暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink源码分析 | 读取HBase配置

伦少的博客 2023-12-19
549

1、前言

上面文章中总结了Flink 获取 HBase 配置的逻辑和优先级,但是并没有对源码进行分析,本文主要是补充这一部分的源码分析。

2、版本

  • Flink 1.15.4

  • HBase 2.0.2

3、入口

从我之前写的文章:Flink用户自定义连接器(Table API Connectors)学习总结 中可知其实Flink Table API 读写 HBase 其实和通过自定义实现一个Table API Connectors ('connector' = 'hbase-2.2')差不多,只不过 HBase Connector 是Flink源码自带的,具体的模块为flink-connector-hbase-2.2,相关的类为HBase2DynamicTableFactory
HBaseDynamicTableSource
HBaseDynamicTableSink
,入口为 HBase2DynamicTableFactory
。并且在Hudi Flink SQL源码调试学习(一)我们也总结了从tableEnv.executeSql
FactoryUtil.createDynamicTableSink
这部分的源码分析。而在FactoryUtil.createDynamicTableSink
方法中会根据'connector'='hbase-2.2' 找到factory为org.apache.flink.connector.hbase2.HBase2DynamicTableFactory,接着调用 HBase2DynamicTableFactory.createDynamicTableSink
。所以我们前面的源码逻辑已经分析过了,现在只需要从HBase2DynamicTableFactory
开始进行分析就好了。
读:HBase2DynamicTableFactory
.createDynamicTableSource

写:HBase2DynamicTableFactory
.createDynamicTableSink

4、调试代码

代码地址:https://github.com/dongkelun/flink-learning/tree/master/flink-hbase

代码实现了Flink 本地读写远程服务器上带有 kerberos 认证的 HBase ,方便对Flink源码不熟悉的新手调试代码。代码和之前总结的Hudi Flink SQL代码示例及本地调试差不多,用tableEnv执行对应的Flink SQL即可。不同点是之前的读写hudi的路径是在Windows本地(因为hudi支持),但是本地没有 HBase,需要连接远程服务器上的 HBase 服务,我们的环境开启了kerberos认证,所以如何本地认证kerberos是个问题,因为之前用 Flink 本地认证kerberos的经验不多,所以尝试了一下,并总结了一点经验:

  • Flink 连接 HBase 仅仅有上篇文章 中提到的两个配置项(hbase.security.authentication 和 hbase.regionserver.kerberos.principal)是不够的。

  • 对于在服务器上通过sql-client跑sql来说,以下两个kerberos配置是必要的:security.kerberos.login.keytab 和 security.kerberos.login.principal。对于sql-client而言,如果提交模式是yarn,那么还需要本地通过 kinit
    缓存票据,否则提不到yarn上,如果提交模式是其他,比如提到standalone集群(默认提交方式),是不需要通过 kinit
    缓存票据的,因为standalone集群没有配置kerberos。

  • 而对于通过 bin/flink run
    命令提交jar包的方式也不需要kinit
    缓存票据,因为它会先获取 security.kerberos.login.keytab 和 security.kerberos.login.principal 两个配置项的值先进行kerberos认证。我们在它的提交日志里就可以看出来:


    而sql-client的提交日志里就没有这个信息,说明sql-client 和 bin/flink run
    提交任务时认证kerberos的逻辑是不一致的,具体的原因还要看对应的脚本和源码,本文先不进行研究~

  • 对于本地程序而言,又不一样了。因为我最开始不知道本地程序缺少什么配置(本地相对于服务器上面的配置会缺很多,比如flink-conf.yaml),只能从服务端通过sql-client来验证(比如修改flink-conf.yaml,去掉kerberos缓存),最终验证结论如上述所言。所以在本地程序最开始尝试通过添加
    security.kerberos.login.keytab 和 security.kerberos.login.principal 两个配置项来解决,但是发现没有效果。添加配置项代码如下:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;

