暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【用户投稿】手把手基于Apache SeaTunnel从PostgreSQL同步到Doris

SeaTunnel 2025-03-13
14

本文详细演示了如何通过 Apache SeaTunnel 2.3.9 实现 PostgreSQL 14.6 到 Apache Doris 3.0.3 的全量数据同步,涵盖从环境部署到生产验证的完整闭环,请各位小伙伴批评指正!

版本要求:

  • PostgreSQL --> Server 14.6
  • Apache SeaTunnel --> Apache-SeaTunnel-2.3.9
  • Apache Doris --> Apache-Doris-3.0.3

自行部署Apache Doris

Apache Doris 1台 Master 、2台节点:

  • 配置好每台的时间同步
  • 配置好每台的文件句柄数
vi /etc/security/limits.conf 
* soft nofile 1000000
* hard nofile 1000000

复制
JAVA选择
  • 在 2.1(含)版本之前,请使用 Java 8,推荐版本:jdk-8u352 之后版本。
  • 从 3.0(含)版本之后,请使用 Java 17,推荐版本:jdk-17.0.10 之后版本。
关闭 swap 分区
swapoff -a #临时关闭
注释修改/etc/fstab #永久关闭

复制
  • 安装包下载
  • 安装步骤
  • FE 集群部署
tar -xf apache-doris-3.0.3.bin.tar.gz

复制

EF配置文件,每个节点都可以使用同一个配置如何java路径一致的话。

cat apache-doris-3.0.3/fe/conf/fe.conf
...
## modify case sensitivity
lower_case_table_names = 1
## modify Java Home
JAVA_HOME = <your-java-home-path>
...
#其他配置不动

复制

启动Master节点

bin/start_fe.sh --daemon

复制

MySQL工具登录Doris添加FE Follower
节点

mysql -uroot -P<fe_query_port> -h<fe_ip_address>
ALTER SYSTEM ADD FOLLOWER "<fe_ip_address>:<fe_edit_log_port>"

复制

启动 FE Follower
 节点填写master-ip
和端口

bin/start_fe.sh --helper <helper_fe_ip>:<fe_edit_log_port> --daemon

复制

查看FE状态

show frontends\G;
#IsMaster: true 代表是master节点

复制

部署 BE集群,其他所有节点都可以使用这个配置

cat be/conf/be.conf
...
storage_root_path=/home/data1;/home/data2;/home/data3
JAVA_HOME = <your-java-home-path>
mem_limit = 4G
...

#创建
mkdir /home/data1
mkdir /home/data2
mkdir /home/data3

复制

在 Doris 中注册 BE 节点

## connect a alive FE node
mysql -uroot -P<fe_query_port> -h<fe_ip_address>
   
## registe BE node
ALTER SYSTEM ADD BACKEND "<be_ip_address>:<be_heartbeat_service_port>"

复制

启动 BE 进程

bin/start_be.sh --daemon

复制

查看 BE 启动状态

 ## connect a alive FE node
mysql -uroot -P<fe_query_port> -h<fe_ip_address>
   
## check BE node status
show backends\G;

复制

验证集群正确性

## connect a alive fe node
mysql -uroot -P<fe_query_port> -h<fe_ip_address>

复制

修改 Doris 集群密码

-- check the current user
select user();  
+------------------------+  
| user()                 |  
+------------------------+  
'root'@'192.168.88.30' |  
+------------------------+  
     
-- modify the password for current user
SET PASSWORD = PASSWORD('doris_new_passwd');

复制

创建测试表并插入数据

-- create a test database
create database testdb;
 
-- create a test table
CREATE TABLE testdb.table_hash
(
    k1 TINYINT,
    k2 DECIMAL(10, 2) DEFAULT "10.5",
    k3 VARCHAR(10) COMMENT "string column",
    k4 INT NOT NULL DEFAULT "1" COMMENT "int column"
)
COMMENT "my first table"
DISTRIBUTED BY HASH(k1) BUCKETS 32;

复制

Doris 兼容 MySQL 协议,可以使用 INSERT 语句插入数据。

-- insert data
INSERT INTO testdb.table_hash VALUES
(1, 10.1, 'AAA', 10),
(2, 10.2, 'BBB', 20),
(3, 10.3, 'CCC', 30),
(4, 10.4, 'DDD', 40),
(5, 10.5, 'EEE', 50);

-- check the data
SELECT * from testdb.table_hash;
+------+-------+------+------+
| k1   | k2    | k3   | k4   |
+------+-------+------+------+
|    3 | 10.30 | CCC  |   30 |
|    4 | 10.40 | DDD  |   40 |
|    5 | 10.50 | EEE  |   50 |
|    1 | 10.10 | AAA  |   10 |
|    2 | 10.20 | BBB  |   20 |
+------+-------+------+------+

复制

PostgreSQL 部署

关于PostgreSQL作者不做详细赘述,请各位自行前往官网安装......

SeaTunnel部署

社区官网:

https://seatunnel.apache.org/

安装包下载:

https://seatunnel.apache.org/download/

tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"

复制

手动下载连接器,然后将其移动至connectors/目录下,如果是2.3.5之前则需要放入connectors/seatunnel目录下。

连接器下载地址:

https://repo.maven.apache.org/maven2/org/apache/seatunnel/

需要确保 JDBC 驱动 JAR 包 已放置在目录 ${SEATUNNEL_HOME}/lib/
 中,在社区之前很多文章都提到过部署教程,如果操作不当出现一些小问题,请进入社区交流(SeaTunnel部署还是很简单的)

使用SeaTunnel 引擎快速开始:

