暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink SQL实时消费Kafka并写入到MySQL

大数据研习社 2022-07-07
2149

长按二维码关注

大数据领域必关注的公众号



1.需求
Flink SQL实时消费Kafka中的数据,然后做聚合分析,最后将聚合结果写入MySQL数据库。

2.添加依赖
添加Flink SQL客户端实时消费Kafka并写入MySQL的相关依赖。
flink-connector-jdbc_2.11-1.13.5.jar
flink-sql-connector-kafka_2.11-1.13.5.jar
mysql-connector-java-5.1.38.jar

依赖包冲突:
错误:Cannot load user class:org.apache.flink.
connector.jdbc.internal.GenericJdbcSinkFunction

解决思路:
修改flink集群的配置中加载包的顺序

解决步骤:
在flink-conf.yaml中添加如下内容:
classloader.resolve-order: parent-first

3.启动相关服务
(1)启动Flink 集群服务
bin/start-cluster.sh

(2)进入Flink SQL CLI
./sql-client.sh

(3)启动Kafka集群
bin/kafka-server-start.sh -daemon config/server.
properties

(4)启动MySQL服务
service mysqld start

4.FlinkSQL创建表
(1)在Flink SQL 客户端使用如下命令创建kafka source表。
CREATE TABLE sourceTable (
`user` STRING,
 url STRING,
 cTime STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'clicklog_input',
 'properties.bootstrap.servers' = 'hadoop1:9092',
 'properties.group.id' = 'clicklog',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);

(2)在Flink SQL 客户端使用如下命令创建jdbc sink表。
CREATE TABLE sinkTable (
 `user` STRING,
 cnt BIGINT,
 PRIMARY KEY (`user`) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://hadoop1:3306/test',
 'username' = 'hive',
 'password' = 'hive',
 'table-name' = 'clickcount'
);

(3)在MySQL的test数据库中创建clickcount表
DROP TABLE IF EXISTS `clickcount`;
CREATE TABLE `clickcount` (
  `user` varchar(20) NOT NULL DEFAULT '',
  `cnt` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`user`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.Flink SQL进行聚合分析
(1)Flink SQL客户端从Kafka表中实时读取数据,然后做数据聚合,最后再写入MySQL数据库。
#Flink SQL读取kafka 属于流式计算模式 
SET 'execution.runtime-mode' = 'streaming';
INSERT INTO sinkTable
SELECT user,count(url) as cnt
FROM sourceTable
group by user;

(2)打开Kafka生产者模拟产生数据
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic clicklog_input
{"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"}

(3)查看聚合分析结果

欢迎点赞 + 收藏 + 在看  素质三连 


往期精彩回顾
程序员,如何避免内卷
Apache 架构师总结的 30 条架构原则
【全网首发】Hadoop 3.0分布式集群安装
大数据运维工程师经典面试题汇总(附带答案)
大数据面试130题
某集团大数据平台整体架构及实施方案完整目录
大数据凉凉了?Apache将一众大数据开源项目束之高阁!
实战企业数据湖,抢先数仓新玩法
Superset制作智慧数据大屏,看它就够了
Apache Flink 在快手的过去、现在和未来
华为云-基于Ambari构建大数据平台(上)
华为云-基于Ambari构建大数据平台(下)
【HBase调优】Hbase万亿级存储性能优化总结
【Python精华】100个Python练手小程序
【HBase企业应用开发】工作中自己总结的Hbase笔记,非常全面!
【剑指Offer】近50个常见算法面试题的Java实现代码

长按识别左侧二维码

     关注领福利    

  领10本经典大数据书

文章转载自大数据研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论