Configuration configuration = new Configuration();
configuration.set(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
configuration.set(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, principal);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

通过上面的代码添加配置项发现无效后,最终又尝试了通过UserGroupInformation.loginUserFromKeytab
认证kerberos来解决,发现这样就可以解决问题了。附运行成功图:

  • 对于上篇文章 提到的不清楚为啥不需要配置:hbase.regionserver.keytab.file,好像也有了答案,只是我的一种推测,不一定是对的。就是认证kerberos是在别的地方提前认证的,比如先缓存票据,或者先获取security.kerberos.login.keytab 和 security.kerberos.login.principal 再认证,然后连接HBase时就不需要了,至于为啥需要hbase.regionserver.kerberos.principal,可能就和连接Spark Thrift Server 一样,需要在url里指定 principal=HTTP/indata-192-168-44-128.indata.com@INDATA.COM,但是这个principal和实际认证keytab的principal的值是可以不一样的。

  • 因为我们本篇文章只是想研究获取HBase配置的部分源码逻辑,所以其实也可以不用连通HBbase就行,因为先获取HBase配置,再去连接。所以获取配置在前面,我们调试的话只需要调试前面获取配置的部分。

5、DynamicTableFactory

HBase2DynamicTableFactory

  • 5.1 tableOptions 我们在建表语句中的配置,如:'connector' = 'hbase-2.2',也就是用户自定义参数,优先级最高的配置

  • 5.2 然后 HBaseConnectorOptionsUtil.getHBaseConfiguration(tableOptions) 获取 hbaseConf

  • 5.3 最后将 hbaseConf 传给 HBaseDynamicTableSource 和 HBaseDynamicTableSink

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        TableFactoryHelper helper = createTableFactoryHelper(this, context);
        helper.validateExcept(PROPERTIES_PREFIX);
        // tableOptions 我们在建表语句中的配置,如:'connector' = 'hbase-2.2'
        // 也就是用户自定义参数,优先级最高的配置
        final ReadableConfig tableOptions = helper.getOptions();

        validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

        String tableName = tableOptions.get(TABLE_NAME);
        // 然后 getHBaseConfiguration(tableOptions) 获取 hbaseConf
        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
        HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
        String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
        HBaseTableSchema hbaseSchema =
                HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
        // 最后将 hbaseConf 传给 HBaseDynamicTableSource
        return new HBaseDynamicTableSource(
                hbaseConf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
    }

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        TableFactoryHelper helper = createTableFactoryHelper(this, context);
        helper.validateExcept(PROPERTIES_PREFIX);
        // tableOptions 我们在建表语句中的配置,如:'connector' = 'hbase-2.2'
        // 也就是用户自定义参数,优先级最高的配置
        final ReadableConfig tableOptions = helper.getOptions();

        validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

        String tableName = tableOptions.get(TABLE_NAME);
        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
        // 然后 getHBaseConfiguration(tableOptions) 获取 hbaseConf
        HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions);
        String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
        HBaseTableSchema hbaseSchema =
                HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
        // 最后将 hbaseConf 传给 HBaseDynamicTableSink
        return new HBaseDynamicTableSink(
                tableName, hbaseSchema, hbaseConf, hBaseWriteOptions, nullStringLiteral);
    }

6、ConnectorOptionsUtil

HBaseConnectorOptionsUtil.getHBaseConfiguration

  • 6.1 首先通过 HBaseConfigurationUtil.getHBaseConfiguration() 获取 hbaseClientConf ,这个方法里就包含了 classpath 和 环境变量两个优先级的获取

  • 6.2 最后根据用户自定义参数更新 hbaseClientConf 并返回。从这里就可以看出用户自定义参数是要比 classpath 和 环境变量 优先级高的。

    public static Configuration getHBaseConfiguration(ReadableConfig tableOptions) {
        // create default configuration from current runtime env (`hbase-site.xml` in classpath)
        // first,
        // 首先通过 HBaseConfigurationUtil.getHBaseConfiguration() 获取 hbaseClientConf
        // 这个方法里就包含了 classpath 和 环境变量两个优先级的获取
        Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
        // 最后根据用户自定义参数更新 hbaseClientConf 并返回
        hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, tableOptions.get(ZOOKEEPER_QUORUM));
        hbaseClientConf.set(
                HConstants.ZOOKEEPER_ZNODE_PARENT, tableOptions.get(ZOOKEEPER_ZNODE_PARENT));
        // add HBase properties
        final Properties properties =
                getHBaseClientProperties(
                        ((org.apache.flink.configuration.Configuration) tableOptions).toMap());
        properties.forEach((k, v) -> hbaseClientConf.set(k.toString(), v.toString()));
        return hbaseClientConf;
    }

7、HBaseConfigurationUtil

