一个问题引出本篇文章,作为工具类,方便维护 Kafka
基于这篇文章:在Docker环境上使用Kafka Connect JDBC将变更数据从Kafka应用到PostgreSQL
- 我把目标端 PostgreSQL 里的表给删除了,怎么让 Kafka 重新同步这个表呢?
需要修改或删除消费者组的偏移量offset
- 怎么修改或删除消费者组的偏移量offset? --见文章末尾
- 这一个问题研究了一天,太难了
Kafka Connect REST API
参考文档:https://kafka.apache.org/documentation.html#connect_rest
获取 Connect 集群的基本信息
# curl -s -X GET localhost:8083/ | jq
列出 Kafka Connect Worker 上安装的插件
# curl -s -X GET localhost:8083/connector-plugins | jq
创建一个连接器
# vi pgsql-scott-jdbc-sink.json
{
"name": "pgsql-scott-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://172.17.0.7:5432/scott?user=postgres&password=postgres&sslMode=require",
"tasks.max": "1",
"topics": "oracle19c.SCOTT.DEPT",
"table.name.format": "dept",
"dialect.name": "PostgreSqlDatabaseDialect",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true"
}
}
# curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json
获取所有现有的连接器名称
# curl -s -X GET localhost:8083/connectors/ | jq
获取连接器的配置信息
# curl -s -X GET localhost:8083/connectors/pgsql-scott-jdbc-sink | jq
获取连接器的状态信息
# curl -s -X GET localhost:8083/connectors/pgsql-scott-jdbc-sink/status | jq
获取当前为连接器运行的任务列表
# curl -s -X GET localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks | jq
获取任务的当前状态
# curl -s -X GET localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks/0/status | jq
获取连接器使用的主题(topics)列表
# curl -s -X GET localhost:8083/connectors/oracle-scott-connector/topics | jq
清空连接器的活动主题(topics)列表
# curl -s -X PUT localhost:8083/connectors/oracle-scott-connector/topics/reset
暂停连接器任务
# curl -s -X PUT localhost:8083/connectors/pgsql-scott-jdbc-sink/pause
恢复连接器任务
# curl -s -X PUT localhost:8083/connectors/pgsql-scott-jdbc-sink/resume
删除连接器
# curl -s -X DELETE localhost:8083/connectors/pgsql-scott-jdbc-sink
更新连接器
# cat pgsql-scott-jdbc-sink.json
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://172.17.0.6:5432/scott?user=postgres&password=postgres&sslMode=require",
"tasks.max": "1",
"topics": "oracle19c.SCOTT.DEPT",
"table.name.format": "DEPT",
"dialect.name": "PostgreSqlDatabaseDialect",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true"
}
# curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/pgsql-scott-jdbc-sink/config -d @pgsql-scott-jdbc-sink.json
重启连接器和任务(tasks)
- 语法
POST /connectors/{name}/restart?includeTasks=<true|false>&onlyFailed=<true|false>
# "includeTasks=true": 重新启动连接器实例和任务实例
# "includeTasks=false"(默认): 仅重新启动连接器实例
# "onlyFailed=true": 仅重新启动具有 FAILED 状态的实例
# "onlyFailed=false"(默认): 重新所有实例
- 示例
# curl -s -X POST localhost:8083/connectors/pgsql-scott-jdbc-sink/restart
- 默认只重新启动连接器并不会重新启动其所有任务。因此,您也可以重新启动失败的单个任务,然后再次获取其状态:
# curl -s -X POST localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks/0/restart
# curl -s -X GET localhost:8083/connectors/pgsql-scott-jdbc-sink/tasks/0/status | jq
kafka-consumer-groups.sh 消费者组管理
参考文章:https://blog.csdn.net/u010634066/article/details/119670405
相关可选参数
参数 | 描述 |
---|---|
–bootstrap-server | 连接到指定的kafka服务 |
–list | 列出所有消费者组名称 |
–describe | 查询消费者描述信息 |
–group | 指定消费者组 |
–all-groups | 指定所有消费者组 |
–members | 查询消费者组的成员信息 |
–state | 查询消费者的状态信息 |
–offsets | 列出消息的偏移量信息 |
–delete | 删除消费者组 |
–reset-offsets | 重置消费组的偏移量 |
–dry-run | 重置偏移量的命令预执行 |
–excute | 真正的执行重置偏移量的操作 |
–delete-offsets | 删除偏移量 |
查看消费者列表 --list
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
查看消费者组详情 --describe --group/all-groups
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group connect-pgsql-scott-jdbc-sink
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-pgsql-scott-jdbc-sink oracle19c.SCOTT.DEPT 0 7 7 0 connector-consumer-pgsql-scott-jdbc-sink-0-17144e72-7b1b-49ef-a3a6-bb4677a43ece /172.17.0.5 connector-consumer-pgsql-scott-jdbc-sink-0
# 查看所有消费者组信息
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --all-groups
查看消费者成员信息 --members
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --members --group connect-pgsql-scott-jdbc-sink
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
connect-pgsql-scott-jdbc-sink connector-consumer-pgsql-scott-jdbc-sink-0-17144e72-7b1b-49ef-a3a6-bb4677a43ece /172.17.0.5 connector-consumer-pgsql-scott-jdbc-sink-0 1
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --members --all-groups
消费者状态信息 --state
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --state --group connect-pgsql-scott-jdbc-sink
GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
connect-pgsql-scott-jdbc-sink 172.17.0.4:9092 (1) range Stable 1
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --state --all-groups
删除消费者组 --delete
- 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete --group pgsql-scott-jdbc-sink
Deletion of requested consumer groups ('pgsql-scott-jdbc-sink') was successful.
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete --all-groups
重置消费组的偏移量 --reset-offsets
- 能够执行成功的一个前提是 消费组是不可用状态
- 相关重置 offset 的模式
参数 | 描述 |
---|---|
–to-earliest | 重置 offset 到最开始的那条 offset (找到还未被删除最早的那个 offset ) |
–to-current | 直接重置 offset 到当前的 offset,也就是 LOE |
–to-latest | 重置到最后一个 offset |
–to-datetime | 重置到指定时间的 offset,格式为:YYYY-MM-DDTHH:mm:SS.sss; |
- | 例如: --to-datetime “2021-6-26T00:00:00.000” |
–to-offset | 重置到指定的 offset,一般不用这个,例如:–to-offset 3465 |
–shift-by | 按照偏移量增加或者减少多少个 offset,例如:–shift-by 100 、–shift-by -100 |
–from-file | 根据CVS文档来重置 |
- 上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的 offset;不过 --from-file 可以让我们更灵活一点
# 先配置cvs文档,格式为: Topic:分区号:重置目标偏移量
oracle19c.SCOTT.DEPT:0:100
oracle19c.SCOTT.DEPT:1:200
oracle19c.SCOTT.DEPT:2:300
# 执行命令
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --from-file config/reset-offset.csv --group test2_consumer_group --dry-run
- 示例
# --dry-run 预执行,不会真正执行命令,为了看看效果
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --all-topics --to-earliest --dry-run --group connect-pgsql-scott-jdbc-sink
# --execute 真正执行命令
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --all-topics --to-earliest --execute --group connect-pgsql-scott-jdbc-sink
# --topic 指定主题
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.SCOTT.DEPT --to-earliest --execute --group connect-pgsql-scott-jdbc-sink
# --to-offset 指定偏移量
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.SCOTT.DEPT --to-offset 2 --execute --group connect-pgsql-scott-jdbc-sink
# --topic :0 指定分区
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.SCOTT.DEPT:0 --to-offset 2 --execute --group connect-pgsql-scott-jdbc-sink
删除偏移量 --delete-offsets
- 能够执行成功的一个前提是 消费组是不可用状态
- 偏移量被删除了之后,Consumer Group下次启动的时候,会从头消费
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete-offsets --topic oracle19c.SCOTT.DEPT --group connect-pgsql-scott-jdbc-sink
解决问题
- 怎么修改或删除消费者组的偏移量offset?
# 查询需要修改的主题名
bin/kafka-topics.sh --list --bootstrap-server kafka:9092
# 查询需要修改的消费者组名
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
# 确认修改的主题名和消费者组名是否正确
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group connect-pgsql-scott-jdbc-sink
- 修改或删除消费组的偏移量的前提是:消费组是不可用状态
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group connect-pgsql-scott-jdbc-sink
如果 CONSUMER-ID、HOST、CLIENT-ID 存在信息,则消费组是处于活动状态,修改或删除消费组的偏移量将会失败,报出以下错误信息:
Error: Assignments can only be reset if the group 'connect-pgsql-scott-jdbc-sink' is inactive, but the current state is Stable.
- 尝试暂停连接器任务,消费者组还是处于活动状态
curl -s -X PUT localhost:8083/connectors/pgsql-scott-jdbc-sink/pause
- 先删除连接器,消费者组处于非活动状态,不知道这个地方有什么好的方法能让消费者组处于非活动状态
curl -s -X DELETE localhost:8083/connectors/pgsql-scott-jdbc-sink
5. 修改或删除消费组的偏移量
# 修改消费组的偏移量
bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --reset-offsets --topic oracle19c.SCOTT.DEPT --to-earliest --execute --group connect-pgsql-scott-jdbc-sink
GROUP TOPIC PARTITION NEW-OFFSET
connect-pgsql-scott-jdbc-sink oracle19c.SCOTT.DEPT 0 0
# 删除消费者组的偏移量
# bin/kafka-consumer-groups.sh --bootstrap-server kafka:9092 --delete-offsets --topic oracle19c.SCOTT.DEPT --group connect-pgsql-scott-jdbc-sink
Request succeed for deleting offsets with topic oracle19c.SCOTT.DEPT group connect-pgsql-scott-jdbc-sink
TOPIC PARTITION STATUS
oracle19c.SCOTT.DEPT 0 Successful
- 添加连接器,此时 PostgreSQL 上的目标表已重新同步
vi pgsql-scott-jdbc-sink.json
{
"name": "pgsql-scott-jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://172.17.0.6:5432/scott?user=postgres&password=postgres&sslMode=require",
"tasks.max": "1",
"topics": "oracle19c.SCOTT.DEPT",
"table.name.format": "scott.dept",
"dialect.name": "PostgreSqlDatabaseDialect",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true"
}
}
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.0.40:8083/connectors/ -d @pgsql-scott-jdbc-sink.json
最后修改时间:2022-04-19 09:04:16
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。