暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

canal源码剖析——parse模块

神机喵算 2020-06-29
354

https://github.com/alibaba/canal

首先,我们来看看该模块下面的类图,通过类图就可以清晰地掌握整个模块的骨架结构。

EventTransactionBuffer是事件事务缓存区。它主要是在内存中开辟一个缓冲区,避免过高的flush频率导致的IO次数过度而导致的性能问题。

CanalEventParser是数据复制的控制器。该接口是核心的数据复制接口。

CanalLogPositionManager是日志的位置管理器。提供了读取和存储当前日志位置的接口。

CanalHAController是高可用的复制控制器。

图中所有接口都实现了CanalLifeCycle(生命周期接口)。

AbstractEventParser是一个模板方法的抽闲实现类,它 最大化共用mysql/oracle版本的实现类,提供了一些抽象方法交给子类实现。

AbstractMysqlEventParser是抽象的MySQL日志复制控制器的模板类。共享了MySQL的日志复制控制实现。

LocalBinlogEventParser是基于本地MySQL的binlog文件的复制控制器实现类。

MysqlEventParser是基于向mysql server复制binlog实现类。该实现类是MySQL使用最多的一种实现方式。

GroupEventParser是合多个EventParser进行合并处理,group只是做为一个delegate处理。它是一个组合模式的实现。

从上图所示可以看出,canal项目并未实现oracle数据库的日志复制器的实现,也就是不支持oracle数据库。

MysqlEventParser时序图

从类图中的介绍可以看出MysqlEventParser 是我们最核心的一个实现类,本文将重点描述该类的一个时序。

AbstractEventParser类源码解析

该类似parse模块中最核心的一个类, 它是一个事件解析的一个模板方法类,定义了事件解析的一个公共流程,几乎所有的子类都是扩展自该类的,因此阅读该类能够掌握最核心的binlog事件解析流程。

解析器对象实例化

public AbstractEventParser(){
// 初始化一下
transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {

public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
if (!running) {
return;
}

if (!successed) {
throw new CanalParseException("consume failed!");
}

LogPosition position = buildLastTransactionPosition(transaction);
if (position != null) { // 可能position为空
logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
}
}
});
}


首先看上述代码,它是构造方法中的代码,实例化本对象的同时,也实例化了一个EventTransactionBuffer对象。传入了一个TransactionFlushCallback的回调匿名类对象。回调类中定义了一个flush方法,该方法实现的内容是先消费事件,如果消费成功了,则存储当前的position。如果消费失败则抛出异常信息。EventTransactionBuffer写缓冲区的使用,是一种应对高并发的手段,它相当于在内存中收集一个个的事件,然后再批量的调用flush方法。这个与日志中的实现是一样的。

启动解析器方法

