Flink 版本: 1.14.3
- 主要测试一个任务中同时关联多个 MySql 中的表
MySQL 表
MySQL 1
两个mysql 表: lookup_join_config、lookup_join_config_2
表结构:
create table lookup_join_config
(
id int auto_increment
primary key,
code varchar(10) null,
value varchar(10) null,
create_time datetime default CURRENT_TIMESTAMP null,
update_time datetime default CURRENT_TIMESTAMP null
)
comment 'lookup join 的配置表';
create table lookup_join_config_2
(
id int auto_increment
primary key,
code varchar(10) null,
value varchar(10) null,
create_time datetime default CURRENT_TIMESTAMP null,
update_time datetime default CURRENT_TIMESTAMP null
)
comment 'lookup join 的配置表';
数据lookup_join_config:
数据lookup_join_config:
MySQL 2
两个mysql 表: lookup_join_config
表结构:
create table lookup_join_config
(
id int auto_increment
primary key,
code varchar(10) null,
value varchar(10) null,
create_time datetime default CURRENT_TIMESTAMP null,
update_time datetime default CURRENT_TIMESTAMP null
)
comment 'lookup join 的配置表';
数据lookup_join_config:
Flink SQL
-- flink lookup mysql test
-- kafka source
drop table if exists user_log;
CREATE TABLE user_log (
user_id VARCHAR
,item_id VARCHAR
,category_id VARCHAR
,behavior VARCHAR
,ts TIMESTAMP(3)
,process_time as proctime()
,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
,'topic' = 'user_log'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'latest-offset'
,'format' = 'json'
);
drop table if exists mysql_behavior_conf ;
CREATE TEMPORARY TABLE mysql_behavior_conf (
id int
,code STRING
,`value` STRING
,update_time TIMESTAMP(3)
-- ,primary key (id) not enforced
-- ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://localhost:3306/venn'
,'table-name' = 'lookup_join_config'
,'username' = 'root'
,'password' = '123456'
,'scan.partition.column' = 'id'
,'scan.partition.num' = '5'
,'scan.partition.lower-bound' = '5'
,'scan.partition.upper-bound' = '99999'
,'lookup.cache.max-rows' = '28'
,'lookup.cache.ttl' = '5555' -- ttl time 超过这么长时间无数据才行
);
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




