暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

人大金仓KFS数据同步系统中Applier模块冲突处理详解

KFS补给站 2023-10-27
355


KFS数据同步系统中Applier模块冲突处理详解

关键字:

Kingbase FlySync、applier、conflictResolve

关于冲突处理方法

Applier模块的作用之一就是解析kufl中的内容,生成对应数据库可识别的Sql语句并执行,保证源端数据库的同步数据可以同步到目标端数据库。但是在同步程序应用过程中,经常会遇到这样的应用场景,即在源端数据库向目标端同步增量数据的同时,目标端对应的数据库的表也存在业务,对表进行写入操作,那个当源端解析出来的数据与目标端写入数据存在冲突时,就需要进行冲突处理。典型的例子是,在将源端数据库主键为id的A表的数据同步至目标端数据库A表的应用场景中,如果源端解A表解析出来一条id=100的数据,在数据未同步至目标端间隙,目标端对表A进行业务写入,写入一条id为100的数据,此时当Applier解析出来的数据进行入库时,源端就会因为违反主键唯一约束而报错,此时Applier提供的conflictResolve能力可以根据预先配置的冲突处理策略对冲突数据进行处理,策略包括忽略Applier解析出来的id冲突的一条数据或者用解析出来的数据覆盖目标自行写入的数据。在实际的情况中,经常会遇到源端和目标端同步时数据的时候同时存在业务写入操作,因为冲突处理是非常关键的逻辑,但是冲突处理过程中存在数据的舍弃和覆盖,因此有数据丢失的风险,因此在使用过程中需要根据实际应用场景进行选择性设置。

ConflictResolve在Applier中的位置

在Applier模块中,JdbcApplier类中定义了数据Apply到数据库中的关键逻辑和方法,其中一个重要的方法就是applyOneRowChangePrepared(),该方法用来解析一个准备好的OneRowChange中的数据,并生成数据库对应的sql语句并执行,冲突处理逻辑就定义在JdbcApplier的OneRowChangePrepared()方法中,由该方法调用conflictResolve()方法执行。此外,MysqlApplier、OracleApplier和KingbaseApplier类中都重写了OneRowChangePrepared()方法,子类重写的方法中没有关系冲突处理的逻辑,是否执行冲突处理由参数optimizeRowEvents来控制,参数optimizeRowEvents是一个boolean值,当optimizeRowEvents为true时,表示数据入库时需要进行性能优化,即将多条sql合并执行,多条语句的合并执行就意味着无法对单条语句进行冲突处理,因此无法走冲突处理逻辑,与之相对的是,当optimizeRowEvents为false时,表示不需要对数据入库进行性能优化,代码执行父类中的OneRowChangePrepared()方法,可以进行冲突处理。

ConflictResolve()方法执行

在JdbcApplier的OneRowChangePrepared()方法中会在bindColumnValues()方法中对数据进行绑定并执行,当语句执行失败报错后,会走冲突处理流程,在ConflictResolve()方法中,首先判断语句类型,当语句存在insert时,会对为事务中未提交的语句进行回滚。随后进行冲突处理,首先获取冲突解决规则,需要进行如下配置:

源端配置过滤器:

property = replicator.extractor.dbms.beforeColsFile=…/beforeColsRule.json

property = replicator.extractor.dbms.getBeforeCols=true

json配置:

{

"mytest.table_test1":"id",

"mytest.table_test2":"id"

}#用来进行比对的列,可以写多个,但是目标端只会用指定的哪一个,不是一定的是主键列

property = replicator.filter.bic.includeAllCols=true #配置此参数,json中不用写列名,默认全部列名(V2R2C3及之后)

目标端配置过滤器:

property = replicator.applier.dbms.conflictResolve=true

property = replicator.applier.dbms.beforeColsFile=…/conflictResolveRule.json

property = replicator.applier.dbms.optimizeRowEvents=false(v1r6C4之前必须配置,之后的版本可以不管)

property =replicator.applier.dbms.enable_handle_error=true(v1r6C4之前配置为false,之后的版本必须为true,否则事务冲突不生效)

conflictResolveRule.json配置:

{

"DEFAULT": "DISCARD", #此处支持DISCARD | OVERWRITE | IGNORE

#DISCARD 忽略并记录冲突sql,目标端不入库

# OVERWRITE 重写,将insert语句改为update然后入库

# IGNORE 忽略,就当做这个功能不存在,按照以前老逻辑处理,insert该报错就报错

"*test.tabl*": #模式名.表名 可以有通配符

{

"DETECTACTION":"UPDATE,DELETE", #insert冲突也必须写这两个

"COMPARECOLS":"id", #用来做比对的列,需要源端存并且是可是进行大小比对的列,可以是非主键列

"RESOLVERULE":

{

"INSERTROWEXISTS":"USEMINEQ:id", #insert冲突处理,可以选择这些规则USEMAX(col)|USEMAXEQ(col)|USEMIN(col)|USEMINEQ (col) | DISCARD | OVERWRITE | IGNORE

"UPDATEROWEXISTS":"USEMINEQ: id",

"UPDATEROWMISSING":"DISCARD",

"DELETEROWEXISTS":"DISCARD",

"DELETEROWMISSING":"DISCARD"

}

}

}

在apply()操作执行过程中,封装好的sql进行提交时,如果报错,系统会捕获异常,此时如果配置optimizeRowEvents=false,则系统会执行jdbcApplier类中applyOneRowChangePrepared()方法,该方法在捕获异常会根据异常类型执行conflictResolve()方法,该方法会读取配置文件中的异常处理规则,并生成异常处理规则实例conflictResolveRule,resolve()方法会通过getConflictType()方法获取冲突类型,以主键冲突INSERTROWEXISTS为例,系统会根据配置规则重构OneRowChange,匹配keySep和keyCloumn列,并将action由insert改为update,使源端冲突数据以更新的形式入库到目标端。

参考资料

KFS项目源代码

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论