PG创建表
CREATE TABLE alarm_receive_record (
    id BIGINT PRIMARY KEY,
    device_id VARCHAR(50),
    alarm_time TIMESTAMP,
    content TEXT
);

复制
PG追加数据
INSERT INTO alarm_receive_record (id, device_id, alarm_time, content)
VALUES 
    (3, 'D003''2025-02-26 16:15:00''通信中断'),
    (4, 'D004''2025-02-26 17:20:00''湿度超限'),
    (5, 'D005''2025-02-26 17:20:00''湿度超限2'),
(6, 'D006''2025-02-26 17:20:00''湿度超限6'),(7, 'D007''2025-02-26 17:20:00''湿度超限7'),(8, 'D008''2025-02-26 17:20:00''湿度超限8'),(9, 'D009''2025-02-26 17:20:00''湿度超限9'),(10, 'D0010''2025-02-26 17:20:00''湿度超限10')

复制
Doris手动创建表结构
CREATE TABLE testdb.alarm_receive_record (
    id BIGINT COMMENT "唯一ID",
    device_id VARCHAR(50) COMMENT "设备ID",
    alarm_time DATETIME COMMENT "告警时间",
    content STRING COMMENT "告警内容"
) ENGINE=OLAP
DUPLICATE KEY(id, device_id)  -- 选择 DUPLICATE 模型(全明细存储)
DISTRIBUTED BY HASH(device_id) BUCKETS 10  -- 按设备ID分桶
PROPERTIES (
    "replication_num" = "3",  -- 副本数(与集群配置一致)
    "storage_format" = "V2"    -- 存储格式
);

复制

pg-doris.yaml

env {
  parallelism = 4
  job.mode = "BATCH"
}
source{
    jdbc{
        url = "jdbc:postgresql://192.168.10.17:5432/aaa"
        driver = "org.postgresql.Driver"
        user = "test"
        password = "123123"
        query = "select * from alarm_receive_record"
    }
}

sink {
  Doris {
    fenodes = "192.168.20.174:8030"
    username = "root"
    password = "123123"
    database = "testdb"
    table = "alarm_receive_record"
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    sink.label-prefix = "pg_to_doris"
    sink.enable-2pc = "true"
    column = ["id""device_id""alarm_time""content"]
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}

复制

启动同步任务,多次执行,会重复数据。

cd "apache-seatunnel-${version}"
./bin/seatunnel.sh --config ./config/pg-doris.yaml  -m local

复制

Doris验证是否同步成功?

大数据时代,选择比努力更重要。让 SeaTunnel 成为您数据流动的超级高速公路,Doris 化作实时洞察的智慧之眼,共同构建面向未来的数据驱动体系!

白鲸开源

白鲸开源是一家开源原生的 DataOps 商业公司,由多个 Apache Foundation Member成立,80%员工都是 Apache Committer,运营2 个全球 Apache 开源项目(DolphinScheduler, SeaTunnel),同时根据全球最佳实践发布商业版版本WhaleScheduler和WhaleTunnel。我们致力于打造下一代开源原生的DataOps 平台,助力企业在大数据和云时代,智能化地完成多数据源、多云及信创环境的数据集成、调度开发和治理,以提高企业解决数据问题的效率,提升企业分析洞察能力和决策能力。

社区介绍

Apache SeaTunnel是一个云原生的高性能海量数据集成工具。北京时间 2023 年 6 月1 日,全球最大的开源软件基金会Apache Software Foundation正式宣布Apache SeaTunnel毕业成为Apache顶级项目。目前,SeaTunnel在GitHub上Star数量已达 8k+,社区达到6000+人规模。SeaTunnel支持在云数据库、本地数据源、SaaS、大模型等160多种数据源之间进行数据实时和批量同步,支持CDC、DDL变更、整库同步等功能,更是可以和大模型打通,让大模型链接企业内部的数据。




同步Demo

 MySQL→Doris | MySQLCDC | MySQL→Hive | HTTP → Doris  | HTTP → MySQL |  MySQL→StarRocks | MySQL→Elasticsearch | Kafka→ClickHouse

新手入门

 SeaTunnel 让数据集成变得 So easy!  3 分钟入门指南
 0 到 1 快速入门 / 初探 / 深入理解  
  分布式集群部署 | CDC数据同步管道 | Oracle-CDC

最佳实践

 OPPO | 清风 | 天翼云 | 马蜂窝 | 孩子王 | 哔哩哔哩 | 唯品会 | 众安保险 | 兆原数通 | 亚信科技 | 映客 | 翼康济世 | 信也科技 | 华润置地 | Shopee | 京东科技 | 58同城 | 互联网银行 | JPMorgan

测试报告

SeaTunnel VS GLUE |  VS Airbyte |  VS DataX | SeaTunnel 与 DataX 、Sqoop、Flume、Flink CDC 对比

源码解析

Zeta引擎源码解析(一) |(二) |(三)| API 源码解析 | 2.1.1源码解析 | 封装 Flink 连接数据库解析

 


Apache SeaTunnel





Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的数据集成平台

仓库地址: 
https://github.com/apache/seatunnel
网址:
https://seatunnel.apache.org/
Apache SeaTunnel 下载地址:
https://seatunnel.apache.org/download
衷心欢迎更多人加入!
我们相信,在Community Over Code(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!
我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!
提交问题和建议:
https://github.com/apache/seatunnel/issues
贡献代码:
https://github.com/apache/seatunnel/pulls
订阅社区开发邮件列表 : 
dev-subscribe@seatunnel.apache.org
开发邮件列表:
dev@seatunnel.apache.org
加入 Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
关注 Twitter: 
https://twitter.com/ASFSeaTunnel

文章转载自SeaTunnel,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论