暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink SQL 同时 Join 多个 Mysql 表

原创 kobrey 2022-06-14
928

Flink 版本: 1.14.3

  • 主要测试一个任务中同时关联多个 MySql 中的表

MySQL 表

MySQL 1

两个mysql 表: lookup_join_config、lookup_join_config_2

表结构:

create table lookup_join_config
(
	id int auto_increment
		primary key,
	code varchar(10) null,
	value varchar(10) null,
	create_time datetime default CURRENT_TIMESTAMP null,
	update_time datetime default CURRENT_TIMESTAMP null
)
comment 'lookup join 的配置表';

create table lookup_join_config_2
(
	id int auto_increment
		primary key,
	code varchar(10) null,
	value varchar(10) null,
	create_time datetime default CURRENT_TIMESTAMP null,
	update_time datetime default CURRENT_TIMESTAMP null
)
comment 'lookup join 的配置表';

数据lookup_join_config:

数据lookup_join_config:

MySQL 2

两个mysql 表: lookup_join_config

表结构:

create table lookup_join_config
(
	id int auto_increment
		primary key,
	code varchar(10) null,
	value varchar(10) null,
	create_time datetime default CURRENT_TIMESTAMP null,
	update_time datetime default CURRENT_TIMESTAMP null
)
comment 'lookup join 的配置表';

数据lookup_join_config:

Flink SQL


-- flink lookup mysql test
-- kafka source
drop table if exists user_log;
CREATE TABLE user_log (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior VARCHAR
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_log'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'scan.startup.mode' = 'latest-offset'
  ,'format' = 'json'
);


drop table if exists mysql_behavior_conf ;
CREATE TEMPORARY TABLE mysql_behavior_conf (
   id int
  ,code STRING
  ,`value` STRING
  ,update_time TIMESTAMP(3)
--   ,primary key (id) not enforced
--   ,WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
   'connector' = 'jdbc'
   ,'url' = 'jdbc:mysql://localhost:3306/venn'
   ,'table-name' = 'lookup_join_config'
   ,'username' = 'root'
   ,'password' = '123456'
   ,'scan.partition.column' = 'id'
   ,'scan.partition.num' = '5'
   ,'scan.partition.lower-bound' = '5'
   ,'scan.partition.upper-bound' = '99999'
   ,'lookup.cache.max-rows' = '28'
   ,'lookup.cache.ttl' = '5555' -- ttl time 超过这么长时间无数据才行
);
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论