
我们很高兴地宣布,RisingWave 2.2 版本正式上线。该版本包含了许多重要更新,包括替换 AWS PrivateLink 的现有连接、支持 webhook source connector、以及新版 SQL 函数等。

此前,如果想从不同 VPC 的 Kafka 服务中接收消息或向其发送消息,需使用 AWS PrivateLink。2.2 版本弃用了这种连接方法。
新款连接方式允许在连接到 Kafka source、Kafka sink 或 Schema registry 时复用 Connector 属性(如代理地址和安全协议细节)。这样,您只需为集群设置一次必要的凭证,为创建多个 Sources 节省时间。同时,限制对敏感信息的访问也增强了安全性。
要创建连接,需使用 CREATE CONNECTION
命令并指定所需的参数。例如:
CREATE CONNECTION kafka_conn1 WITH (
type = 'kafka',
properties.bootstrap.server = 'localhost:9092'
) ;复制
必要的参数为 type
和 properties.bootstrap.server
,同时也支持添加与 SSL/SASL 认证、private link connection 及 AWS 认证相关的其他参数。
使用此连接可以创建 Kafka source、table 或 sink。该功能也简化了从同一集群中的不同主题中创建 Kafka source 的过程。
CREATE SOURCE kafka1 (
id int,
name varchar,
email varchar,
age int
) WITH (
connector = 'kafka',
connection = 'kafka_conn1',
topic = 'topic1',
scan.startup.mode='latest'
) FORMAT PLAIN ENCODE JSON;复制
若想使用 CREATE CONNECTION
命令连接到 Schema registry,请参照以下示例:
CREATE CONNECTION schema_1 WITH (
type = 'schema_registry',
schema.registry = 'http://...',
schema.registry.username = 'superuser',
schema.registry.password = 'pass123'
);复制
详细内容,请参见:
CREATE CONNECTION
[2]

用 EXPLAIN FORMAT 优化查询
RisingWave 中的 EXPLAIN
命令会返回 SELECT
语句的执行计划。对于特别复杂的语句,这有助于了解如何优化查询,轻松识别出消耗最多资源的操作,并找到可能的解决方法。此外,如果查询行为与预期不符,EXPLAIN
命令有助于定位查询设计中的错误。
不过,返回的执行计划可能难以阅读,因此我们引入了全新的 FORMAT
选项,以不同格式提供执行计划,让其更加易于理解,方便用户根据自己的偏好选择格式。
目前支持的格式包括:
JSON
TEXT
XML
YAML
例如,在以下查询使用 EXPLAIN
命令而不使用 FORMAT
选项,将看到未经格式化的输出。
CREATE TABLE t(v1 int);
EXPLAIN CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
---
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamTableScan { table: t, columns: [v1, _row_id] }复制
使用 FORMAT
选项后,可以用 JSON 格式查看查询执行计划。
EXPLAIN(physical, FORMAT json) CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
---
{
"name": "StreamMaterialize",
"fields": {
"columns": [
"v1",
"t._row_id(hidden)"
],
"pk_columns": [
"t._row_id"
],
"pk_conflict": "NoCheck",
"stream_key": [
"t._row_id"
]
},
"children": [
{
"name": "StreamTableScan",
"fields": {
"columns": [
"v1",
"_row_id"
],
"table": "t"
},
"children": []
}
] }复制
详细内容,请参见:
EXPLAIN
[3]

