暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【BUG】Flink CDC 2.0.0迷之异常!!!

数据基石 2021-12-14
5677

点个关注你最好看

一、场景还原

基于 Flink CDC 的 SQL Api 实现实时监听 MySQL 的 binlog 数据发送到 Kafka

二、框架版本

框架版本
Flink1.13.2
MySQL5.7.25
connector-mysql-cdc2.0.0

三、测试代码

public class CDCWithSqlTest {
   public static void main(String[] args) {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       env.enableCheckpointing(120000, CheckpointingMode.EXACTLY_ONCE);
       env.getCheckpointConfig().setCheckpointTimeout(60000);
       env.getCheckpointConfig()
          .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
       env.setStateBackend(new FsStateBackend("hdfs://namenode_ip:8020/data/checkpoint/flink_cdc/"));
       System.setProperty("HADOOP_USER_NAME", "hdfs");
       StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

       String sourceDDL = "create table cdc_from_mysql(" +
               " id     INT                     " +
               " ,name   STRING                   " +
               " ,PRIMARY KEY(id)     NOT enforced " +
               " ) WITH ( " +
               " 'connector'                         = 'mysql-cdc'     ," +
               " 'scan.startup.mode'                 = 'latest-offset' ," +
               " 'server-time-zone'                 = 'Asia/Shanghai' ," +
               " 'scan.incremental.snapshot.enabled' = 'true'           ," +
               " 'hostname'                         = 'mysql_ip'       ," +
               " 'port'                             = 'mysql_port'     ," +
               " 'username'                         = 'mysql_username' ," +
               " 'password'                         = 'mysql_password' ," +
               " 'database-name'                     = 'mysql_databse' ," +
               " 'table-name'                       = 'mysql_table'   ," +
               " 'server-id'                         = '5400'       " +
               " ) ";
       tableEnv.executeSql(sourceDDL);

       String sinkDDL = "create table cdc_to_kafka(" +
               " id     INT                     " +
               " ,name   STRING                   " +
               " ,PRIMARY KEY(id)     NOT enforced " +
               " ) WITH ( " +
               " 'connector'                   = 'upsert-kafka'       ," +
               " 'topic'                       = 'ZGN_CDC_TEST'       ," +
               " 'properties.bootstrap.servers' = 'kafka_ip:9092' ," +
               " 'key.json.ignore-parse-errors' = 'true'               ," +
               " 'key.format'                   = 'json'               ," +
               " 'value.format'                 = 'json'               ," +
               " 'value.fields-include'         = 'ALL'                 " +
               " ) ";
       tableEnv.executeSql(sinkDDL);

       tableEnv.executeSql("INSERT INTO cdc_to_kafka SELECT * FROM cdc_from_mysql");
  }
}

四、BUG 重现

1.先向 MySQL 插入几条数据
1.1 MySQL 端
idname
11
22
33
1.2.控制台消费 Kafka 数据
kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic ZGN_CDC_TEST

{"id":1,"name":"1"}
{"id":2,"name":"2"}
{"id":3,"name":"3"}

2.模拟 Flink 任务失败(停止 Flink 任务)

我这里直接通过Web UI Cancel掉任务

3.继续向 MySQL插入数据

idname
1(上次添加)1(上次添加)
2(上次添加)2(上次添加)
3(上次添加)3(上次添加)
4(此次添加)4(此次添加)
5(此次添加)5(此次添加)
6(此次添加)6(此次添加)

4.从检查点重启 Flink 任务,控制台继续观测消费 Kafka 数据

kafka-console-consumer.sh --bootstrap-server kafka_ip:9092 --topic ZGN_CDC_TEST

{"id":1,"name":"1"}
{"id":2,"name":"2"}
{"id":3,"name":"3"}

----------------任务启停的分界线------------------

{"id":4,"name":"4"}
{"id":5,"name":"5"}
{"id":6,"name":"6"}
{"id":4,"name":"4"}    -- 异常: 数据产生了重复消费
{"id":5,"name":"5"}    -- 异常: 数据产生了重复消费
{"id":6,"name":"6"}    -- 异常: 数据产生了重复消费
{"id":4,"name":"4"}    -- 异常: 数据产生了重复消费
{"id":5,"name":"5"}    -- 异常: 数据产生了重复消费
{"id":6,"name":"6"}    -- 异常: 数据产生了重复消费
{"id":4,"name":"4"}    -- 异常: 数据产生了重复消费
{"id":5,"name":"5"}    -- 异常: 数据产生了重复消费
{"id":6,"name":"6"}    -- 异常: 数据产生了重复消费
......

五、错误日志

java.lang.RuntimeException: One or more fetchers have encountered exception
......
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
......
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
......
Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1639381876000, eventType=EXT_WRITE_ROWS, serverId=999, headerLength=19, dataLength=23, nextPosition=356319039, flags=0}
......
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1639381876000, eventType=EXT_WRITE_ROWS, serverId=999, headerLength=19, dataLength=23, nextPosition=356319039, flags=0}
......
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.MissingTableMapEventException: No TableMapEventData has been found for table id:1800034. Usually that means that you have started reading binary log 'within the logical event group' (e.g. from WRITE_ROWS and not proceeding TABLE_MAP
......

四、问题排查

1.从控制台消费 kafka 的数据来看

预取的数据是只消费一次 {"id":4,"name":"4"} {"id":5,"name":"5"} {"id":6,"name":"6"} 数据,但是事实却是一直在重复消费,怀疑重启后的 Flink CDC 程序不能很好的解析存储在 hdfs 中的检查点信息

2.从报错日志来看

主要报的错就是反序列化 MySQL 的 binlog 有问题,很难于上述的猜测达成一致

3.从 Flink CDC 社区查阅了 issue,没找到相类似错误
4.从 Flink CDC 的项目地址,发现在 2.0.1 版本修复了一个问题(第10条)
Improvements and Bug
1.[postgres] Fix Validator didn't implement Serializable
2.[mysql] Correct the initial binlog offset for MySqlHybridSplitAssigner
3.[mysql] Optimize the checkpoint be optional under single parallelism
4.[postgres] Fix postgres-cdc connector cannot recognize the optional option 'slot.name'
5.[mysql] Improve the code format in SignalEventDispatcher
6.[mysql] Add default value for 'database.history.instance.name' in MySqlParallelSource
7.[mysql] Add tests to check mysql-cdc works well under various timezones
8.[common] Remove useless parameter 'converter.schemas.enable'
9.[build] Run integration tests for each building
10.[changelog] fix changelog-deserialize exception message typo
11.[docs] Add FAQ for MySQL 8.0 Public Key Retrieval setting
12.[docs] Update the debezium document link to version 1.5
13.[docs] Add checkpoint and primary key setting for example in tutorials

在 2.0.1 版本修复了日志变更反序列化的异常,刚好能对应的上报错日志的信息,因此,定位到此结束

五、解决方案

将 Flink CDC 版本做一次升级,从 2.0.0 -> 2.0.2





END



3秒学不会Palo Doris的数据导入你打我!


Kafka详解日志结构


详解Java线程池监控,看不懂找我!!



文章转载自数据基石,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论