流计算 Oceanus 简介
前置准备
创建流计算 Oceanus 集群
创建消息队列 CKafka
创建 Topic:
数据准备:
Kafka 客户端: 进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。
使用脚本发送:
Java:参考 使用 SDK 收发消息 [7]
Python:参考如下代码
#!/usr/bin/python3# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块import jsonimport randomimport timefrom kafka import KafkaProducerbroker_lists = ['10.0.0.29:9092']topic_oceanus_quickstart = 'oceanus7_test1'producer = KafkaProducer(bootstrap_servers=broker_lists,value_serializer=lambda m: json.dumps(m).encode('ascii'))def generate_oceanus_test_data():results = []for _ in range(0, 10):int_one = random.randint(1000,10000)int_two = random.randint(1,10)random_thr = random.random()msg_kv = {"int_one":int_one,"int_two":int_two,"random_thr":random_thr}results.append(msg_kv)return resultsdef send_data(topic, msgs):for msg in msgs:import timetime.sleep(1)producer.send(topic, msg)print(msg)producer.flush()if __name__ == '__main__':count = 1while True:msg_oceanus_test_data = generate_oceanus_test_data()send_data(topic_oceanus_quickstart, msg_oceanus_test_data)time.sleep(30)
创建 PostgreSQL 实例
oceanus7_test1表。
-- 建表语句create table public.oceanus7_test1 (id INT,random_thr DOUBLE PRECISION,PRIMARY KEY(id));
笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [10]
流计算 Oceanus 作业
1. 创建 Source
CREATE TABLE `kafka_json_source_table` (int_one INT,int_two INT,random_thr DOUBLE) WITH ('connector' = 'kafka','topic' = 'oceanus7_test1', -- 替换为您要消费的 Topic'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址'properties.group.id' = 'oceanus_group2', -- 必选参数, 一定要指定 Group ID-- 定义数据格式 (JSON 格式)'format' = 'json','json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。);
2. 创建 Sink
CREATE TABLE jdbc_sink (id INT,random_thr DOUBLE,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'jdbc', -- connector 类型为'jdbc''url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true', -- 请替换为您的实际 PostgreSQL 连接参数'table-name' = 'oceanus7_test1', -- 需要写入的数据表'username' = 'root', -- 数据库用户名(需要提供 INSERT 权限)'password' = 'Tencent123$', -- 数据库密码-- 数据目的 Sink 性能调优参数'sink.buffer-flush.max-rows' = '5000', -- 可选参数, 表示每批数据的最大缓存条数, 默认值是 5000'sink.buffer-flush.interval' = '2s', -- 可选参数, 表示每批数据的刷新周期, 默认值是 0s'sink.max-retries' = '3' -- 可选参数, 表示数据库写入出错时, 最多重试的次数);
3. 编写业务 SQL
INSERT INTO jdbc_sinkSELECTMOD(int_one,int_two) AS id,TRUNCATE(random_thr,2) AS random_thrFROM kafka_json_source_table;
总结
参考链接

文章转载自腾讯云大数据,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。





