暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

高途基于 Iceberg 和 Amoro 的湖仓一体架构实践

Amoro Community 2025-03-18
33

导读 本文将分享高途在数据湖方面的落地实践,以及对未来工作的展望。

主要内容包括以下几大部分:

1. 使用数据湖背景

2. 数据湖落地

3. 效果和收益

4. 未来展望和规划

分享嘉宾|王士达高途大数据高级开发工程师

编辑整理|曹加印

内容校对|李瑶

出品社区|DataFun



01

使用数据湖背景

首先分享一下使用数据湖的背景。我们希望过数据湖解决 Databus 增量同步任务遇到的一些问题,以及其它一些数仓问题。

1. Databus 增量同步任务问题

Databus 同步任务链路由 MySQL->Canal->Kafka->HDFS->Hive 临时表->全量表和增量表合并组成。
在实际生产中,Databus 存在如下一些问题,对数据链路产出效率造成了影响:
(1)链路长
增量同步链路和使用技术栈多,维护成本高。
(2)集群运维有概率导致数据丢失
合并过程中,EMR 集群组件重启有小概率导致历史数据丢失。此外,Databus 链路的服务器扩缩容有概率导致数据丢失。
(3)大表合并费资源
30 亿以上的大表,全量和增量做数据合并时花费资源约 1 万+/月。
(4)链路时效性低
由于每小时使用 Hive 引擎做全量表和增量表的数据合并,30 亿大表合并时运行时间超 25 分钟,导致小时级链路产出数据较晚。

2. 其它传统数仓问题

(1)离线全量同步速度慢
每天/每小时全量抽取上亿大表时,并发高会导致 MySQL 负载满影响业务,并发低导致抽取时间可能超 30 分钟,影响数据链路时效性。
(2)实时链路问题排查困难
实时链路通过 Kafka 存储,排查问题困难。以前需要在服务器消费上亿 Kafka 数据或者导入 CK 去排查数据问题,查询费时费力。现在通过分钟级写入数据湖,可以通过 SQL 直观地分析数据,提高解决问题的效率。
(3)离线、实时两套链路问题
由于当前数仓采用 Lambda 架构,实时数据和离线数据分别存储于不同介质中,会因两套技术栈导致以下几个问题:
  • 计算数据可能存在差异
  • 数据口径难以统一
  • 需要开发两套,浪费开发资源
(4)数据冗余和资源浪费问题
  • MySQL 入仓使用分区方式存储,每天/每小时存储一份 MySQL 全量数据,不是所有表都需要历史镜像,导致数据存在冗余。
  • 数仓 ODS 层存在多次同步同一张 MySQL 表的情况,造成存储、传输、MySQL 资源浪费。


02

数据湖落地

1. 同步工具

背景
由于期望业务数据实时入湖,要实现实时入湖,技术上选择了业界主流的流批一体框架 Flink,需要使用基于 Flink 的同步工具来支持高途线上任务稳定运行。
高途在同步工具上考虑了以下问题:
  • 目前高途大航海离线调度平台无法满足实时入湖诉求。实时 GLink 平台也已经在迁移上云,没有人力迭代实时开发平台。
  • 目前只有多半个人力投入到数据湖,包含调研、选型、工具搭建、高途大航海平台接入数据湖、快速处理用户问题等。
  • 需要保证实时同步工具的稳定性、同步数据质量
  • 离线数仓和 EMR 集群在腾讯云,使用腾讯云独有的 CHDFS+Goosefs 本地缓存架构, 以及开启了 Kerberos 认证,尽量不跨云。
同步工具选择
调研了几款开源和腾讯云工具,结合现状最后选择与腾讯云合作,使用全托管的腾讯数据集成工具来支持同步数据入湖。数据集成是基于 Flink 做的全托管同步工具,解决了上述提到的问题,可以释放高途在开发和运维上的人力投入。在近 1 年的测试和线上使用过程中,一起改造数据集成,解决同步过程中遇到的几十个问题,现在基本可用,但还需一起完善。

2. Iceberg 治理

背景
由于入湖使用 Flink 技术实现,每次写入都会生成数据和标记删除的小文件,而小文件多会影响查询性能,小文件超 1 千以上时可能会导致查询内存不足,阻塞大航海下游任务使用。所以需要一个治理程序对小文件、脏数据、快照等内容做清理,来保证 Iceberg 表可以被正常使用。高途和腾讯一起经历了 3 个阶段,最终使用今年 3 月进入 Apache 孵化器的 Amoro 工具来解决 Iceberg 小文件治理问题。
高图和腾讯合作,在这一年中经历了三个阶段来解决 Iceberg 治理的问题。
第一阶段
时间:23 年 11 月-23 年 12 月
在高途离线大航海调度平台,通过定时提交 spark-sql 执行存储过程对 Iceberg 表进行治理。


