上周六在深圳分享了《Flink SQL 1.9.0 技术内幕和最佳实践》,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码。希望对于 Flink SQL 的初学者能有所帮助。完整分享可以观看 Meetup 视频回顾 :https://developer.aliyun.com/live/1416
演示代码已经开源到了 GitHub 上:
https://github.com/wuchong/flink-sql-submit
这份代码主要由两部分组成:1) 能用来提交 SQL 文件的 SqlSubmit 实现。2) 用于演示的 SQL 示例、Kafka 启动停止脚本、 一份测试数据集、Kafka 数据源生成器。
通过本实战,你将学到:
如何使用 Blink Planner
一个简单的 SqlSubmit 是如何实现的
如何用 DDL 创建一个 Kafka 源表和 MySQL 结果表
运行一个从 Kafka 读取数据,计算 PVUV,并写入 MySQL 的作业
设置调优参数,观察对作业的影响
SqlSubmit 的实现
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
// 创建一个使用 Blink Planner 的 TableEnvironment, 并工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 读取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 通过正则表达式匹配前缀,来区分不同的 SQL 语句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根据不同的 SQL 语句,调用 TableEnvironment 执行
for (SqlCommandCall call : calls) {
switch (call.command) {
case SET:
String key = call.operands[0];
String value = call.operands[1];
// 设置参数
tEnv.getConfig().getConfiguration().setString(key, value);
break;
case CREATE_TABLE:
String ddl = call.operands[0];
tEnv.sqlUpdate(ddl);
break;
case INSERT_INTO:
String dml = call.operands[0];
tEnv.sqlUpdate(dml);
break;
default:
throw new RuntimeException("Unsupported command: " + call.command);
}
}
// 提交作业
tEnv.execute("SQL Job");
复制
使用 DDL 连接 Kafka 源表
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
复制
复制
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'update-mode' = 'append',
'format.type' = 'json', -- 数据源格式为 json
'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
)
复制
注:可能有用户会觉得其中的 connector.properties.0.key 等参数比较奇怪,社区计划将在下一个版本中改进并简化 connector 的参数配置。
使用 DDL 连接 MySQL 结果表
CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc', -- 使用 jdbc connector
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
'connector.table' = 'pvuv_sink', -- 表名
'connector.username' = 'root', -- 用户名
'connector.password' = '123456', -- 密码
'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条
)
复制
PV UV 计算
INSERT INTO pvuv_sink
SELECT
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')
复制
注:在深圳 Meetup 中,我们有对这种查询的性能调优做了深度的介绍。
实战演示
环境准备
Flink 本地集群:用来运行 Flink SQL 任务。 Kafka 本地集群:用来作为数据源。 MySQL 数据库:用来作为结果表。
Flink 本地集群安装
下载 Flink 1.9.0 安装包并解压:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz 下载以下依赖 jar 包,并拷贝到 flink-1.9.0/lib/ 目录下。因为我们运行时需要依赖各个 connector 实现。 flink-sql-connector-kafka_2.11-1.9.0.jar
http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.9.0/flink-sql-connector-kafka_2.11-1.9.0.jarflink-json-1.9.0-sql-jar.jar
http://central.maven.org/maven2/org/apache/flink/flink-json/1.9.0/flink-json-1.9.0-sql-jar.jarflink-jdbc_2.11-1.9.0.jar
http://central.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.9.0/flink-jdbc_2.11-1.9.0.jarmysql-connector-java-5.1.48.jar
https://dev.mysql.com/downloads/connector/j/5.1.html将 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因为我们的演示任务可能会消耗多于1个的 slot。 在 flink-1.9.0 目录下执行 ./bin/start-cluster.sh,启动集群。

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0
复制
Kafka 本地集群安装
下载 Kafka 2.2.0 安装包并解压:https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz 将安装路径填到 flink-sql-submit 项目的 env.sh 中,如我的路径是 KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0 复制在 flink-sql-submit 目录下运行 ./start-kafka.sh 启动 Kafka 集群。 在命令行执行 jps,如果看到 Kafka 进程和 QuorumPeerMain 进程即表明启动成功。
MySQL 安装
可以在官方页面下载 MySQL 并安装: https://dev.mysql.com/downloads/mysql/ 如果有 Docker 环境的话,也可以直接通过 Docker 安装 https://hub.docker.com/_/mysql
$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql
复制
提交 SQL 任务
在 flink-sql-submit 目录下运行 ./source-generator.sh,会自动创建 user_behavior,并实时往里灌入数据。

在 flink-sql-submit 目录下运行 ./run.sh q1, 提交成功后,可以在 Web UI 中看到拓扑。


结尾

文章转载自Flink 中文社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
【MySQL 30周年庆】MySQL 8.0 OCP考试限时免费!教你免费领考券
墨天轮小教习
833次阅读
2025-04-25 18:53:11
MySQL 30 周年庆!MySQL 8.4 认证免费考!这次是认真的。。。
严少安
523次阅读
2025-04-25 15:30:58
国产数据库需要扩大场景覆盖面才能在竞争中更有优势
白鳝的洞穴
473次阅读
2025-04-14 09:40:20
墨天轮个人数说知识点合集
JiekeXu
452次阅读
2025-04-01 15:56:03
MySQL数据库当前和历史事务分析
听见风的声音
428次阅读
2025-04-01 08:47:17
最近我为什么不写评论国产数据库的文章了
白鳝的洞穴
419次阅读
2025-04-07 09:44:54
MySQL 生产实践-Update 二级索引导致的性能问题排查
chengang
391次阅读
2025-03-28 16:28:31
【活动】分享你的压箱底干货文档,三篇解锁进阶奖励!
墨天轮编辑部
372次阅读
2025-04-17 17:02:24
MySQL 9.3 正式 GA,我却大失所望,新特性亮点与隐忧并存?
JiekeXu
358次阅读
2025-04-15 23:49:58
优炫数据库成功入围新疆维吾尔自治区行政事业单位数据库2025年框架协议采购!
优炫软件
341次阅读
2025-04-18 10:01:22