暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kafka#(一)基本认知

小怂读书 2021-12-02
337

《Kafka权威指南》整理。

Kafka,基于发布与订阅的消息系统,被称为“分布式提交日志”或“分布式流平台”。Kafka的数据具有有序、持久、按需读取、分布式存储、故障保护、可伸缩等特点。


1、消息系统

消息,Kafka的数据单元被称为消息,可以将消息看成DB中的一个数据行(一条记录);一组消息称为批次,消息分批次写入Kafka。对于Kafka来说,消息就是字节数组,这种字节数组不方便人来看,于是出现了消息模式(Schema),可读性好的结构。有了模式就要考虑传输过程中的序列化和反序列化问题。Kafka通过主题进行消息的分类,主题好比数据库中的表,表中存放着一类数据。主题可以被分为若干个分区(Partition),消息以追加的方式写入分区,然后以FIFO的顺序读取。我们没有办法保证整个主题范围内的消息有序,但可以保证分区内有序。分区可以分布在不同的机器(Broker)上。这个分区的机制也是Kafka实现数据冗余和伸缩性的实现。

1)生产者和消费者

Kafka的客户端包括两类:生产者(发布者、写入者,创建消息)和消费者(订阅者、读者,读取消息)。程序A向topicA中生产数据,是生产者;程序B订阅topicA消费数据,是消费者。A和B可以为一个程序,即该程序既是生产者也是消费者。

消费者订阅一个或多个主题进行消费,并通过检查消息偏移量(一个元数据,不断递增的int值)来区分是否已经读取过消息,消费者把每个分区最后读取的消息偏移量存到zk上。会有多个消费者共同读取一个主题,这部分消费者成为一个消费组

2)集群

一个独立的机器(Kafka服务器)成为broker,生产者和消费者都是Client。broker接收来自生产者的消息,设置偏移量并提交磁盘保存。多台机器(broker)构成集群。集群中都有一个broker扮演集群控制器的角色(分布式系统中的主节点Leader角色)。一个主题有多个分区,多个分区也有主分区副本分区之分。

2、Why Kafka?

  • 支持多个生产者,适合多个数据源的数据采集和对外提供的场景

  • 支持多个消费者,不同消费组的消费者互不影响;同一消费组每个消息只处理一次

  • 基于磁盘的数据存储,消息提交到磁盘,根据设置的规则进行保留,允许消费者非实时地进行读取;每个Topic可设置独立保留规则

  • 灵活伸缩的系统 -> 动态扩展broker

  • 横向扩展消费者、生产者和broker,kafka可以处理海量消息流,保证毫秒级延迟


3、使用场景

  • 用户trace,跟踪用户交互信息

  • 消息传递,程序发送通知等

  • 用于手机应用程序的度量指标和系统日志

  • 提交日志,Kafka来源于提交日志,将DB的更新操作发布到Kafka中,应用程序监听更新

  • 流处理


4、使用

  • Kafka可以运行在Win、Mac、Linux多种操作系统,Linux最常见。

  • zk和kafka的运行依赖Java,安装jdk8

  • zk保存Kafka元数据(参考zk安装:Zookeeper#ZAB、使用场景),不建议zk节点超过7个,过多会降低整个群组性能

  • 安装kafka broker

tar -zxf kafka_2.11-0.9.0.1.tgz
# mv kafka_2.11-0.9.0.1 /usr/local/kafka
# mkdir /tmp/kafka-logs
# export JAVA_HOME=/usr/java/jdk1.8.0_51
# /usr/local/kafka/bin/kafka-server-start.sh -daemon
/usr/local/kafka/config/server.properties
#
  • broker配置

1)broker.id 标识符,默认是0.这个值要保证整个Kafka集群中唯一

2)port 启动Kafka后,默认监听9092端口,port参数可修改此端口

3)zookeeper.connect 保存broker元数据的地址通过此配置指定