HBaseConfigurationUtil.getHBaseConfiguration()

  • 7.1 HBaseConfiguration.create() 方法会获取 classpath 中的 hbase-site.xml 和 hbase-default.xml,hbase-site.xml 优先级要高于 hbase-default.xml

  • 7.2 先判断有没有HBASE_HOME环境变量,如果有则读取 hbase-site.xml 和 hbase-default.xml,hbase-site.xml 优先级要高于 hbase-default.xml (具体可以看addHBaseConfIfFound方法)

  • 7.3 然后判断有没有HBASE_CONF_DIR环境变量,如果有则读取 hbase-site.xml 和 hbase-default.xml,hbase-site.xml 优先级要高于 hbase-default.xml (具体可以看addHBaseConfIfFound方法)
    后面的配置会覆盖前面的配置,所以从优先级上来看 HBASE_CONF_DIR环境变量 > HBASE_HOME环境变量 >HBaseConfiguration.create() (classpath)

备注:addResource方法会读取新的配置覆盖旧的配置,也就是会更新配置,所以最后读取的配置优先级最高

    public static Configuration getHBaseConfiguration() {

        // Instantiate an HBaseConfiguration to load the hbase-default.xml and hbase-site.xml from
        // the classpath.
        // 获取 classpath 中的 hbase-site.xml 和 hbase-default.xml
        // hbase-site.xml 优先级要高于 hbase-default.xml
        Configuration result = HBaseConfiguration.create();
        boolean foundHBaseConfiguration = false;

        // We need to load both hbase-default.xml and hbase-site.xml to the hbase configuration
        // The properties of a newly added resource will override the ones in previous resources, so
        // a configuration
        // file with higher priority should be added later.

        // Approach 1: HBASE_HOME environment variables
        String possibleHBaseConfPath = null;

        final String hbaseHome = System.getenv("HBASE_HOME");
        // 先判断有没有HBASE_HOME环境变量
        if (hbaseHome != null) {
            LOG.debug("Searching HBase configuration files in HBASE_HOME: {}", hbaseHome);
            possibleHBaseConfPath = hbaseHome + "/conf";
        }

        // 如果有则读取 hbase-site.xml 和 hbase-default.xml
        if (possibleHBaseConfPath != null) {
            foundHBaseConfiguration = addHBaseConfIfFound(result, possibleHBaseConfPath);
        }

        // Approach 2: HBASE_CONF_DIR environment variable
        String hbaseConfDir = System.getenv("HBASE_CONF_DIR");
        // 然后判断有没有HBASE_CONF_DIR环境变量
        // 如果有则读取 hbase-site.xml 和 hbase-default.xml
        if (hbaseConfDir != null) {
            LOG.debug("Searching HBase configuration files in HBASE_CONF_DIR: {}", hbaseConfDir);
            foundHBaseConfiguration =
                    addHBaseConfIfFound(result, hbaseConfDir) || foundHBaseConfiguration;
        }

        if (!foundHBaseConfiguration) {
            LOG.warn(
                    "Could not find HBase configuration via any of the supported methods "
                            + "(Flink configuration, environment variables).");
        }

        return result;
    }

HBaseConfiguration.create()

    public static Configuration create() {
        Configuration conf = new Configuration();
        conf.setClassLoader(HBaseConfiguration.class.getClassLoader());
        return addHbaseResources(conf);
    }

    public static Configuration addHbaseResources(Configuration conf) {
        // classpath 中的 hbase-site.xml 
        conf.addResource("hbase-default.xml");
        // classpath 中的 hbase-default.xml
        conf.addResource("hbase-site.xml");
        checkDefaultsVersion(conf);
        HeapMemorySizeUtil.checkForClusterFreeMemoryLimit(conf);
        return conf;
    }    

8、HBaseDynamicTableSink

对于 5.3 提到的 HBaseDynamicTableSource 和 HBaseDynamicTableSink ,我们先只分析 HBaseDynamicTableSink 。我们在 Hudi Flink SQL源码调试学习(一) 有总结过从 tableEnv.executeSql
到 Hudi 的 getSinkRuntimeProvider
的源码分析,类似的HBase也是一样的也会走到 (CommonExecSink)createSinkTransformation
继而调用 HBaseDynamicTableSink
.getSinkRuntimeProvider

    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        HBaseSinkFunction<RowData> sinkFunction =
                new HBaseSinkFunction<>(
                        tableName,
                        hbaseConf,
                        new RowDataToMutationConverter(hbaseTableSchema, nullStringLiteral),
                        writeOptions.getBufferFlushMaxSizeInBytes(),
                        writeOptions.getBufferFlushMaxRows(),
                        writeOptions.getBufferFlushIntervalMillis());
        return SinkFunctionProvider.of(sinkFunction, writeOptions.getParallelism());
    }

