暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

摆脱云数据库限制:一种行之有效的非常规 CDC

DataFlow范式 2021-11-30
636

平台组逸凡同学提供完善的功能验证和压力测试,掌声鼓励👏

笔者在前面的文章中大篇幅地介绍过数据库同步的 CDC 方案以及 NiFi 实时数据流处理分发系统,感兴趣的读者可以抽空(立马)去阅读一下,必将功力大增。

今天,笔者将介绍一种场景,专注于将业务数据库的表数据准实时地抽取到 Kafka,然后根据各自的业务需求,基于 Flink 进行实时分析并通过可视化展示。

背景

截至目前,不少云厂商都提供了 PaaS 数据库,但是据以了解的都不愿意为用户提供免费的 CDC 常规方案,而是提供基于 binlog 的增量数据订阅功能服务,说白了,除了购买 PaaS 数据库外,想使用 binlog 等方式实时同步数据库数据,那就得额外准备一笔费用。

订阅原理

本篇文章以 MySQL 为例,其架构和实战方式具有普适性。

PaaS 数据库(MySQL)提供的数据订阅,通过模拟备库向主库获取对应 binlog 内容进行分析。通过解析 binlog,按照订阅通道配置的库表进行分析,对主库几乎无影响。架构图如下:

大部分云厂商提供的功能大同小异,不再细说,读者一看便知。

回归简单

初步想法

基于 binlog 方式的 CDC,不花 Q 肯定是无法使用了,我们只能变换思路。

前面读者提到 NiFi,其实是提前做了提示。

Apache NiFi 具有用于从关系数据库提取数据的处理器,比如 ExecuteSQL(ExecuteSQLRecord)、QueryDatabaseTable(QueryDatabaseTableRecord)和 GenerateTableFetch 等。

到这里,读者第一时间想到的应该是使用 NiFi 的 ExecuteSQL 和 PutDatabaseRecord 对 MySQL 等数据库数据进行抽取,并存入到 MySQL 等数据库中。

想,当然是没问题的,但是实战中就会存在一些问题。如果数据库的表数据量很大,比如千万或亿级,那么使用 ExecuteSQL 将所有的数据一次性抽取,是不是疯掉了,除了对 NiFi 节点内存占用很大,而且也很难实时获取增量数据,更不要说做大屏报表实时展示。

针对这种情况,读者可能会想到设置 ExecuteSQL 分批次抽取数据,但是对于大数据量来说效率太差,而且还要将数据包装为 Avro 格式,另外如果分批查询中间断了,要重新开始,想想就觉得坑在多了。

三思而行

一些读者可能已经想到了要使用分页查询。

在实际场景中,如果实时获取数据的 MySQL 表有自增主键,或者含有创建/更新时间字段,那么思路就很清晰了,即根据 ID 或创建/更新时间字段进行增量分页查询。

接下来将以实战的方式讲解如何使用 GenerateTableFetch 和 ExecuteSQLRecord 等 处理器来实现流式抽取数据库表的历史数据和最新增量数据,最终满足准实时分析需求的场景。

在本篇文章中,笔者将基于 NiFi + Kafka 来实现准实时同步数据库表数据,然后使用 Flink 消费 Kafka 数据并根据业务指标进行计算(本篇文章不涉及该部分内容),最终通过报表平台实时展示。

场景演练

架构

图中虚线部分自行考虑是否必要。

功能点

根据上述简单的架构图,笔者列一下涉及的数据流设计点:

  • 根据设置数据库表的创建/更新时间字段属性,使用 NiFi GenerateTableFetch 处理器访问数据库备库,生成在表中执行分页查询的 SQL 语句
  • 使用 NiFi ExecuteSQLRecord 处理器执行分页查询的 SQL,并将数据写入 Kafka 集群
  • Flink 消费 Kafka 数据进行计算,将结果实时写入目标数据库

NiFi 数据流编排

整体流程如上图所示,接下来笔者详细讲解。

GenerateTableFetch

GenerateTableFetch 使用指定的数据库连接生成包含 SQL 语句的 FlowFiles,这些 SQL 语句可用于从表中获取分页的数据。

