暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

KafkaMirrorMaker2.0架构与实战全解《三》

大数据从业者 2021-10-12
4376

背景

最近有项目需要调研MirrorMaker2.0,之前已经发布两篇文章(侧重于概念、理论)。本文主要侧重于实战演练,利用mm2.0模拟实现两个Kafka集群的双活方案。需要查看之前发文的可以到公众号专栏合集中查看,如图:



环境说明

两台服务器,每台服务器部署一个单节点Kafka集群。

服务器主机名

集群别名

felixzh1

A

felixzh2

B


Kafka version

2.7.1

Zookeeper version

Kafka内置版本

 

Kafka参数配置

主机felixzh1:

    auto.create.topics.enable=false
    broker.id=0
    listeners=PLAINTEXT://felixzh1:9092

    主机felixzh2:

      auto.create.topics.enable=false
      broker.id=0
      listeners=PLAINTEXT://felixzh2:9092

       

      启动Zookeeper&Kafka服务

        nohup bin/zookeeper-server-start.shconfig/zookeeper.properties 2>&1 &
        nohup bin/kafka-server-start.shconfig/server.properties 2>&1 &

        主机felixzh1:


        主机felixzh2:


        创建测试Topic

          bin/kafka-topics.sh --zookeeperlocalhost:2181 --topic test --partitions 1 --replication-factor 1 –create

          主机felixzh1:


          主机felixzh2:


           

          MirrorMaker2.0参数配置

          注意:两个集群配置相同

            [root@felixzh1 kafka_2.12-2.7.1]# cat config/connect-mirror-maker.properties
            # Licensed to the Apache Software Foundation (ASF) under A or more
            # contributor license agreements. See the NOTICE file distributed with
            # this work for additional information regarding copyright ownership.
            # The ASF licenses this file to You under the Apache License, Version 2.0
            # (the "License"); you may not use this file except in compliance with
            # the License. You may obtain a copy of the License at
            #
            # http://www.apache.org/licenses/LICENSE-2.0
            #
            # Unless required by applicable law or agreed to in writing, software
            # distributed under the License is distributed on an "AS IS" BASIS,
            # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            # See the License for the specific language governing permissions and
            # limitations under the License.
            # see org.apache.kafka.clients.consumer.ConsumerConfig for more details


            # Sample MirrorMaker 2.0 top-level configuration file
            # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties


            # specify any number of cluster aliases
            clusters = A, B


            # connection information for each cluster
            # This is a comma separated host:port pairs for each cluster
            # for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
            A.bootstrap.servers = felixzh1:9092
            B.bootstrap.servers = felixzh2:9092


            # enable and configure individual replication flows
            A->B.enabled = true


            # regex which defines which topics gets replicated. For eg "foo-.*"
            A->B.topics = .*
            A->B.groups = .*


            B->A.enabled = true
            B->A.topics = .*
            B->A.groups = .*


            # Setting replication factor of newly created remote topics
            replication.factor=1


            ############################# Internal Topic Settings #############################
            # The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
            # "mm2-offset-syncs.B.internal"
            # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
            checkpoints.topic.replication.factor=1
            heartbeats.topic.replication.factor=1
            offset-syncs.topic.replication.factor=1


            # The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
            # "mm2-status.B.internal"
            # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
            offset.storage.replication.factor=1
            status.storage.replication.factor=1
            config.storage.replication.factor=1


            # customize as needed
            # replication.policy.separator = _
            # sync.topic.acls.enabled = false
            # emit.heartbeats.interval.seconds = 5


            #########################felixzh#######################################
            refresh.topics.enabled = true
            refresh.topics.interval.seconds = 30
            refresh.groups.enabled = true
            refresh.groups.interval.seconds = 30
            sync.topic.configs.enabled = true
            sync.topic.acls.enabled = false
            sync.group.offsets.enabled = true
            sync.group.offsets.interval.seconds = 30

             

            启动MirrorMaker2.0服务

              [root@felixzh1 kafka_2.12-2.7.1]#bin/connect-mirror-maker.sh config/connect-mirror-maker.properties --clusters A
              [root@felixzh2 kafka_2.12-2.7.1]#bin/connect-mirror-maker.sh config/connect-mirror-maker.properties --clusters B

               

              查看Topic列表

              主机felixzh1:

                [root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-topics.sh --zookeeper localhost:2181 --list

                主机felixzh2:

                  [root@felixzh2 kafka_2.12-2.7.1]#bin/kafka-topics.sh --zookeeper localhost:2181 --list

                   


                  测试1:数据同步

                  说明:写集群A (topic:test),消费集群B(topic:A.test)

                  主机felixzh1:

                    [root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-console-producer.sh --broker-list felixzh1:9092 --topic test

                    主机felixzh2:

                      [root@felixzh2 kafka_2.12-2.7.1]#bin/kafka-console-consumer.sh --bootstrap-server felixzh2:9092 --topic A.test--from-beginning

                      结论:集群A(topic:test)数据已经同步到集群B(topic:A.test)

                       

                      测试2:消费者组offset同步

                      说明:使用group组test消费集群A(topic:test),查看集群B消费者组offset信息

                      主机felixzh1:

                        [root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-console-consumer.sh --bootstrap-server felixzh1:9092 --topic test--group test

                          [root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-consumer-groups.sh --bootstrap-server felixzh1:9092 --all-groups –describe 

                          主机felixzh2:

                            [root@felixzh2 kafka_2.12-2.7.1]#bin/kafka-consumer-groups.sh --bootstrap-server felixzh2:9092 --all-groups –describe

                            结论:集群A消费者组offset信息已经同步到集群B。


                            文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                            评论