使用 ALTER ... SWAP WITH...
命令可以同时重命名两个对象,让两者的定义实现无缝切换,避免任何停机时间。
典型的应用场景为维护期间需要更新或替换对象,用此命令能够确保依赖的对象持续正常工作。
假设有物化视图 mv_old
定义如下:
CREATE MATERIALIZED VIEW mv_old AS
SELECT
product_category,
SUM(order_amount) AS total_sales
FROM
orders
GROUP BY
product_category;复制
如果想更新 total_sales
的计算方式,但有其他与 mv_old
相关的对象(如物化视图和 Sinks)需要保持不变。
在这种情况下,可以创建新物化视图 mv_new
,然后使用 ALTER...SWAP WITH...
将 mv_old
替换为 mv_new
,无需停机。
CREATE MATERIALIZED VIEW mv_new AS
SELECT
product_category,
SUM(order_amount - discount_amount) AS total_sales
FROM
orders
GROUP BY
product_category;
ALTER MATERIALIZED VIEW mv_old SWAP WITH mv_new;复制
以下数据库对象均支持使用 ALTER...SWAP WITH...
命令:
表 物化视图 视图 Source Sink 订阅
详细内容,请参见:
ALTER SWAP
[4]
自 2.2 版本起,RisingWave 支持从 webhook 获取数据,允许实时接收和处理外部 HTTP 请求。有了全新的 webhook connector 便无需再维护中间的 Kafka 集群,减少了 Data pipeline 中的麻烦,使得构建实时处理和自动化关键应用变得更加简单。
将数据从 webhook sources 导入 RisingWave 为 Premium 版本功能,想了解更多有关 RisingWave Premium 的信息,请参见RisingWave Premium|常见问题解答。
webhook 在多种场景中应用广泛,如发送通知和警报、数据同步以及触发 CI/CD 流程等。RisingWave 中的 webhook connector 支持从以下 Sources 获取事件:
GitHub Segment HubSpot Amazon EventBridge RudderStack
通过 CREATE TABLE
命令,可以迅速开始在 RisingWave 中接收 webhook 请求。以下示例使用了 SECRET
来验证传入的请求。
CREATE SECRET test_secret WITH (backend = 'meta') AS 'secret_value';
CREATE TABLE wbhtable (
data JSONB
) WITH (
connector = 'webhook'
) VALIDATE SECRET test_secret AS secure_compare(
headers->>'{header of signature}',
{signature generation expressions}
);复制
详细内容,请参见:
从 webhook 获取数据[5]
此次更新引入了表值函数(table-valued function)mysql_query
。和设置 CDC source 相比,这是一种轻量级的替代方案, 它允许按需从 MySQL 表中检索数据。对于数据更新频率很低或几乎不更新的 MySQL 表,运行此函数比创建 CDC source 和 table 要节省计算资源。
假设 MySQL 中有下表 users
:
first_name | last_name | age | city
-----------+-----------+-----+------
Aaron | Jones | 24 | NYC
Shelly | Doe | 36 | SF
Taylor | Smith | 52 | NYC复制
使用 mysql_query
函数可以查询该表,还可以使用 CREATE TABLE
函数从返回的结果中创建表。
SELECT * FROM mysql_query(
'localhost',
'3336',
'superuser',
'password123',
'mydb',
'SELECT * FROM users WHERE age >= 25;'
);
---
first_name | last_name | age | city
-----------+-----------+-----+------
Shelly | Doe | 36 | SF
Taylor | Smith | 52 | NYC复制
详细内容,请参见:
从 MySQL 表中获取数据[6]
原生 PostgreSQL sink
此前,RisingWave 使用 JDBC API 作为 PostgreSQL sink connector。2.2 版本引入了基于 Rust 实现的原生 PostgreSQL sink connector,旨在优化 PostgreSQL 连接的性能和控制。支持的 Sink type 包括 upsert
和 append-only
。
使用时,若想让 Sink connector 从 JDBC 切换到 Rust,请将 streaming.developer
的配置项 stream_switch_jdbc_pg_to_native
设置为 true
。
然后就可以像往常一样使用 CREATE SINK
命令创建 PostgreSQL sink。
CREATE SINK rust_pg_sink FROM sink_content WITH (
connector = 'postgres',
host = 'localhost',
port = 4566,
user = 'superuser',
password = 'pass123',
database = 'dev',
table = 'sink_table',
type = 'append_only'
);复制
详细内容,请参见:
从 RisingWave 导出数据至 PostgreSQL[7]
更新日志: https://docs.risingwave.com/changelog/release-notes
[2]CREATE CONNECTION
: https://docs.risingwave.com/sql/commands/sql-create-connection
EXPLAIN
: https://docs.risingwave.com/sql/commands/sql-explain
ALTER SWAP
: https://docs.risingwave.com/sql/commands/sql-alter-swap
从 webhook 获取数据: https://docs.risingwave.com/integrations/sources/webhook
[6]从 MySQL 表中获取数据: https://docs.risingwave.com/integrations/sources/mysql-table
[7]从 RisingWave 导出数据至 PostgreSQL: https://docs.risingwave.com/integrations/destinations/postgresql
关于 RisingWave

技术内幕
👇 点击阅读原文,立即体验 RisingWave!
评论





