暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【eLakehouse】Apache Kafka 分层存储(Tiered Storage)技术解析

原创 手机用户0271 2023-11-26
530

前言

消息中间件可以用来实现不同平台之间的消息传递,客户端通常基于订阅机制获取最新数据,历史数据的价值较低,因此消息中间件中通过设置消息的留存时间进行数据的清理,同时为了提高消息的吞吐量,使用内存或者SSD来存储刚写入的热数据。

但是目前几乎所有的消息中间件都以打造分布式消息流平台为目标,其中一个关键能力是持久化存储消息流,这就需要将历史数据持久化存储而不是设置留存周期进行自动删除,综上分析,分层存储的能力对于消息中间件的架构演进至关重要。

分层存储是一种数据存储方法或系统,由两种或更多存储介质类型组成,如Tape、硬盘、SSD(固态硬盘)、内存等,存储数据的介质类型的选用由成本、数据可用性及数据可恢复性等要求来决定。

图片

使用分层存储是为了实现成本管理和存储空间效率,主要思想是让处理器需要的数据存储在更快的存储层中,而将不活跃的数据文件存放到速度慢、廉价且容量大的存储设备中

Apache Kafka开源的版本中分层存储架构如下图所示:

图片

Page Cache是操作系统实现的一种磁盘缓存,将磁盘中的数据缓存到内存中,当进程读取磁盘文件内容时会先读取Page Cache,如果存在则直接返回数据,没有命中则操作系统向磁盘发起读取请示并将读取的数据页写入到缓存中,之后再将数据返回到进程。

SSD相较于HDD在带宽上有数量级别的提升,其使用可以带来性能上的提升,有两种方式:使用SSD替代HDD;将其作为PageCache与HDD直接的缓存,解决PageCache出现竞争后承接部分流量。以上存储介质上的数据到了留存时间后会自动清除,如果需要持久化存储可以Offload到第三方存储系统中,例如AWS S3、Hadoop等。

但是目前社区代码中还不支持第三方存储。当前业界比较常用的消息中间件,Pulsar和Pravega底层使用Bookkeeper,原生支持将数据OffLoad到云数据库、HDFS等,RocketMQ和Kafka数据的分层存储均在开发中,估计在今年都会发布开源实现。

快速入门

Kafka的分层存储在Confluent平台中已经支持但是未开源,社区在持续开发中,功能进展情况可以参考:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-405

Kafka已经提供了Remote Storage的核心代码,用户可以扩展相应的接口来将数据移动到第三方存储系统中。下面介绍github上实现的将数据offload的hdfs的持久化存储的开源项目。

1、编译安装

$ git clone -b 2.8.x-tiered-storage https://github.com/satishd/kafka.git $ cd kafka $ ./gradlew releaseTarGz
复制

编译成功后可以在core/build/distributions目录下找到安装包,将安装包复制到安装主机中进行解压。

2、完成配置

分层存储依赖HDFS,其安装过程在这里不再介绍。在server.properties中增加分层存储配置,配置示例如下所示:

