作者:腾讯云流计算 Oceanus 团队
流计算 Oceanus 简介
创建流计算 Oceanus 集群
创建 ClickHouse 集群
注意:创建流计算 Oceanus 集群和 ClickHouse 集群时所选的 VPC 必须相同。
# 下载 ClickHouse-Client 命令wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpmwget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm# 安装客户端rpm -ivh *.rpm# 使用 tcp 端口登陆 ClickHouse 集群,IP 地址可通过控制台查看clickhouse-client -hxxx.xxx.xxx.xxx --port 9000
CREATE TABLE default.datagen_to_ck on cluster default_cluster (win_start TIMESTAMP,win_end TIMESTAMP,user_id String,amount_total Int16,Sign Int8 )ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/datagen_to_ck', '{replica}',Sign)ORDER BY (win_start,win_end,user_id);
流计算 Oceanus 作业
1. 创建 Source
CREATE TABLE random_source (user_id VARCHAR,amount INT,pre_time AS CURRENT_TIMESTAMP,WATERMARK FOR pre_time AS pre_time - INTERVAL '3' SECOND) WITH ('connector' = 'datagen','rows-per-second' = '5', -- 每秒产生的数据条数'fields.user_id.length' = '1', -- 随机字符串的长度'fields.amount.kind' = 'random', -- 无界的随机数'fields.amount.min' = '1', -- 随机数的最小值'fields.amount.max' = '10' -- 随机数的最大值);
2. 创建 Sink
CREATE TABLE clickhouse (win_start TIMESTAMP(3),win_end TIMESTAMP(3),user_id VARCHAR,amount_total BIGINT,PRIMARY KEY (win_start,win_end,user_id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH (-- 指定数据库连接参数'connector' = 'clickhouse','url' = 'clickhouse://10.0.0.178:8123',-- 如果ClickHouse集群未配置账号密码可以不指定--'username' = 'root',--'password' = 'root','database-name' = 'default','table-name' = 'datagen_to_ck','table.collapsing.field' = 'Sign' -- CollapsingMergeTree 类型列字段的名称);
3. 编写业务 SQL
INSERT INTO clickhouseSELECTTUMBLE_START(pre_time,INTERVAL '1' MINUTE) AS win_start,TUMBLE_END(pre_time,INTERVAL '1' MINUTE) AS win_end,user_id,CAST(SUM(amount) AS BIGINT) AS amount_totalFROM random_sourceGROUP BY TUMBLE(pre_time,INTERVAL '1' MINUTE),user_id;
4. 选择 Connector
flink-connector-clickhouse,点击【保存】>【发布草稿】运行作业。
新版 Flink 1.13 集群不需要用户自己选择内置 Connector
总结
参考链接

文章转载自腾讯云大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。





