暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka权威指南学习总结三--生产者

释福 2021-08-30
267


1、ProducerRecord 对象

    ProducerRecord 对象:包含 目标主题,发送内容(必选:值,可选:键,分区)。

    

    键对象、值对象均由生产者先序列化为字节数组,方可进行网络传输。

    数据被传给分区器,根据是否指定分区,否则根据是否由键值而使用不同的分发规则,将消息发送到目标分区。


2、创建Kafka生产者。

    创建 Kafka 生产者 3 个必选属性:

bootstrap.servers

        该属性指定broker的地址清单,地址的格式为 host:port。从给定的broker里可找到其他broker的信息。因此,此属性值至少提供两个broker的信息。

key.serializer

    此值必须设置为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会用这个类把键对象序列化成字节数组。

    Kafka 客户端默认提供了 ByteArraySerializer、StringSerializer、IntegerSerializer。


value.serializer

    与 key.serializer 一样,value.serializer 指定的类会将值序列化。


创建生产者示例:

    private Properties kafkaProps = new Properties();

    kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092" );

    kafkaProps.put("key.serializer", "org.apache.kafka.common.serializer.StringSerializer" );

kafkaProps.put("value.serializer", "org.apache.kafka.common.serializer.StringSerializer" );


producer = new KafkaProducer<String, String>(kafkaProps);


发送消息的三种方式:

    发送并忘记:发送消息给服务器,不关心是否到达。由自动重发次数,仍有丢失数据可能。

    同步发送:使用send()发送消息,返回Future对象,调用get()方法进行等待,根据结果可知是否发送成功。

    异步发送:调用send()方法发送消息,并指定一个回调函数,服务器在返回响应时调用该函数。


回调函数示例(需实现org.apache.kafka.clients.producer.Callback):

private class DemoProducerCallback 

implements Callback {

    @Override

    publlic void onComletion(RecordMetadata 

recordMetadata, Exception e) {

        if (e != null) {

            e.printStackTrace();

        }

    }   

}


ProducerRecord<String, String> record = 

new ProducerRecord<>("CustomerCountry", 

"Biomedical Materials", "USA");


发送消息,传递一个回调对象:

producer.send(record, new DemoProducerCallback());


3、发送消息到Kafka

    ProducerRecord<String, String> record = 

new ProducerRecord<>("CustomerCountry", 

"Precision Products", "France");

try{

    producer.send(record);

} catch (Exception e) {

    e.printStackTrace();

}


3.1、同步发送消息

ProducerRecord<String, String> record = 

new ProducerRecord<>("CustomerCountry", 

"Precision products", "France");

try {

    producer.send(record).get();

} catch (Exception e) {

    e.printStackTrace();

}


3.2、异步发送消息

    需要一个实现 org.apache.kafka.clients.producer.Callback 接口的回调类:

private class DemoProducerCallback 

implements Callback {

    @Override

    public void onCompletion(RecordMetadata 

recordMetadata, Exception e) {

        if (e != null) {

            e.printStackTrace();

        }

    }

}


3.4、生产者的配置

  1. acks

    这参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入时成功时成功的。

    acks=0:生产者在成功写入消息之前不会等待任何来自服务器的响应。

    acks=1:只要集群的首领节点收到消息,生产者就会收到一个服务器的成功相应。

    acks=all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功相应。

  2. buffer.memory

    生产者内存缓冲区的大小,生产者用它缓冲要发送服务器的消息。

  3. compression.type

    默认情况下,消息发送时不压缩。此参数可设置为 snappy、gzip 或 lz4,它指定消息被发送给 broker 之前用哪一种压缩算法进行压缩。

  4. retries

    生产者可以因服务器临时性错误发生消息失败,为避免这种偶发性错误,retries 参数值决定了生产者可进行重发消息的次数。

    重试发送消息的间隔参数 retry.backoff.ms,默认为 100ms,可修改。

  5. batch.size

    发送到同一个分区一个批次的消息可以使用内存的总大小,按字节计算。与linger.ms协同作用,KafkaProducer会在批次填满batch.size或linger.ms达到上限时把批次发送出去。

  6. linger.ms

    此参数指定了生产者在发送批次之前等待更多消息加入批次的时间。与batch.size协同作用,KafkaProducer会在批次填满batch.sizelinger.ms达到上限时把批次发送出去。

  7. client.id

    任意字符串,识别消息的来源。

  8. max.in.flight.requests.per.connection

    指定生产者在收到服务器相应前可以发送多少个消息。

  9. timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms

    request.timeout.ms 指定了生产者在发送数据时等待服务器返回相应的时间。

    metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。

    若等待响应超时,生产者重发数据。

    time.ms 指定broker等待同步副本返回确认的时间,与 acks 的配置协同作用 -- 指定时间内没有收到同步副本的确认,broker 就返回一个错误。

  10. max.block.ms

    指定调用 send() 方法或使用 partitionFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法发生阻塞,时间达到 max.block.ms 时,生产者抛出异常。

  11. max.request.size

    用于控制生产者发送的请求大小(一个或多个消息的总大小)。

  12. broker 可接受消息的最大值 message.max.bytes。

  13. receive.buffer.bytes 和 send.buffer.bytes

    分别指定TCP socket 接收和发送数据包的缓冲区大小。

    若被设置为 -1,则使用操作系统的默认值。


    顺序保证:生产者按照一定的顺序发送消息,broke 按照这个顺序将消息写入分区,消费者也按照同样的顺序读取。


4、自定义序列化器 -- Auro(示例)

    所有写入数据需要用到的 Schema 保存在注册表里;

    记录引用 schema 的标识;

    读取数据的应用程序使用标识从注册表里拉去 schema 来反序列化记录。


    序列化器和反序列化器分别负责处理 schema 的注册和拉取。  


    “Scheme注册表”:例如使用开源的 Confluent Schema Registry。

    

5、分区

    ProducerRecord 对象包含了目标主题、键(可为null)和值。

    键的两个用途:消息的附加信息;决定消息写到哪个分区(相同键的消息写到同一个分区)。

    键为null,且使用默认分区器,记录将被随机发送到主题内各个可用的分区上。若使用 轮询(Round Robin)算法将消息均衡地分布到各个分区上。

    键非空,且使用默认分区器,Kafka 对键进 行散列(内部算法,升级Java,散列值不变,不受影响),后根据散列值把消息映射到特定的分区上。同一键值的消息总是映射到同一个分区。由于映射时使用了主题所有的分区,可能将消息写入不可用分区而发生错误。

    主题分区数量不变的情况下,键与分区之间的映射保持不变。

    使用键值映射分区,最好在创建分区时规划好。

文章转载自释福,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论