- flink 关联 hbase 表非主键
关联 Hbase 表非主键的功能,是我们一直都在做的事情,只是实现的方式不同。
在 Flink 1.10 版本的时候,SQL 关联 Hbase,都是在 SqlSubmit 程序启动的时候,基于配置文件生成 UDF 并注册成临时函数,直到 Flink 官方的 Hbase connector 支持 Lookup join,使用 lookup join 替换 udf 关联 hbase 表主键的部分。
udf 相对于 connector 还是有比较大的差距,udf 的输入输出都要基于配置文件,并且生成的 udf 只能查询固定的表的指定字段,条件也是提前确定好的,虽然也可以用,但是相对于 connector 灵活性上还是有很大的差距。
特别是这段时间,有个项目需要关联hbase 表的场景多,如果使用 udf 需要在启动的时候生成很多 udf,虽然对任务没什么影响,但是看着就很傻,所以就有了自己实现 hbase 的 lookup join source。
最近花了一些时间,尝试了一下自定义 sql source、mysql Table source 和支持 lookup join 的source,并实现了支持 lookup join hbase 非主键的 table source。
Flink 1.10 hbase udf
Flink 1.10 官方在源码的里面提供了 hbase udf 的样例,我们用这种方法实现流和hbase 表的关联
HBaseConnectorITCase.java
// 配置信息和 hbase 表、列族、列 信息 private static Map<String, String> hbaseTableProperties() { Map<String, String> properties = new HashMap<>(); properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE); properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143); properties.put(CONNECTOR_PROPERTY_VERSION, "1"); properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1); // get zk quorum from "hbase-site.xml" in classpath String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM); properties.put(CONNECTOR_ZK_QUORUM, hbaseZk); // schema String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3}; TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT); TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{F2COL1, F2COL2}, Types.STRING, Types.LONG); TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{F3COL1, F3COL2, F3COL3}, Types.DOUBLE, Types.BOOLEAN, Types.STRING); TypeInformation[] columnTypes = new TypeInformation[]{f1, Types.INT, f2, f3}; DescriptorProperties descriptorProperties = new DescriptorProperties(true); TableSchema tableSchema = new TableSchema(columnNames, columnTypes); descriptorProperties.putTableSchema(SCHEMA, tableSchema); descriptorProperties.putProperties(properties); return descriptorProperties.asMap(); } // lateral 方式 join hhbase 表 @Test public void testHBaseLookupFunction() throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv, streamSettings); StreamITCase.clear(); // prepare a source table DataStream<Row> ds = streamEnv.fromCollection(testData2).returns(testTypeInfo2); Table in = streamTableEnv.fromDataStream(ds, "a, b, c"); streamTableEnv.registerTable("src", in); Map<String, String> tableProperties = hbaseTableProperties(); TableSource source = TableFactoryService .find(HBaseTableFactory.class, tableProperties) .createTableSource(tableProperties); streamTableEnv.registerFunction("hbaseLookup", ((HBaseTableSource) source).getLookupFunction(new String[]{ROWKEY})); // perform a temporal table join query String sqlQuery = "SELECT a,family1.col1, family3.col3 FROM src, LATERAL TABLE(hbaseLookup(a))"; Table result = streamTableEnv.sqlQuery(sqlQuery); DataStream<Row> resultSet = streamTableEnv.toAppendStream(result, Row.class); resultSet.addSink(new StreamITCase.StringSink<>()); streamEnv.execute(); List<String> expected = new ArrayList<>(); expected.add("1,10,Welt-1"); expected.add("2,20,Welt-2"); expected.add("3,30,Welt-3"); expected.add("3,30,Welt-3"); StreamITCase.compareWithList(expected); }
复制
Flink lookup join rowkey
join hbase 表主键的样例,之前已经有博客写过,也不再赘述: flink sql join hbase demo
这里简单介绍一下 HBaseRowDataLookupFunction 的实现
还是一样的三件套,通过 java spi api 注册工厂 HBase2DynamicTableFactory,工厂创建 HBaseDynamicTableSource, TableSource 创建 HBaseRowDataLookupFunction(同步,或:HBaseRowDataAsyncLookupFunction 异步)
工厂和 TableSource 没什么好说的,直接看 HBaseRowDataLookupFunction
构造方法中传入需要的参数, open 方法初始化 缓存对象和hbase 连接