
本文将探讨如何利用 RisingWave 和 PuppyGraph 构建实时图分析框架,以应对数据处理中的挑战,实现高效的实时图数据分析。首先,让我们分别了解 RisingWave 和 PuppyGraph。

了解 RisingWave
RisingWave[1] 是一款现代化的 SQL 数据平台,完全兼容 PostgreSQL,专为满足当今数据驱动型应用的动态需求而打造。它具备广泛的应用场景,可用于事件驱动架构、实时 ETL 管道、持续分析服务以及 AI 特征存储,使企业能够充分挖掘数据的价值。无论是处理实时事件流、通过 CDC(变更数据捕获)监控数据库变更,还是分析时序数据,RisingWave 都能提供最新、稳定的洞察。
其高性能架构支持毫秒级延迟计算,这对需要快速决策的场景至关重要。RisingWave 无缝结合了流式和批量处理的优势,使用户能够在统一框架下高效摄取、关联和分析实时及历史数据。这种融合不仅简化了复杂的数据处理流程,还提升了分析的灵活性。
此外,RisingWave 采用云原生设计,能够根据业务需求弹性扩展,同时保证高可靠性。无论是在托管云环境还是自托管基础设施中部署,RisingWave 都能凭借其可扩展性和稳健性能,胜任各种高负载数据应用。它帮助企业实时洞察数据,推动创新,提供灵活高效的现代数据处理方案。

图示:RisingWave 实时数据技术栈

了解 PuppyGraph
PuppyGraph[2] 是市场上首个也是目前唯一的实时零 ETL(无需数据提取、转换和加载)图查询引擎,使数据团队能够直接将现有的关系型数据存储作为图模型查询,并在不到 10 分钟内完成设置,无需传统图数据库的高成本、长延迟和复杂维护。PuppyGraph 适用于现代数据驱动型应用,消除了数据迁移和冗余存储的复杂性,同时确保数据始终最新、同步。无论是在云端还是自托管环境中部署,这种零 ETL 方式都能帮助企业高效利用现有数据,而无需额外的存储成本或数据复制延迟。
PuppyGraph 具备卓越的性能与可扩展性,能够处理 TB 级甚至 PB 级数据,并管理数十亿个相互关联的实体。它支持 Gremlin[3] 和 openCypher[4] 等主流图查询语言,使用户能够轻松探索复杂的数据关系,发现潜在的模式。这种对图模型和关系型数据库的无缝整合,不仅简化了数据分析流程,也极大地提升了分析的灵活性,广泛适用于欺诈检测[5]、网络安全[6]、日志分析等多个领域。
PuppyGraph 彻底改变了企业访问和分析关联数据的方式,使其能够在保障数据完整性和运维简洁性的同时,迅速获取深层次洞察。
图示:图数据库架构 vs. PuppyGraph 图查询引擎架构
RisingWave + PuppyGraph
基于 RisingWave 提供的高质量数据,PuppyGraph 可在无需 ETL 的情况下,将其直接转换为完整的图模型。通过将多个数据源视为统一的图结构,数据团队能够揭示复杂的关系网络,发现传统 SQL 分析难以挖掘的潜在模式。
图示:RisingWave & PuppyGraph 架构示意图

