解析binlog可以做很多的事情,比如在ES做增量索引,可靠的同步消息(本地事物提交,也要成功发送到MQ),缓存一致性(数据库中数据与缓存一致),Delta实时数仓(Delta中数据与数据库在某个时间点之前一致),Hive记录数据变更等等。
MySQL主备复制原理:
MySQL master将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过show binlog events进行查看);
MySQL slave将master的binary log events拷贝到它的中继日志(relay log);
MySQL slave 重放relay log中事件,将数据变更反映它自己的数据。
canal 工作原理:
canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
MySQL master收到dump请求,开始推送binary log给slave(即 canal )
canal解析binary log对象(原始为byte流)
而且在canal 1.1.1版本之后, 支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的Kafka和RocketMQ。
CREATE USER test IDENTIFIED BY 'test';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'test'@'%' IDENTIFIED BY 'test';
FLUSH PRIVILEGES;
//MySQL连接信息与binlog信息
case class MySQLConnectionInfo(
host: String, port: Int, userName: String, password: String
, binlogFileName: Option[String], recordPos: Option[Long])
//处理时间类型,这里只处理增删改三种
object EventInfo {
val INSERT_EVENT = "insert"
val DELETE_EVENT = "delete"
val UPDATE_EVENT = "update"
}
//实现Binlog Server
class MyBinlogServer {
def connectMySQL(connect: MySQLConnectionInfo) = {
val client = new BinaryLogClient(connect.host, connect.port, connect.userName, connect.password)
//设置binlog文件与偏移量
client.setBinlogFilename(connect.binlogFileName.get)
client.setBinlogPosition(connect.recordPos.get)
//注册事件监听器 ,并对相应事件进行处理
client.registerEventListener(new BinaryLogClient.EventListener() {
def onEvent(event: Event): Unit = {
val header = event.getHeader[EventHeaderV4]()
val eventType = header.getEventType
eventType match {
case TABLE_MAP => println(event.getData[TableMapEventData]())
case EXT_WRITE_ROWS | PRE_GA_WRITE_ROWS | WRITE_ROWS =>
printRecord(event, client.getBinlogFilename, EventInfo.INSERT_EVENT ,header)
case EXT_UPDATE_ROWS | PRE_GA_UPDATE_ROWS | UPDATE_ROWS=>
printRecord(event, client.getBinlogFilename, EventInfo.UPDATE_EVENT ,header)
case EXT_DELETE_ROWS | PRE_GA_DELETE_ROWS | DELETE_ROWS =>
printRecord(event, client.getBinlogFilename, EventInfo.DELETE_EVENT ,header)
// case ROTATE =>
// val rotateEventData = event.getData[RotateEventData]()
// val currentBinlogFile = rotateEventData.getBinlogFilename
// val currentBinlogPosition = rotateEventData.getBinlogPosition
// case QUERY =>
// println("qe: " + event.getData())
case _ =>
}
}
})
client.connect()
}
}
//printRecord为打印信息,具体实现见源码
//启动Server
object MyBinlogServer{
def main(args: Array[String]) {
val bls = new MyBinlogServer
bls.connectMySQL(MySQLConnectionInfo("127.0.0.1" ,3306 ,"root" ,"mlsql" ,Option("master-bin.000069") ,Option(4)))
}
}
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}
EXT_WRITE_ROWS ,newValue:1,1, pos:7831, next pos:7877
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}
EXT_WRITE_ROWS ,newValue:2,2, pos:8093, next pos:8139
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}
EXT_UPDATE_ROWS ,oldValue: 1,1 ,newValue: 1,3 ,pos: 8355 ,next pos: 8435
EXT_UPDATE_ROWS ,oldValue: 2,2 ,newValue: 2,3 ,pos: 8355 ,next pos: 8435
TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}
EXT_DELETE_ROWS ,newValue:2,3, pos:8651, next pos:8697
到这里可以看到,增删改的所有变更都可以捕获到,而且可以解析出具体变化的情况。
下面介绍几个关于MySQL binlog的命令:
show VARIABLES like 'log_%';
show master logs;
show master status;
sh mysqlbinlog --start-datetime="2019-06-19 01:00:00" --stop-datetime="2019-06-20 23:00:00" --base64-output=decode-rows -vv master-bin.000050
connect jdbc where
url="jdbc:mysql://172.16.2.120:3308/wow"
and driver="com.mysql.jdbc.Driver"
and user="root"
and password="mlsql"
as wow;
load jdbc.`wow.test` as wow_test;
save overwrite wow_test as delta.`/dw/delta/wow/test`;
--流的mlsql脚本必须要设置流的名字
set streamName="binlog-delta";
--加载binlog
load binlog.`` where
host="172.16.2.120"
and port="3308"
and userName="root"
and password="mlsql"
and bingLogNamePrefix="mysql-bin"
and binlogIndex="240"
and binlogFileOffset="1613280"
and databaseNamePattern="wow"
and tableNamePattern="test"
as wow_test;
--bingLogNamePrefix:mysql binlog文件前缀
--binlogIndex:binlog文件的索引
--binlogFileOffset:开始同步binlog的偏移量
--databaseNamePattern:要同步的库
--tableNamePattern:要同步的表(正则表达式可以匹配多个表,进行同步)
--!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
--流式程序可以配置回调函数,用于监控流的状态
save append wow_test
as binlogRate.`/dw/delta/{db}/{table}`
options mode="Append"
and idCols="id"
and duration="10"
and checkpointLocation="/chk/delta/binlog";
--mode:append 追加模式
--idCols:数据的唯一键,用于更新和删除数据
--duration:同步时间间隔,单位秒
--checkpointLocation:存储binlog offset等状态信息,流状态可以从这个检查点恢复
load delta.`/dw/delta/wow/test` as wow_test;
mysql show master status;
!kafkaTool streamOffset chk/delta/binlog;
当然,还可以先读取binlog的数据写入到Kafka,然后读取Kafka在写入到Delta。如果需要保存binlog,这种方式比较好。但是要增加一个Kafka到Delta的延时检查。
spark-binlog还有需要完善的地方,比如如何提高写Kafka的并发,目前支持Topic只有一个Partition,如果连接的库挂掉该如何处理,如何在原有的Source中动态增加或删除表,对DDL的支持等。优点是使用简单,而且可以读取从库。Canal在并发上做的比较好,可以根据表的名字和主键设置并发,并且支持HA。在canal版本>=1.1.4,还有canal-admin提供web管理集群、Server和Instance,使用很方便。
源码地址:https://github.com/latincross/mlsqlwechat(c7-binlog)