暂无图片
暂无图片
3
暂无图片
暂无图片
暂无图片

Kafka 维护笔记--Kafka Connect REST API 和管理消费组

原创 张玉龙 2022-04-18
4525

一个问题引出本篇文章,作为工具类,方便维护 Kafka

基于这篇文章:在Docker环境上使用Kafka Connect JDBC将变更数据从Kafka应用到PostgreSQL

  1. 我把目标端 PostgreSQL 里的表给删除了,怎么让 Kafka 重新同步这个表呢?
    需要修改或删除消费者组的偏移量offset
    image.png
  2. 怎么修改或删除消费者组的偏移量offset? --见文章末尾
  3. 这一个问题研究了一天,太难了

Kafka Connect REST API

参考文档:https://kafka.apache.org/documentation.html#connect_rest

获取 Connect 集群的基本信息

# curl -s -X GET localhost:8083/ | jq

image.png

列出 Kafka Connect Worker 上安装的插件

# curl -s -X GET localhost:8083/connector-plugins | jq

image.png

创建一个连接器

# vi pgsql-scott-jdbc-sink.json { "name": "pgsql-scott-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://172.17.0.7:5432/scott?user=postgres&password=postgres&sslMode=require", "tasks.max": "1", "topics": "oracle19c.SCOTT.DEPT", "table.name.format": "dept", "dialect.name": "PostgreSqlDatabaseDialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true" } } # curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json

获取所有现有的连接器名称

# curl -s -X GET localhost:8083/connectors/ | jq

image.png

获取连接器的配置信息

# curl -s -X GET localhost:8083/connectors/pgsql-scott-jdbc-sink | jq

image.png

获取连接器的状态信息

# curl -s -X GET localhost:8083/connectors/pgsql-scott-jdbc-sink/status | jq

image.png

获取当前为连接器运行的任务列表

# curl -s -X GET localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks | jq

image.png

获取任务的当前状态

# curl -s -X GET localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks/0/status | jq

image.png

获取连接器使用的主题(topics)列表

# curl -s -X GET localhost:8083/connectors/oracle-scott-connector/topics | jq

image.png

清空连接器的活动主题(topics)列表

# curl -s -X PUT localhost:8083/connectors/oracle-scott-connector/topics/reset

暂停连接器任务

# curl -s -X PUT localhost:8083/connectors/pgsql-scott-jdbc-sink/pause

恢复连接器任务

# curl -s -X PUT localhost:8083/connectors/pgsql-scott-jdbc-sink/resume

删除连接器

# curl -s -X DELETE localhost:8083/connectors/pgsql-scott-jdbc-sink

更新连接器

# cat pgsql-scott-jdbc-sink.json { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://172.17.0.6:5432/scott?user=postgres&password=postgres&sslMode=require", "tasks.max": "1", "topics": "oracle19c.SCOTT.DEPT", "table.name.format": "DEPT", "dialect.name": "PostgreSqlDatabaseDialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true" } # curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/pgsql-scott-jdbc-sink/config -d @pgsql-scott-jdbc-sink.json

重启连接器和任务(tasks)

  • 语法
POST /connectors/{name}/restart?includeTasks=<true|false>&onlyFailed=<true|false> # "includeTasks=true": 重新启动连接器实例和任务实例 # "includeTasks=false"(默认): 仅重新启动连接器实例 # "onlyFailed=true": 仅重新启动具有 FAILED 状态的实例 # "onlyFailed=false"(默认): 重新所有实例
  • 示例
# curl -s -X POST localhost:8083/connectors/pgsql-scott-jdbc-sink/restart
  • 默认只重新启动连接器并不会重新启动其所有任务。因此,您也可以重新启动失败的单个任务,然后再次获取其状态:
# curl -s -X POST localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks/0/restart # curl -s -X GET localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks/0/status | jq

kafka-consumer-groups.sh 消费者组管理

参考文章:https://blog.csdn.net/u010634066/article/details/119670405

相关可选参数

参数 描述
–bootstrap-server 连接到指定的kafka服务
–list 列出所有消费者组名称
–describe 查询消费者描述信息
–group 指定消费者组
–all-groups 指定所有消费者组
–members 查询消费者组的成员信息
–state 查询消费者的状态信息
–offsets 列出消息的偏移量信息
–delete 删除消费者组
–reset-offsets 重置消费组的偏移量
–dry-run 重置偏移量的命令预执行
–excute 真正的执行重置偏移量的操作
–delete-offsets 删除偏移量

查看消费者列表 --list

# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list

查看消费者组详情 --describe --group/all-groups

# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group connect-pgsql-scott-jdbc-sink GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID connect-pgsql-scott-jdbc-sink oracle19c.SCOTT.DEPT 0 7 7 0 connector-consumer-pgsql-scott-jdbc-sink-0-17144e72-7b1b-49ef-a3a6-bb4677a43ece /172.17.0.5 connector-consumer-pgsql-scott-jdbc-sink-0 # 查看所有消费者组信息 # bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --all-groups

查看消费者成员信息 --members

# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --members --group connect-pgsql-scott-jdbc-sink GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS connect-pgsql-scott-jdbc-sink connector-consumer-pgsql-scott-jdbc-sink-0-17144e72-7b1b-49ef-a3a6-bb4677a43ece /172.17.0.5 connector-consumer-pgsql-scott-jdbc-sink-0 1 # bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --members --all-groups

消费者状态信息 --state

# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --state --group connect-pgsql-scott-jdbc-sink GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS connect-pgsql-scott-jdbc-sink 172.17.0.4:9092 (1) range Stable 1 # bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --state --all-groups

删除消费者组 --delete

  • 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete --group pgsql-scott-jdbc-sink Deletion of requested consumer groups ('pgsql-scott-jdbc-sink') was successful. # bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete --all-groups

重置消费组的偏移量 --reset-offsets

  • 能够执行成功的一个前提是 消费组是不可用状态
  • 相关重置 offset 的模式
参数 描述
–to-earliest 重置 offset 到最开始的那条 offset (找到还未被删除最早的那个 offset )
–to-current 直接重置 offset 到当前的 offset,也就是 LOE
–to-latest 重置到最后一个 offset
–to-datetime 重置到指定时间的 offset,格式为:YYYY-MM-DDTHH:mm:SS.sss;
- 例如: --to-datetime “2021-6-26T00:00:00.000”
–to-offset 重置到指定的 offset,一般不用这个,例如:–to-offset 3465
–shift-by 按照偏移量增加或者减少多少个 offset,例如:–shift-by 100 、–shift-by -100
–from-file 根据CVS文档来重置
  • 上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的 offset;不过 --from-file 可以让我们更灵活一点
# 先配置cvs文档,格式为: Topic:分区号:重置目标偏移量 oracle19c.SCOTT.DEPT:0:100 oracle19c.SCOTT.DEPT:1:200 oracle19c.SCOTT.DEPT:2:300 # 执行命令 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --from-file config/reset-offset.csv --group test2_consumer_group --dry-run
  • 示例
# --dry-run 预执行,不会真正执行命令,为了看看效果 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --all-topics --to-earliest --dry-run --group connect-pgsql-scott-jdbc-sink # --execute 真正执行命令 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --all-topics --to-earliest --execute --group connect-pgsql-scott-jdbc-sink # --topic 指定主题 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.SCOTT.DEPT --to-earliest --execute --group connect-pgsql-scott-jdbc-sink # --to-offset 指定偏移量 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.SCOTT.DEPT --to-offset 2 --execute --group connect-pgsql-scott-jdbc-sink # --topic :0 指定分区 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.SCOTT.DEPT:0 --to-offset 2 --execute --group connect-pgsql-scott-jdbc-sink

删除偏移量 --delete-offsets

  • 能够执行成功的一个前提是 消费组是不可用状态
  • 偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete-offsets --topic oracle19c.SCOTT.DEPT --group connect-pgsql-scott-jdbc-sink

解决问题

  1. 怎么修改或删除消费者组的偏移量offset?
# 查询需要修改的主题名 bin/kafka-topics.sh --list --bootstrap-server kafka:9092 # 查询需要修改的消费者组名 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list # 确认修改的主题名和消费者组名是否正确 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group connect-pgsql-scott-jdbc-sink
  1. 修改或删除消费组的偏移量的前提是:消费组是不可用状态
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group connect-pgsql-scott-jdbc-sink

如果 CONSUMER-ID、HOST、CLIENT-ID 存在信息,则消费组是处于活动状态,修改或删除消费组的偏移量将会失败,报出以下错误信息:

Error: Assignments can only be reset if the group 'connect-pgsql-scott-jdbc-sink' is inactive, but the current state is Stable.
  1. 尝试暂停连接器任务,消费者组还是处于活动状态
curl -s -X PUT localhost:8083/connectors/pgsql-scott-jdbc-sink/pause
  1. 先删除连接器,消费者组处于非活动状态,不知道这个地方有什么好的方法能让消费者组处于非活动状态
curl -s -X DELETE localhost:8083/connectors/pgsql-scott-jdbc-sink

image.png
5. 修改或删除消费组的偏移量

# 修改消费组的偏移量 bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.SCOTT.DEPT --to-earliest --execute --group connect-pgsql-scott-jdbc-sink GROUP TOPIC PARTITION NEW-OFFSET connect-pgsql-scott-jdbc-sink oracle19c.SCOTT.DEPT 0 0 # 删除消费者组的偏移量 # bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete-offsets --topic oracle19c.SCOTT.DEPT --group connect-pgsql-scott-jdbc-sink Request succeed for deleting offsets with topic oracle19c.SCOTT.DEPT group connect-pgsql-scott-jdbc-sink TOPIC PARTITION STATUS oracle19c.SCOTT.DEPT 0 Successful
  1. 添加连接器,此时 PostgreSQL 上的目标表已重新同步
vi pgsql-scott-jdbc-sink.json { "name": "pgsql-scott-jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://172.17.0.6:5432/scott?user=postgres&password=postgres&sslMode=require", "tasks.max": "1", "topics": "oracle19c.SCOTT.DEPT", "table.name.format": "scott.dept", "dialect.name": "PostgreSqlDatabaseDialect", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "auto.evolve": "true" } } curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json

image.png

最后修改时间:2022-04-19 09:04:16
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论