暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

kafka-connector-plugin

DBally 2021-11-15
1467


摘要

上文已经介绍了如何以插件的方式部署Debezium,可以看到通过kafka connect接口,可以部署和使用丰富的连接器。confluent提供了丰富的connect plugin,包含自研的和一些第三方的plugin,而Debezium开源项目提供针对DB 的connector。所以单纯从功能上来说,通过kafka和connector plugin 实现数据总线,或者数据流成为可能,让数据流动起来。
上文已经部署过了stanalone 模式,本文会着重介绍分布式模式部署kafka connect,并说明Debezium社区开发的所有连接器,以及支持的数据种类。
目前只说明kafka connector source ,为下文展开sink 端的数据导出,本文还会部署confluent 公司开发 jdbc connector,能连接所有提供jdbc连接的数据库,实现数据的导入和导出。

kafka connector 分布式架构说明

Kafka Connect是Kafka 0.9+增加了一个新的特性,提供了API可以更方便的创建和管理数据流管道。它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统。Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等,比如数据库、Elastic Search、Apache Ignite等。

plugin组件介绍

  • confluentinc-kafka-connect-jdbc-10.2.5.zip
    jdbc 支持source 和sink
    debezium-connector-oracle-1.7.0.Final-plugin.tar.gz
    只支持source,CDC日志采集
    debezium-connector-mysql-1.7.0.Final-plugin.tar.gz
    只支持source,CDC日志采集
    debezium-connector-postgres-1.7.0.Final-plugin.tar.gz
    只支持source,CDC日志采集
    debezium-connector-mongodb-1.7.1.Final-plugin.tar.gz
    只支持source,CDC日志采集
    debezium-connector-sqlserver-1.7.1.Final-plugin.tar.gz
    只支持source,CDC日志采集
    debezium-connector-db2-1.7.1.Final-plugin.tar.gz
    只支持source,CDC日志采集
    debezium-connector-vitess-1.7.1.Final-plugin.tar.gz
    只支持source,CDC日志采集
    debezium-connector-cassandra-1.7.1.Final-plugin.tar.gz
    只支持source,CDC日志采集

kafka部署和 plugin组件配置

1、zookeeper 集群部署

--所有节点修改配置文件
vi /app/kafka_2.13-2.8.1/config/zookeeper.properties
dataDir=/app/data/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
tickTime=2000
initLimit=5
syncLimit=2
server.1=10.96.80.31:2888:3888
server.2=10.96.80.32:2888:3888
server.3=10.96.80.33:2888:3888

--设置不同节点的myid
echo "1" > /app/data/zookeeper/myid
echo "2" > /app/data/zookeeper/myid
echo "3" > /app/data/zookeeper/myid

--所有节点启动服务
zookeeper-server-start.sh -daemon /app/kafka_2.13-2.8.1/config/zookeeper.properties

复制

2、kafka集群配置

--修改配置文件,所有节点

vi /app/kafka_2.13-2.8.1/config/server.properties
broker.id=1
listeners=PLAINTEXT://10.96.80.31:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
min.insync.replicas=2
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

log.retention.hours=2

log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.96.80.31:2181,10.96.80.32:2181,10.96.80.33:2181

zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0
delete.topic.enable=true

节点2
broker.id=2
listeners=PLAINTEXT://10.96.80.32:9092
节点三
broker.id=3
listeners=PLAINTEXT://10.96.80.33:9092

--启动kafka服务
kafka-server-start.sh -daemon /app/kafka_2.13-2.8.1/config/server.properties

复制

3、Kafka connect配置和启动

--节点一
vi /app/kafka_2.13-2.8.1/config/connect-distributed.properties
bootstrap.servers=10.96.80.31:9092,10.96.80.32:9092,10.96.80.33:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false


offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3

offset.flush.interval.ms=10000
rest.advertised.host.name=10.96.80.31

offset.storage.file.filename=/app/data/connect.offsets
plugin.path=/app/kafka_2.13-2.8.1/plugin
--节点二
rest.advertised.host.name=10.96.80.32
节点三
rest.advertised.host.name=10.96.80.33
--创建启动必须的topic
kafka-topics.sh --create --zookeeper 10.96.80.31:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
kafka-topics.sh --create --zookeeper 10.96.80.31:2181 --topic connect-offsets --replication-factor 3 --partitions 3 --config cleanup.policy=compact
kafka-topics.sh --create --zookeeper 10.96.80.31:2181 --topic connect-status --replication-factor 3 --partitions 1 --config cleanup.policy=compact

--启动分布式的connect服务,所有节点

connect-distributed.sh -daemon /app/kafka_2.13-2.8.1/config/connect-distributed.properties

复制

4、查询支持 plugins

[kafka@node2 ~]$ curl -s localhost:8083/connector-plugins|jq
[
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.2.5"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "10.2.5"
},
{
"class": "io.debezium.connector.db2.Db2Connector",
"type": "source",
"version": "1.7.1.Final"
},
{
"class": "io.debezium.connector.mongodb.MongoDbConnector",
"type": "source",
"version": "1.7.1.Final"
},
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "1.7.0.Final"
},
{
"class": "io.debezium.connector.oracle.OracleConnector",
"type": "source",
"version": "1.7.0.Final"
},
{
"class": "io.debezium.connector.postgresql.PostgresConnector",
"type": "source",
"version": "1.7.0.Final"
},
{
"class": "io.debezium.connector.sqlserver.SqlServerConnector",
"type": "source",
"version": "1.7.1.Final"
},
{
"class": "io.debezium.connector.vitess.VitessConnector",
"type": "source",
"version": "1.7.1.Final"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.8.1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.8.1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]