简单的演示
接下来,我们通过一个简单的演示来了解二者如何协同工作。详细的操作指南和示例数据可在 GitHub 仓库[7]中获取。本次演示使用 LDBC Financial Benchmark[8] 数据集,该基准测试模拟了真实的金融业务场景,如反欺诈和风险管理。
准备工作:
Docker 和 Docker Compose[9] PostgreSQL 客户端[10] Python 3 和虚拟环境[11]
启动服务
运行以下命令启动 Kafka、RisingWave 和 PuppyGraph 服务:
docker compose up -d
复制
创建 Python 虚拟环境
创建一个 Python 虚拟环境并安装 confluent_kafka
包:
python3 -m venv myvenv
source myvenv/bin/activate
pip install confluent-kafka复制
创建 Kafka 主题
运行 Python 脚本 topics.py
来创建 Kafka 主题:
python topics.py -c
复制
RisingWave 连接 Kafka
使用 PostgreSQL 客户端执行 SQL 脚本 rw_kafka.sql
以创建 Sources 和物化视图:
psql -h localhost -p 4566 -d dev -U root -f rw_kafka.sql
复制
例如,对于 kafka-Account
主题,可以使用以下语句:
CREATE SOURCE IF NOT EXISTS account_stream (
"label" varchar,
"accountId" bigint,
"createTime" timestamptz,
"isBlocked" boolean,
"accountType" varchar,
"nickname" varchar,
"phonenum" varchar,
"email" varchar,
"freqLoginType" varchar,
"lastLoginTime" timestamptz,
"accountLevel" varchar
)
WITH (
connector='kafka',
topic='kafka-Account',
properties.bootstrap.server='kafka:9092',
scan.startup.mode='earliest'
)
FORMAT PLAIN ENCODE JSON;复制
以及
CREATE MATERIALIZED VIEW IF NOT EXISTS account_mv AS
SELECT * FROM account_stream;复制
导入快照数据
运行 topics.py
脚本导入快照数据:
python topics.py -s
复制
在 RisingWave 中查询数据(可选)
psql -h localhost -p 4566 -d dev -U root
复制
例如,列出数据库中的表、视图和序列:
\d
复制

图建模
在浏览器中访问 PuppyGraph 界面:http://localhost:8081
,登录凭据如下:
用户名: puppygraph
密码: puppygraph123
上传模型架构:在界面中,找到 Upload Graph Schema JSON,选择文件 schema.json
,然后点击 Upload 进行上传。
图示:金融数据的图模式
通过 PuppyGraph 进行查询
你可以尝试使用 Gremlin 或 Cypher 语句对 PuppyGraph 的快照数据进行查询。
在左侧导航栏进入 Query 面板。在 Gremlin Query 选项卡中,你可以使用 Gremlin 交互式查询图数据。 每次查询后,请清除图形面板,以保持可视化界面的整洁。在页面右上角点击 Clear 按钮。 如果要使用 Cypher 进行查询,可以参考 Graph Notebook 和 Cypher Console[12]。
示例:
Gremlin 查询账户数量:
g.V().hasLabel('Account').count()
复制Cypher 查询账户数量:
MATCH (x:Account) RETURN count(x)
复制
导入增量数据
运行 Python 脚本 topics.py
以导入增量数据:
python topics.py -i
复制
实时查询 PuppyGraph
你可以在 PuppyGraph 中持续查询,并观察随着新数据的加入,查询结果的变化。
清理与关闭
如需停止并移除容器及网络,请运行以下命令:
docker compose down -v
复制
如果您对这种集成方案感兴趣,欢迎免费试用 RisingWave Cloud[13](点击阅读原文立即体验),并下载 PuppyGraph Developer Edition[14] 进行体验!
RisingWave: https://risingwave.com/
[2]PuppyGraph: https://www.puppygraph.com/
[3]Gremlin: https://www.puppygraph.com/blog/gremlin-graph
[4]openCypher: https://docs.puppygraph.com/reference/cypher-query-language/?h=cy
[5]欺诈检测: https://www.puppygraph.com/finserv
[6]网络安全: https://www.puppygraph.com/cybersecurity
[7]GitHub 仓库: https://github.com/puppygraph/puppygraph-getting-started/tree/main/integration-demos/risingwave-demo
[8]LDBC Financial Benchmark: https://ldbcouncil.org/benchmarks/finbench/
[9]Docker 和 Docker Compose: https://docs.docker.com/compose/
[10]PostgreSQL 客户端: https://docs.risingwave.com/deploy/install-psql-without-postgresql
[11]Python 3 和虚拟环境: https://docs.python.org/3/library/venv.html
[12]Graph Notebook 和 Cypher Console: https://docs.puppygraph.com/querying/querying-using-opencypher/
[13]RisingWave Cloud: https://cloud.risingwave.com/auth/signup/?hsCtaAttrib=171085305230
[14]PuppyGraph Developer Edition: https://www.puppygraph.com/download-confirmation
关于 RisingWave

技术内幕
👇 点击阅读原文,立即体验 RisingWave!