1.方法1,使用自带工具:
- 查看消费信息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.253.134:9092 --property print.key=true --topic connect-offsets --from-beginning - 往topic生产消息
bin/kafka-console-producer.sh --broker-list 192.168.253.135:9092 --topic connect-offsets --property parse.key=true
[“connector-mysql_new”,{“server”:“testadmin”}] {“ts_sec”:1606132362,“file”:“mysql-bin1og.000064”,“pos”:4,“gtids”:“5ee3587e-fa7b-11ea-8bef-000c296cd7b4:1-23655”,“row”:1,“server_id”:3306100,“event”:2}
2.方法2,使用python
1)python3版
from kafka import KafkaProducer
import json
from kafka.errors import KafkaError
def kafka_producer():
producer = KafkaProducer(
key_serializer=lambda k: json.dumps(k,separators=(’,’,’:’)).encode(‘utf-8’),
value_serializer=lambda v: json.dumps(v,separators=(’,’,’:’)).encode(‘utf-8’),
bootstrap_servers=[‘192.168.253.133:9092’,‘192.168.253.134:9092’,‘192.168.253.135:9092’]
)
data={ "ts_sec":1605529559, "file":"mysql-bin1og.000041", "pos":259, "gtids":"5ee3587e-fa7b-11ea-8bef-000c296cd7b4:1-22650", "row":1, "server_id":3306100, "event":2} keynm = ["connector-mysql_new",{"server":"testadmin"}] try: producer.send('connect-offsets', key=keynm,value=data) producer.close() except KafkaError as e: print(e)
复制
if name == “main”:
kafka_producer()
- python2 版本
from kafka import KafkaProducer
import json
from kafka.errors import KafkaError
def kafka_producer():
producer = KafkaProducer(
key_serializer=lambda k: json.dumps(k,separators=(’,’,’:’)).encode(‘utf-8’),
value_serializer=lambda v: json.dumps(v,separators=(’,’,’:’)).encode(‘utf-8’),
bootstrap_servers=[‘192.168.253.133:9092’,‘192.168.253.134:9092’,‘192.168.253.135:9092’]
)
data={ "ts_sec":1606132362, "file":"mysql-bin1og.000083", "pos":4, "gtids":"5ee3587e-fa7b-11ea-8bef-000c296cd7b4:1-23672", "row":1, "server_id":3306100, "event":2} keynm = ["connector-mysql_new",{"server":"testadmin"}] try: producer.send('connect-offsets', key=keynm,value=data) producer.close() except KafkaError as e: print e
复制
if name == “main”:
kafka_producer()