GenerateTableFetch 执行对数据库的查询,以确定当前行数和最大值,如果指定了最大值列,则收集其最大值列的值大于 GenerateTableFetch 最后观察到的值的行数,允许增量获取新行,而不是每次生成 SQL 来获取整个表,为了更加深入理解,请接着看。

Database Connection Pooling Service 属性是配置数据库的连接,这是一个 Controller Services,EACHPLUS_EVENTS_DBCPPool 值的内容为:

EACHPLUS_EVENTS_DBCPPool 配置如下信息:

  • Database Connection URL
  • Database Driver Class Name
  • Database Driver Location(s)
  • Database User
  • Password

其他参数,根据自己实际情况进行调整。

我们再回到 GenerateTableFetch 处理器配置中,配置如下内容:

  • Database Connection Pooling Service

    选择上面配置的 EACHPLUS_EVENTS_DBCPPool。

  • Database Type

    选择 MySQL。

  • Table Name

  • Columns to Return

    要在查询中使用的以逗号分隔的列名列表。根据需求,选择配置。

  • Maximum-value Columns

    跟踪处理器开始运行以来返回的每个列的最大值。笔者选择 c_time 字段用于增量实时抽取数据。

  • Max Wait Time

    允许运行 SQL select 查询的最大时间量,默认为 0,表示没有限制。

  • Partition Size

    每个生成的 SQL 语句要获取的结果行数。表总行数除以分区大小给出生成的SQL语句(FlowFiles)的数量。值为 0 表示将生成一个 FlowFile,获取表中的所有行。

  • Column for Value Partitioning

    值将用于分区列的名称。默认行为是使用结果集中的行号,使用偏移或限制策略将结果分区到要从数据库获取的分页中。只有当默认查询执行得不好、没有最大值列或只有一个最大值列(其类型可以强制为长整数(即不是日期或时间戳))且列值均匀分布而不是稀疏时,才应使用此属性。

  • Additional WHERE clause

    在构建 SQL 查询时,要在 WHERE 条件中添加一个自定义子句。比如可以抽取指定时间以来的数据,忽略历史数据。

  • Custom ORDER BY Column

    如果未提供 Maximum-value Columns 并且启用了分区,则用于排序结果的列名称。

  • Output Empty FlowFile on Zero Results

    根据指定的属性,执行此处理器可能不会导致生成任何 SQL 语句。当此属性为 true 时,将生成一个空的 FlowFile 并将其转换到 success。如果此属性为 false,则不会生成任何输出 FlowFile。

执行 GenerateTableFetch

上面整个 NiFi 流程中,我们先把除 GenerateTableFetch 之外的其他处理器都停止,观察 GenerateTableFetch 生成的结果。

Queued 采用 Load Balance 方式:

查看 Queued 里面的数据:

为了方便演示,我们先来看一下 10 条数据(GenerateTableFetch 处理器每隔 15 秒运行一次,达到 10 条数据后,我们立即停止 GenerateTableFetch 运行):

Position 1:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:47:21.0' AND c_time <= '2020-06-25 12:47:36.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:47:36.0' AND c_time <= '2020-06-25 12:47:51.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:47:51.0' AND c_time <= '2020-06-25 12:48:06.0' ORDER BY c_time LIMIT 5000


Position 2:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:48:06.0' AND c_time <= '2020-06-25 12:48:21.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:48:21.0' AND c_time <= '2020-06-25 12:48:36.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:48:36.0' AND c_time <= '2020-06-25 12:48:51.0' ORDER BY c_time LIMIT 5000


Position 3:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:48:51.0' AND c_time <= '2020-06-25 12:49:06.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:49:06.0' AND c_time <= '2020-06-25 12:49:21.0' ORDER BY c_time LIMIT 5000
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:49:21.0' AND c_time <= '2020-06-25 12:49:43.0' ORDER BY c_time LIMIT 5000


Position 4:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:49:43.0' AND c_time <= '2020-06-25 12:49:58.0' ORDER BY c_time LIMIT 5000
复制

