自我介绍
我叫蒋晓峰,来自于阿里云开源大数据平台生态技术组,负责 Apache Flink 的 Connector 生态,同时负责集团和开源使用 RocketMQ Connector 的用户答疑。
最初接触到 RocketMQ 的时候,却是在来阿里以前,作为开源和大数据爱好者,慢慢发现上下游 RocketMQ 出现的次数也越来越多,其作为业务消息领域的首选,大量高价值的业务数据通过 RocketMQ 流转,自然而然的就产生了数据处理的需求,而 Flink 在实时计算中已经近乎成为事实标准,两相结合,于是就有了 rocketmq-flink[1] 的诞生。
当然,不仅仅是在一次次流量洪峰挑战下与阿里云 RocketMQ 团队结成的深厚战斗友谊,也包括逐渐喜欢上了 RocketMQ 稳定至上、重剑无锋的风格,所有的原因都促使我产生了对 rocketmq-flink 进行深度优化的念头。因此,在随后的日子里,我不仅与社区一起优化了原有的 RocketMQ DataStream Source/sink 的实现,而且在消息领域率先实现了 FLIP-27 的全新接口,并开始对 FLIP-143 进行支持;为了给用户提供基于 SQL 的开发体验,在 Datastream 实现的基础上,提供了 ScanTableSource/DynamicTableSink Table 接口的实现,目前 RocketMQ Connector 这三套公共接口的实现,均已经过了 “双十一” 海量 RocketMQ 消息数据考验洗礼表现稳定无任何故障,而这一切均已贡献给了社区。
经过 Apache RocketMQ 社区投票,有幸成为一名 Committer,Apache RocketMQ社区的开放、友好、积极的氛围,激励着我一路成长,在此也感谢我的社区领路人杜恒。成为 Committer,既是社区对自己的肯定,也是一份沉甸甸的责任,我也将努力与 RocketMQ 社区开发者一起,将 RocketMQ 打造成为最符合实时计算引擎Flink的消息系统,全面融入 Flink 生态;除此之外,也将与社区一起,持续优化符合流计算语义的轻量级 rocketmq-flink connect,共同将 Apache RocketMQ 打造成新一代“消息、事件、流”融合处理平台。
RocketMQ-Flink 介绍
Apache Flink 的 RocketMQ 集成 RocketMQ-Flink 模块包括 RocketMQSource/RocketMQSink 和 RocketTableSource/RocketMQSink,允许用户使用其 Source 从 RocketMQ 的主题中实例订阅消息和 Sink 发布消息到 RocketMQ 的主题。
RocketMQ SQL Connector
如何创建 Apache Flink 的 RocketMQ 表?
下面的例子展示了如何创建 RocketMQ 表:
CREATE TABLE rocketmq_source (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'rocketmq',
'topic' = 'user_behavior',
'consumeGroup' = 'behavior_consume_group',
'nameServerAddress' = '127.0.0.1:9876'
);
CREATE TABLE rocketmq_sink (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'rocketmq',
'topic' = 'user_behavior',
'produceGroup' = 'behavior_produce_group',
'nameServerAddress' = '127.0.0.1:9876'
);
可用元数据
以下的连接器元数据可以在表定义中通过元数据列的形式获取。
R/W
列定义了一个元数据是可读的(R
)还是可写的(W
)。
只读列必须声明为 VIRTUAL
以在 INSERT INTO
操作中排除它们。
KEY | DATA TYPE | DESCRIPTION | DEFAULT |
---|---|---|---|
topic | STRING NOT NULL | Topic name of the RocketMQ record. | R |
扩展的 CREATE TABLE
示例演示公开这些元数据字段的语法:
CREATE TABLE rocketmq_source (
`topic` STRING METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'rocketmq',
'topic' = 'user_behavior',
'consumeGroup' = 'behavior_consume_group',
'nameServerAddress' = '127.0.0.1:9876'
);
欢迎社区用户试用 rocketmq-flink 并进行积极反馈,也欢迎Flink 与 RocketMQ 社区开发者积极参与进来,共同完善 rocketmq flink connector。
[1] https://github.com/apache/rocketmq-flink