前言概述
流处理应用程序通常是有状态的,通过保存已处理事件的信息,用于影响未来事件的处理。Flink中保存的事件信息,即状态,会被存储在已经配置的状态后端中。为避免应用程序故障时造成数据丢失,状态后端会定期将其快照持久化到预先配置的持久存储中。RocksDB状态后端(RocksDBStateBackend)是Flink三个内置状态后端之一。本文主要描述使用RocksDB管理Flink作业状态的好处、如何、何时使用它,并澄清一些常见的误区。尽管如此,本文并不负责解释RocksDB内部机制及如何进行故障排除和性能调优;有这方面需求的同学,可以访问Flink用户邮件列表获得帮助。
State in Flink
为了更好地理解Flink中的状态和状态后端,很重要的一点:区分in-flight state和state snapshots。in-flight state即是Flink作业正在处理中的状态;该类型的状态总是存储在内存中(有可能溢出到磁盘),在作业失败时可能会丢失但是不会影响作业的可恢复性。state snapshots(即checkpoints和savepoints)会存储在远程持久存储中,用于在作业失败时恢复本地状态。生产场景如何选择状态后端,取决于对可伸缩性、吞吐量和延迟的需求。
What is RocksDB ?
纠正一个常见的误区:认为RocksDB是一个需要在集群上运行并由专门管理员管理的分布式数据库,是错误的。RocksDB是一个用于快速存储的可嵌入持久化键值存储。它通过Java Native接口(JNI)与Flink进行交互。
下图描述RocksDB在Flink集群节点中的位置。以下各节将详细说明。
RocksDB inFlink
将RocksDB用于状态后端的话,要求部署 Flink的环境包括本机共享库:
$ jar -tvf lib/flink-dist_2.12-1.12.0.jar|grep librocksdbjni-linux64
8695334 Wed Nov 27 02:27:06 CET 2019librocksdbjni-linux64.so
复制
Flink作业运行时,RocksDB会被内嵌到TaskManager进程中。RocksDB以本地线程方式运行来读写本地文件。例如:配置了RocksDBStateBendback的Flink作业,32513是TaskManager进程ID。如下:
$ ps -T -p 32513 | grep -i rocksdb
32513 32633 ? 00:00:00 rocksdb:low0
32513 32634 ? 00:00:00 rocksdb:high0
复制
When to useRocksDBStateBackend
除了RocksDBStateBackend之外,Flink还有另外两个内置的状态后端:MemoryStateBackend和FsStateBackend。这两种状态后端都是基于JVM堆的。当前暂且抛开MemoryStateBackend,因为绝大多数场景,它只用于本地开发和调试,不用于生产。
使用RocksDBStateBendback时,运行中的状态首先写入堆外/本机内存,然后当达到配置的阈值时刷新到本地磁盘。这意味着RocksDBStateBendback可以支持大于总配置堆容量的状态,或者说其状态大小只受限于整个集群中的可用磁盘空间。另外,因为RocksDBStateBendback不使用JVM堆来存储运行中的状态,故它也不受JVM垃圾回收的影响,具有可预测的延迟。
除了完整的、自包含的状态快照之外,RocksDBStateBackend还支持作为性能调优选项的增量checkpoint。增量checkpoint仅存储上次checkpoint之后发生的改变。与执行完整快照相比,这大大减少了checkpoint的时间。RocksDBStateBendback是当前唯一支持增量checkpoint的状态后端。
如下场景,优选RocksDB:
作业的状态大小大于本地内存(如:跨度较长的窗口,较大的Keyed状态)
作业需要使用增量checkpoint,以减少checkpoint的时间
作业需要保证可预测的延迟,不受JVM垃圾回收的影响
如果Flink作业的状态很小或需要很低的延迟,则应该考虑FsStateBackend。根据经验,RocksDBStateBackend会比基于堆的状态后端慢几倍,因为它将键/值对存储为序列化字节。这意味着任何(读/写)状态的操作都需要经过一个跨JNI的反序列化/序列化的过程,这比直接使用堆上的状态更为昂贵。但是好处就是:对于相同数量的状态,与堆上状态相比,内存占用率更低。
How to use RocksDBStateBackend
RocksDB是完全嵌入到TaskManager进程中,并完全由TaskManager进程管理。RocksDBStateBackend可以在集群级别配置为整个集群的默认值,也可以在作业级别配置为单个作业的默认值。注意:作业级别的配置优先级高于集群级别的配置。
Cluster Level
增加如下配置到 conf/flink-conf.yaml:
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir:hdfs:///flink-checkpoints # location to store checkpoints
复制
Job Level
创建StreamExecutionEnvironment 后,增加如下代码:
# 'env' is the createdStreamExecutionEnvironment
# 'true' is to enable incrementalcheckpointing
env.setStateBackend(newRocksDBStateBackend("hdfs:///fink-checkpoints", true));
复制
注意:除了使用HDFS外,如果在FLINK_HOME/plugins下添加了相应的依赖项的话,也可以使用其他的本地或者基于云的对象存储。
Best Practices and Advanced Configuration
本文希望帮助用户更好地理解RocksDB在Flink中的作用,以及如何使用RocksDBStatebend成功地运行作业。最后,我们将探讨一些最佳实践和一些参考点,以便进一步进行故障排除和性能调优。
State Location in RocksDB
如前所述,RocksDBStatebend中的运行中状态会溢出到磁盘上的文件,这些文件位于Flink配置state.backend.rocksdb.localdir指定的目录下。因为磁盘性能直接影响RocksDB的性能,所以建议将此目录放在本地磁盘上。不建议将其配置到基于网络的远程位置,如NFS或HDFS,因为写入远程磁盘通常比较慢。高可用性也不是in-flight state(工作状态)的要求。如果需要高吞吐量,则首选本地SSD磁盘。
在状态快照期间,TaskManager会对工作中的状态进行快照并远程存储。将状态快照传输到远程存储完全由TaskManager自身处理,不需要状态后端的参与。所以,state.checkpoints.dir目录或者在代码中为特定作业设置的参数,可以是不同的位置,如本地HDFS或基于云的对象存储,如Amazon S3、Azure Blob Storage、Google cloud Storage、Alibaba OSS等。
Troubleshooting RocksDB
使用RocksDB时,可以查找名为LOG的RocksDB日志文件,以掌握RocksDB的运行信息。默认情况下,此日志文件与数据文件位于同一目录中,即Flink配置指定的目录state.backend.rocksdb.localdir。一旦启用,RocksDB统计信息也会记录在那里,以帮助诊断潜在的问题。有关更多信息,请查看RocksDB Wiki中的RocksDB故障排除指南。如果您对RocksDB行为信息感兴趣,可以考虑为您的Flink工作启用RocksDB metrics(会影响性能哦)。
注意:从Flink1.10开始,通过将日志级别设置为HEADER,RocksDB日志记录被有效地禁用。要启用它,参考
https://ververica.zendesk.com/hc/en-us/articles/360015933320-How-to-get-RocksDB-s-LOG-file-back-for-advanced-troubleshooting。
Tuning RocksDB
从flink1.10开始,Flink默认将RocksDB的内存大小配置为每个task slot的托管内存。调试内存性能的问题主要是通过调整配置项
taskmanager.memory.managed.size
或者 taskmanager.memory.managed.fraction
以增加Flink的托管内存(即堆外内存)。对于更细粒度的控制,应该首先通过设置 state.backend.rocksdb.memory.managed
为false,禁用自动内存管理,然后调整如下配置项:
state.backend.rocksdb.block.cache-size
(corresponding to block_cache_size in RocksDB),
state.backend.rocksdb.writebuffer.size
(corresponding to write_buffer_size in RocksDB),
state.backend.rocksdb.writebuffer.count
(corresponding to max_write_buffer_number in RocksDB)。
详细的可以参看
https://www.ververica.com/blog/manage-rocksdb-memory-size-apache-flink
https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
在RocksDB中写入或覆盖数据时,RocksDB线程在后台运行负责将数据从内存刷新到本地磁盘以及压缩数据。在多CPU核的机器上,通过调整配置项 state.backend.rocksdb.thread.num
可以增加后台用于刷新数据和压缩数据的并行度。对于生产环境来说,默认配置通常太小。如果需要频繁的从RocksDB读取内容,应该考虑启用bloom过滤器。
对于其他RocksDBStateBackend配置,请查看有关高级RocksDB状态后端选项的Flink文档。有关进一步的调优,请查看RocksDB Wiki中的RocksDB调优指南。
Conclusion
RocksDB状态后端(即RocksDBStateBackend)是Flink内置的三种状态后端之一,对于Flink作业配置是一个强有力的选择。它使Flink作业能够保存TB级别的状态,并保证exactly-once语义。如果你的Flink作业的状态太大导致无法放入JVM堆中、如果你对增量checkpoint感兴趣、如果你希望有可预测的延迟,那么你应该使用RocksDBStateBackend。
RocksDB以本地线程形式嵌入到TaskManager进程中,并且可以处理本地磁盘上的文件,因此支持RocksDBStateBackend,无需进一步设置和管理任何外部系统或进程。