remote.log.storage.system.enable=trueremote.log.storage.manager.class.name=org.apache.kafka.rsm.hdfs.HDFSRemoteStorageManagerremote.log.storage.manager.class.path=/opt/satish/kafka_2.13-2.8.1-SNAPSHOT/external/hdfs/libs/*:/opt/hadoop-3.3.1/etc/hadoop/remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerremote.log.metadata.manager.class.path=/opt/satish/kafka_2.13-2.8.1-SNAPSHOT/libs/kafka-storage-2.8.1-SNAPSHOT.jarremote.log.storage.hdfs.keytab.path=/opt/satish/kafka_2.13-2.8.1-SNAPSHOT/config/hdfs.headless.keytab
remote.log.metadata.topic.num.partitions=1rremote.log.metadata.topic.replication.factor=1retention.ms=259200000remote.log.storage.manager.impl.prefix=remote.log.storage.remote.log.storage.hdfs.base.dir=kafka-remote-storageremote.log.storage.hdfs.user=hdfs-hctest@BCHKDC
复制


3、启动命令

$cd /opt/satish/kafka_2.13-2.8.1-SNAPSHOT $bin/kafka-server-start -daemon config/server.properties
复制

4、执行测试

•创建topic

sh kafka-topics.sh --bootstrap-server localhost:9092 --topic tieriedstorage --replication-factor 1 --partitions 5 --create --config local.retention.ms=60000 --config segment.bytes=1048576 --config remote.storage.enable=true
复制

核心参数为remote.storage.enable及retention.ms,当日志文件超过保留时间会自动将数据复制到HDFS中。

•写入数据

sh kafka-producer-perf-test.sh --topic tieriedstorage --num-records 100000 --throughput 100 --record-size 10240 --producer.config test-producer.properties
复制

•查看server日志,可以看到数据移动过程,日志信息如下:

Created a new task: class kafka.log.remote.RemoteLogManager$RLMTask[TopicIdPartition{topicId=eRjo9HPJS92E7_RC5D5G0A, topicPartition=tieredstorage-0}] and getting scheduled (kafka.log.remote.RemoteLogManager).....
Copying /tmp/satish/kafka-logs/tieriedstorage/00000000000000022321.log.toPath to remote storage. (kafka.log.remote.RemoteLogManager$RLMTask) Copying timeIndex: /tmp/satish/kafka-logs/tieriedstorage/00000000000000022321.timeindex to remote storage. (kafka.log.remote.RemoteLogManager$RLMTask).....
复制

这时可以查看到HDFS中的日志数据,其目录在

remote.log.storage.hdfs.base.dir=kafka-remote-storage
复制

如下图所示:

图片

这里每一个文件都是LogSegment,具体细节在下面章节进行介绍。

核心流程分析

持久化存储的设计核心逻辑是将冷热数据自动迁移到对应的存储设备上并提供透明的访问方式,流程的核心如下:

•元数据及数据的持久化存储•远程存储数据与原生存储之间的映射关系•提供统一的对外访问入口,对客户端透明•分层存储数据流自动迁移

Kafka的分层架构设计,可见KIP-405,系统架构图下所示:

图片

在示例中的实现是在ReplicaManager中增加RemoteLogManager(RLM)模块进行远端存储的管理,底层实现细节如下图所示:

图片

1、核心组件,RLM运行后根据分区的Leader变化、保留周期等要素触发分区数据的管理,RLM的实现分为两部分:

•RemoteStorageManager(RSM),远程LogSegment和索引文件的周期管理•RemoteLogMetadataManager(RLMM),提供LogSegment的元数据的管理,用于支持数据的强一致性

2、任务机制,RLM在启动后会启动以下任务:

•RLMTask,分区的Leader副本所在的节点会确定RLMTask进行日志的管理,当遇到阈值条件是将LogSegment移动到远端存储•AsyncReadTask,客户端发起读请求,基于Offset在本地LogSegment找不到数据时(OffsetOutOfRangeException),将向远程存储发起读请求,具体通过该Task来具体获取,核心逻辑是创建CacheInputStream,读取后将封装成MemoryRecords后发送给客户端

3、底层文件的封装,在HDFS上LogSegment的所有文件,包括log、index、leaderEpoch等均存放在同一个文件中,结构如下:

图片

在文件头(25字节)中保存了各个文件在文件中的位置,具体长度根据下一个文件的起始位置进行计算,文件类型包括:

public enum FileType {  OFFSET_INDEX((byte) 0),  TIMESTAMP_INDEX((byte) 1),  LEADER_EPOCH_CHECKPOINT((byte) 2),  PRODUCER_SNAPSHOT((byte) 3),  TRANSACTION_INDEX((byte) 4),  SEGMENT((byte) 5);}
复制

客户端在读取时首先可根据RemoteLogSegment的文件头获取Index文件的位置,然后获取Offset对应Record在文件中位置,最后再获取数据。

实现细节可见HDFSRemoteStorageManager,其中获取数据的核心代码如下:

private InputStream fetchData(RemoteLogSegmentMetadata metadata,   LogSegmentDataHeader.FileType fileType,   int startPosition,   int endPosition) throws RemoteStorageException { try {    Path dataFilePath = new Path(getSegmentRemoteDir(metadata.remoteLogSegmentId()));    return new CachedInputStream(dataFilePath, fileType, startPosition,endPosition); } .....}
复制

4、消息读取流程,引入远程存储后读取流程基本上相同,当出现OffsetOutOfRange异常后再从远程读取,即创建AsyncReadTask,核心代码如下:

 case e: OffsetOutOfRangeException =>  if (remoteLogManager.isDefined && log != null && !log.config.compact &&    log.rlmEnabled && log.config.remoteStorageEnable) {    val leaderLogStartOffset = log.logStartOffset    val leaderLogEndOffset = log.logEndOffset     ......    val lastStableOffset = Some(log.lastStableOffset)    val fetchDataInfo = {       FetchDataInfo(LogOffsetMetadata(fetchInfo.fetchOffset), MemoryRecords.EMPTY,          delayedRemoteStorageFetch = Some(       RemoteStorageFetchInfo(          adjustedMaxBytes,           minOneMessage,           tp,           fetchInfo,           fetchIsolation)))    } .......
复制

启动AsyncReadTask的逻辑,获取数据并调用responseCallback,将其发送给客户端,调用逻辑在ReplicaManager#fetchMessage中实现。

logReadResults.foreach { case (topicPartition, logReadResult) =>  if (remoteFetchInfo.isEmpty && logReadResult.info.delayedRemoteStorageFetch.isDefined)     remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch  ......} val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,   fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) if (remoteFetchInfo.isDefined) {   val key = new TopicPartitionOperationKey(remoteFetchInfo.get.topicPartition.topic(), remoteFetchInfo.get.topicPartition.partition())   val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]   var remoteFetchTask: RemoteLogManager#AsyncReadTask  = null   try {     remoteFetchTask = remoteLogManager.get.asyncRead(remoteFetchInfo.get, (result: RemoteLogReadResult) => {       remoteFetchResult.complete(result)       delayedRemoteFetchPurgatory.checkAndComplete(key)  })}
复制


总结

持久化存储能力对于分布式流平台来说是不可缺少的一环,也许是由于Confluent公司在商业方面的考虑,目前社区对于这块功能的开发进度还是比较慢的。但对于该功能的发布还是值得期待,当具备持久化存储能力后,可以省去很多数据流转的工作,一些相对简单的实时处理能力也可以通过Kafka Stream来实现,简化大数据平台的总体框架。



参考链接

•https://cwiki.apache.org/confluence/display/KAFKA/KIP-405

•https://github.com/satishd/kafka

•https://memark.io/index.php/category/tech_blogs/pafka/

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论