抛出疑无路?
【Flink 1.10】- 使用flink-jdbc连接器的方式与MySQL交互,读数据和写数据都能完成,但是在写数据时,发现Flink程序执行完毕之后,才能在MySQL中查询到插入的数据。即,虽然是流计算,但却不能实时的输出计算结果?From @罗鹏程

JDBCAppendTableSink.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost/flink")
.setUsername("root")
.setPassword("123456")
.setParameterTypes(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
.setQuery("insert into batch_size values(?,?)")
.build()
复制
再现又一村!
【Flink-1.10】这个问题是知道一秒钟,不知磨洋工的Case,在初学时候非常容易遇上,那么真的是Flink不能实时写入MySQL吗?当然不是,上面代码基础之上简单的加上一行,就解决问题了:
...
.setBatchSize(1) //将写入MySQL的buffer大小为1。
..
复制
向前一小步...
那么问题虽然解决了,根本原因是个啥呢?也许你看到这里会说,这问题很明显,就是Flink设计JDBC Sink的时候出于性能因素考虑,对写入buffer做了默认值设置,没错,这一点你说的很对,在【Flink-1.10】中JDBC OutputFormat的基类 AbstractJDBCOutputFormat里面和这相关的变量 DEFAULT_FLUSH_MAX_SIZE 默认值是5000,所以在你学习测试时候由于测试数据少(少于5000),数据一直在buffer中,直到数据源数据结束,作业也结束了,才将计算结果刷入MySQL,所以没有实时的(每条)写入MySQL。如下:
但这里还有个因素需要注意,那就是时间因素,上面DEFAULT_FLUSH_INTERVAL_MILLS默认值是0,这个相当于没有时间限制,一直等到buffer满了或者作业结束才能触发写出动作。也就是有些初学者,发现问题,即使故意 debug时候打上断点,不让作业结束,但是等到花儿都谢了,数据也没有写入到MySQL。
在【Flink 1.10】中AbstractJDBCOutputFormat有两个实现类,
分别对应了如下两类Sink:
所以在【Flink 1.10】中不论是 AppendTableSink和UpsertTableSink都会有同样的问题。不过UpsertTableSink时候用户可以设置时间,而AppendTableSink是连时间设置的入口都木有。
Flink 的锅?...
就这个问题而言,我个人认为不是用户的问题,是Flink 1.10代码设计有进一步改进的空间。在Flink 1.11 中社区的确重构了,对JDBCOutputFormat 打了 @Deprecated。感兴趣可以查阅 FLINK-17537 了解变化过程。但是在这个改进中,并没有对 DEFAULT_FLUSH_MAX_SIZE 默认值和 DEFAULT_FLUSH_INTERVAL_MILLS 默认值做变化,社区也在积极的讨论改进方案,想参与社区贡献或者了解最终讨论结果的可以查阅 FLINK-16497。
举一反三
当然在你学习过程中使用任何Sink的时候,只要没有实时写入,都可以找找是否有写出buffer和写出时间的限制设置。在这一点上,罗鹏程 也提到了Elasticsearch也有类似问题,需要调用setBulkFlushMaxActions进行设置。
众人拾柴
期待你典型问题的抛出... 我将知无不言...言无不尽... 我在又一村等你...
关注我的订阅号,我们并肩续航...