KFS数据同步系统中Applier模块冲突处理详解
关键字:
Kingbase FlySync、applier、conflictResolve
关于冲突处理方法
Applier模块的作用之一就是解析kufl中的内容,生成对应数据库可识别的Sql语句并执行,保证源端数据库的同步数据可以同步到目标端数据库。但是在同步程序应用过程中,经常会遇到这样的应用场景,即在源端数据库向目标端同步增量数据的同时,目标端对应的数据库的表也存在业务,对表进行写入操作,那个当源端解析出来的数据与目标端写入数据存在冲突时,就需要进行冲突处理。典型的例子是,在将源端数据库主键为id的A表的数据同步至目标端数据库A表的应用场景中,如果源端解A表解析出来一条id=100的数据,在数据未同步至目标端间隙,目标端对表A进行业务写入,写入一条id为100的数据,此时当Applier解析出来的数据进行入库时,源端就会因为违反主键唯一约束而报错,此时Applier提供的conflictResolve能力可以根据预先配置的冲突处理策略对冲突数据进行处理,策略包括忽略Applier解析出来的id冲突的一条数据或者用解析出来的数据覆盖目标自行写入的数据。在实际的情况中,经常会遇到源端和目标端同步时数据的时候同时存在业务写入操作,因为冲突处理是非常关键的逻辑,但是冲突处理过程中存在数据的舍弃和覆盖,因此有数据丢失的风险,因此在使用过程中需要根据实际应用场景进行选择性设置。
ConflictResolve在Applier中的位置
在Applier模块中,JdbcApplier类中定义了数据Apply到数据库中的关键逻辑和方法,其中一个重要的方法就是applyOneRowChangePrepared(),该方法用来解析一个准备好的OneRowChange中的数据,并生成数据库对应的sql语句并执行,冲突处理逻辑就定义在JdbcApplier的OneRowChangePrepared()方法中,由该方法调用conflictResolve()方法执行。此外,MysqlApplier、OracleApplier和KingbaseApplier类中都重写了OneRowChangePrepared()方法,子类重写的方法中没有关系冲突处理的逻辑,是否执行冲突处理由参数optimizeRowEvents来控制,参数optimizeRowEvents是一个boolean值,当optimizeRowEvents为true时,表示数据入库时需要进行性能优化,即将多条sql合并执行,多条语句的合并执行就意味着无法对单条语句进行冲突处理,因此无法走冲突处理逻辑,与之相对的是,当optimizeRowEvents为false时,表示不需要对数据入库进行性能优化,代码执行父类中的OneRowChangePrepared()方法,可以进行冲突处理。
ConflictResolve()方法执行
在JdbcApplier的OneRowChangePrepared()方法中会在bindColumnValues()方法中对数据进行绑定并执行,当语句执行失败报错后,会走冲突处理流程,在ConflictResolve()方法中,首先判断语句类型,当语句存在insert时,会对为事务中未提交的语句进行回滚。随后进行冲突处理,首先获取冲突解决规则,需要进行如下配置:
源端配置过滤器:
property = replicator.extractor.dbms.beforeColsFile=…/beforeColsRule.json
property = replicator.extractor.dbms.getBeforeCols=true
json配置:
{
"mytest.table_test1":"id",
"mytest.table_test2":"id"
}#用来进行比对的列,可以写多个,但是目标端只会用指定的哪一个,不是一定的是主键列
property = replicator.filter.bic.includeAllCols=true #配置此参数,json中不用写列名,默认全部列名(V2R2C3及之后)
目标端配置过滤器:
property = replicator.applier.dbms.conflictResolve=true
property = replicator.applier.dbms.beforeColsFile=…/conflictResolveRule.json
property = replicator.applier.dbms.optimizeRowEvents=false(v1r6C4之前必须配置,之后的版本可以不管)
property =replicator.applier.dbms.enable_handle_error=true(v1r6C4之前配置为false,之后的版本必须为true,否则事务冲突不生效)
conflictResolveRule.json配置:
{
"DEFAULT": "DISCARD", #此处支持DISCARD | OVERWRITE | IGNORE
#DISCARD 忽略并记录冲突sql,目标端不入库
# OVERWRITE 重写,将insert语句改为update然后入库
# IGNORE 忽略,就当做这个功能不存在,按照以前老逻辑处理,insert该报错就报错
"*test.tabl*": #模式名.表名 可以有通配符
{
"DETECTACTION":"UPDATE,DELETE", #insert冲突也必须写这两个
"COMPARECOLS":"id", #用来做比对的列,需要源端存并且是可是进行大小比对的列,可以是非主键列
"RESOLVERULE":
{
"INSERTROWEXISTS":"USEMINEQ:id", #insert冲突处理,可以选择这些规则USEMAX(col)|USEMAXEQ(col)|USEMIN(col)|USEMINEQ (col) | DISCARD | OVERWRITE | IGNORE
"UPDATEROWEXISTS":"USEMINEQ: id",
"UPDATEROWMISSING":"DISCARD",
"DELETEROWEXISTS":"DISCARD",
"DELETEROWMISSING":"DISCARD"
}
}
}
在apply()操作执行过程中,封装好的sql进行提交时,如果报错,系统会捕获异常,此时如果配置optimizeRowEvents=false,则系统会执行jdbcApplier类中applyOneRowChangePrepared()方法,该方法在捕获异常会根据异常类型执行conflictResolve()方法,该方法会读取配置文件中的异常处理规则,并生成异常处理规则实例conflictResolveRule,resolve()方法会通过getConflictType()方法获取冲突类型,以主键冲突INSERTROWEXISTS为例,系统会根据配置规则重构OneRowChange,匹配keySep和keyCloumn列,并将action由insert改为update,使源端冲突数据以更新的形式入库到目标端。
参考资料
KFS项目源代码