8.1 HBaseSinkFunction

  • CommonExecSink
    .createSinkTransformation
    方法中拿到 runtimeProvider
    之后会调用 applySinkProvider
    ,从上面代码可知,这里的 runtimeProvider 是 SinkFunctionProvider,所以会先调用 runtimeProvider.createSinkFunction

    private Transformation<?> applySinkProvider(
            Transformation<RowData> inputTransform,
            StreamExecutionEnvironment env,
            SinkRuntimeProvider runtimeProvider,
            int rowtimeFieldIndex,
            int sinkParallelism,
            ExecNodeConfig config) {
        TransformationMetadata sinkMeta = createTransformationMeta(SINK_TRANSFORMATION, config);
        if (runtimeProvider instanceof DataStreamSinkProvider) {
            .....
            return provider.consumeDataStream(createProviderContext(config), dataStream)
                    .getTransformation();
        } else if (runtimeProvider instanceof TransformationSinkProvider) {
            .....
        } else if (runtimeProvider instanceof SinkFunctionProvider) {
            // 走到这里
            // 先调用 runtimeProvider.createSinkFunction
            // `createSinkFunction` 返回 `HBaseSinkFunction`
            final SinkFunction<RowData> sinkFunction =
                    ((SinkFunctionProvider) runtimeProvider).createSinkFunction();
            // 接着调用 `createSinkFunctionTransformation`
            return createSinkFunctionTransformation(
                    sinkFunction,
                    env,
                    inputTransform,
                    rowtimeFieldIndex,
                    sinkMeta,
                    sinkParallelism);
        } else if (runtimeProvider instanceof OutputFormatProvider) {
            .....
        } else if (runtimeProvider instanceof SinkProvider) {
           .....
        } else if (runtimeProvider instanceof SinkV2Provider) {
            .....
        } else {
            throw new TableException("Unsupported sink runtime provider.");
        }
    }

  • createSinkFunction
    返回 HBaseSinkFunction

public SinkFunction<RowData> createSinkFunction() {
    return sinkFunction;
}

  • 接着调用 createSinkFunctionTransformation
    , 在 createSinkFunctionTransformation
    会创建 Operator
    Transformation
    ,关于  Operator
    Transformation
    可以参考 Hudi Flink源码总结(二)-Transformation/Operator总结,关于 Function
    是如何运行的,会在后面的文章继续总结,本文先不研究~,我们知道后面会运行  Function
    open
    方法就好了

    private Transformation<?> createSinkFunctionTransformation(
            SinkFunction<RowData> sinkFunction,
            StreamExecutionEnvironment env,
            Transformation<RowData> inputTransformation,
            int rowtimeFieldIndex,
            TransformationMetadata transformationMetadata,
            int sinkParallelism) {
        // 创建 `Operator` 
        final SinkOperator operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex);

        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction)
                    .setInputType(getInputTypeInfo(), env.getConfig());
        }

        // 创建 `Transformation`
        final Transformation<?> transformation =
                new LegacySinkTransformation<>(
                        inputTransformation,
                        transformationMetadata.getName(),
                        SimpleOperatorFactory.of(operator),
                        sinkParallelism);
        transformationMetadata.fill(transformation);
        return transformation;
    }

  • HBaseSinkFunction
    .open

 public void open(Configuration parameters) throws Exception {
        LOG.info("start open ...");
        // 调用 prepareRuntimeConfiguration 获取 配置
        org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
        try {
            this.mutationConverter.open();
            this.numPendingRequests = new AtomicLong(0);

            if (null == connection) {
                // 根据 config 连接 HBase
                // 所以 config 就是最终配置
                this.connection = ConnectionFactory.createConnection(config);
            }
            ......
        LOG.info("end open.");
    }

HBaseSinkFunction
.open
会调用 prepareRuntimeConfiguration 方法返回 config,然后根据 config 连接 HBase,所以 config 就是最终配置了

  • HBaseSinkFunction
    .prepareRuntimeConfiguration

    这里一共有两个配置 serializedConfig 和 HBaseConfigurationUtil.getHBaseConfiguration()。这里的 serializedConfig 的优先级要比 HBaseConfigurationUtil.getHBaseConfiguration() 高。我们在 7 中分析了 HBaseConfigurationUtil.getHBaseConfiguration()的逻辑,所以就不用再分析了,也就是它包含 classpath和环境变量两个优先级别的配置信息。serializedConfig。而 serializedConfig 是在 HBase2DynamicTableFactory中通过 HBaseConnectorOptionsUtil.getHBaseConfiguration 获取的,我们在6中分析了,它包含了classpath、环境变量、用户自定义三个优先级的配置信息。所以写 HBase 时连接 HBase 所使用的的配置 就是在6中分析的用 HBaseConnectorOptionsUtil.getHBaseConfiguration 获取的配置,和上篇文章分析的逻辑一致。而后面的 prepareRuntimeConfiguration 并没有改变任何配置信息。

    private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
        // create default configuration from current runtime env (`hbase-site.xml` in classpath)
        // first,
        // and overwrite configuration using serialized configuration from client-side env
        // (`hbase-site.xml` in classpath).
        // user params from client-side have the highest priority
        // 这里一共有两个配置 serializedConfig 和 HBaseConfigurationUtil.getHBaseConfiguration()
        // serializedConfig 的优先级要比 HBaseConfigurationUtil.getHBaseConfiguration() 高
        org.apache.hadoop.conf.Configuration runtimeConfig =
                HBaseConfigurationUtil.deserializeConfiguration(
                        serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());

        // do validation: check key option(s) in final runtime configuration
        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
            LOG.error(
                    "Can not connect to HBase without {} configuration",
                    HConstants.ZOOKEEPER_QUORUM);
            throw new IOException(
                    "Check HBase configuration failed, lost: '"
                            + HConstants.ZOOKEEPER_QUORUM
                            + "'!");
        }

        return runtimeConfig;
    }

