暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

实战:Flink1.12异步IO访问外部数据-Mysql

大数据开发运维架构 2021-08-18
1675

    本文结合官网和网络资料,讲解 Flink 用于访问外部数据存储的异步 I/O API。对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。

Flink异步IO官方文档地址:

    https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/
    复制

        我程序大体流程是,从socket流接收用户数据user(id,userNum,age三个字段),利用记录数据中的userNum字段,查询Mysql数据库用户姓名字典表userCode中的userName用户名,将查询出的userName字段,回填到user中,最后将回填的数据存到Mysql数据库的userInfo表中。

    1.建表语句:

        userCode(字典表)和userInfo表(目标表)对应DDL语句:

      CREATE TABLE `userInfo` (
      `id` int(11) NOT NULL,
      `userNum` int(11) DEFAULT NULL,
      `userName` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
      `age` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
      复制
        CREATE TABLE `userCode` (
        `id` int(11) NOT NULL,
        `name` varchar(50) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
        PRIMARY KEY (`id`)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
        复制

        2.pom.xml文件,这里引入了阿里巴巴的druid

          <dependencies>
          <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          </dependency>
          <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>${flink.version}</version>
          </dependency>
          <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          </dependency>
          <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          </dependency>
          <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.44</version>
          </dependency>
          <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>druid</artifactId>
          <version>1.1.20</version>
          </dependency>
          </dependencies>
          复制

          3.代码整体结构如下:

              主函数:FlinkMysqlAsyncIOMain

              异步IO处理类,主要用于查询mysql:MysqlAsyncRichMapFunction

              Sink端,将结果输出到mysql中:MysqlSinkFunction

              数据库工具类:DBUtils

              用户实体类:User

            package com.hadoop.ljs.flink112.study.asyncIO;

            import org.apache.flink.streaming.api.datastream.AsyncDataStream;
            import org.apache.flink.streaming.api.datastream.DataStream;
            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            import java.util.concurrent.TimeUnit;
            /**
            * @author: Created By lujisen
            * @company China JiNan
            * @date: 2021-08-18 14:50
            * @version: v1.0
            * @description: com.hadoop.ljs.flink112.study.asyncIO
            */
            public class FlinkMysqlAsyncIOMain {
            public static void main(String[] args) throws Exception {

            int maxIORequestCount = 20; /*最大的异步请求数量*/
            StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

            DataStream<String> socketDataStream = senv.socketTextStream("192.168.0.111", 9999);

            DataStream<User> userDataStream = AsyncDataStream.orderedWait(
            socketDataStream,
            new MysqlAsyncRichMapFunction(), //自定义的Mysql异步处理类
            500000, //异步超时时间
            TimeUnit.MILLISECONDS, //时间单位
            maxIORequestCount //最大异步并发请求数量
            );
            userDataStream.print();
            userDataStream.addSink(new MysqlSinkFunction<User>());
            /* userDataStream.print();*/
            senv.execute("FlinkMysqlAsyncIOMain");
            }
            }
            复制
              package com.hadoop.ljs.flink112.study.asyncIO;
              import com.alibaba.druid.pool.DruidDataSource;
              import java.sql.Connection;
              import java.sql.PreparedStatement;
              import org.apache.flink.api.java.tuple.Tuple2;
              import org.apache.flink.configuration.Configuration;
              import org.apache.flink.streaming.api.functions.async.ResultFuture;
              import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
              import java.sql.ResultSet;
              import java.sql.SQLException;
              import java.util.Collections;
              import java.util.concurrent.*;
              import java.util.function.Supplier;
              /**
              * @author: Created By lujisen
              * @company China JiNan
              * @date: 2021-08-18 10:25
              * @version: v1.0
              * @description: com.hadoop.ljs.flink112.study.asyncIO
              */
              public class MysqlAsyncRichMapFunction extends RichAsyncFunction<String,User{
                  /** 能够利用回调函数并发发送请求的数据库客户端,加上transient,不让其序列化 */
              /** 创建线程池、Mysql连接池 */
              @Override
              public void open(Configuration parameters) throws Exception {
                     super.open(parameters);
              }

              @Override
              public void asyncInvoke(String line, ResultFuture<User> resultFuture) throws Exception {
              String[] split = line.split(",");
              User user = new User();
              user.setId(Integer.valueOf(split[0]));
              user.setUserNum(Integer.valueOf(split[1]));
              user.setAge(Integer.valueOf(split[2]));

                      Future<String> dbResult = DBUtils.executorService.submit(new Callable<String>() {
              @Override
              public String call() throws Exception {
              ResultSet resultSet=null;
              PreparedStatement statement=null;
              String sql = "SELECT id, name FROM userCode WHERE id = ?";
              String userName=null;
              if(user.getUserNum()==1001){
              System.out.println("当前getUserNum:"+user.getUserNum()+"开始睡眠30秒!!!");
              Thread.sleep(30000);
              }

              try {
              statement = DBUtils.getConnection().prepareStatement(sql);
              statement.setInt(1,user.getUserNum());

              resultSet = statement.executeQuery();
              while (resultSet.next()) {
              userName= resultSet.getString("name");
              }
              } finally {
              if (resultSet != null) {
              resultSet.close();
              }
              if (statement != null) {
              statement.close();
              }
              }
              return userName;
              }
                      });
              CompletableFuture.supplyAsync(new Supplier<String>() {
              @Override
              public String get() {
              try {
              return dbResult.get();
              } catch (InterruptedException | ExecutionException e) {
              // 显示地处理异常。
              return null;
              }
              }
              }).thenAccept( (String userName) -> {
              user.setUserName(userName);
              resultFuture.complete(Collections.singleton(user));
              });
              }

              @Override
              public void close() throws Exception {
              super.close();
              }
              }
              复制
                package com.hadoop.ljs.flink112.study.asyncIO;
                import org.apache.flink.configuration.Configuration;
                import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
                import java.sql.PreparedStatement;
                /**
                * @author: Created By lujisen
                * @company China JiNan
                * @date: 2021-08-18 15:02
                * @version: v1.0
                * @description: com.hadoop.ljs.flink112.study.asyncIO
                */
                public class MysqlSinkFunction<U> extends RichSinkFunction<User> {
                private static final String UPSERT_CASE = "INSERT INTO userInfo(id,userNum,userName,age) VALUES (?, ?,?,?)";
                    private PreparedStatement statement=null;
                @Override
                public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                statement = DBUtils.getConnection().prepareStatement(UPSERT_CASE);
                }
                @Override
                public void invoke(User user, Context context) throws Exception {
                statement.setInt(1,user.getId());
                statement.setInt(2, user.userNum);
                statement.setString(3, user.getUserName());
                statement.setInt(4, user.getAge());
                statement.addBatch();
                statement.executeBatch();
                }

                @Override
                public void close() throws Exception {
                super.close();
                if(statement!=null){
                statement.close();
                }
                }
                }

                复制
                  package com.hadoop.ljs.flink112.study.asyncIO;

                  import com.alibaba.druid.pool.DruidDataSource;
                  import java.sql.Connection;
                  import java.sql.SQLException;
                  import java.util.concurrent.ExecutorService;
                  import static java.util.concurrent.Executors.newFixedThreadPool;
                  /**
                  * @author: Created By lujisen
                  * @company China JiNan
                  * @date: 2021-08-18 15:19
                  * @version: v1.0
                  * @description: com.hadoop.ljs.flink112.study.asyncIO
                  */
                  public class DBUtils {

                  private static String jdbcUrl = "jdbc:mysql://192.168.0.111:3306/lujisen?characterEncoding=utf8";
                  private static String username = "root";
                  private static String password = "123456a?";
                  private static String driverName = "com.mysql.jdbc.Driver";
                  public static Connection connection=null;
                  public static transient ExecutorService executorService = null;
                  public static transient DruidDataSource dataSource = null;
                  /*连接池最大线程数*/
                      private static int maxPoolConn=20;
                  /*静态初始化*/
                  static {
                  //创建线程池
                  executorService = newFixedThreadPool(maxPoolConn);
                  dataSource=new DruidDataSource();

                  dataSource.setDriverClassName(driverName);
                  dataSource.setUsername(username);
                  dataSource.setPassword(password);
                  dataSource.setUrl(jdbcUrl);
                  dataSource.setMaxActive(maxPoolConn);
                  }

                  public static Connection getConnection() throws SQLException {
                  if(connection==null){
                  connection= dataSource.getConnection();
                  }
                  return connection;
                  }
                  }
                  复制
                  package com.hadoop.ljs.flink112.study.asyncIO;/** * @author: Created By lujisen * @company China JiNan * @date: 2021-08-18 10:13 * @version: v1.0 * @description: com.hadoop.ljs.flink112.study.asyncIO */public class User {    int id;    int userNum;    String userName;    int age;    public User() {    }    public User(int id, int userNum, String userName, int age) {        this.id = id;        this.userNum = userNum;        this.userName = userName;        this.age = age;    }    public int getId() {        return id;    }    public void setId(int id) {        this.id = id;    }    public int getUserNum() {        return userNum;    }    public void setUserNum(int userNum) {        this.userNum = userNum;    }    public String getUserName() {        return userName;    }    public void setUserName(String userName) {        this.userName = userName;    }    public int getAge() {        return age;    }    public void setAge(int age) {        this.age = age;    }    @Override    public String toString() {        return "User{" +                "id=" + id +                ", userNum=" + userNum +                ", userName='" + userName + '\'' +                ", age=" + age +                '}';    }}
                  复制

                     说明:这里可忽略上面的多表数据,只发送后面的2条数据即可,Main函数中设置Flink异步IO超时时间是50s,而MysqlAsyncRichMapFunction文件asyncInvoke函数处理逻辑,当数据id为1时,进程休眠30秒,第一条数据阻塞执行,收到第二条数据不阻塞,直接处理第二条写入mysql数据库,30秒后,程序继续处理第一条数据,可观察Mysql数据表userInfo,看到第二条数据先入库,第一条数据30秒之后入库。

                    1,1001,23  第一条数据
                    2,1002,24  第二条数据
                    复制

                      如果觉得我的文章能帮到您,请关注微信公众号“大数据开发运维架构”,并转发朋友圈,谢谢支持!

                    文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论