主要解决的问题为:
(1)定时对 Iceberg 表的小文件、元数据、过期快照进行治理。
(2)由于有定时治理,实时写入 Iceberg 后,可查询。
仍存在的问题包括:
(1)配置治理任务繁琐。不同表由于数据量不同,配置不同调度间隔。
(2)治理时费资源,每次都是全量重写,需要大量内存对小文件做合并。
(3)流量突增时,需要手动调整治理间隔。
(4)同步全量 Kafka 写入 Iceberg 后,部分表 Executor 给 40GB 内存也会 OOM,需要在全量同步过程中多次治理才可用。
(5)治理任务治理时有 Flink 写入,存在冲突导致治理失败,需要暂停写入然后进行小文件治理。
第二阶段
时间:23 年 12 月-24 年 7 月
通过腾讯 luoshu 工具进行治理。
解决问题包括:
(1)配置单个表治理任务简单,输入 Iceberg 表即可添加治理任务。
(2)支持定时对 Iceberg 表的小文件、孤儿文件、元数据、过期快照等进行治理,修改也方便。
(3)支持微批提交,在遇到写入治理冲突时,尽可能治理一部分小文件。
(4)降低治理时费资源问题,共用一个弹性内存的 Yarn 任务做治理,客户端定时提交治理 SQL。
还存在问题:
(1)整库同步场景需要添加非常多治理任务,一个一个添加,不方便。
(2)每次治理都是重写全表数据,存在资源浪费。
(3)流量突增时,需要手动调整治理间隔,否则会影响下游查询。
(4)全量 Kafka 写入 Iceberg 后,第一次治理,大表给 40GB 内存也治理不动,需要在全量同步过程中多次治理才可用。
第三阶段
时间:24 年 8 月-至今
通过腾讯劲松团队 Amoro 工具进行治理。
解决的问题包括:
(1)自动扫描数仓 Iceberg 表并自动治理,降低使用成本。
(2)支持增量小文件治理,解决 luoshu 每次治理重写整表的资源浪费问题。
(3)在全量同步或流量突增时,根据 Iceberg 表的小文件数量来自动发起治理,解决治理不及时无法查询问题。
(4)治理冲突时,3 分钟会自动重试,解决因定时治理冲突,导致治理不及时问题。
还存在问题:
(1)治理资源运行在 Yarn 上,任务失败时不会自动补资源,导致治理排队。
(2)缺少治理大盘和治理告警功能。
(3)异常时任务隔离能力待完善。
3. Iceberg 同步质量校验
背景
由于流式同步链路可能存在数据质量问题,需要通过数据对比工具来发现和解决数据质量问题。经调研,已知的数据质量校验工具都是基于规则对单数据源的数据内容进行校验,如单数据源行数、字段空值个数等规则,无法跨源对 MySQL、Iceberg 两端的数据内容做比对。而开发跨源对比工具基于以上背景
遇到数据对不上,PolarDB 问题,Hive 钩子函数问题
由于公司伙伴对新的流式同步工具存在质疑,认为流式链路可能存在数据质量问题,需要一套数据对比工具,通过对比 MySQL 和 Iceberg 数据内容,来保障数据准确性。经过调研,已知数据质量校验工具都是基于规则,对其中一方的数据内容进行校验,如目标表行数、目标表空值个数、目标表字段期望值等规则,无法对 MySQL、Iceberg 两端的数据内容做比对。
跨源对比工具需要考虑的问题点:
(1)MySQL 和 Iceberg 表是不同数据源,数据内容如何对比?
需要一个工具,能同步两端的数据到一起,然后进行数据内容比对
(2)比对哪些内容?
对比 MySQL 和 Iceberg 近 7 天的全部数据内容,以及当天 0 点之前的所有主键是否一致
(3)MySQL 大表如何比对?
因 MySQL 大表存在主键不连续、主键雪花算法、创建时间和更新时间列没有索引情况,直接查询 MySQL 近 7 天数据会查询超时。为了支持并行对比,抽取 MySQL 数据时采用滚动切片方式做任务划分,划分任务后并行抽取数据。在抽取数据时,为了减少无用的数据传输提高抽取速度,where 条件增加近 7 天限制。
(4)因为流式同步,导致数据一直变化,如何比对?
离线调度任务在每天 0 点 30 分后才开始运行,对比当天 0 点-7 天前 0 点数据。因为有半小时调度延迟,对比时昨天的数据已经写入进 Iceberg。主键对比时也是抽取 0 点之前所有主键进行对比。
工具收益:发现数据质量问题
(1)阿里云 PolarDB 版本低问题
公司使用 PolarDB 较早,由于版本太低 binlog 记录信息不完善,导致同步时使用 PolarDB 集群地址读取 binlog 存在数据丢失、恢复任务时报 binlog 不存在问题。Q4 升级测试环境 PolarDB 版本,25 年 Q1 升级线上 PolarDB 版本解决。
(2)阿里云只读库 rds 不记录 binlog 问题
阿里云 5.7 以下 MySQL 只读库 binlog 没记录 insert、update 信息,需要阿里云后台修改 rds_slave_minor_log=OFF。
(3)Hive 钩子函数导致丢数
在 Hivemetastore 中有个钩子函数用来记录表的最新访问时间。每次有用户或程序查询 Iceberg 表时,钩子函数会查询两次表元数据信息,第一次是进入钩子时查询元数据放入本地缓存,第二次是更新元数据前查询,如果发现信息不一致,会用缓存中的旧元数据信息覆盖新元数据信息。
例如,有程序查询表触发钩子函数时,钩子函数在更新表访问时间过程中,Flink 写入了新的快照,则钩子函数发现两次查询的元数据信息不一致,会用缓存中的元数据覆盖 Flink 写入的元数据,造成回滚现象,导致 Flink 写入的数据丢失。
修复后,线上 36 个数据对比任务每天跑,暂未发现新的丢数问题。
03

