点击蓝字 关注我们

前言
INSERT 数据插入 UPDATE_BEFORE 数据更改前的旧值 UPDATE_AFTER 数据更改后的新值 DELETE 数据删除
整体设计
JdbcBatchStatementExecutor是执行器的顶层接口。
public interface JdbcBatchStatementExecutor extends AutoCloseable {
void prepareStatements(Connection connection) throws SQLException;
void addToBatch(SeaTunnelRow record) throws SQLException;
void executeBatch() throws SQLException;
void closeStatements() throws SQLException;
@Override
default void close() throws SQLException {
closeStatements();
}
}
JdbcBatchStatementExecutor具有以下实现类:
SimpleBatchStatementExecutor 实现简单 SQL Batch 执行逻辑
InsertOrUpdateBatchStatementExecutor 实现 INSERT、UPDATE 更新,也支持 UPSERT 模式
ReduceBufferedBatchStatementExecutor 内存攒批,刷新到数据库时根据数据更改类型(INSERT、UPDATE、DELETE)分发到具体的执行 Executor
1
未指定主键的情况处理
SimpleBatchStatementExecutor2
实现 CDC 数据处理
CDC Event
\
\
\
\
DELETE Executor INSERT OR UPDATE Executor
\
\
\
\
INSERT Executor UPDATE Executor
3
写入保持 CDC 数据的顺序
InsertOrUpdateBatchStatementExecutorpublic class InsertOrUpdateBatchStatementExecutor implements JdbcBatchStatementExecutor {
@Override
public void addToBatch(SeaTunnelRow record) throws SQLException {
boolean currentChangeFlag = hasInsert(record);
if (currentChangeFlag) {
if (preChangeFlag != null && !preChangeFlag) {
updateStatement.executeBatch();
updateStatement.clearBatch();
}
valueRowConverter.toExternal(record, insertStatement);
insertStatement.addBatch();
} else {
if (preChangeFlag != null && preChangeFlag) {
insertStatement.executeBatch();
insertStatement.clearBatch();
}
valueRowConverter.toExternal(record, updateStatement);
updateStatement.addBatch();
}
preChangeFlag = currentChangeFlag;
submitted = false;
}
@Override
public void executeBatch() throws SQLException {
if (preChangeFlag != null) {
if (preChangeFlag) {
insertStatement.executeBatch();
insertStatement.clearBatch();
} else {
updateStatement.executeBatch();
updateStatement.clearBatch();
}
}
submitted = true;
}
}
ReduceBufferedBatchStatementExecutor增加一层内存缓冲,在执行批处理提交时再做分发提交到数据库的动作。
ReduceBufferedBatchStatementExecutorpublic class ReduceBufferedBatchStatementExecutor implements JdbcBatchStatementExecutor {
private final LinkedHashMap<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> buffer = new LinkedHashMap<>();
@Override
public void addToBatch(SeaTunnelRow record) throws SQLException {
buffer.put(record, ...);
}
@Override
public void executeBatch() throws SQLException {
Boolean preChangeFlag = null;
Set<Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>>> entrySet = buffer.entrySet();
for (Map.Entry<SeaTunnelRow, Pair<Boolean, SeaTunnelRow>> entry : entrySet) {
Boolean currentChangeFlag = entry.getValue().getKey();
if (currentChangeFlag) {
if (preChangeFlag != null && !preChangeFlag) {
deleteExecutor.executeBatch();
}
insertOrUpdateExecutor.addToBatch(entry.getValue().getValue());
} else {
if (preChangeFlag != null && preChangeFlag) {
insertOrUpdateExecutor.executeBatch();
}
deleteExecutor.addToBatch(entry.getKey());
}
preChangeFlag = currentChangeFlag;
}
if (preChangeFlag != null) {
if (preChangeFlag) {
insertOrUpdateExecutor.executeBatch();
} else {
deleteExecutor.executeBatch();
}
}
buffer.clear();
}
}
4
实现通用的 UPSERT 写入
InsertOrUpdateBatchStatementExecutor中可以配置开启 UPSERT,开启后处理 INSERT 或 UPDATE 类型数据时会先使用主键查询数据行是否已存在再决定使用 INSERT 或 UPDATE SQL 进行写入。
InsertOrUpdateBatchStatementExecutorpublic class InsertOrUpdateBatchStatementExecutor implements JdbcBatchStatementExecutor {
@Override
public void addToBatch(SeaTunnelRow record) throws SQLException {
boolean currentChangeFlag = hasInsert(record);
...
}
private boolean hasInsert(SeaTunnelRow record) throws SQLException {
if (upsertMode()) {
return !exist(keyExtractor.apply(record));
}
switch (record.getRowKind()) {
case INSERT:
return true;
case UPDATE_AFTER:
return false;
default:
throw new UnsupportedOperationException();
}
}
private boolean exist(SeaTunnelRow pk) throws SQLException {
keyRowConverter.toExternal(pk, existStatement);
try (ResultSet resultSet = existStatement.executeQuery()) {
return resultSet.next();
}
}
}
5
针对ReplacingMergeTree
表引擎优化UPSERT
ReplacingMergeTree表引擎可以配置
ORDER BY字段,并且在执行 INSERT INTO 语句时覆盖
ORDER BY字段相同的记录,我们也可以利用这个特性实现 UPSERT。
ReplacingMergeTree表引擎并且表的
ORDER BY字段与 Sink Connector 中配置的主键字段相同时,将 INSERT/UPDATE_AFTER 两种数据类型都处理为 INSERT 来实现 UPSERT。
6
针对MergeTree
表引擎的优化更新
MergeTree引擎有实验性的轻量级删除(https://clickhouse.com/docs/en/sql-reference/statements/delete),它比重量级的删除性能更好,我们可以让用户配置使用轻量级的删除。
MergeTree表引擎并打开轻量级删除之后,我们将 DELETE/UPDATE_BEFORE 两种数据类型都处理为轻量级删除,然后将 INSERT/UPDATE_AFTER 两种数据类型都处理为 INSERT,这样就避免了 UPDATE 操作并且使用到轻量级删除。
相关PR
https://github.com/apache/incubator-seatunnel/pull/3653
参考
https://clickhouse.com/docs/en/sql-reference/statements/delete https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree
Apache SeaTunnel

往期推荐
新年刷好运,点个赞吧!

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




