1、ProducerRecord 对象
ProducerRecord 对象:包含 目标主题,发送内容(必选:值,可选:键,分区)。
键对象、值对象均由生产者先序列化为字节数组,方可进行网络传输。
数据被传给分区器,根据是否指定分区,否则根据是否由键值而使用不同的分发规则,将消息发送到目标分区。
2、创建Kafka生产者。
创建 Kafka 生产者 3 个必选属性:
bootstrap.servers
该属性指定broker的地址清单,地址的格式为 host:port。从给定的broker里可找到其他broker的信息。因此,此属性值至少提供两个broker的信息。
key.serializer
此值必须设置为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会用这个类把键对象序列化成字节数组。
Kafka 客户端默认提供了 ByteArraySerializer、StringSerializer、IntegerSerializer。
value.serializer
与 key.serializer 一样,value.serializer 指定的类会将值序列化。
创建生产者示例:
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092" );
kafkaProps.put("key.serializer", "org.apache.kafka.common.serializer.StringSerializer" );
kafkaProps.put("value.serializer", "org.apache.kafka.common.serializer.StringSerializer" );
producer = new KafkaProducer<String, String>(kafkaProps);
发送消息的三种方式:
发送并忘记:发送消息给服务器,不关心是否到达。由自动重发次数,仍有丢失数据可能。
同步发送:使用send()发送消息,返回Future对象,调用get()方法进行等待,根据结果可知是否发送成功。
异步发送:调用send()方法发送消息,并指定一个回调函数,服务器在返回响应时调用该函数。
回调函数示例(需实现org.apache.kafka.clients.producer.Callback):
private class DemoProducerCallback
implements Callback {
@Override
publlic void onComletion(RecordMetadata
recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry",
"Biomedical Materials", "USA");
发送消息,传递一个回调对象:
producer.send(record, new DemoProducerCallback());
3、发送消息到Kafka
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry",
"Precision Products", "France");
try{
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
3.1、同步发送消息
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry",
"Precision products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
3.2、异步发送消息
需要一个实现 org.apache.kafka.clients.producer.Callback 接口的回调类:
private class DemoProducerCallback
implements Callback {
@Override
public void onCompletion(RecordMetadata
recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
3.4、生产者的配置
acks
这参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入时成功时成功的。
acks=0:生产者在成功写入消息之前不会等待任何来自服务器的响应。
acks=1:只要集群的首领节点收到消息,生产者就会收到一个服务器的成功相应。
acks=all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功相应。
buffer.memory
生产者内存缓冲区的大小,生产者用它缓冲要发送服务器的消息。
compression.type
默认情况下,消息发送时不压缩。此参数可设置为 snappy、gzip 或 lz4,它指定消息被发送给 broker 之前用哪一种压缩算法进行压缩。
retries
生产者可以因服务器临时性错误发生消息失败,为避免这种偶发性错误,retries 参数值决定了生产者可进行重发消息的次数。
重试发送消息的间隔参数 retry.backoff.ms,默认为 100ms,可修改。
batch.size
发送到同一个分区一个批次的消息可以使用内存的总大小,按字节计算。与linger.ms协同作用,KafkaProducer会在批次填满batch.size或linger.ms达到上限时把批次发送出去。
linger.ms
此参数指定了生产者在发送批次之前等待更多消息加入批次的时间。与batch.size协同作用,KafkaProducer会在批次填满batch.size或linger.ms达到上限时把批次发送出去。
client.id
任意字符串,识别消息的来源。
max.in.flight.requests.per.connection
指定生产者在收到服务器相应前可以发送多少个消息。
timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms
request.timeout.ms 指定了生产者在发送数据时等待服务器返回相应的时间。
metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。
若等待响应超时,生产者重发数据。
time.ms 指定broker等待同步副本返回确认的时间,与 acks 的配置协同作用 -- 指定时间内没有收到同步副本的确认,broker 就返回一个错误。
max.block.ms
指定调用 send() 方法或使用 partitionFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法发生阻塞,时间达到 max.block.ms 时,生产者抛出异常。
max.request.size
用于控制生产者发送的请求大小(一个或多个消息的总大小)。
receive.buffer.bytes 和 send.buffer.bytes
分别指定TCP socket 接收和发送数据包的缓冲区大小。
若被设置为 -1,则使用操作系统的默认值。
顺序保证:生产者按照一定的顺序发送消息,broke 按照这个顺序将消息写入分区,消费者也按照同样的顺序读取。
broker 可接受消息的最大值 message.max.bytes。
4、自定义序列化器 -- Auro(示例)
所有写入数据需要用到的 Schema 保存在注册表里;
记录引用 schema 的标识;
读取数据的应用程序使用标识从注册表里拉去 schema 来反序列化记录。
序列化器和反序列化器分别负责处理 schema 的注册和拉取。
“Scheme注册表”:例如使用开源的 Confluent Schema Registry。
5、分区
ProducerRecord 对象包含了目标主题、键(可为null)和值。
键的两个用途:消息的附加信息;决定消息写到哪个分区(相同键的消息写到同一个分区)。
键为null,且使用默认分区器,记录将被随机发送到主题内各个可用的分区上。若使用 轮询(Round Robin)算法将消息均衡地分布到各个分区上。
键非空,且使用默认分区器,Kafka 对键进 行散列(内部算法,升级Java,散列值不变,不受影响),后根据散列值把消息映射到特定的分区上。同一键值的消息总是映射到同一个分区。由于映射时使用了主题所有的分区,可能将消息写入不可用分区而发生错误。
主题分区数量不变的情况下,键与分区之间的映射保持不变。
使用键值映射分区,最好在创建分区时规划好。




