暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

7 - 教你如何读取MySQL binlog

MLSQL之道 2021-09-23
4516

解析binlog可以做很多的事情,比如在ES做增量索引,可靠的同步消息(本地事物提交,也要成功发送到MQ),缓存一致性(数据库中数据与缓存一致),Delta实时数仓(Delta中数据与数据库在某个时间点之前一致),Hive记录数据变更等等。

Canal是一个很好的解析MySQL数据库增量日志的工具。下面简单介绍下原理:

MySQL主备复制原理:

  1. MySQL master将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过show binlog events进行查看);

  2. MySQL slave将master的binary log events拷贝到它的中继日志(relay log);

  3. MySQL slave 重放relay log中事件,将数据变更反映它自己的数据。

canal 工作原理:

  1. canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议

  2. MySQL master收到dump请求,开始推送binary log给slave(即 canal )

  3. canal解析binary log对象(原始为byte流)

    而且在canal 1.1.1版本之后, 支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的Kafka和RocketMQ。


今天笔者讲一个与canal原理相似的库,也是通过BINLOG_DUMP(它还支持BINLOG_DUMP_GTID协议)协议网络读取binlog并提供解析MySQL binlog的方法:https://github.com/shyiko/mysql-binlog-connector-java。它有两种模式:BinaryLogFileReader日志读取模式(要求部署到MySQL机器,直接读取binlog文件)和BinaryLogClient客户端访问模式(需要授权,支持读取从库binlog)。
它可以根据binlog文件的position(起始位置)解析binlog event,下面通过一个示例了解一下BinaryLogClient,示例中只处理insert、delete和update事件:
准备条件,授权test连接MySQL账号如下权限
    CREATE USER test IDENTIFIED BY 'test';  
    GRANT SELECTREPLICATION SLAVEREPLICATION CLIENT ON *.* TO 'test'@'%' IDENTIFIED BY 'test';  
    FLUSH PRIVILEGES;
    Scala实现代码:
      //MySQL连接信息与binlog信息
      case class MySQLConnectionInfo(
      host: String, port: Int, userName: String, password: String
       , binlogFileName: Option[String], recordPos: Option[Long])


      //处理时间类型,这里只处理增删改三种
      object EventInfo {
      val INSERT_EVENT = "insert"
      val DELETE_EVENT = "delete"
      val UPDATE_EVENT = "update"
      }


      //实现Binlog Server
      class MyBinlogServer {
      def connectMySQL(connect: MySQLConnectionInfo) = {
      val client = new BinaryLogClient(connect.host, connect.port, connect.userName, connect.password)
          //设置binlog文件与偏移量
      client.setBinlogFilename(connect.binlogFileName.get)
      client.setBinlogPosition(connect.recordPos.get)
          //注册事件监听器 ,并对相应事件进行处理
      client.registerEventListener(new BinaryLogClient.EventListener() {
      def onEvent(event: Event): Unit = {
      val header = event.getHeader[EventHeaderV4]()
      val eventType = header.getEventType


      eventType match {
               case TABLE_MAP => println(event.getData[TableMapEventData]())
                case EXT_WRITE_ROWS | PRE_GA_WRITE_ROWS | WRITE_ROWS =>
      printRecord(event, client.getBinlogFilename, EventInfo.INSERT_EVENT ,header)
      case EXT_UPDATE_ROWS | PRE_GA_UPDATE_ROWS | UPDATE_ROWS=>
      printRecord(event, client.getBinlogFilename, EventInfo.UPDATE_EVENT ,header)
      case EXT_DELETE_ROWS | PRE_GA_DELETE_ROWS | DELETE_ROWS =>
                  printRecord(event, client.getBinlogFilename, EventInfo.DELETE_EVENT ,header)
      // case ROTATE =>
      // val rotateEventData = event.getData[RotateEventData]()
      // val currentBinlogFile = rotateEventData.getBinlogFilename
      // val currentBinlogPosition = rotateEventData.getBinlogPosition
      // case QUERY =>
      // println("qe: " + event.getData())
                case _ =>
      }
      }
      })


      client.connect()
      }
      }


      //printRecord为打印信息,具体实现见源码
      //启动Server
      object MyBinlogServer{
      def main(args: Array[String]) {
      val bls = new MyBinlogServer
      bls.connectMySQL(MySQLConnectionInfo("127.0.0.1" ,3306 ,"root" ,"mlsql" ,Option("master-bin.000069") ,Option(4)))
      }
      }
      下面来建一张wow.test表,有id和name两个字段,来看一下输出情况:
      1. 执行insert wow.test values(1,1)后,输出:
        TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}
        EXT_WRITE_ROWS ,newValue:1,1, pos:7831, next pos:7877
        表示在wow.test中插入id=1 name=1,TableMapEventData可以理解为当前操作的表;
        2. 执行insert wow.test values(2,2)后,输出:
          TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}
          EXT_WRITE_ROWS ,newValue:2,2, pos:8093, next pos:8139
          表示在wow.test中插入id=2 name=2;
          3. 执行update wow.test set name=3后,输出:
            TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}
            EXT_UPDATE_ROWS ,oldValue: 1,1 ,newValue: 1,3 ,pos: 8355 ,next pos: 8435
            EXT_UPDATE_ROWS ,oldValue: 2,2 ,newValue: 2,3 ,pos: 8355 ,next pos: 8435
            可以看到有两条记录被更新,而且可以看到更新前的数据;
            4. 执行delete from wow.test where id=2后,输出:
              TableMapEventData{tableId=122, database='wow', table='test', columnTypes=8, 15, columnMetadata=0, 255, columnNullability={}}
              EXT_DELETE_ROWS ,newValue:2,3, pos:8651, next pos:8697
              表示在wow.test中删除id=2的数据,而且可以看到原始的整条数据。

              到这里可以看到,增删改的所有变更都可以捕获到,而且可以解析出具体变化的情况。

              细心的读者可以看到,ROTATE和QUERY事件被注释掉了,ROTATE是记录binlog文件的变更的,比如在做检查点的时候要记录当前读的文件和它的偏移量。QUERY可以解析出执行的sql,比如执行修改表结构的ddl。在实际使用中,可能还需要去解析一些事件。

              下面介绍几个关于MySQL binlog的命令:

              1. 找到binlog文件的路径:
                show VARIABLES like 'log_%';
                2. 查询binlog文件:
                  show master logs;
                  3. 查询最新binlog的position:
                    show master status;
                    4. 根据开始结束时间查binlog变更,用于寻找binlog在指定时间范围内的position:
                      sh mysqlbinlog --start-datetime="2019-06-19 01:00:00" --stop-datetime="2019-06-20 23:00:00" --base64-output=decode-rows -vv  master-bin.000050
                      在MLSQL中,引用的spark-binlog包就使用了该包,实现了解析binlog的Spark Source,在Source端启动BinlogServer,并实现getOffset、getBatch等方法。详情请参照:https://github.com/allwefantasy/spark-binlog MLSQLBinLogDataSource
                      文章最后,通过一个例子演示MLSQL是如何优雅的同步Binlog到Delta:
                      1. 初始化Delta
                        connect jdbc where
                        url="jdbc:mysql://172.16.2.120:3308/wow"
                        and driver="com.mysql.jdbc.Driver"
                        and user="root"
                        and password="mlsql"
                        as wow;
                        load jdbc.`wow.test` as wow_test;
                        save overwrite wow_test as delta.`/dw/delta/wow/test`;
                        2. 启动流
                          --流的mlsql脚本必须要设置流的名字
                          set streamName="binlog-delta";
                          --加载binlog
                          load binlog.`` where
                          host="172.16.2.120"
                          and port="3308"
                          and userName="root"
                          and password="mlsql"
                          and bingLogNamePrefix="mysql-bin"
                          and binlogIndex="240"
                          and binlogFileOffset="1613280"
                          and databaseNamePattern="wow"
                          and tableNamePattern="test"
                          as wow_test;


                          --bingLogNamePrefix:mysql binlog文件前缀
                          --binlogIndex:binlog文件的索引
                          --binlogFileOffset:开始同步binlog的偏移量
                          --databaseNamePattern:要同步的库
                          --tableNamePattern:要同步的表(正则表达式可以匹配多个表,进行同步)


                          --!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
                          --流式程序可以配置回调函数,用于监控流的状态


                          save append wow_test
                            as binlogRate.`/dw/delta/{db}/{table}` 
                          options mode="Append"
                          and idCols="id"
                          and duration="10"
                            and checkpointLocation="/chk/delta/binlog";


                          --mode:append 追加模式
                          --idCols:数据的唯一键,用于更新和删除数据
                          --duration:同步时间间隔,单位秒
                            --checkpointLocation:存储binlog offset等状态信息,流状态可以从这个检查点恢复
                          可以通过如下脚本查询数据,当然也可以增加时间和版本等条件查询:
                            load delta.`/dw/delta/wow/test` as wow_test;
                            3. 如何检查流延迟:
                            查询MySQL最新binlog的Position:
                              mysql show master status;
                              查看流的CheckPoint Offsets:
                                !kafkaTool streamOffset chk/delta/binlog;
                                根据File和Position计算Offsets与CheckPoint的Offsets比较看消息延时状况。

                                当然,还可以先读取binlog的数据写入到Kafka,然后读取Kafka在写入到Delta。如果需要保存binlog,这种方式比较好。但是要增加一个Kafka到Delta的延时检查。

                                spark-binlog还有需要完善的地方,比如如何提高写Kafka的并发,目前支持Topic只有一个Partition,如果连接的库挂掉该如何处理如何在原有的Source中动态增加或删除表,对DDL的支持等。优点是使用简单,而且可以读取从库。Canal在并发上做的比较好,可以根据表的名字和主键设置并发,并且支持HA。canal版本>=1.1.4,还有canal-admin提供web管理集群、Server和Instance,使用很方便。





                                喜欢就点击最上方的[ MLSQL之道 ]关注下吧,后面精彩不断!

                                源码地址:https://github.com/latincross/mlsqlwechat(c7-binlog)

                                更多介绍请访问:http://docs.mlsql.tech/zh/
                                文章转载自MLSQL之道,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                评论