平台组逸凡同学提供完善的功能验证和压力测试,掌声鼓励👏
笔者在前面的文章中大篇幅地介绍过数据库同步的 CDC 方案以及 NiFi 实时数据流处理分发系统,感兴趣的读者可以抽空(立马)去阅读一下,必将功力大增。
今天,笔者将介绍一种场景,专注于将业务数据库的表数据准实时地抽取到 Kafka,然后根据各自的业务需求,基于 Flink 进行实时分析并通过可视化展示。
背景
截至目前,不少云厂商都提供了 PaaS 数据库,但是据以了解的都不愿意为用户提供免费的 CDC 常规方案,而是提供基于 binlog 的增量数据订阅功能服务,说白了,除了购买 PaaS 数据库外,想使用 binlog 等方式实时同步数据库数据,那就得额外准备一笔费用。
订阅原理
本篇文章以 MySQL 为例,其架构和实战方式具有普适性。
PaaS 数据库(MySQL)提供的数据订阅,通过模拟备库向主库获取对应 binlog 内容进行分析。通过解析 binlog,按照订阅通道配置的库表进行分析,对主库几乎无影响。架构图如下:
大部分云厂商提供的功能大同小异,不再细说,读者一看便知。
回归简单
初步想法
基于 binlog 方式的 CDC,不花 Q 肯定是无法使用了,我们只能变换思路。
前面读者提到 NiFi,其实是提前做了提示。
Apache NiFi 具有用于从关系数据库提取数据的处理器,比如 ExecuteSQL(ExecuteSQLRecord)、QueryDatabaseTable(QueryDatabaseTableRecord)和 GenerateTableFetch 等。
到这里,读者第一时间想到的应该是使用 NiFi 的 ExecuteSQL 和 PutDatabaseRecord 对 MySQL 等数据库数据进行抽取,并存入到 MySQL 等数据库中。
想,当然是没问题的,但是实战中就会存在一些问题。如果数据库的表数据量很大,比如千万或亿级,那么使用 ExecuteSQL 将所有的数据一次性抽取,是不是疯掉了,除了对 NiFi 节点内存占用很大,而且也很难实时获取增量数据,更不要说做大屏报表实时展示。
针对这种情况,读者可能会想到设置 ExecuteSQL 分批次抽取数据,但是对于大数据量来说效率太差,而且还要将数据包装为 Avro 格式,另外如果分批查询中间断了,要重新开始,想想就觉得坑在多了。
三思而行
一些读者可能已经想到了要使用分页查询。
在实际场景中,如果实时获取数据的 MySQL 表有自增主键,或者含有创建/更新时间字段,那么思路就很清晰了,即根据 ID 或创建/更新时间字段进行增量分页查询。
接下来将以实战的方式讲解如何使用 GenerateTableFetch 和 ExecuteSQLRecord 等 处理器来实现流式抽取数据库表的历史数据和最新增量数据,最终满足准实时分析需求的场景。
在本篇文章中,笔者将基于 NiFi + Kafka 来实现准实时同步数据库表数据,然后使用 Flink 消费 Kafka 数据并根据业务指标进行计算(本篇文章不涉及该部分内容),最终通过报表平台实时展示。
场景演练
架构
图中虚线部分自行考虑是否必要。
功能点
根据上述简单的架构图,笔者列一下涉及的数据流设计点:
根据设置数据库表的创建/更新时间字段属性,使用 NiFi GenerateTableFetch 处理器访问数据库备库,生成在表中执行分页查询的 SQL 语句 使用 NiFi ExecuteSQLRecord 处理器执行分页查询的 SQL,并将数据写入 Kafka 集群 Flink 消费 Kafka 数据进行计算,将结果实时写入目标数据库
NiFi 数据流编排
整体流程如上图所示,接下来笔者详细讲解。
GenerateTableFetch
GenerateTableFetch 使用指定的数据库连接生成包含 SQL 语句的 FlowFiles,这些 SQL 语句可用于从表中获取分页的数据。
GenerateTableFetch 执行对数据库的查询,以确定当前行数和最大值,如果指定了最大值列,则收集其最大值列的值大于 GenerateTableFetch 最后观察到的值的行数,允许增量获取新行,而不是每次生成 SQL 来获取整个表,为了更加深入理解,请接着看。
Database Connection Pooling Service 属性是配置数据库的连接,这是一个 Controller Services,EACHPLUS_EVENTS_DBCPPool 值的内容为:
EACHPLUS_EVENTS_DBCPPool 配置如下信息:
Database Connection URL Database Driver Class Name Database Driver Location(s) Database User Password
其他参数,根据自己实际情况进行调整。
我们再回到 GenerateTableFetch 处理器配置中,配置如下内容:
Database Connection Pooling Service
选择上面配置的 EACHPLUS_EVENTS_DBCPPool。
Database Type
选择 MySQL。
Table Name
Columns to Return
要在查询中使用的以逗号分隔的列名列表。根据需求,选择配置。
Maximum-value Columns
跟踪处理器开始运行以来返回的每个列的最大值。笔者选择 c_time 字段用于增量实时抽取数据。
Max Wait Time
允许运行 SQL select 查询的最大时间量,默认为 0,表示没有限制。
Partition Size
每个生成的 SQL 语句要获取的结果行数。表总行数除以分区大小给出生成的SQL语句(FlowFiles)的数量。值为 0 表示将生成一个 FlowFile,获取表中的所有行。
Column for Value Partitioning
值将用于分区列的名称。默认行为是使用结果集中的行号,使用偏移或限制策略将结果分区到要从数据库获取的分页中。只有当默认查询执行得不好、没有最大值列或只有一个最大值列(其类型可以强制为长整数(即不是日期或时间戳))且列值均匀分布而不是稀疏时,才应使用此属性。
Additional WHERE clause
在构建 SQL 查询时,要在 WHERE 条件中添加一个自定义子句。比如可以抽取指定时间以来的数据,忽略历史数据。
Custom ORDER BY Column
如果未提供 Maximum-value Columns 并且启用了分区,则用于排序结果的列名称。
Output Empty FlowFile on Zero Results
根据指定的属性,执行此处理器可能不会导致生成任何 SQL 语句。当此属性为 true 时,将生成一个空的 FlowFile 并将其转换到 success。如果此属性为 false,则不会生成任何输出 FlowFile。
执行 GenerateTableFetch
上面整个 NiFi 流程中,我们先把除 GenerateTableFetch 之外的其他处理器都停止,观察 GenerateTableFetch 生成的结果。
Queued 采用 Load Balance 方式:
查看 Queued 里面的数据:
为了方便演示,我们先来看一下 10 条数据(GenerateTableFetch 处理器每隔 15 秒运行一次,达到 10 条数据后,我们立即停止 GenerateTableFetch 运行):
Position 1:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:47:21.0' AND c_time <= '2020-06-25 12:47:36.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:47:36.0' AND c_time <= '2020-06-25 12:47:51.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:47:51.0' AND c_time <= '2020-06-25 12:48:06.0' ORDER BY c_time LIMIT 5000
Position 2:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:48:06.0' AND c_time <= '2020-06-25 12:48:21.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:48:21.0' AND c_time <= '2020-06-25 12:48:36.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:48:36.0' AND c_time <= '2020-06-25 12:48:51.0' ORDER BY c_time LIMIT 5000
Position 3:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:48:51.0' AND c_time <= '2020-06-25 12:49:06.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:49:06.0' AND c_time <= '2020-06-25 12:49:21.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:49:21.0' AND c_time <= '2020-06-25 12:49:43.0' ORDER BY c_time LIMIT 5000
Position 4:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:49:43.0' AND c_time <= '2020-06-25 12:49:58.0' ORDER BY c_time LIMIT 5000
复制
可以看到根据 c_time 日期字段增量抽取数据,实际使用中最好使用时间戳。
再来观察 GenerateTableFetch 保存的 State:
State 里面保存的 c_time 最新时间为 2020-06-25 12:49:58.0,也是 Position 4 最后的时间:
Position 4:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:49:43.0' AND c_time <= '2020-06-25 12:49:58.0' ORDER BY c_time LIMIT 5000
复制
当再次执行 GenerateTableFetch 处理器时,State 状态值会实时更新。
ExecuteSQLRecord
ExecuteSQLRecord 处理器可以根据 Record Writer 指定结果的格式。
配置如下参数:
Database Connection Pooling Service
同 GenerateTableFetch 配置。
Record Writer
JsonRecordSetWriter 内容为:
AvroSchemaRegistry_card_tablename 配置为:
其他参数自行配置或保持默认值
执行 ExecuteSQLRecord
单独执行 ExecuteSQLRecord 处理器,将从 GenerateTableFetch 的 Queued 中获取 SQL 查询语句进行执行,比如:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:47:21.0' AND c_time <= '2020-06-25 12:47:36.0' ORDER BY c_time LIMIT 5000
复制
ExecuteSQLRecord 执行完成后,查看输出到下游的 Queued:
点击 List queue 查看:
每一个 Position 返回的是一个包含多个 json 数据的 Array 格式,Array 中的每个 json 为数据库表中的一条数据,格式如下:
[{"id":1000111,"to_username":"xxx",....},{"id":1000222,"to_username":"xxx",....},...]
复制
Array 中 json 总数大小不会超过 GenerateTableFetch 中的 Partition Size。
细心的读者查看 List queue 会发现,有些 FlowFile 大小接近了 1 MB,如果大小超过 1MB,会导致无法将数据写入 Kafka topic 中。为了避免这种情况发生,可以设置 Partition Size,也可以拆分 ExecuteSQLRecord 结果为单个 json 格式。
SplitJson
SplitJson 处理器使用 JsonPath 表达式将 Array 元素切分为一个个 json 片段,即每个 json 数据对应数据库表的一条记录。
执行 SplitJson
点击 List queue 查看信息:
查看其中一条数据:
输出结果已经被切分为每一个 json 格式。
PublishKafka_2_0
这一块比较简单,笔者就简单梳理一下 NiFi 支持的 Kafka 特性:
Security Protocol
支持 PLAINTEXT、SSL、SASL_PLAINTEXT 和 SASL_SSL。
SASL Mechanism
支持 GSSAPI、PLAIN 和 SCRAM-SHA-256。
Delivery Guarantee
支持 Best Effort、Guarantee Single Node Delivery 和 Guarantee Replicated Delivery。
Use Transactions
支持 Transaction。
Partitioner class
支持 DefaultPartitioner(random 方式)、RoundRobinPartitioner 和 Expression Language Partitioner。
Compression Type
支持 none、gzip、snappy 和 lz4。
收个尾
对于一张需要实时抽取数据的表来说,如果包含的历史数据太大,可以一次性将历史数据导出来,然后使用 NiFi GenerateTableFetch 处理器从某一个时间点开始实时抽取数据。当然如果不想这么麻烦,其实也是可以使用 GenerateTableFetch 处理器分页慢慢抽取历史数据,直到追赶上最新数据,但是这不是最佳合理的方式。
在生产环境中,ExecuteSQLRecord、SplitJson 和 PublishKafka_2_0 处理器都可以设置在 NiFi 集群上分布式并发执行,只要数据库备库扛得住,一切都不是问题。
总结
在本篇文章中,笔者回归到最简单的方式,基于 NiFi 提供的多种处理器,实现了从数据库备库实时抽取增量数据的方案,希望对读者有所帮助。
