暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

debezium_mysql-connect指定binlog

原创 Temi 2021-01-17
841

1.方法1,使用自带工具:

  1. 查看消费信息
    bin/kafka-console-consumer.sh --bootstrap-server 192.168.253.134:9092 --property print.key=true --topic connect-offsets --from-beginning
  2. 往topic生产消息
    bin/kafka-console-producer.sh --broker-list 192.168.253.135:9092 --topic connect-offsets --property parse.key=true

[“connector-mysql_new”,{“server”:“testadmin”}] {“ts_sec”:1606132362,“file”:“mysql-bin1og.000064”,“pos”:4,“gtids”:“5ee3587e-fa7b-11ea-8bef-000c296cd7b4:1-23655”,“row”:1,“server_id”:3306100,“event”:2}

2.方法2,使用python
1)python3版
from kafka import KafkaProducer
import json
from kafka.errors import KafkaError

def kafka_producer():
producer = KafkaProducer(
key_serializer=lambda k: json.dumps(k,separators=(’,’,’:’)).encode(‘utf-8’),
value_serializer=lambda v: json.dumps(v,separators=(’,’,’:’)).encode(‘utf-8’),
bootstrap_servers=[‘192.168.253.133:9092’,‘192.168.253.134:9092’,‘192.168.253.135:9092’]
)

data={
    "ts_sec":1605529559,
    "file":"mysql-bin1og.000041",
    "pos":259,
    "gtids":"5ee3587e-fa7b-11ea-8bef-000c296cd7b4:1-22650",
    "row":1,
    "server_id":3306100,
    "event":2}
keynm = ["connector-mysql_new",{"server":"testadmin"}]
try:
    producer.send('connect-offsets', key=keynm,value=data)
    producer.close()
except KafkaError as e:
    print(e)
复制

if name == “main”:
kafka_producer()

  1. python2 版本
    from kafka import KafkaProducer
    import json
    from kafka.errors import KafkaError

def kafka_producer():
producer = KafkaProducer(
key_serializer=lambda k: json.dumps(k,separators=(’,’,’:’)).encode(‘utf-8’),
value_serializer=lambda v: json.dumps(v,separators=(’,’,’:’)).encode(‘utf-8’),
bootstrap_servers=[‘192.168.253.133:9092’,‘192.168.253.134:9092’,‘192.168.253.135:9092’]
)

data={
    "ts_sec":1606132362,
    "file":"mysql-bin1og.000083",
    "pos":4,
    "gtids":"5ee3587e-fa7b-11ea-8bef-000c296cd7b4:1-23672",
    "row":1,
    "server_id":3306100,
    "event":2}
keynm = ["connector-mysql_new",{"server":"testadmin"}]
try:
    producer.send('connect-offsets', key=keynm,value=data)
    producer.close()
except KafkaError as e:
    print e
复制

if name == “main”:
kafka_producer()

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论