暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink Sql Lookup Join Hbase 表非主键

原创 jj 2022-06-16
1101
  • flink 关联 hbase 表非主键

关联 Hbase 表非主键的功能,是我们一直都在做的事情,只是实现的方式不同。

在 Flink 1.10 版本的时候,SQL 关联 Hbase,都是在 SqlSubmit 程序启动的时候,基于配置文件生成 UDF 并注册成临时函数,直到 Flink 官方的 Hbase connector 支持 Lookup join,使用 lookup join 替换 udf 关联 hbase 表主键的部分。

udf 相对于 connector 还是有比较大的差距,udf 的输入输出都要基于配置文件,并且生成的 udf 只能查询固定的表的指定字段,条件也是提前确定好的,虽然也可以用,但是相对于 connector 灵活性上还是有很大的差距。

特别是这段时间,有个项目需要关联hbase 表的场景多,如果使用 udf 需要在启动的时候生成很多 udf,虽然对任务没什么影响,但是看着就很傻,所以就有了自己实现 hbase 的 lookup join source。

最近花了一些时间,尝试了一下自定义 sql source、mysql Table source 和支持 lookup join 的source,并实现了支持 lookup join hbase 非主键的 table source。

Flink 1.10 hbase udf

Flink 1.10 官方在源码的里面提供了 hbase udf 的样例,我们用这种方法实现流和hbase 表的关联

HBaseConnectorITCase.java


// 配置信息和 hbase 表、列族、列 信息
private static Map<String, String> hbaseTableProperties() {
    Map<String, String> properties = new HashMap<>();
    properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
    properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
    properties.put(CONNECTOR_PROPERTY_VERSION, "1");
    properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
    // get zk quorum from "hbase-site.xml" in classpath
    String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
    properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
    // schema
    String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
    TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
    TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG);
    TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING);
    TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3};

    DescriptorProperties descriptorProperties = new DescriptorProperties(true);
    TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
    descriptorProperties.putTableSchema(SCHEMA, tableSchema);
    descriptorProperties.putProperties(properties);
    return descriptorProperties.asMap();
}

// lateral 方式 join hhbase 表
@Test
public void testHBaseLookupFunction() throws Exception {
    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings);
    StreamITCase.clear();

    // prepare a source table
    DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2);
    Table in = streamTableEnv.fromDataStream(ds, "a, b, c");
    streamTableEnv.registerTable("src", in);

    Map<String, String> tableProperties = hbaseTableProperties();
    TableSource source = TableFactoryService
        .find(HBaseTableFactory.class, tableProperties)
        .createTableSource(tableProperties);

    streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY}));

    // perform a temporal table join query
    String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM src, LATERAL TABLE(hbaseLookup(a))";
    Table result = streamTableEnv.sqlQuery(sqlQuery);

    DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class);
    resultSet.addSink(new StreamITCase.StringSink<>());

    streamEnv.execute();

    List<String> expected = new ArrayList<>();
    expected.add("1,10,Welt-1");
    expected.add("2,20,Welt-2");
    expected.add("3,30,Welt-3");
    expected.add("3,30,Welt-3");

    StreamITCase.compareWithList(expected);
}

复制

Flink lookup join rowkey

join hbase 表主键的样例,之前已经有博客写过,也不再赘述: flink sql join hbase demo

这里简单介绍一下 HBaseRowDataLookupFunction 的实现

还是一样的三件套,通过 java spi api 注册工厂 HBase2DynamicTableFactory,工厂创建 HBaseDynamicTableSource, TableSource 创建 HBaseRowDataLookupFunction(同步,或:HBaseRowDataAsyncLookupFunction 异步)

工厂和 TableSource 没什么好说的,直接看 HBaseRowDataLookupFunction

构造方法中传入需要的参数, open 方法初始化 缓存对象和hbase 连接

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论