暂无图片
暂无图片
4
暂无图片
暂无图片
9
暂无图片

RisingWave 2.2 发布!

Doc 团队
来源|RisingWave 官网

我们很高兴地宣布,RisingWave  2.2 版本正式上线。该版本包含了许多重要更新,包括替换 AWS PrivateLink 的现有连接、支持 webhook source connector、以及新版 SQL 函数等。

如果您对 2.2 版本的完整更新信息感兴趣,请查看更新日志[1]以下是 RisingWave 2.2 版本的一些重点更新。
Kafka connector 的可复用连接

此前,如果想从不同 VPC 的 Kafka 服务中接收消息或向其发送消息,需使用 AWS PrivateLink。2.2 版本弃用了这种连接方法。

新款连接方式允许在连接到 Kafka source、Kafka sink 或 Schema registry 时复用 Connector 属性(如代理地址和安全协议细节)。这样,您只需为集群设置一次必要的凭证,为创建多个 Sources 节省时间。同时,限制对敏感信息的访问也增强了安全性。

要创建连接,需使用 CREATE CONNECTION
命令并指定所需的参数。例如:

CREATE CONNECTION kafka_conn1 WITH (
 type = 'kafka',
 properties.bootstrap.server = 'localhost:9092'
) ;


复制

必要的参数为 type
properties.bootstrap.server
,同时也支持添加与 SSL/SASL 认证、private link connection 及 AWS 认证相关的其他参数。

使用此连接可以创建 Kafka source、table 或 sink。该功能也简化了从同一集群中的不同主题中创建 Kafka source 的过程。

 CREATE SOURCE kafka1 (
  id int,
  name varchar,
  email varchar,
  age int
 ) WITH (
  connector = 'kafka',
  connection = 'kafka_conn1',
  topic = 'topic1',
  scan.startup.mode='latest'
 ) FORMAT PLAIN ENCODE JSON;

复制

若想使用 CREATE CONNECTION
命令连接到 Schema registry,请参照以下示例:

CREATE CONNECTION schema_1 WITH (
  type = 'schema_registry',
  schema.registry = 'http://...',
  schema.registry.username = 'superuser',
  schema.registry.password = 'pass123'
);


复制

详细内容,请参见:

  • CREATE CONNECTION
    [2]

用 EXPLAIN FORMAT 优化查询

RisingWave 中的 EXPLAIN
命令会返回 SELECT
语句的执行计划。对于特别复杂的语句,这有助于了解如何优化查询,轻松识别出消耗最多资源的操作,并找到可能的解决方法。此外,如果查询行为与预期不符,EXPLAIN
命令有助于定位查询设计中的错误。

不过,返回的执行计划可能难以阅读,因此我们引入了全新的 FORMAT
选项,以不同格式提供执行计划,让其更加易于理解,方便用户根据自己的偏好选择格式。

目前支持的格式包括:

  • JSON

  • TEXT

  • XML

  • YAML

例如,在以下查询使用 EXPLAIN
命令而不使用 FORMAT
选项,将看到未经格式化的输出。

CREATE TABLE t(v1 int);

EXPLAIN CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
---
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamTableScan { table: t, columns: [v1, _row_id] }


复制

使用 FORMAT
选项后,可以用 JSON 格式查看查询执行计划。

