暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SeaTunnel 支持 CDC 写入 ClickHouse 了!

SeaTunnel 2023-02-08
2927

点击蓝字 关注我们

作者 | 王海林,Apache SeaTunnel PPMC



    前言

目前,SeaTunnel 已支持数据库变更数据捕获(CDC https://github.com/apache/incubator-seatunnel/issues/3175),以将数据更改实时传送到下游系统,SeaTunnel 将捕获到的数据更改分为以下 4 种类型:
  • INSERT
    • 数据插入
  • UPDATE_BEFORE
    • 数据更改前的旧值
  • UPDATE_AFTER
    • 数据更改后的新值
  • DELETE
    • 数据删除
要处理上述数据更改操作需要 Sink 连接器的写入行为支持,本文将介绍 ClickHouse Sink 连接器如何支持上述 CDC 类型的数据更改写入。
对于 CDC 场景,主键是必要条件,所以首先要支持基于主键的 INSERT、UPDATE、DELETE 等通用需求,并且保证写入的顺序与 CDC 事件顺序一致。此外,考虑到实际的场景中数据来源的复杂性还需要支持 UPSERT 写入。最后,还需要根据 ClickHouse 自身的特性做相应优化处理,例如 UPDATE、DELETE 在 ClickHouse 中是重量级操作应该根据对应表引擎的特性做优化处理。



整体设计

当前 ClickHouse Sink Connector 基于 Jdbc Driver 实现,可以通过设计一组 Jdbc 执行器来封装对不同类型数据的处理,方便根据实际的场景切换或组合实现以及封装实现细节。
JdbcBatchStatementExecutor
是执行器的顶层接口。
public interface JdbcBatchStatementExecutor extends AutoCloseable {

    void prepareStatements(Connection connection) throws SQLException;

    void addToBatch(SeaTunnelRow record) throws SQLException;

    void executeBatch() throws SQLException;

    void closeStatements() throws SQLException;

    @Override
    default void close() throws SQLException {
        closeStatements();
    }
}
JdbcBatchStatementExecutor
具有以下实现类:
SimpleBatchStatementExecutor 实现简单 SQL Batch 执行逻辑
InsertOrUpdateBatchStatementExecutor 实现 INSERT、UPDATE 更新,也支持 UPSERT 模式
ReduceBufferedBatchStatementExecutor 内存攒批,刷新到数据库时根据数据更改类型(INSERT、UPDATE、DELETE)分发到具体的执行 Executor

1

未指定主键的情况处理


当前 CDC 处理中,主键是必要条件,如果配置 Sink Connector 未指定主键列则使用 append-only 模式写入,直接调用 SimpleBatchStatementExecutor

2

实现 CDC 数据处理


我们将数据处理的执行逻辑层次做如下划分,不同的数据类型进入到对应的 Executor,最终转化为各自的 SQL 语句去执行,并且在此过程中使用 Jdbc Batch 攒批提交。
CDC Event
\
\
\
\
DELETE Executor INSERT OR UPDATE Executor
\
\
\
\
INSERT Executor UPDATE Executor

3

写入保持 CDC 数据的顺序


CDC 事件是有序的,处理写入必须按照事件发生的顺序处理,否则可能出现数据不一致。
在上一步逻辑中针对不同类型的数据分发到各自的 Executor 处理,并且使用 Jdbc Batch 攒批提交以提高写入性能,但是分类攒批会导致提交的顺序与 CDC 事件顺序不一致。
我们可以通过添加执行屏障标记,当处理到的数据行与前一行数据的类型相同时可进入 batch 中攒批,当处理到的数据行与前一行数据的类型不同时先刷新前面攒批的 batch 到数据库,保证数据写入顺序与 CDC 事件顺序严格一致。
Example for InsertOrUpdateBatchStatementExecutor
public class InsertOrUpdateBatchStatementExecutor implements JdbcBatchStatementExecutor {
    @Override
    public void addToBatch(SeaTunnelRow record) throws SQLException {
        boolean currentChangeFlag = hasInsert(record);
        if (currentChangeFlag) {
            if (preChangeFlag != null && !preChangeFlag) {
                updateStatement.executeBatch();
                updateStatement.clearBatch();
            }
            valueRowConverter.toExternal(record, insertStatement);
            insertStatement.addBatch();
        } else {
            if (preChangeFlag != null && preChangeFlag) {
                insertStatement.executeBatch();
                insertStatement.clearBatch();
            }
            valueRowConverter.toExternal(record, updateStatement);
            updateStatement.addBatch();
        }
        preChangeFlag = currentChangeFlag;
        submitted = false;
    }
    
    @Override
    public void executeBatch() throws SQLException {
        if (preChangeFlag != null) {
            if (preChangeFlag) {
                insertStatement.executeBatch();
                insertStatement.clearBatch();
            } else {
                updateStatement.executeBatch();
                updateStatement.clearBatch();
            }
        }
        submitted = true;
    }
}
当然,这会显著的降低批处理的速度,所以我们使用 ReduceBufferedBatchStatementExecutor
增加一层内存缓冲,在执行批处理提交时再做分发提交到数据库的动作。
Example for ReduceBufferedBatchStatementExecutor
public class ReduceBufferedBatchStatementExecutor implements JdbcBatchStatementExecutor {
    private final LinkedHashMap<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> buffer = new LinkedHashMap<>();
    
    @Override
    public void addToBatch(SeaTunnelRow record) throws SQLException {
        buffer.put(record, ...);
    }
    
    @Override
    public void executeBatch() throws SQLException {
        Boolean preChangeFlag = null;
        Set<Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>>> entrySet = buffer.entrySet();
        for (Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> entry : entrySet) {
            Boolean currentChangeFlag = entry.getValue().getKey();
            if (currentChangeFlag) {
                if (preChangeFlag != null && !preChangeFlag) {
                    deleteExecutor.executeBatch();
                }
                insertOrUpdateExecutor.addToBatch(entry.getValue().getValue());
            } else {
                if (preChangeFlag != null && preChangeFlag) {
                    insertOrUpdateExecutor.executeBatch();
                }
                deleteExecutor.addToBatch(entry.getKey());
            }
            preChangeFlag = currentChangeFlag;
        }
    
        if (preChangeFlag != null) {
            if (preChangeFlag) {
                insertOrUpdateExecutor.executeBatch();
            } else {
                deleteExecutor.executeBatch();
            }
        }
        buffer.clear();
    }
}

4

实现通用的 UPSERT 写入


InsertOrUpdateBatchStatementExecutor
中可以配置开启 UPSERT,开启后处理 INSERT 或 UPDATE 类型数据时会先使用主键查询数据行是否已存在再决定使用 INSERT 或 UPDATE SQL 进行写入。
注意:此配置为可选项,会降低写入的速度,仅当某些特殊场景需要时打开
Example for InsertOrUpdateBatchStatementExecutor
public class InsertOrUpdateBatchStatementExecutor implements JdbcBatchStatementExecutor {
    @Override
    public void addToBatch(SeaTunnelRow record) throws SQLException {
        boolean currentChangeFlag = hasInsert(record);
      ...
    }

    private boolean hasInsert(SeaTunnelRow record) throws SQLException {
        if (upsertMode()) {
            return !exist(keyExtractor.apply(record));
        }
        switch (record.getRowKind()) {
            case INSERT:
                return true;
            case UPDATE_AFTER:
                return false;
            default:
                throw new UnsupportedOperationException();
        }
    }
    
    private boolean exist(SeaTunnelRow pk) throws SQLException {
        keyRowConverter.toExternal(pk, existStatement);
        try (ResultSet resultSet = existStatement.executeQuery()) {
            return resultSet.next();
        }
    }
}

5

针对ReplacingMergeTree
表引擎优化UPSERT


ReplacingMergeTree
表引擎可以配置 ORDER BY
字段,并且在执行 INSERT INTO 语句时覆盖 ORDER BY
字段相同的记录,我们也可以利用这个特性实现 UPSERT。
当用户写入 ReplacingMergeTree
表引擎并且表的 ORDER BY
字段与 Sink Connector 中配置的主键字段相同时,将 INSERT/UPDATE_AFTER 两种数据类型都处理为 INSERT 来实现 UPSERT。

6

针对MergeTree
表引擎的优化更新


DELETE、UPDATE 在 ClickHouse 是重量级的操作,但是针对 MergeTree
引擎有实验性的轻量级删除(https://clickhouse.com/docs/en/sql-reference/statements/delete),它比重量级的删除性能更好,我们可以让用户配置使用轻量级的删除。
当用户写入 MergeTree
表引擎并打开轻量级删除之后,我们将 DELETE/UPDATE_BEFORE 两种数据类型都处理为轻量级删除,然后将 INSERT/UPDATE_AFTER 两种数据类型都处理为 INSERT,这样就避免了 UPDATE 操作并且使用到轻量级删除。



相关PR


  • https://github.com/apache/incubator-seatunnel/pull/3653
我们欢迎您贡献完善相关的功能,如果您有任何问题,欢迎在 SeaTunnel GitHub(https://www.github.com/apache/incubator-seatunnel) 上提出 issue,我们会尽快回复。



    参考


  • https://clickhouse.com/docs/en/sql-reference/statements/delete
  • https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree

Apache SeaTunnel


Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

仓库地址: 
https://github.com/apache/incubator-seatunnel

网址:
https://seatunnel.apache.org/

Proposal:
https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelPro

Apache SeaTunnel(Incubating)  下载地址:
https://seatunnel.apache.org/download
 
衷心欢迎更多人加入!

我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:
https://github.com/apache/incubator-seatunnel/issues

贡献代码:
https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org

开发邮件列表:
dev@seatunnel.apache.org

加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1cmonqu2q-ljomD6bY1PQ~oOzfbxxXWQ

关注 Twitter: 
https://twitter.com/ASFSeaTunnel

往期推荐




技术详解 | SeaTunnel WAL 机制实现内存数据持久化存储




如何使用 SeaTunnel 同步 MySQL 数据到 Hive




从 0 到 1 快速入门 Apache SeaTunnel ,新一代数据集成平台的原理和实践



新年刷好运,点个赞吧!

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论