复制

配置Oracle connector

需要特别注意,connectors 的建立通过reset api实现

1、创建连接器

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "hr-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "DB01",
"database.hostname" : "10.96.80.21",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "orcl",
"database.pdb.name":"PDB1",
"table.include.list" : "hr.test",
"database.history.kafka.bootstrap.servers" : "10.96.80.31:9092,10.96.80.32:9092,10.96.80.33:9092",
"database.history.kafka.topic": "schema-changes.test"
}
}'

复制

2、检查连接器的状态

[kafka@node1 debezium-connector-cassandra]$ curl -s localhost:8083/connectors/hr-connector/status|jq
{
"name": "hr-connector",
"connector": {
"state": "RUNNING",
"worker_id": "10.96.80.31:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.96.80.31:8083"
}
],
"type": "source"
}

复制

配置MySQLconnector

1、创建MySQL 的connector

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name" : "mysql-connector",
"config":
{
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"database.hostname" : "10.96.80.31",
"database.port" : "3307",
"database.user" : "dbz_mysql",
"database.password" : "dbz",
"database.server.id" : "1234",
"database.server.name" : "MYSQLDB",
"database.include.list": "TEST",
"database.history.kafka.bootstrap.servers" : "10.96.80.31:9092,10.96.80.32:9092,10.96.80.33:9092",
"database.history.kafka.topic" : "mysql.history",
"include.schema.changes" : "true" ,
"database.history.skip.unparseable.ddl" : "true"
}
}'

复制

查看connector 状态

[kafka@node1 debezium-connector-cassandra]$ curl -s localhost:8083/connectors/mysql-connector/status|jq
{
"name": "mysql-connector",
"connector": {
"state": "RUNNING",
"worker_id": "10.96.80.33:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.96.80.33:8083"
}
],
"type": "source"
}

复制

reset api 的常用命令

1、查看当前集群的连接器

[kafka@node1 ~]$ curl -s localhost:8083/connectors|jq
[
"mysql-connector",
"hr-connector"
]

复制

2、查看连接器详情

[kafka@node1 ~]$ curl -s localhost:8083/connectors/mysql-connector |jq
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.user": "dbz_mysql",
"database.server.id": "1234",
"database.history.kafka.bootstrap.servers": "10.96.80.31:9092,10.96.80.32:9092,10.96.80.33:9092",
"database.history.kafka.topic": "mysql.history",
"database.server.name": "MYSQLDB",
"database.port": "3307",
"include.schema.changes": "true",
"database.hostname": "10.96.80.31",
"database.password": "dbz",
"name": "mysql-connector",
"database.history.skip.unparseable.ddl": "true",
"database.include.list": "TEST"
},
"tasks": [
{
"connector": "mysql-connector",
"task": 0
}
],
"type": "source"
}

复制

3、查看连接器的状态

[kafka@node1 ~]$ curl -s localhost:8083/connectors/mysql-connector/status|jq
{
"name": "mysql-connector",
"connector": {
"state": "RUNNING",
"worker_id": "10.96.80.33:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "10.96.80.33:8083"
}
],
"type": "source"
}

复制

4、暂停连接器

curl -s -X PUT localhost:8083/connectors/mysql-connector/pause

复制

5、恢复连接器

curl -s -X PUT localhost:8083/connectors/mysql-connector/resume

复制

6、重启连接器

curl -s -X PUT localhost:8083/connectors/mysql-connector/restart

复制

7、删除连接器

curl -X DELETE   http://localhost:8083/connectors/mysql-connector

复制

8、修改连接器参数

curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/hr-connector/config -d '
{
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "ORCL",
"database.hostname" : "10.96.80.21",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "orcl",
"database.pdb.name":"PDB1",
"schema.include.list" : "hr",
"database.history.kafka.bootstrap.servers" : "10.96.80.31:9092",
"database.history.kafka.topic": "were.wrewrw"
}'

复制

kafak的常用命令

1、查看所有的topic

[kafka@node1 ~]$ kafka-topics.sh --list --zookeeper localhost:2181
DB01
DB01.HR.TEST
MYSQLDB
MYSQLDB.test.test
MYSQLDB.test.test2
MYSQLDB.test.test3
MYSQLDB.test.test4
__consumer_offsets
connect-configs
connect-offsets
connect-status
mysql.history
schema-changes.test

复制

2、查看topic详情

kafka-console-consumer.sh --bootstrap-server 10.96.80.31:9092 --topic MYSQLDB.test.test --from-beginning|jq

复制

3、查看topic副本信息

[kafka@node1 ~]$ kafka-topics.sh --zookeeper 10.96.80.31:2181  --topic MYSQLDB.test.test --describe 
Topic: MYSQLDB.test.test TopicId: kKZBdzP7SDmRghy9iIlCAg PartitionCount: 1 ReplicationFactor: 3 Configs:
Topic: MYSQLDB.test.test Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
[kafka@node1 ~]$

复制

分布式架构的kafka配置完成

关于connector 具体参数配置和使用,参考官方文档。
https://debezium.io/


文章转载自DBally,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论