效果和收益

实时数仓 DWS 层入湖
公司消转数据治理和推送项目,通过实时和离线链路结合,提高链路时效性,满足业务小时级需求。以前链路:离线数仓 ODS->DWD->DWS 中有多个层级,加工到 DWS 层需要约 5 个小时。现在链路:实时链路加工的 Kafka 数据直接入数据湖 DWS 层,缩短为 5 分钟更新。
替换增量同步任务
通过数据湖替换以前增量同步任务,实现了同步效率显著提升和费用降低,有效解决了同步 MySQL 大表慢的问题。以前增量同步:超 30 亿以上大表使用 Hive 做全量和增量数据合并,预估费用 1 万/月。现在同步方案:MySQL大表改为实时入湖方式,预计 200 元/月。
MySQL 实时入湖
业务 Mysql 数据实时入湖,缩短抽数时间,提高小时级链路时效性。旧同步链路:每天/每小时全量抽取 MySQL 数据,有些大表抽取时间需要 40 分钟以上,造成下游任务等待,数据产出需要 1 小时以上,无法满足小时级链路。新链路:替换为分钟级入湖,下游任务等待时间缩短为分钟级,支持小时级数据链路。
04

未来展望和规划

1. 数据同步方面
  • 完善数据同步开发、运维规范。针对不合规情况有通知和修复机制。
  • 通过数据湖替换 Databus 任务收尾阶段。提高入湖速度,节约同步费用。
  • 由以前单表同步改为整库同步。一个任务同步多张 MySQL 表入湖,提高同步效率,降低费用。
2. 数据湖治理方面
  • 提高 Amoro 治理服务稳定性:支持异常流量时任务发现、隔离、治理能力。
  • 监控大盘完善:通过大盘及时发现治理隐患并告警,针对隐患做修复。
以上就是本次分享的内容,谢谢大家。


分享嘉宾

INTRODUCTION


王士达

高途

大数据高级开发工程师

7 年实际工作经验,一直在做大数据相关工作。

在美菜网 3.5 年,公司做生鲜电商业务。曾任职于履约研发部、成立数据仓库部、BI 研发部,参与公司数仓规范制定和数仓建设,以及 BI 工具和报表研发。

目前在高途 3.5 年,公司做在线教育业务。任职于大数据部的数据平台组,大数据高级开发工程师岗位。主要工作职责包括数据同步、数据湖调研落地、埋点上报及管理系统、USQL 统一查询服务、指标字典等

活动推荐

往期推荐


唯品会亿级数据秒级响应!热数据访问效率翻倍!

大模型驱动的DeepInsight Copilot在蚂蚁的技术实践

平安人寿ChatBI:大模型智能化报表的深度实践

Apache Spark SQL 优化器

“人工调参”VS“AI迭代”:基于 AI 的化合物配比优化

策略产品AI转型指南:能力模型与实战策略

广告召回新范式:京东生成式推理实战

lceberg 助力 B 站商业化模型样本行级更新的实践

AutoMQ Table Topic:基于 Iceberg 构建流式数据湖新范式

如何基于DeepSeek搭建AI Agent?

点个在看你最好看

SPRING HAS ARRIVED

文章转载自Amoro Community,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论