暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

太强了! Flink CDC 3.0 支持表结构变更自动同步,再也不用挨个建表了↗

大数据技能圈 2024-10-10
35
Flink CDC 是基于数据库日志 CDC(Change Data Capture)技术的实时数据集成框架,配合 Flink 优秀的管道能力和丰富的上下游生态,Flink CDC 可以高效实现海量数据的实时集成。

01

Flink CDC 演进历程

2020年7月,Flink CDC作为一个源于个人兴趣的项目,迎来了其首个代码提交,标志着实时数据集成新篇章的开启。这使得用户能够仅通过创建一个简单的Flink SQL任务来实现CDC(Change Data Capture)数据的同步、处理与分析。然而,在这一初期阶段,Flink CDC依赖于锁定机制以确保数据的一致性,同时面临无法水平扩展的局限。为了解决这些问题,Flink CDC借鉴了DBLog论文中的设计理念,成功实现了无需锁定即可并发读取的全量与增量数据同步功能,从而完成了从1.0版本到2.0版本的重大升级。
Flink CDC 2.0获得了用户的广泛认可,但在实际应用中也逐渐显现出一些需要改进之处。主要问题包括:当上游数据库表结构发生变化(如添加或删除列)时,需要手动调整Flink作业的SQL定义;在进行整个数据库的同步时,必须为每个表单独创建一个作业,导致连接数过多及计算资源的大量消耗。得益于社区用户与开发者们的共同努力,Flink CDC在2023年12月推出了3.0版本,该版本实现了强大的端到端全量与增量同步能力、自动同步表结构变更、整库同步以及分库分表同步等高级功能,有效缓解了用户的使用难题。

这或许是一个对你有用的开源项目data-warehouse-learning 项目是一套基于 MySQL + Kafka + Hadoop + Hive + Dolphinscheduler + Doris + Seatunnel + Paimon + Hudi + Iceberg + Flink + Dinky + DataRT + SuperSet 实现的实时离线数仓(数据湖)系统,以大家最熟悉的电商业务为切入点,详细讲述并实现了数据产生、同步、数据建模、数仓(数据湖)建设、数据服务、BI报表展示等数据全链路处理流程。

https://gitee.com/wzylzjtn/data-warehouse-learning

https://github.com/Mrkuhuo/data-warehouse-learning

https://bigdatacircle.top/

项目演示:

02

Flink CDC 3.0 的架构设计

Flink CDC 3.0的核心特性包含以下几点:

  1. 端到端的数据集成能力,用户仅需配置一个YAML文件即可迅速搭建起数据导入数据湖或数据仓库的任务。

  2. 提供全面的数据同步功能,能够自动从全量读取无缝过渡到增量数据同步,并且能够自动将上游表结构的任何变更反映到下游。

  3. 单个作业实例即可支持对多个表的读取和写入操作,减少了对数据库连接的需求;在增量读取过程中,系统会自动关闭未使用的读取器,从而节约计算资源。


Flink CDC 3.0的整体架构由上至下可以分为四个层次:

  1. Flink CDC API:这是面向最终用户的API层,允许用户通过YAML格式配置数据同步流程,并利用Flink CDC CLI工具提交任务。

  2. Flink CDC Connect:作为连接外部系统的连接器层,它通过封装Flink及其现有的CDC数据源,实现了与外部系统之间的数据读取和写入同步。

  3. Flink CDC Composer:此为同步任务的构建层,负责将用户定义的同步任务转换成Flink DataStream作业。

  4. Flink CDC Runtime:这是运行时层,针对不同的数据同步场景,对Flink算子进行了高度定制化设计,以实现诸如模式变更、数据路由、数据转换等高级功能。


03

Flink CDC 3.0 的核心实现

01

数据抽象

Event是Flink CDC 3.0内部用于数据处理和传输的数据结构接口,其功能与Flink SQL中的RowData接口类似。目前,Event的所有实现如下图所示。

3.1.1 ChangeEvent

