暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink CookBook | JDBC Connector解析和优化

data之道 2021-02-16
2710

背景

Flink的Table API & SQL可以访问外部系统,来读取或写入外部批量或流式表,通过Flink支持的连接器,我们就可以用Table API和SQL语句实现外部系统的访问,完成整个流处理Pipeline的实现,而无需进行Stream Environment和Table Environment的切换。这是原生支持的外部系统,清晰描述了connector支持的是有界数据流还是无界数据流、批次插入还是流式插入:

本文聊的是JDBC Connector的一些重点特性以及可以增强的功能点。

分区并发控制

JDBC Connector的scan.partition.参数定义了数据分区方式,即数据以怎样的方式分片,以便被多个任务并发读取。
  • scan.partition.column: 分区字段,必须是日期、时间戳、数字类型.

  • scan.partition.num: 分区总数.

  • scan.partition.lower-bound: 分区字段最小值.

  • scan.partition.upper-bound: 分区字段最大值.

Flink根据配置的column、lower-bound、upper-bound属性值在SQL里加上对应的查询条件:

每个分区查询的数据量大小:((upper-bound - lower-bound) + 1)/num。
Flink JDBC Connector和Sqoop import两者对rdbms分区的方式异曲同工,不同的是FLINK SQL不支持对source、sink设置并发度,而Sqoop的map task数就是数据表的分片数,还有就是Sqoop会向数据库发送一个类似select min(split-column),max(split-column) from ...这样的sql检索分片字段的最大值和最小值。
Flink Sql并发数使用StreamExecutionEnvironment的并发度,为了尽量提高每个并发任务的利用效率,可以把表分区数和environment的并发数设置成一致,否则可能存在task读取多个分片(会串行读取多个分片)或者没有task读取到分片数据的情况。不要将任务的并行度增加到数据库可以承受的程度,比如 将100个并发客户端连接到数据库,可能会增加数据库负载,从而导致性能下降;如果分片字段的实际值在其范围内分布不均匀,也可能导致任务不平衡,即数据倾斜。

写入一致性

JDBC Sink提供了at-least-once保障,但可以通过upsert幂等性实现exactly-once。Flink实现了三种数据库方言:Derby、Mysql、PostgresSQL,Flink DDL里driver是可选参数,会根据url自动选择适合的数据库方言。
在写入数据库时,如果DDL里定义了主键,将会以upsert方式写入,如果数据库支持upsert语法,执行对应的语句,比如INSERT .. ON DUPLICATE KEY UPDATE ..(Mysql)INSERT .. ON CONFLICT .. DO UPDATE SET ..(PostgreSQL),如果数据库不支持upsert,退化成UPDATE+INSERT:根据主键判断数据是否存在,然后执行update或insert;如果DDL里没有定义主键,那么将会以append方式写入(即只会insert)
数据库不支持upsert语法的情况下执行路径:

周期性增量查询

Flink在连接到数据库并查询分片数据时,对resultSet和hasNext变量初始化:
resultSet = statement.executeQuery();hasNext = resultSet.next()。Flink框架判断有没有把所有数据读取完,如果还有数据,就继续从ResultSet获取数据并将读取到的数据emit出来:
while (!this.taskCanceled && !format.reachedEnd()) {
OT returned;
if ((returned = format.nextRecord(reuse)) != null) {
output.collect(returned);
}
}

reachedEnd和nextRecord方法的实现:

public boolean reachedEnd() throws IOException {
//判断还有没有数据
return !hasNext;
}
  
public RowData nextRecord(RowData reuse) throws IOException {
try {
if (!hasNext) {
return null;
}
RowData row = rowConverter.toInternal(resultSet);
//update hasNext after we've read the record
hasNext = resultSet.next();
return row;
} catch (SQLException se) {
throw new IOException("Couldn't read data - " + se.getMessage(), se);
} catch (NullPointerException npe) {
throw new IOException("Couldn't access resultSet", npe);
}
}
可以看出来,Flink JDBC Connector只支持单次批量查询,我们很多场景期望flink能够根据某个字段做增量轮询,那就要在上面这两个方法内做些文章,而且要改写查询sql:对增量字段排序。这样就能够获取到读取到的增量字段最大值,然后将增量字段值保存到状态,下个轮询从状态里拿数据、重新执行查询语句,实现的伪代码如下
public boolean reachedEnd() throws IOException{
if (hasNext) {
return false;
} else {
    //轮询查数据
        if (isPolling()) {
try {
              //轮询间隔
               TimeUnit.MILLISECONDS.sleep(getPollingInterval());
               resultSet.close()
                //从LongMaximum累加器里获取保存的最大值,然后查询数据
queryForPolling(endLocationAccumulator.getLocalValue().toString());
return false;
} catch (SQLException e) {


}
}
return true;
}
}
//根据最大值,重新查询数据
protected void queryForPolling(String startLocation) throws SQLException {
boolean isNumber = StringUtils.isNumeric(startLocation);
    //根据增量字段的类型,转换并设置到PreparedStatement
switch (type) {
case TIMESTAMP:
Timestamp ts = isNumber ? new Timestamp(Long.parseLong(startLocation)) : Timestamp.valueOf(startLocation);
pstmt.setTimestamp(1, ts);
break;
case DATE:
Date date = isNumber ? new Date(Long.parseLong(startLocation)) : Date.valueOf(startLocation);
pstmt.setDate(1, date);
break;
default:
if(isNumber){
pstmt.setLong(1, Long.parseLong(startLocation));
}else {
pstmt.setString(1, startLocation);
}
}
    //执行查询sql
resultSet = ps.executeQuery();
    //移动ResultSet游标
hasNext = resultSet.next();
}


public RowData nextRecord(RowData reuse) throws IOException {
try {
if (!hasNext) {
return null;
}
    RowData row = rowConverter.toInternal(resultSet);
    //获取增量字段的值
    Object obj = resultSet.getObject(incrementColumn);
if(obj != null) {
if((obj instanceof java.util.Date
|| obj.getClass().getSimpleName().toUpperCase().contains(ColumnType.TIMESTAMP.name())) ) {
obj = resultSet.getTimestamp(incrementColumn).getTime();
        }
        //将增量字段值保存到LongMaximum累加器
endLocationAccumulator.add(new BigInteger(String.valueOf(obj)));
}
//update hasNext after we've read the record
hasNext = resultSet.next();
return row;
} catch (SQLException se) {
throw new IOException("Couldn't read data - " + se.getMessage(), se);
} catch (NullPointerException npe) {
throw new IOException("Couldn't access resultSet", npe);
}
}
每个并行实例会创建更新维护自己的累加器,不同并行实例的累加器由系统在job结束后合并。
文章转载自data之道,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论