暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink1.14.3 Table读写MySQL做数据聚合(1)

大数据研习社 2022-04-28
1406

长按二维码关注

大数据领域必关注的公众号

By大数据研习社

概要:1. 使用JDBC SQL ConnectorSource只支持批处理,Sink支持批处理和流处理。

2. Sink支持数据追加和更新,如果Flink Table API做聚合操作,使用Sink必须指定指定主键。

3. 本案例独家使用Flink Table API(非SQL)方式读写MySQL,官网只讲解了SQL的使用方式。


1.需求

需求:Flink Table APIMySQL读取数据,然后做聚合操作,最后将聚合结果写入MySQL

2.添加Maven依赖

FlinkTable集成MySQL引⼊如下依赖:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.38</version>
</dependency>

3.准备MySQL数据源

MySQLtest数据库中,创建clicklog表并导入初始数据集。
DROP TABLE IF EXISTS `clicklog`;
CREATE TABLE `clicklog` (
  `user` varchar(20) NOT NULL,
  `url` varchar(100) NOT NULL,
  `cTime` varchar(30) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert  into `clicklog`(`user`,`url`,`cTime`) values ('Mary','./home','2022-02-02 12:00:00'),('Bob','./cart','2022-02-02 12:00:00'),('Mary','./prod?id=1','2022-02-02 12:00:05');

4.代码实现

Flink Table API读写MySQL的完整代码如下所示。
package com.bigdata.chap02;
import org.apache.flink.table.api.*;
import static org.apache.flink.table.api.Expressions.$;
public class FlinkTableAPIMySQL2MySQL {
public static void main(String[] args) {
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
 
//2、创建Mysql source table
Schema sourceschema = Schema.newBuilder()
//.primaryKey("user")
.column("user", DataTypes.STRING())
.column("url", DataTypes.STRING())
.column("cTime", DataTypes.STRING())
.build();
tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("jdbc")
.schema(sourceschema)
.option("url","jdbc:mysql://hadoop1:3306/test")
.option("driver","com.mysql.jdbc.Driver")
.option("table-name","clicklog")
.option("username","hive")
.option("password","hive")
.build());
 
tEnv.from("sourceTable").printSchema();
 
//3、创建MySQL sink table
Schema sinkschema = Schema.newBuilder()
//通过notNull()指定主键为非空
.column("username",DataTypes.STRING().notNull())
.column("count", DataTypes.BIGINT())
//指定主键
.primaryKey("username")
.build();
tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("jdbc")
.schema(sinkschema)
.option("url","jdbc:mysql://hadoop1:3306/test")
.option("driver","com.mysql.jdbc.Driver")
.option("table-name","clickcount")
.option("username","hive")
.option("password","hive")
.build());
//5、输出
Table reusltTable = tEnv.from("sourceTable")
.groupBy($("user"))
.aggregate($("url").count().as("count"))
.select($("user").as("username"), $("count"))
;
reusltTable.printSchema();
reusltTable.executeInsert("sinkTable");
}
}
备注:Flink Table API做聚合操作插入MySQL,必须指定主键(.primaryKey("username")),同时必须指定主键为非空(.column("username",DataTypes.STRING().notNull())

5.MySQL业务建表

MySQLtest数据库中,创建clickcount表用于Flink Table的聚合数据。
 
DROP TABLE IF EXISTS `clickcount`;
CREATE TABLE `clickcount` (
  `username` varchar(20) NOT NULL DEFAULT '',
  `count` int(11) DEFAULT NULL,
  PRIMARY KEY (`username`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
注意:如果clickcount表需要做更新操作,需要指定主键(primary key),如username

6.测试运行

打开MySQL连接工具,查询clickcount表中的数据,如果聚合数据能插入clickcount表,说明Flink Table API能成功将聚合数据写入MySQL数据库。
 


7.注意事项

注意:
4. 使用JDBC SQL connector过程中,作为source只支持批处理,作为sink既可以用于批处理又可以用于流处理。
5. Sink支持数据的追加和更新,如果Flink Table API做聚合操作,使用sink更新聚合数据,必须指定指定主键。
6. 本案例独家使用Flink Table API(非SQL)方式读写MySQL,官网只讲解了SQL的使用方式。
欢迎点赞 + 收藏 + 在看  素质三连 


往期精彩回顾
程序员,如何避免内卷
Apache 架构师总结的 30 条架构原则
【全网首发】Hadoop 3.0分布式集群安装
大数据运维工程师经典面试题汇总(附带答案)
大数据面试130题
某集团大数据平台整体架构及实施方案完整目录
大数据凉凉了?Apache将一众大数据开源项目束之高阁!
实战企业数据湖,抢先数仓新玩法
Superset制作智慧数据大屏,看它就够了
Apache Flink 在快手的过去、现在和未来
华为云-基于Ambari构建大数据平台(上)
华为云-基于Ambari构建大数据平台(下)
【HBase调优】Hbase万亿级存储性能优化总结
【Python精华】100个Python练手小程序
【HBase企业应用开发】工作中自己总结的Hbase笔记,非常全面!
【剑指Offer】近50个常见算法面试题的Java实现代码

长按识别左侧二维码

     关注领福利    

  领10本经典大数据书

复制
文章转载自大数据研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论