可以看到根据 c_time 日期字段增量抽取数据,实际使用中最好使用时间戳。

再来观察 GenerateTableFetch 保存的 State:

State 里面保存的 c_time 最新时间为 2020-06-25 12:49:58.0,也是 Position 4 最后的时间:

Position 4:
SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:49:43.0' AND c_time <= '2020-06-25 12:49:58.0' ORDER BY c_time LIMIT 5000
复制

当再次执行 GenerateTableFetch 处理器时,State 状态值会实时更新。

ExecuteSQLRecord

ExecuteSQLRecord 处理器可以根据 Record Writer 指定结果的格式。

配置如下参数:

  • Database Connection Pooling Service

    同 GenerateTableFetch 配置。

  • Record Writer

    JsonRecordSetWriter 内容为:

    AvroSchemaRegistry_card_tablename 配置为:

  • 其他参数自行配置或保持默认值

执行 ExecuteSQLRecord

单独执行 ExecuteSQLRecord 处理器,将从 GenerateTableFetch 的 Queued 中获取 SQL 查询语句进行执行,比如:

SELECT * FROM xxx_event_v2 WHERE c_time > '2020-06-25 12:47:21.0' AND c_time <= '2020-06-25 12:47:36.0' ORDER BY c_time LIMIT 5000
复制

ExecuteSQLRecord 执行完成后,查看输出到下游的 Queued:

点击 List queue 查看:

每一个 Position 返回的是一个包含多个 json 数据的 Array 格式,Array 中的每个 json 为数据库表中的一条数据,格式如下:

[{"id":1000111,"to_username":"xxx",....},{"id":1000222,"to_username":"xxx",....},...]
复制

Array 中 json 总数大小不会超过 GenerateTableFetch 中的 Partition Size。

细心的读者查看 List queue 会发现,有些 FlowFile 大小接近了 1 MB,如果大小超过 1MB,会导致无法将数据写入 Kafka topic 中。为了避免这种情况发生,可以设置 Partition Size,也可以拆分 ExecuteSQLRecord 结果为单个 json 格式。

SplitJson

SplitJson 处理器使用 JsonPath 表达式将 Array 元素切分为一个个 json 片段,即每个  json 数据对应数据库表的一条记录。

执行 SplitJson

点击 List queue 查看信息:

查看其中一条数据:

输出结果已经被切分为每一个 json 格式。

PublishKafka_2_0

这一块比较简单,笔者就简单梳理一下 NiFi 支持的 Kafka 特性:

  • Security Protocol

    支持 PLAINTEXT、SSL、SASL_PLAINTEXT 和 SASL_SSL。

  • SASL Mechanism

    支持 GSSAPI、PLAIN 和 SCRAM-SHA-256。

  • Delivery Guarantee

    支持 Best Effort、Guarantee Single Node Delivery 和 Guarantee Replicated Delivery。

  • Use Transactions

    支持 Transaction。

  • Partitioner class

    支持 DefaultPartitioner(random 方式)、RoundRobinPartitioner 和 Expression Language Partitioner。

  • Compression Type

    支持 none、gzip、snappy 和 lz4。

收个尾

对于一张需要实时抽取数据的表来说,如果包含的历史数据太大,可以一次性将历史数据导出来,然后使用 NiFi GenerateTableFetch 处理器从某一个时间点开始实时抽取数据。当然如果不想这么麻烦,其实也是可以使用 GenerateTableFetch 处理器分页慢慢抽取历史数据,直到追赶上最新数据,但是这不是最佳合理的方式。

在生产环境中,ExecuteSQLRecord、SplitJson 和 PublishKafka_2_0 处理器都可以设置在 NiFi 集群上分布式并发执行,只要数据库备库扛得住,一切都不是问题。

总结

在本篇文章中,笔者回归到最简单的方式,基于 NiFi 提供的多种处理器,实现了从数据库备库实时抽取增量数据的方案,希望对读者有所帮助。

              你若喜欢,点个在看
文章转载自DataFlow范式,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论