暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

KafkaMirrorMaker2.0架构与实战全解《二》

大数据从业者 2021-10-01
6230

背景

本文为MM2架构与实战全解系列的第二篇文章,主要介绍MM2当前支持的运行模式、配置文件语法及参数集、metrics监控指标等内容。穿插介绍kafka connect框架及confluent公司connectors连接器。


MirrorMaker2.0运行模式

    * As a dedicated MirrorMaker cluster.
    * As a Connector in a distributed Connect cluster.
    * As a standalone Connect worker.


    运行专用MirrorMaker集群

      #does not require an existing Connect cluster. Instead, a high-level driver manages a collection of Connect workers.
      $ ./bin/connect-mirror-maker.sh mm2.properties


      运行standalone模式

        #In this mode, a single Connect worker runs MirrorSourceConnector. 
        #This does not support multinode clusters, but is useful for small workloads or for testing.
        $ ./bin/connecti-standalone.sh worker.properties connector.properties


        运行distributed模式

          #If you already have a Connect cluster, you can configure it to run MirrorMaker connectors.
          #There are four such connectors:MirrorSourceConnector、MirrorSinkConnector、MirrorCheckpointConnector、MirrorHeartbeatConnector
          Configure these using the Connect REST API:
          PUT connectors/us-west-source/config HTTP/1.1
          {
          "name": "us-west-source",
          "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
          "source.cluster.alias": "us-west",
          "target.cluster.alias": "us-east",
          "source.cluster.bootstrap.servers": "us-west-host1:9091",
          "topics": ".*"
          }

          我们都知道MM2是基于kafka connect框架实现的,熟悉该框架几个核心概念,对于理解该框架运行机制很有必要,如下:

            Connectors:the high level abstraction that coordinates data streaming by managing tasks
            Tasks:the implementation of how data is copied to or from Kafka
            Workers:the running processes that execute connectors and tasks

            confluent公司platform产品内置120+connectors,用于kafka与其他系统之间同步数据。虽然提供的有开源社区版本,但是值得注意的是:开源协议并不是常见的Apache 2.0 License,而是Confluent Community License(即允许免费下载、修改和重新分发代码,但不允许将软件作为 SaaS 产品提供,如 KSQL-as-a-service)。



            MirrorMaker2.0配置文件语法

            默认为connect-mirror-maker.properties(可自定义),内容包括三个模块:

            1. MirrorMaker settings  

            可以定义集群级别、数据流级别

              # 集群级别 
              clusters = us-west, us-east # defines cluster aliases
              us-west.bootstrap.servers = broker3-west:9092
              us-east.bootstrap.servers = broker5-east:9092
              topics = .* # all topics to be replicated by default
              topics.exclude = .*[\-\.]internal, .*\.replica, __.*
              groups = .*
              groups.exclude = console-consumer-.*, connect-.*, __.*
              ##检查新建topics
              refresh.topics.enabled = true
              refresh.topics.interval.seconds = 600
              ##检查新建groups
              refresh.groups.enabled = true
              refresh.groups.interval.seconds = 600
              ##同步topic config
              sync.topic.configs.enabled = true
              ##同步acl
              sync.topic.acls.enabled = true
              ##心跳监测
              emit.heartbeats.enabled = true
              emit.heartbeats.interval.seconds = 1
              heartbeats.topic.replication.factor = 3
              ##checkpoint设置
              emit.checkpoints.enabled = true
              emit.checkpoints.interval.seconds = 60
              checkpoints.topic.replication.factor = 3
              ##同步group offset
              sync.group.offsets.enabled = true
              sync.group.offsets.interval.seconds = 60
              offset-syncs.topic.replication.factor = 3


              # 数据流级别
              us-west->us-east.enabled = true # flow from us-west to us-east
              us-west->us.east.topics = foo.*, bar.* # override the default above
              us-west->us.east.groups = foo.*, bar.* # override the default above
              ##定制target topic分隔符
              us-west->us-east.replication.policy.separator = _ #default(.)
              ##如果需要定制特殊映射关系,可以参考DefaultReplicationPolicy类实现ReplicationPolicy和Configurable两个接口,详见源码
              ##https://github.com/apache/kafka/blob/trunk/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java

              2. Kafka Connect and connector settings

              MirrorMaker2.0基于Kafka Connect框架实现,可以自定义Kafka Connect框架级别参数、每个connector worker-level级别参数

                # 框架级别参数
                # Setting Kafka Connect defaults for MirrorMaker
                tasks.max = 5 # 默认框架参数开箱即用(out-of-the-box),唯独task.max需要依情况设置(建议至少为2,需考虑硬件资源、复制topic-partitions数目)


                # connector work-level级别参数
                # Use the format of {cluster}.{config_name}
                us-west.offset.storage.topic = my-mirrormaker-offsets

                3. Kafka producer, consumer, and admin client settings

                MirrorMaker2.0内部使用Kafka producer、consumer、admin客户端。自定义这些客户端参数很有必要。

                设置格式:

                  * {source}.consumer.{consumer_config_name}
                  * {target}.producer.{producer_config_name}
                  * {source_or_target}.admin.{admin_config_name}
                    consumer举例:us-west cluster (from which to consume)
                    us-west.consumer.isolation.level = read_committed
                    us-west.admin.bootstrap.servers = broker57-primary:9092


                    producer举例:us-east cluster (to which to produce)
                    us-east.producer.compression.type = gzip
                    us-east.producer.buffer.memory = 32768


                    admin举例:
                    us-east.admin.bootstrap.servers = broker8-secondary:9092


                    MirrorMaker2.0监控metrics

                    强烈建议监控mm2进程metrics,以确保所有定义的数据流正常。mm2继承了kafka connect框架所有connect metrics(如source-record-poll-rate)。

                    mm2自身metrics位于kafka.connect.mirror metric group。

                      # MBean: kafka.connect.mirror:type=MirrorSourceConnector,target=([-.w]+),topic=([-.w]+),partition=([0-9]+)
                      record-count # number of records replicated source -> target
                      record-age-ms # age of records when they are replicated
                      record-age-ms-min
                      record-age-ms-max
                      record-age-ms-avg
                      replication-latency-ms # time it takes records to propagate source->target
                      replication-latency-ms-min
                      replication-latency-ms-max
                      replication-latency-ms-avg
                      byte-rate # average number of bytes/sec in replicated records
                      # MBean: kafka.connect.mirror:type=MirrorCheckpointConnector,source=([-.w]+),target=([-.w]+)
                      checkpoint-latency-ms # time it takes to replicate consumer offsets
                      checkpoint-latency-ms-min
                      checkpoint-latency-ms-max
                      checkpoint-latency-ms-avg



                      文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论