本文结合官网和网络资料,讲解 Flink 用于访问外部数据存储的异步 I/O API。对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。
Flink异步IO官方文档地址:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/
复制
我程序大体流程是,从socket流接收用户数据user(id,userNum,age三个字段),利用记录数据中的userNum字段,查询Mysql数据库用户姓名字典表userCode中的userName用户名,将查询出的userName字段,回填到user中,最后将回填的数据存到Mysql数据库的userInfo表中。
1.建表语句:
userCode(字典表)和userInfo表(目标表)对应DDL语句:
CREATE TABLE `userInfo` (
`id` int(11) NOT NULL,
`userNum` int(11) DEFAULT NULL,
`userName` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
复制
CREATE TABLE `userCode` (
`id` int(11) NOT NULL,
`name` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
复制
2.pom.xml文件,这里引入了阿里巴巴的druid
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.20</version>
</dependency>
</dependencies>
复制
3.代码整体结构如下:
主函数:FlinkMysqlAsyncIOMain
异步IO处理类,主要用于查询mysql:MysqlAsyncRichMapFunction
Sink端,将结果输出到mysql中:MysqlSinkFunction
数据库工具类:DBUtils
用户实体类:User
package com.hadoop.ljs.flink112.study.asyncIO;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.concurrent.TimeUnit;
/**
* @author: Created By lujisen
* @company China JiNan
* @date: 2021-08-18 14:50
* @version: v1.0
* @description: com.hadoop.ljs.flink112.study.asyncIO
*/
public class FlinkMysqlAsyncIOMain {
public static void main(String[] args) throws Exception {
int maxIORequestCount = 20; /*最大的异步请求数量*/
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> socketDataStream = senv.socketTextStream("192.168.0.111", 9999);
DataStream<User> userDataStream = AsyncDataStream.orderedWait(
socketDataStream,
new MysqlAsyncRichMapFunction(), //自定义的Mysql异步处理类
500000, //异步超时时间
TimeUnit.MILLISECONDS, //时间单位
maxIORequestCount //最大异步并发请求数量
);
userDataStream.print();
userDataStream.addSink(new MysqlSinkFunction<User>());
/* userDataStream.print();*/
senv.execute("FlinkMysqlAsyncIOMain");
}
}
复制
package com.hadoop.ljs.flink112.study.asyncIO;
import com.alibaba.druid.pool.DruidDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.concurrent.*;
import java.util.function.Supplier;
/**
* @author: Created By lujisen
* @company China JiNan
* @date: 2021-08-18 10:25
* @version: v1.0
* @description: com.hadoop.ljs.flink112.study.asyncIO
*/
public class MysqlAsyncRichMapFunction extends RichAsyncFunction<String,User> {
/** 能够利用回调函数并发发送请求的数据库客户端,加上transient,不让其序列化 */
/** 创建线程池、Mysql连接池 */
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void asyncInvoke(String line, ResultFuture<User> resultFuture) throws Exception {
String[] split = line.split(",");
User user = new User();
user.setId(Integer.valueOf(split[0]));
user.setUserNum(Integer.valueOf(split[1]));
user.setAge(Integer.valueOf(split[2]));
Future<String> dbResult = DBUtils.executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
ResultSet resultSet=null;
PreparedStatement statement=null;
String sql = "SELECT id, name FROM userCode WHERE id = ?";
String userName=null;
if(user.getUserNum()==1001){
System.out.println("当前getUserNum:"+user.getUserNum()+"开始睡眠30秒!!!");
Thread.sleep(30000);
}
try {
statement = DBUtils.getConnection().prepareStatement(sql);
statement.setInt(1,user.getUserNum());
resultSet = statement.executeQuery();
while (resultSet.next()) {
userName= resultSet.getString("name");
}
} finally {
if (resultSet != null) {
resultSet.close();
}
if (statement != null) {
statement.close();
}
}
return userName;
}
});
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return dbResult.get();
} catch (InterruptedException | ExecutionException e) {
// 显示地处理异常。
return null;
}
}
}).thenAccept( (String userName) -> {
user.setUserName(userName);
resultFuture.complete(Collections.singleton(user));
});
}
@Override
public void close() throws Exception {
super.close();
}
}
复制
package com.hadoop.ljs.flink112.study.asyncIO;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.PreparedStatement;
/**
* @author: Created By lujisen
* @company China JiNan
* @date: 2021-08-18 15:02
* @version: v1.0
* @description: com.hadoop.ljs.flink112.study.asyncIO
*/
public class MysqlSinkFunction<U> extends RichSinkFunction<User> {
private static final String UPSERT_CASE = "INSERT INTO userInfo(id,userNum,userName,age) VALUES (?, ?,?,?)";
private PreparedStatement statement=null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
statement = DBUtils.getConnection().prepareStatement(UPSERT_CASE);
}
@Override
public void invoke(User user, Context context) throws Exception {
statement.setInt(1,user.getId());
statement.setInt(2, user.userNum);
statement.setString(3, user.getUserName());
statement.setInt(4, user.getAge());
statement.addBatch();
statement.executeBatch();
}
@Override
public void close() throws Exception {
super.close();
if(statement!=null){
statement.close();
}
}
}
复制
package com.hadoop.ljs.flink112.study.asyncIO;
import com.alibaba.druid.pool.DruidDataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import static java.util.concurrent.Executors.newFixedThreadPool;
/**
* @author: Created By lujisen
* @company China JiNan
* @date: 2021-08-18 15:19
* @version: v1.0
* @description: com.hadoop.ljs.flink112.study.asyncIO
*/
public class DBUtils {
private static String jdbcUrl = "jdbc:mysql://192.168.0.111:3306/lujisen?characterEncoding=utf8";
private static String username = "root";
private static String password = "123456a?";
private static String driverName = "com.mysql.jdbc.Driver";
public static Connection connection=null;
public static transient ExecutorService executorService = null;
public static transient DruidDataSource dataSource = null;
/*连接池最大线程数*/
private static int maxPoolConn=20;
/*静态初始化*/
static {
//创建线程池
executorService = newFixedThreadPool(maxPoolConn);
dataSource=new DruidDataSource();
dataSource.setDriverClassName(driverName);
dataSource.setUsername(username);
dataSource.setPassword(password);
dataSource.setUrl(jdbcUrl);
dataSource.setMaxActive(maxPoolConn);
}
public static Connection getConnection() throws SQLException {
if(connection==null){
connection= dataSource.getConnection();
}
return connection;
}
}
复制
package com.hadoop.ljs.flink112.study.asyncIO;/** * @author: Created By lujisen * @company China JiNan * @date: 2021-08-18 10:13 * @version: v1.0 * @description: com.hadoop.ljs.flink112.study.asyncIO */public class User { int id; int userNum; String userName; int age; public User() { } public User(int id, int userNum, String userName, int age) { this.id = id; this.userNum = userNum; this.userName = userName; this.age = age; } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getUserNum() { return userNum; } public void setUserNum(int userNum) { this.userNum = userNum; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "User{" + "id=" + id + ", userNum=" + userNum + ", userName='" + userName + '\'' + ", age=" + age + '}'; }}复制
说明:这里可忽略上面的多表数据,只发送后面的2条数据即可,Main函数中设置Flink异步IO超时时间是50s,而MysqlAsyncRichMapFunction文件asyncInvoke函数处理逻辑,当数据id为1时,进程休眠30秒,第一条数据阻塞执行,收到第二条数据不阻塞,直接处理第二条写入mysql数据库,30秒后,程序继续处理第一条数据,可观察Mysql数据表userInfo,看到第二条数据先入库,第一条数据30秒之后入库。
1,1001,23 第一条数据
2,1002,24 第二条数据
复制
如果觉得我的文章能帮到您,请关注微信公众号“大数据开发运维架构”,并转发朋友圈,谢谢支持!!!