9、DynamicTableSource

HBaseDynamicTableSource

最后总结一下 5.3 提到的 HBaseDynamicTableSource,一开始以为 source 和 sink 是类似的逻辑:getLookupRuntimeProvider
-> HBaseRowDataLookupFunction
-> open
-> prepareRuntimeConfiguration
, 但是调试是发现并不一样。它的调用逻辑为:
AbstractHBaseDynamicTableSource
(HBaseDynamicTableSource的父类).getScanRuntimeProvider
-> HBaseDynamicTableSource
.getInputFormat
(返回 HBaseRowDataInputFormat) -> AbstractTableInputFormat
.createInputSplits
(HBaseRowDataInputFormat的父类) -> HBaseRowDataInputFormat
.initTable
-> connectToTable

在 connectToTable 通过 getHadoopConfiguration 方法获取配置然后连接 HBase ,这里 getHadoopConfiguration 的逻辑和 HBaseSinkFunction
.prepareRuntimeConfiguration
是一样的。

    private void connectToTable() throws IOException {
        try {
            if (connection == null) {
                connection = ConnectionFactory.createConnection(getHadoopConfiguration());
            }
            TableName name = TableName.valueOf(getTableName());
            table = connection.getTable(name);
            regionLocator = connection.getRegionLocator(name);
        } catch (TableNotFoundException tnfe) {
            LOG.error("The table " + tableName + " not found ", tnfe);
            throw new RuntimeException("HBase table '" + tableName + "' not found.", tnfe);
        }
    }

    protected org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
        return HBaseConfigurationUtil.deserializeConfiguration(
                serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
    }

分析一下为啥会调用 getScanRuntimeProvider
而不是 getLookupRuntimeProvider
,因为 AbstractHBaseDynamicTableSource 同时实现了 ScanTableSource 和 LookupTableSource,ScanTableSource 对应 getScanRuntimeProvider
,LookupTableSource对应getLookupRuntimeProvider
,可能是因为先实现了 ScanTableSource  ,所以会调用 getScanRuntimeProvider
吧(不确定原因,以后再进行研究)。(我们在 Flink用户自定义连接器(Table API Connectors)学习总结 中也是实现了 ScanTableSource ,查询时会调用 getScanRuntimeProvider
~)

public abstract class AbstractHBaseDynamicTableSource
        implements ScanTableSourceLookupTableSourceSupportsProjectionPushDown 
{

10、修改源码添加参数

我们在上篇文章讲到了修改源码,添加参数支持通过参数配置自定义 hbase-site.xml,就是在 HBaseConnectorOptionsUtil
.getHBaseConfiguration
添加了如下代码:

        hbaseClientConf.addResource(
                new org.apache.hadoop.fs.Path(
                        tableOptions.get(HBASE_CONF_DIR) + "/hbase-site.xml"));

优先级:如果放在最后那么优先级最高,比通过properties.*
等自定义参数(tableOptions
)要高,如果想比tableOptions
参数优先级低,则可以放在前面。代码:https://github.com/dongkelun/flink/commit/b9db276cc5eb7c68aba029efbac62c7fb9cc46d8

11、相关阅读


🧐 分享、点赞、在看,给个3连击👇

文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论