暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何自定义Flink LookupTable 丨直播回顾

数栈研习社 2021-07-02
1575
6月23日,袋鼠云数栈技术研发团队工程师无刃为大家直播分享《如何自定义Flink LookupTable》。

错过直播的朋友可以钉钉扫描文末的二维码,加入钉钉群回看直播,或者在b站搜索“袋鼠云”观看视频。b站视频网址:
https://www.bilibili.com/video/BV1864y1t7Kv

如果需要课件资料,请加技术交流群:
30537511 获取

下面带大家来回顾下本次直播的内容,本次直播无刃大佬主要从以下三部分讲解Flink LookupTable的实现。
1、何为(What)
2、为何(Why)
3、如何(How)
研究任何技术都依次从这三个角度分析:何为、为何、如何。

何为LookupTable


LookupTable是Flink SQL读取数据的一种方式。一般用来做实时数仓维表,与实时的数据进行JOIN。如下图Kafka数据进入Flink后查询MongoDB维表并将拼接(JOIN)后的结果写入MySQL。MongoDB在这次计算任务中的抽象就是LookupTable。



具体使用SQL样例:



1、LookupTable的种类

一共有两种
1)同步LookupTable,串行逐条查询数据。
2)异步LookupTable,并发查询数据。可以看下图,一段时间内因为采用了异步,吞吐量提升了。

同步和异步分别对应代码中的TableFunction类和AsyncTableFunction类。

本文将以AsyncTableFunction举例讲解。



为何使用LookupTable


LookupTable是实时计算基础功能之一。


例如:张学友演唱会抓逃犯

可以将参加演唱会人员的数据实时发送给Flink,与数据库里的逃犯数据进行对比。



或者:寻找被拐儿童

可以将城市各处摄像头人脸数据实时发送给Flink,模型识别此人为被拐儿童,查询数据库中所属地区警方数据,自动向警方报警。



如何实现LookupTable


1、Flink提交任务流程

在编写Flink 自定义的Connector时一定要清楚Flink任务提交的流程。
提交结构图

在编写自己的LookupTable时,一定要清楚哪些代码是在Client端执行,也就是图中最左边节点。哪些是在最右端的TaskManager Slot节点执行。如果不清晰很容易写出Bug。哪些方法是在Slot中执行,可以通过看Java文档注释,或者远程Debug TaskMananger分析出来。当然还有些特殊的方法可能在JobManager节点执行。

因为以上原因,所以在实现LookupTable之前,才先让大家熟悉下上面的Flink提交任务结构图。

2、实现LookupTable步骤

就如同把大象放冰箱中一样,只需三步。
  • 解析SQL

  • 读取数据

  • 转换数据


    1)第一步:根据Flink给予标准接口和工具类,解析下图的SQL语句。


    2)第一步启动时SQL解析流程图

    后续只需要实现下图中紫色块和蓝色块中的接口,即可完成第一步解析SQL。


    3)第二步、第三步示意图

    这一步我们只需要根据数据源类库、Table API、DataStreamAPI去实现即可。最终目的是把数据库中的数据转换成Flink的RowData类型。

    具体代码讲解


    具体代码可以参考FlinkX中各个Connector的实现。

    项目地址:https://github.com/DTStack/flinkx

    以下的介绍可以参考FlinkX中Connector源码,边看文章边对照源码。多调试几次即可熟悉。


    1、解析SQL

    1)主要两个接口DynamicTableSourceFactory和LookupTableSource。
    • DynamicTableSourceFactory 接口作用是解析建表DDL。

    • LookupTableSource接口实现对 JOIN中等式信息的解析。


    2)DynamicTableSourceFactory接口方法介绍
    • createDynamicTableSource():实现具体的解析、校验DDL中参数的逻辑。

    • factoryIdentifier():设置DDL中'connector'选项,用来不同connector,所以不要和其他connector重名。

    • requiredOptions():设置DDL中必选参数

    • optionalOptions():设置DDL中可选参数


    3)LookupTableSource接口方法介绍
    • getLookupRuntimeProvider():获取JOIN等式中的字段名,并通过LookupRuntimeProvider初始化一个AsyncTableFunction类。代码编写到LookupRuntimeProvider,就说明所有在Client端执行的启动时逻辑已经编写完成。


    2、读取数据

    • AsyncTableFunction<RowData>

    • open():开启数据库连接

    • eval():根据传入的Key,异步查询远端数据库。

    • close():关闭数据库连接


    3、转换数据

    RowDataConverter:这个类的目的是将数 据源类库返回的类型如Mongodb的Document类型或JDBC的ResultSet类型转换成Flink SQL标准的RowData类型。从数据层面与Flink SQL Runtime对接。

    这个官方没有标准接口我们可以参考两个类
    • HBase Connector中的org.apache.flink.connector.hbase.util.HBaseSerde

    • JDBC Connector中的org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter

    模仿其实现即可。

    技术要点总结


    1、理清Connector代码中那些是启动时执行,那些是运行时执行。换句话说,那些在Client端执行,那些在TaskManager的Slot端执行。

    2、根据Flink SQL中官方接口,去解析并获取所有DDL中配置,以及DML中JOIN信息。

    3、解析好所有SQL信息后,通过实现AsyncTableFunction和自定义的RowDataConverter,来把数据库中的数据转化成Flink的RowData类型发送给下游即可。

    本文作者

    无刃

    数栈大数据引擎工程师



    FlinkX-Oracle Logminer模块介绍丨直播回顾

    Flink提交流程&如何debug和跟踪流程(on yarn)丨直播回顾

    Flink jm、tm启动过程和资源分配丨直播回顾


    更多技术交流方式


    想面对面的进行技术交流吗?想及时参与直播活动吗?可扫码加入钉钉群“袋鼠云开源框架技术交流群”(群号:30537511)


    想体验更多的数栈开源项目吗?可以在Github社区或Gitee社区搜索“FlinkX”开源项目

    Github开源项目地址:
    https://github.com/DTStack/flinkx


    Gitee开源项目地址:

    https://gitee.com/dtstack_dev_0/flinkx


    点击阅读原文,一键到达FlinkX开源项目!

    文章转载自数栈研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

    评论