排行
数据库百科
核心案例
行业报告
月度解读
大事记
产业图谱
中国数据库
向量数据库
时序数据库
实时数据库
搜索引擎
空间数据库
图数据库
数据仓库
大调查
2021年报告
2022年报告
年度数据库
2020年openGauss
2021年TiDB
2022年PolarDB
2023年OceanBase
首页
资讯
活动
大会
学习
课程中心
推荐优质内容、热门课程
学习路径
预设学习计划、达成学习目标
知识图谱
综合了解技术体系知识点
课程库
快速筛选、搜索相关课程
视频学习
专业视频分享技术知识
电子文档
快速搜索阅览技术文档
文档
问答
服务
智能助手小墨
关于数据库相关的问题,您都可以问我
数据库巡检平台
脚本采集百余项,在线智能分析总结
SQLRUN
在线数据库即时SQL运行平台
数据库实训平台
实操环境、开箱即用、一键连接
数据库管理服务
汇聚顶级数据库专家,具备多数据库运维能力
数据库百科
核心案例
行业报告
月度解读
大事记
产业图谱
我的订单
登录后可立即获得以下权益
免费培训课程
收藏优质文章
疑难问题解答
下载专业文档
签到免费抽奖
提升成长等级
立即登录
登录
注册
登录
注册
首页
资讯
活动
大会
课程
文档
排行
问答
我的订单
首页
专家团队
智能助手
在线工具
SQLRUN
在线数据库即时SQL运行平台
数据库在线实训平台
实操环境、开箱即用、一键连接
AWR分析
上传AWR报告,查看分析结果
SQL格式化
快速格式化绝大多数SQL语句
SQL审核
审核编写规范,提升执行效率
PLSQL解密
解密超4000字符的PL/SQL语句
OraC函数
查询Oracle C 函数的详细描述
智能助手小墨
关于数据库相关的问题,您都可以问我
精选案例
新闻资讯
云市场
登录后可立即获得以下权益
免费培训课程
收藏优质文章
疑难问题解答
下载专业文档
签到免费抽奖
提升成长等级
立即登录
登录
注册
登录
注册
首页
专家团队
智能助手
精选案例
新闻资讯
云市场
微信扫码
复制链接
新浪微博
分享数说
采集到收藏夹
分享到数说
首页
/
Flink SQL实时消费Kafka并写入到MySQL
Flink SQL实时消费Kafka并写入到MySQL
大数据研习社
2022-07-07
2149
长按二维码关注
大数据领域必关注的公众号
1.需求
Flink SQL实时消费Kafka中的数据,然后做聚合分析,最后将聚合结果写入MySQL数据库。
2.添加依赖
添加Flink SQL客户端实时消费Kafka并写入MySQL的相关依赖。
flink-connector-jdbc_2.11-1.13.5.jar
flink-sql-connector-kafka_2.11-1.13.5.jar
mysql-connector-java-5.1.38.jar
依赖包冲突:
错误:Cannot load user class:org.apache.flink.
connector.jdbc.internal.GenericJdbcSinkFunction
解决思路:
修改flink集群的配置中加载包的顺序
解决步骤:
在flink-conf.yaml中添加如下内容:
classloader.resolve-order: parent-first
3.启动相关服务
(1)启动Flink 集群服务
bin/start-cluster.sh
(2)进入Flink SQL CLI
./sql-client.sh
(3)启动Kafka集群
bin/kafka-server-start.sh -daemon config/server.
properties
(4)启动MySQL服务
service mysqld start
4.FlinkSQL创建表
(1)在Flink SQL 客户端使用如下命令创建kafka source表。
CREATE TABLE sourceTable (
`user` STRING,
url STRING,
cTime STRING
) WITH (
'connector' = 'kafka',
'topic' = 'clicklog_input',
'properties.bootstrap.servers' = 'hadoop1:9092',
'properties.group.id' = 'clicklog',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
(2)在Flink SQL 客户端使用如下命令创建jdbc sink表。
CREATE TABLE sinkTable (
`user` STRING,
cnt BIGINT,
PRIMARY KEY (`user`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop1:3306/test',
'username' = 'hive',
'password' = 'hive',
'table-name' = 'clickcount'
);
(3)在MySQL的test数据库中创建clickcount表
DROP TABLE IF EXISTS `clickcount`;
CREATE TABLE `clickcount` (
`user` varchar(20) NOT NULL DEFAULT '',
`cnt` bigint(11) DEFAULT NULL,
PRIMARY KEY (`user`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.Flink SQL进行聚合分析
(1)Flink SQL客户端从Kafka表中实时读取数据,然后做数据聚合,最后再写入MySQL数据库。
#Flink SQL读取kafka 属于流式计算模式
SET 'execution.runtime-mode' = 'streaming';
INSERT INTO sinkTable
SELECT user,count(url) as cnt
FROM sourceTable
group by user;
(2)打开Kafka生产者模拟产生数据
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic clicklog_input
{"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"}
(3)查看聚合分析结果
欢迎点赞 + 收藏 + 在看
素质三连
完
▼
往期精彩回顾
▼
程序员,如何避免内卷
Apache 架构师总结的 30 条架构原则
【全网首发】Hadoop 3.0分布式集群安装
大数据运维工程师经典面试题汇总(附带答案)
大数据面试130题
某集团大数据平台整体架构及实施方案完整目录
大数据凉凉了?Apache将一众大数据开源项目束之高阁!
实战企业数据湖,抢先数仓新玩法
Superset制作智慧数据大屏,看它就够了
Apache Flink 在快手的过去、现在和未来
华为云-基于Ambari构建大数据平台(上)
华为云-基于Ambari构建大数据平台(下)
【HBase调优】Hbase万亿级存储性能优化总结
【Python精华】100个Python练手小程序
【HBase企业应用开发】工作中自己总结的Hbase笔记,非常全面!
【剑指Offer】近50个常见算法面试题的Java实现代码
长按识别左侧二维码
关注领福利
领10本经典大数据书
sql
mysql
flink
kafka
大数据
文章转载自
大数据研习社
,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
领墨值
有奖问卷
意见反馈
客服小墨