ChangeEvent接口用于表示发生在特定表上的变更事件,主要分为两类实现:一是数据变更事件,由DataChangeEvent类表示;二是表结构变更事件,这类事件实现了SchemaChangeEvent接口。其中,DataChangeEvent类存储了完整的数据变更详情,包括变更前后(before和after)每条记录的字段值。而SchemaChangeEvent则具体涵盖了增加列、删除列、修改列类型等多种实现。
Flink CDC将表结构变更信息作为事件流的一部分进行处理,这种方式避免了在数据变更事件中直接存储类型信息。任何需要从DataChangeEvent读取数据的节点都会依据SchemaChangeEvent来维护最新的表结构信息。此外,Flink CDC还特别设计了自己的序列化机制,每条记录均以二进制形式存储于Flink的MemorySegment中。通过这样的底层结构优化,显著提升了数据在各节点间传输的效率。
3.1.2 FlushEvent
FlushEvent是一种包含数据刷写控制逻辑的特殊事件。在表结构变更事件发生后,可能会出现旧表结构的数据还未完全处理完毕的情况,导致数据链路中同时存在两种不同表结构的数据。由于大多数数据库不支持在同一事务批次中混合处理不同版本的数据格式,因此在处理新版数据前,必须先确保所有旧版数据已完成刷写。FlushEvent的作用正是为了隔离这两种数据,在Sink端接收到FlushEvent时,它会触发将之前缓存的所有数据立即刷写到目标存储中。

02

算子编排

Flink CDC依据数据集成的具体场景,对Flink DataStream的算子链路进行了深度定制,当前设计的数据处理流程如下图所示:

下面对这些模块的具体实现做进一步的介绍。