public void start() {
super.start();
MDC.put("destination", destination);
// 配置transaction buffer
// 初始化缓冲队列
transactionBuffer.setBufferSize(transactionSize);// 设置buffer大小
transactionBuffer.start();
// 构造bin log parser
binlogParser = buildParser();// 初始化一下BinLogParser
binlogParser.start();
// 启动工作线程
parseThread = new Thread(new Runnable() {

public void run() {
MDC.put("destination", String.valueOf(destination));
ErosaConnection erosaConnection = null;
while (running) {
try {

// 开始执行replication
// 1. 构造Erosa连接
erosaConnection = buildErosaConnection();

// 2. 启动一个心跳线程
startHeartBeat(erosaConnection);

// 3. 执行dump前的准备工作
preDump(erosaConnection);

erosaConnection.connect();// 链接
// 4. 获取最后的位置信息
final EntryPosition startPosition = findStartPosition(erosaConnection);
if (startPosition == null) {
throw new CanalParseException("can't find start position for " + destination);
}
logger.info("find start position : {}", startPosition.toString());
// 重新链接,因为在找position过程中可能有状态,需要断开后重建
erosaConnection.reconnect();

final SinkFunction sinkHandler = new SinkFunction<EVENT>() {

private LogPosition lastPosition;

public boolean sink(EVENT event) {
try {
CanalEntry.Entry entry = parseAndProfilingIfNecessary(event);

if (!running) {
return false;
}

if (entry != null) {
exception = null; // 有正常数据流过,清空exception
transactionBuffer.add(entry);
// 记录一下对应的positions
this.lastPosition = buildLastPosition(entry);
// 记录一下最后一次有数据的时间
lastEntryTime = System.currentTimeMillis();
}
return running;
} catch (TableIdNotFoundException e) {
throw e;
} catch (Exception e) {
// 记录一下,出错的位点信息
processError(e,
this.lastPosition,
startPosition.getJournalName(),
startPosition.getPosition());
throw new CanalParseException(e); // 继续抛出异常,让上层统一感知
}
}

};

// 4. 开始dump数据
if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
} else {
erosaConnection.dump(startPosition.getJournalName(),
startPosition.getPosition(),
sinkHandler);
}

} catch (TableIdNotFoundException e) {
exception = e;
// 特殊处理TableIdNotFound异常,出现这样的异常,一种可能就是起始的position是一个事务当中,导致tablemap
// Event时间没解析过
needTransactionPosition.compareAndSet(false, true);
logger.error(String.format("dump address %s has an error, retrying. caused by ",
runningInfo.getAddress().toString()), e);
} catch (Throwable e) {
exception = e;
if (!running) {
if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {
throw new CanalParseException(String.format("dump address %s has an error, retrying. ",
runningInfo.getAddress().toString()), e);
}
} else {
logger.error(String.format("dump address %s has an error, retrying. caused by ",
runningInfo.getAddress().toString()), e);
sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
}
} finally {
// 重新置为中断状态
Thread.interrupted();
// 关闭一下链接
afterDump(erosaConnection);
try {
if (erosaConnection != null) {
erosaConnection.disconnect();
}
} catch (IOException e1) {
if (!running) {
throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ",
runningInfo.getAddress().toString()),
e1);
} else {
logger.error("disconnect address {} has an error, retrying., caused by ",
runningInfo.getAddress().toString(),
e1);
}
}
}
// 出异常了,退出sink消费,释放一下状态
eventSink.interrupt();
transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
binlogParser.reset();// 重新置位

if (running) {
// sleep一段时间再进行重试
try {
Thread.sleep(10000 + RandomUtils.nextInt(10000));
} catch (InterruptedException e) {
}
}
}
MDC.remove("destination");
}
});

parseThread.setUncaughtExceptionHandler(handler);
parseThread.setName(String.format("destination = %s , address = %s , EventParser",
destination,
runningInfo == null ? null : runningInfo.getAddress().toString()));
parseThread.start();
}

start()方法是实现了生命周期的启动方法,是被上层的组件调用的,parser组件的start方法应该是被instance组件调用的。该方法开始启动组件,接收binlog,并且解析处理它。该方法的流程是这样的。

  1. 初始化并启动transactionBuffer组件。

  2. 构造binlogParser组件,并启动它。

  3. 开启新的线程并启动它。避免阻塞上级组件的启动。

  • 开启循环,直到终止组件运行。判断标志是protected volatile boolean running = false。定义为volatile修饰的成员变量,让多线程可见。

  • 构造erosa连接。

  • 启动一个心跳线程。用Timer实现。会定期消费一个事件类型为EntryType.HEARTBEAT的事件。应该是告知下游组件,上有组件还活着。

  • dump数据库复制日志前的准备处理。

  • erosa连接创建连接。

  • 查找日志起始位置。

  • erosa连接重建连接。因为在找position过程中可能有状态,需要断开后重建

  • 开始dump数据库复制日志。传入一个回调的SinkFunction匿名类对象。回调方法sink的实现就是解析dump到的日志事件,将其转化为Entry对象。并强Entry对象加入到缓冲区transactionBuffer中,并且记录当前日志位置和时间。

  • 最后dump后的处理。关闭连接等事后处理。

  • 若未停止运行,则再次进入第一步。


bigdata_ny


文章转载自神机喵算,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论