最近我对于Flowable的数据库访问过程很感兴趣,所以去阅读了一下这块的源码,大概梳理了一下。
直接从ProcessEngineConfigurationImpl内的init()方法作为切入点,跟踪其内部调用的initCommandExecutors()方法。
如果阅读过之前我写的源码分析文章,就可以知道Flowable是基于命令链来实现各种业务逻辑的,也可以知道initCommandExecutors()这个方法其实就是命令链的初始化方法。
着重关注一下getDefaultCommandInterceptors()这个方法,并且需要关注一下这个方法内部,默认的CommandInterceptor的初始化顺序,代码片段如下:
CommandInterceptor transactionInterceptor = createTransactionInterceptor();
if (transactionInterceptor != null) {
interceptors.add(transactionInterceptor);
}
if (commandContextFactory != null) {
String engineCfgKey = getEngineCfgKey();
CommandContextInterceptor commandContextInterceptor = new CommandContextInterceptor(commandContextFactory);
engineConfigurations.put(engineCfgKey, this);
commandContextInterceptor.setEngineConfigurations(engineConfigurations);
commandContextInterceptor.setServiceConfigurations(serviceConfigurations);
commandContextInterceptor.setCurrentEngineConfigurationKey(engineCfgKey);
interceptors.add(commandContextInterceptor);
}
if (transactionContextFactory != null) {
interceptors.add(new TransactionContextInterceptor(transactionContextFactory));
}复制
首先,看看transactionInterceptor,以SpringTransactionInterceptor为例,可以看到其内部的关键实现,如下:
TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
transactionTemplate.setPropagationBehavior(transactionPropagation);
return transactionTemplate.execute(status -> next.execute(config, command));复制
这段代码在上一篇有所介绍,这块其实就是对整个命令链的事务包裹。
接下来,看看commandContextInterceptor,关键代码如下:
try {
commandContext.setCurrentEngineConfiguration(engineConfigurations.get(currentEngineConfigurationKey));
// Push on stack
Context.setCommandContext(commandContext);
return next.execute(config, command);
} catch (Exception e) {
commandContext.exception(e);
} finally {
try {
if (!contextReused) {
commandContext.close();
}
} finally {
// Pop from stack
Context.removeCommandContext();
commandContext.setCurrentEngineConfiguration(previousEngineConfiguration);
}
}复制
这段代码其实是在执行整个命令链,执行完毕后推送在这个过程中操作的所有对象,以及在写入数据库发生错误时,进行的回滚处理。
如果直接去看Behavior等的最终代码,会发现其实一直都是在操作一段缓存,真正将这些缓存写入到数据库时,其实是在上面代码中的commandContext.close()方法。
commandContext.close()在整个命令链未发生任何异常时,会调用flushSessions(),将数据写入到数据库中。如果在写入数据库时发生异常,就会调用executeCloseListenersCloseFailure()回滚事务。
所以,整体的结构其实是这样的:
transactionInterceptor{
//开启事务
commandContextInterceptor{
try {
//提交缓存中的数据至数据库
} catch {
//回滚事务
}
}
}复制
至于TransactionContextInterceptor,它其实主要加载了一个监听器,这个监听器会按照不同的EngineConfiguration完成对应的commit和rollback。
至此,整个Flowable对于数据库的操作和事务的管理过程,也应该比较清晰了。
同样,以上,如果有问题欢迎指正。
觉的不错?可以关注我的公众号↑↑↑