错过直播的朋友可以钉钉扫描文末的二维码,加入钉钉群回看直播,或者在b站搜索“袋鼠云”观看视频。b站视频网址:
如果需要课件资料,请加技术交流群:30537511 获取
何为LookupTable
LookupTable是Flink SQL读取数据的一种方式。一般用来做实时数仓维表,与实时的数据进行JOIN。如下图Kafka数据进入Flink后查询MongoDB维表并将拼接(JOIN)后的结果写入MySQL。MongoDB在这次计算任务中的抽象就是LookupTable。

具体使用SQL样例:

1、LookupTable的种类
同步和异步分别对应代码中的TableFunction类和AsyncTableFunction类。
本文将以AsyncTableFunction举例讲解。

为何使用LookupTable
LookupTable是实时计算基础功能之一。
例如:张学友演唱会抓逃犯

或者:寻找被拐儿童

如何实现LookupTable
1、Flink提交任务流程

因为以上原因,所以在实现LookupTable之前,才先让大家熟悉下上面的Flink提交任务结构图。
2、实现LookupTable步骤
解析SQL
读取数据
转换数据
1)第一步:根据Flink给予标准接口和工具类,解析下图的SQL语句。

2)第一步启动时SQL解析流程图

3)第二步、第三步示意图

具体代码讲解
项目地址:https://github.com/DTStack/flinkx
以下的介绍可以参考FlinkX中Connector源码,边看文章边对照源码。多调试几次即可熟悉。
1、解析SQL
DynamicTableSourceFactory 接口作用是解析建表DDL。
LookupTableSource接口实现对 JOIN中等式信息的解析。
createDynamicTableSource():实现具体的解析、校验DDL中参数的逻辑。
factoryIdentifier():设置DDL中'connector'选项,用来不同connector,所以不要和其他connector重名。
requiredOptions():设置DDL中必选参数
optionalOptions():设置DDL中可选参数
getLookupRuntimeProvider():获取JOIN等式中的字段名,并通过LookupRuntimeProvider初始化一个AsyncTableFunction类。代码编写到LookupRuntimeProvider,就说明所有在Client端执行的启动时逻辑已经编写完成。
2、读取数据
AsyncTableFunction<RowData>
open():开启数据库连接
eval():根据传入的Key,异步查询远端数据库。
close():关闭数据库连接
3、转换数据
这个官方没有标准接口我们可以参考两个类
HBase Connector中的org.apache.flink.connector.hbase.util.HBaseSerde
JDBC Connector中的org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter
技术要点总结
2、根据Flink SQL中官方接口,去解析并获取所有DDL中配置,以及DML中JOIN信息。
3、解析好所有SQL信息后,通过实现AsyncTableFunction和自定义的RowDataConverter,来把数据库中的数据转化成Flink的RowData类型发送给下游即可。
本文作者
无刃
数栈大数据引擎工程师
FlinkX-Oracle Logminer模块介绍丨直播回顾
Flink提交流程&如何debug和跟踪流程(on yarn)丨直播回顾
Flink jm、tm启动过程和资源分配丨直播回顾

更多技术交流方式

想面对面的进行技术交流吗?想及时参与直播活动吗?可扫码加入钉钉群“袋鼠云开源框架技术交流群”(群号:30537511)


Gitee开源项目地址:

点击“阅读原文”,一键到达FlinkX开源项目!