4)log.dirs Kafka把所有消息都保存在磁盘上,存放这些日志片段的目录通过此配置执行。如果指定了多个路径(逗号分割),broker会根据“最少使用(Broker会往拥有最少分区数的路径新增分区,而不是最小磁盘空间的路径 -> 新增的路径肯定保存新创建的分区)”原则,将同一分区的日志片段保存同一路径下。

  • topic默认配置

1)num.partitions 指定新创建的主题包含多少分区。默认启用,默认1。


5、生产者(写入消息->Kafka)

1)消息发送的过程如下所示。

  • 创建ProducerRecord对象,包含目标主题和要发送的内容

  • 发送内容序列化成字节数组

  • Producer可以指定发送的分区,如果不指定,Kafka分区器会根据键来选择一个分区;确定分区后,该记录添加到批次(分区同一个)中,独立线程发送这一批次到相应broker的分区上

  • 服务器收到消息会返回一个响应(主题、分区信息和偏移量);写入失败会返回错误,生产者可重试

2)Kafka生产者属性

  • Bootstrap.servers 服务器地址,指定broker的地址清单,host:port格式。不需要指定所有的,生产者会从给定的找到其他的broker。但至少两个,容灾。

  • key.serializer|value.serializer broker希望接收的消息是kv字节数组,这两个属性配置了kv的序列化方式(兼容性问题、版本问题等不建议使用自定义的序列化器,已有JSON、Avro、Thrift、Protobuf等

3)消息发送的三种模式

  • 啥都不管(fire-and-forget):不关心是否成功,发完就OK

  • 同步发送:等待确定消息成功才干别的

  • 异步发送:发送后干别的,有个回调通知是否真的成功

4)生产者配置

  • acks。执行必须要有N个分区副本收到消息,生产者才会认为消息写入成功。ack=0,生产者不等服务器响应;ack=1,集群主节点收到消息返回响应即可;ack=all所有副本都收到消息返回响应才算成功。

  • buffer.memory。设置生产者内缓冲区的大小,该缓冲区用于缓冲发送到服务器的消息。

  • retries。该配置指定生产者可以重发消息的次数,达到这个次数生产者会停止重试返回错误。

  • batch.size。多个消息发送同一个分区时会放到同一个批次中,这个配置规定了批次的内存大小

  • max.in.flight.requests.per.connection。指定生产者在收到Kafka服务器响应前可以发送多少消息。如果设置为1,可保证消息是按照发送顺序写入服务器的。(也就是说每条消息必须等到响应后才算成功,才能发下一条

顺序保证?

Kafka可以保证同一个分区中的消息是有序的。如果生产者按照一定顺序发送消息,broker会按照这个顺序写入分区,消费者按照同样顺序读。将max.in.flight..connection设置为1,保证生产者发送第一批消息时不会有其他消息发给broker。

5)分区

Kafka的消息是一个键值对,ProducerRecord对象中键可以为null,键有两个用途:

  • 消息的附加信息

  • 决定消息被写到主题的哪个分区(没有指定分区时用到),如果为null并没有指定分区,分区器使用轮询(Round Robin)将消息均衡分布到各个分区上


6、消费者(读取消息<-kafka)

消费速度跟不上生产的速度会怎样?消息积压。

这个时候需要对消费者进行横向的扩展,使用多个消费者从同一个主题读取消息,对消息进行分流。Kafka消费者从属于消费者群组,消费组订阅同一个主题消息,每个消费者接收主题一部分分区的消息,一条消息在消费组中只消费一次

消费者和主题分区的关系如下所示:

不能多个消费者消费同一个分区!!

1)分区再均衡(Rebalance)

一个新的消费者加入群组时,它读取什么内容?原来大家分区都分好了,各个消费者读自己的分区。当一个消费者关闭或者崩溃离开时,它的分区给谁?管理员增加新的分区时,新的分区谁来读
分区的所有权从一个消费者转移到另一个消费者的过程称为“再均衡”。再均衡机制为消费者群组带来高可用和伸缩性,但正常情况下并不希望这种情况发生,再均衡期间消费者无法读取消息,消费组不可用;分区被分给另一个消费者时,消费者当前读取状态丢失
消费者向群组协调器的broker发送心跳来维持它们和群组的从属关系以及对分区的所有权,发送心跳的时机在轮询消息或提交偏移时顺带发送。如果超过一定时间没有发送,协调器就认为这个消费者下线了。协调器会等待几秒确认它崩溃后执行再均衡。这几秒内,这个消费者不会读取分区的消息,尽量保证消息不会丢失

