背景
最近有项目需要调研MirrorMaker2.0,之前已经发布两篇文章(侧重于概念、理论)。本文主要侧重于实战演练,利用mm2.0模拟实现两个Kafka集群的双活方案。需要查看之前发文的可以到公众号专栏合集中查看,如图:
环境说明
两台服务器,每台服务器部署一个单节点Kafka集群。
服务器主机名 | 集群别名 |
felixzh1 | A |
felixzh2 | B |
Kafka version | 2.7.1 |
Zookeeper version | Kafka内置版本 |
Kafka参数配置
主机felixzh1:
auto.create.topics.enable=false
broker.id=0
listeners=PLAINTEXT://felixzh1:9092
主机felixzh2:
auto.create.topics.enable=false
broker.id=0
listeners=PLAINTEXT://felixzh2:9092
启动Zookeeper&Kafka服务
nohup bin/zookeeper-server-start.shconfig/zookeeper.properties 2>&1 &
nohup bin/kafka-server-start.shconfig/server.properties 2>&1 &
主机felixzh1:
主机felixzh2:
创建测试Topic
bin/kafka-topics.sh --zookeeperlocalhost:2181 --topic test --partitions 1 --replication-factor 1 –create
主机felixzh1:
主机felixzh2:
MirrorMaker2.0参数配置
注意:两个集群配置相同
[root@felixzh1 kafka_2.12-2.7.1]# cat config/connect-mirror-maker.properties
# Licensed to the Apache Software Foundation (ASF) under A or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = A, B
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = felixzh1:9092
B.bootstrap.servers = felixzh2:9092
# enable and configure individual replication flows
A->B.enabled = true
# regex which defines which topics gets replicated. For eg "foo-.*"
A->B.topics = .*
A->B.groups = .*
B->A.enabled = true
B->A.topics = .*
B->A.groups = .*
# Setting replication factor of newly created remote topics
replication.factor=1
############################# Internal Topic Settings #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
#########################felixzh#######################################
refresh.topics.enabled = true
refresh.topics.interval.seconds = 30
refresh.groups.enabled = true
refresh.groups.interval.seconds = 30
sync.topic.configs.enabled = true
sync.topic.acls.enabled = false
sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 30
启动MirrorMaker2.0服务
[root@felixzh1 kafka_2.12-2.7.1]#bin/connect-mirror-maker.sh config/connect-mirror-maker.properties --clusters A
[root@felixzh2 kafka_2.12-2.7.1]#bin/connect-mirror-maker.sh config/connect-mirror-maker.properties --clusters B
查看Topic列表
主机felixzh1:
[root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-topics.sh --zookeeper localhost:2181 --list
主机felixzh2:
[root@felixzh2 kafka_2.12-2.7.1]#bin/kafka-topics.sh --zookeeper localhost:2181 --list
测试1:数据同步
说明:写集群A (topic:test),消费集群B(topic:A.test)
主机felixzh1:
[root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-console-producer.sh --broker-list felixzh1:9092 --topic test
主机felixzh2:
[root@felixzh2 kafka_2.12-2.7.1]#bin/kafka-console-consumer.sh --bootstrap-server felixzh2:9092 --topic A.test--from-beginning
结论:集群A(topic:test)数据已经同步到集群B(topic:A.test)
测试2:消费者组offset同步
说明:使用group组test消费集群A(topic:test),查看集群B消费者组offset信息
主机felixzh1:
[root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-console-consumer.sh --bootstrap-server felixzh1:9092 --topic test--group test
[root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-consumer-groups.sh --bootstrap-server felixzh1:9092 --all-groups –describe
主机felixzh2:
[root@felixzh2 kafka_2.12-2.7.1]#bin/kafka-consumer-groups.sh --bootstrap-server felixzh2:9092 --all-groups –describe
结论:集群A消费者组offset信息已经同步到集群B。
文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。