Table of Contents
参考
ververica/flink-cdc-connectors: CDC Connectors for Apache Flink® (github.com)
Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL - 掘金 (juejin.cn)
安装java
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel mkdir /usr/java ln -s /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.352.b08-2.el7_9.x86_64/ /usr/java/default export JAVA_HOME=/usr/java/default java -version | grep version | grep -v grep
复制
安装flink
wget https://archive.apache.org/dist/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz -O /opt/ ln -s flink-1.16.0 flink cd /opt/flink/lib/ ## jar放入/opt/flink/lib中 flink-sql-connector-elasticsearch7-1.16.0.jar flink-sql-connector-mysql-cdc-2.3.0.jar flink-sql-connector-postgres-cdc-2.3.0.jar
复制
启动flink
/opt/flink/bin/start-cluster.sh
复制
配置docker-compose.yml
[root@fy opt]# cat docker-compose.yml version: '2.1' services: postgres: image: debezium/example-postgres:1.1 ports: - "5432:5432" environment: - POSTGRES_DB=postgres - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw elasticsearch: image: elastic/elasticsearch:7.6.0 environment: - cluster.name=docker-cluster - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - discovery.type=single-node ports: - "9200:9200" - "9300:9300" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 kibana: image: elastic/kibana:7.6.0 ports: - "5601:5601" [root@fy opt]#
复制
启动docker-compose
export DOCKER_CONFIG=/root/.docker $DOCKER_CONFIG/cli-plugins/docker-compose up -d
复制
配置mysql
登录mysql
$DOCKER_CONFIG/cli-plugins/docker-compose exec mysql mysql -uroot -p123456
复制
mysql sql
-- MySQL CREATE DATABASE mydb; USE mydb; CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) ); ALTER TABLE products AUTO_INCREMENT = 101; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"spare tire","24 inch spare tire"); CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL -- Whether order has been placed ) AUTO_INCREMENT = 10001; INSERT INTO orders VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false), (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
复制
配置postgres
登录postgres
$DOCKER_CONFIG/cli-plugins/docker-compose exec postgres psql -h localhost -U postgres
复制
postgres sql
-- PG CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
复制
配置flink
登录flink
/opt/flink/bin/sql-client.sh
复制
flink sql
CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'products' ); CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'orders' ); CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'localhost', 'port' = '5432', 'username' = 'postgres', 'password' = 'postgres', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments' ); CREATE TABLE enriched_orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, product_name STRING, product_description STRING, shipment_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'enriched_orders' ); INSERT INTO enriched_orders SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived FROM orders AS o LEFT JOIN products AS p ON o.product_id = p.id LEFT JOIN shipments AS s ON o.order_id = s.order_id;
复制
关闭docker
$DOCKER_CONFIG/cli-plugins/docker-compose down
复制
关闭flink
/opt/flink/bin/stop-cluster.sh
复制
最后修改时间:2024-03-15 14:44:05
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
文章被以下合辑收录
评论
相关阅读
NineData 社区版征文评测活动圆满收官!
NineData
127次阅读
2025-04-09 11:01:15
下一代DBA必备技能
Bytebase
53次阅读
2025-03-28 14:59:40
如何使用 RisingWave 和 PuppyGraph 构建高性能实时图分析框架
RisingWave中文开源社区
43次阅读
2025-03-18 10:49:54
一键部署 GPU Kind 集群,体验 vLLM 极速推理
Se7en的架构笔记
39次阅读
2025-03-24 09:41:41
Pigsty v3.4发布:更好的备份与PITR,排序,基础设施与应用
非法加冯
29次阅读
2025-04-02 09:34:20
docker安装jenkins
IT那活儿
29次阅读
2025-03-31 10:00:41
知识文档 | docker容器中的操作系统内核调用的是自己还是宿主机层的?
戏说数据那点事
27次阅读
2025-04-02 09:34:18
NineData 社区版初体验,大超预期
老叶茶馆
23次阅读
2025-03-18 10:45:49
Pigsty v3.4 发布:更好的备份与 PITR,排序,基础设施与应用
IvorySQL开源数据库社区
16次阅读
2025-04-11 15:34:47
Docker环境下资源管控实践
IT那活儿
15次阅读
2025-03-18 10:45:36