2)分区分配过程

  • 消费者A想要入群,向群组协调器(broker)发送JoinGroup申请

  • A是第一个入群的,作为群主

  • 群主从协调器那获取所有的群成员列表,给它们分配分区(实现PartitionAssignor接口的类来进行分区分配)

  • 群主把分配情况反馈给协调器,协调器将结果发送给其他消费者

每个消费者只能看到自己的分配信息,群主知道所有的分配信息,这个过程再分配时重复发生

分区会被分配给消费组里的消费者,PartitionAssignor根据给定的消费者和主题进行分配,有2个默认的分配策略:

  • Range:将主题若干连续的分区分配给消费者。这种方式类似于除法,主题有4个分区,2个消费者,则每人2个;如果3个分区,则2:1分。弊端,如果多个主题都是3个分区,那么第一个消费者分区特别多...

  • RoundRobin:分区逐个分配消费者,类似轮询的方式。

3)轮询

消息轮询是消费者API的核心,通过简单的Round Robin机制向服务器请求数据。轮询会处理所有的操作:群组协调、再均衡、发送心跳、获取数据。这是一个无限循环,消费者实际上是一个长期运行的程序。

如何退出

如果要退出循环,需要通过另一个线程调用consumer.wakeup()方法;如果循环运行在主线程中,可以在ShutdownHook里调用这个方法,这个方法是消费者唯一一个可以从其他线程安全调用的方法。退出线程前调用consumer.close()会提交任何没有提交的东西,向群组协调器发送离开群组的消息,触发再均衡。

轮询不只是简单的数据获取,在第一次调用poll()方法时,轮询会去查找GroupCoordinator(协调者),加入群组,接受分配的分区;轮询时候也会发送心跳。
4)提交和偏移量

消费者用poll()方法从服务器上拉取数据,获取的是还没被消费过的记录,消费者可以使用Kafka来追踪消息在分区的位置(偏移量),更新分区当前位置的操作叫做提交

消费者如何提交偏移量

消费者向_consumer_offset的特殊主题中发送消息,消息包含每个分区的偏移量。如果消费者一直处于运行状态,偏移量实际是没有什么作用的。当消费者发生崩溃或者有新的消费者加入群组,触发再均衡,再均衡之后每个消费者可能会分配新的分区而不是之前拥有的,这个时候,消费者就要读取负责分区的最后一次提交的偏移量,从偏移量的地方继续处理

提交的偏移量和实际客户端处理的偏移量不一致,还有问题。

  • 提交的 < 实际的

少的那块会被重复处理。

  • 提交的 > 实际的

多的那块消息会丢失。

所以这个处理偏移量的方式对于客户端的消息处理影响很大。

① 自动提交。最简单的方式,让消费者自动提交偏移量。enable.auto.commit=true,每过5s(auto.commit.interval.ms),消费者自动把poll()方法接收到的最大偏移量提交上去。重复消息不可避免

② 手动提交。开发者在必要时提交当前偏移量,而不是基于时间。使用commitSync()提交偏移量,简单可靠。commitSync()提交由poll()返回的最新偏移量,所以处理完所有记录后,要记得调用这个方法提交!!!
③ 异步提交。手动提交不足之处是在
broker做出回应前,程序会一直阻塞。commitAsync()只管发提交请求,不等结果。commitSync遇到错误会重试,Async不会重试,这也是Async不好的地方:

如果200的偏移量提交时网络故障没有发出,之后300的提交了,得到了回应;然后重发200的时候也成功了,这样导致200-300之间的数据会重复消费

文章转载自小怂读书,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论