3.2.1 Source
Source模块负责生成将在链路中传递的变更事件。Flink CDC 2.0已经具备了强大的全增量同步能力和并发读取功能,能够产生包含各种变更事件信息的SourceRecord对象。在此基础上,只需实现一个自定义的DebeziumDeserializationSchema转换器,用于将SourceRecord解析为前述的各种表变更事件,即可实现Flink CDC 3.0数据源的接入。
首次启动时,Source模块需首先获取表结构信息,并生成CreateTableEvent发送给下游节点,以此确保下游节点能够正确解析DataChangeEvent。
    public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) {
    String removedColName = parser.parseName(ctx.uid());
    changes.add(new DropColumnEvent(currentTable, Collections.singletonList(removedColName)));
    super.enterAlterByDropColumn(ctx);
    }
    复制
    3.2.2 Transform
      transform: 
      - source-table: db.tbl1
      projection: id, age, weight, height, weight (height * height) as bmi
      filter: age > 18 AND name IS NOT NULL
      复制
      3.2.3 Schema
      在表结构变更事件发生后,Schema模块负责暂停上游数据的进一步发送,直至所有旧版本格式的数据完成刷写。这一过程通过FlushEvent来协调,鉴于下游可能包含多个Sink节点,需要借助运行在JobManager上的一个OperatorCoordinator——即SchemaRegistry来进行管理。
      具体而言,处理表结构变更的流程如下图所示:

      在程序启动之初,所有Sink Operator都会向SchemaRegistry注册,SchemaRegistry会记录下Writer的数量。当从Source接收到SchemaChangeEvent时,SchemaOperator会向SchemaRegistry发送一个包含此次表结构变更信息的SchemaChangeRequest,使SchemaRegistry能够缓存这个SchemaChangeEvent。随后,SchemaOperator会下发一个FlushEvent给所有Sink Operator,促使这些Sink Operator将数据刷写到外部系统,并向SchemaRegistry报告FlushSuccessEvent。SchemaRegistry据此统计已响应的Writer数量。接着,SchemaOperator会下发SchemaChangeEvent给所有Sink Operator,指示它们更新相关表的序列化器。之后,SchemaOperator会向SchemaRegistry发送一个ReleaseUpstreamRequest,并自我阻塞,停止处理任何变更事件,直至收到SchemaRegistry的确认回复。一旦SchemaRegistry接收到所有Sink Operator的FlushSuccessEvent,即表明所有Sink Operator已完成数据刷写,它将开始应用第2步中接收的SchemaChangeEvent到外部系统,并回应第4步中的ReleaseUpstreamRequest。如此一来,SchemaOperator便能继续传递新的数据变更事件及表结构变更事件。

      3.2.4 Route

      Route模块提供了表名映射的功能,通过为源表中的每条数据指定目标表,利用一对一或多对一的映射配置,能够实现整个数据库的同步以及简单的分库分表同步。该模块基于Flink的RichMapFunction实现,允许使用source-table参数指定一个正则表达式规则,将所有匹配该规则的表名映射到由sink-table参数指定的目标表名。RouteFunction的关键代码如下所示:

        public Event map(Event event) throws Exception {
        ChangeEvent changeEvent = (ChangeEvent) event;
        TableId tableId = changeEvent.tableId();


        for (Tuple2<Selectors, TableId> route : routes) {
        Selectors selectors = route.f0;
        TableId replaceBy = route.f1;
        if (selectors.isMatch(tableId)) {
        return recreateChangeEvent(changeEvent, replaceBy);
        }
        }
        return event;
        }
        复制

        3.2.5 Partition

        在数据同步的场景中,数据的生成和消费速度往往不一致,用户希望通过增加Sink的并发数量来提升数据处理的速度。Partition模块负责将事件分发至不同的Sink中。
        在Partition阶段,数据变更事件依据表名和主键作为哈希键进行分配,确保了同一张表内具有相同主键的数据在分发过程中不会出现顺序混乱的问题。哈希键的计算方法如下所示:
          public Integer apply(DataChangeEvent event) {
          List<Object> objectsToHash = new ArrayList<>();
          Table ID
          TableId tableId = event.tableId();
          Optional.ofNullable(tableId.getNamespace()).ifPresent(objectsToHash::add);
          Optional.ofNullable(tableId.getSchemaName()).ifPresent(objectsToHash::add);
          objectsToHash.add(tableId.getTableName());


          / Primary key
          RecordData data =
          event.op().equals(OperationType.DELETE) ? event.before() : event.after();
          for (RecordData.FieldGetter primaryKeyGetter : primaryKeyGetters) {
          objectsToHash.add(primaryKeyGetter.getFieldOrNull(data));
          }


          // Calculate hash
          return (Objects.hash(objectsToHash.toArray()) * 31) & 0x7FFFFFFF;
          }
          复制

          同时,由于Sink模块需要维护表结构信息,因此对于表结构变更事件,必须将其广播到每一个并发实例中。而对于控制数据刷写的FlushEvent,同样需要广播到下游的每一个通道中。

          其代码实现如下:

            public void processElement(StreamRecord<Event> element) throws Exception {
            Event event = element.getValue();
            if (event instanceof SchemaChangeEvent) {
            // Update hash function
            TableId tableId = ((SchemaChangeEvent) event).tableId();
            cachedHashFunctions.put(tableId, recreateHashFunction(tableId));
            // Broadcast SchemaChangeEvent
            broadcastEvent(event);
            } else if (event instanceof FlushEvent) {
            // Broadcast FlushEvent
            broadcastEvent(event);
            } else if (event instanceof DataChangeEvent) {
            // Partition DataChangeEvent by table ID and primary keys
            partitionBy(((DataChangeEvent) event));
            }
            }
            复制

            3.2.6 Sink

            在Sink模块中,需要将数据写入外部系统,并将表结构变更应用到外部系统中。Flink CDC的DataSink API通过提供EventSinkProvider和MetaDataApplier接口来实现这两个功能。

            EventSinkProvider用于将表数据变更应用到外部系统中,要求提供一个基于Flink SinkFunction或Flink Sink API的实现,并且能够支持向多个表写入数据。以Flink Sink API为例,SinkWriter需要从DataChangeEvent中提取变更数据,并将其写入相应的表中。遇到SchemaChangeEvent时,SinkWriter会更新其内存中存储的表结构信息。当接收到FlushEvent时,Sink Operator会调用SinkWriter的flush方法,将缓存的数据写入外部系统。
            MetaDataApplier则负责将表结构变更应用到外部系统中。在SchemaRegistry接收到所有Sink算子已完成处理FlushEvent的通知后,SchemaRegistry会调用MetaDataApplier的applySchemaChange方法来应用表结构变更事件。考虑到任务可能重启的情况,MetaDataApplier需要支持对表结构变更事件的幂等处理,确保变更操作即使重复执行也能产生相同的结果。

            04

            代码获取

            https://gitee.com/wzylzjtn/data-warehouse-learning

            https://github.com/Mrkuhuo/data-warehouse-learning

            05

            文档获取

            06

            进交流群群添加作者

            推荐阅读系列文章

            文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论