EXPLAIN(physicalFORMAT jsonCREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
---
  {
   "name": "StreamMaterialize",
   "fields": {
     "columns": [
       "v1",
       "t._row_id(hidden)"
     ],
     "pk_columns": [
       "t._row_id"
     ],
     "pk_conflict": "NoCheck",
     "stream_key": [
       "t._row_id"
     ]
   },
   "children": [
     {
       "name": "StreamTableScan",
       "fields": {
         "columns": [
           "v1",
           "_row_id"
         ],
         "table": "t"
       },
       "children": []
     }
   ] }

复制

详细内容,请参见:

  • EXPLAIN
    [3]

ALTER...SWAP WITH... 命令

使用 ALTER ... SWAP WITH...
命令可以同时重命名两个对象,让两者的定义实现无缝切换,避免任何停机时间。

典型的应用场景为维护期间需要更新或替换对象,用此命令能够确保依赖的对象持续正常工作。

假设有物化视图 mv_old
定义如下:

CREATE MATERIALIZED VIEW mv_old AS
SELECT
    product_category,
    SUM(order_amount) AS total_sales
FROM
    orders
GROUP BY
    product_category;


复制

如果想更新 total_sales
的计算方式,但有其他与 mv_old
相关的对象(如物化视图和 Sinks)需要保持不变。

在这种情况下,可以创建新物化视图 mv_new
,然后使用 ALTER...SWAP WITH...
mv_old
替换为 mv_new
,无需停机。

CREATE MATERIALIZED VIEW mv_new AS
SELECT
    product_category,
    SUM(order_amount - discount_amount) AS total_sales
FROM
    orders
GROUP BY
    product_category;

ALTER MATERIALIZED VIEW mv_old SWAP WITH mv_new;


复制

以下数据库对象均支持使用 ALTER...SWAP WITH...
命令:

  • 物化视图
  • 视图
  • Source
  • Sink
  • 订阅

详细内容,请参见:

  • ALTER SWAP
    [4]

从 webhooks 获取数据

自 2.2 版本起,RisingWave 支持从 webhook 获取数据,允许实时接收和处理外部 HTTP 请求。有了全新的 webhook connector 便无需再维护中间的 Kafka 集群,减少了 Data pipeline 中的麻烦,使得构建实时处理和自动化关键应用变得更加简单。

将数据从 webhook sources 导入 RisingWave 为 Premium 版本功能,想了解更多有关 RisingWave Premium 的信息,请参见RisingWave Premium|常见问题解答

webhook 在多种场景中应用广泛,如发送通知和警报、数据同步以及触发 CI/CD 流程等。RisingWave 中的 webhook connector 支持从以下 Sources 获取事件:

  • GitHub
  • Segment
  • HubSpot
  • Amazon EventBridge
  • RudderStack

通过 CREATE TABLE
命令,可以迅速开始在 RisingWave 中接收 webhook 请求。以下示例使用了 SECRET
来验证传入的请求。

CREATE SECRET test_secret WITH (backend = 'meta'AS 'secret_value';

CREATE TABLE wbhtable (
  data JSONB
WITH (
  connector = 'webhook'
VALIDATE SECRET test_secret AS secure_compare(
  headers->>'{header of signature}',
  {signature generation expressions}
);


复制

详细内容,请参见:

  • 从 webhook 获取数据[5]

MySQL 表值函数

此次更新引入了表值函数(table-valued function)mysql_query
。和设置 CDC source 相比,这是一种轻量级的替代方案, 它允许按需从 MySQL 表中检索数据。对于数据更新频率很低或几乎不更新的 MySQL 表,运行此函数比创建 CDC source 和 table 要节省计算资源。

假设 MySQL 中有下表 users

first_name | last_name | age | city
-----------+-----------+-----+------
Aaron      | Jones     | 24  | NYC
Shelly     | Doe       | 36  | SF
Taylor     | Smith     | 52  | NYC


复制

使用 mysql_query
函数可以查询该表,还可以使用 CREATE TABLE
函数从返回的结果中创建表。

SELECT * FROM mysql_query(
 'localhost',
 '3336',
 'superuser',
 'password123',
 'mydb',
 'SELECT * FROM users WHERE age >= 25;'
);
---
first_name | last_name | age | city
-----------+-----------+-----+------
Shelly     | Doe       | 36  | SF
Taylor     | Smith     | 52  | NYC   

复制

详细内容,请参见:

  • 从 MySQL 表中获取数据[6]

原生 PostgreSQL sink

此前,RisingWave 使用 JDBC API 作为 PostgreSQL sink connector。2.2 版本引入了基于 Rust 实现的原生 PostgreSQL sink connector,旨在优化 PostgreSQL 连接的性能和控制。支持的 Sink type 包括 upsert
append-only

使用时,若想让 Sink connector 从 JDBC 切换到 Rust,请将 streaming.developer
的配置项 stream_switch_jdbc_pg_to_native
设置为 true

后就可以像往常一样使用 CREATE SINK
命令创建 PostgreSQL sink。

CREATE SINK rust_pg_sink FROM sink_content WITH (
    connector = 'postgres',
    host = 'localhost',
    port = 4566,
    user = 'superuser',
    password = 'pass123',
    database = 'dev',
    table = 'sink_table',
    type = 'append_only'
);


复制

详细内容,请参见:

  • 从 RisingWave 导出数据至 PostgreSQL[7]
参考资料
[1]

更新日志: https://docs.risingwave.com/changelog/release-notes

[2]

CREATE CONNECTION
: https://docs.risingwave.com/sql/commands/sql-create-connection

[3]

EXPLAIN
: https://docs.risingwave.com/sql/commands/sql-explain

[4]

ALTER SWAP
: https://docs.risingwave.com/sql/commands/sql-alter-swap

[5]

从 webhook 获取数据: https://docs.risingwave.com/integrations/sources/webhook

[6]

从 MySQL 表中获取数据: https://docs.risingwave.com/integrations/sources/mysql-table

[7]

从 RisingWave 导出数据至 PostgreSQL: https://docs.risingwave.com/integrations/destinations/postgresql

关于 RisingWave 

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。
RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

技术内幕

如何上手 RisingWave 👉 新手入门教程

RisingWave 中文用户文档上线,阅读更高效!

深入探索 RisingWave 中的高可用性与容错机制

深入理解 RisingWave 流处理引擎(三):触发机制

深入理解 RisingWave 流处理引擎(二):计算模型

深入理解 RisingWave 流处理引擎(一):总览

用户案例
视源股份(CVTE)IT 流计算应用历程
尘锋 SCRM 如何使用 RisingWave 实时打宽
RisingWave 在超百亿管理规模对冲基金公司中的应用
金融科技公司 Kaito 使用 RisingWave 实现实时智能化
龙腾出行如何通过 RisingWave 实现实时数据分析

RisingWave 助力乾象投资打造实时监控平台



👇 点击阅读原文立即体验 RisingWave

最后修改时间:2025-02-17 09:37:54
文章转载自RisingWave中文开源社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论

微笑
暂无图片
1月前
评论
暂无图片 0
云服务支持智能问答
1月前
暂无图片 点赞
评论
数据中心
暂无图片
1月前
评论
暂无图片 0
Cloudv6.5发布,支持GaussDBAI智能问答,带来专属顾问体验
1月前
暂无图片 点赞
评论
洗了个头
暂无图片
1月前
评论
暂无图片 0
科蓝SUNDB分布式数据库管理系统v5.0入围
1月前
暂无图片 点赞
评论
乌蒙
暂无图片
1月前
评论
暂无图片 0
火山引擎ByteHouse助力抖音集团降本60%
1月前
暂无图片 点赞
评论
心有阳光
暂无图片
1月前
评论
暂无图片 0
RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。
1月前
暂无图片 点赞
评论
巴比龙
暂无图片
1月前
评论
暂无图片 0
科蓝SUNDB分布式数据库管理系统v5.0入围
1月前
暂无图片 点赞
评论
查看更多 >