中秋佳节到来之际,小编在此预祝大家中秋快乐,人月两圆🥮🥮
5分钟了解RocketMQ,今天为大家带来的是【轻松上手 RocketMQ 专栏】第三篇——集群模式和广播模式。
RocketMQ的消费者消费消息,有2种模式:
负载均衡模式(集群模式)
广播模式
首先我们先启动一个生产者:发送了10条消息,主题是TopicTest,tag是TagA
public class SyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
//设置生产者组
defaultMQProducer.setProducerGroup("syncProducer");
//设置nameserver
defaultMQProducer.setNamesrvAddr("localhost:9876");
//启动生产者
defaultMQProducer.start();
for (int i = 0; i < 10; i++) {
//构建消息 topic tag 内容
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//同步发送,且返回结果
SendResult sendResult = defaultMQProducer.send(msg);
System.out.println("发送结果"+sendResult);
}
//关闭生产者
defaultMQProducer.shutdown();
}
}
//运行结果
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22A0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22C0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22E0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=352]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=352]
14:29:56.930 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true复制
接下来,我们启动2个消费者,分别用负载均衡模式和广播模式去进行消费信息:
负载均衡(或叫集群模式)
public class ClusterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer");
consumer.setNamesrvAddr("localhost:9876");
//设置集群模式,也就是负载均衡模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//订阅主题和标签
consumer.subscribe("TopicTest","TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费信息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
//运行2个实例,结果如下:
实例1:
消费信息:Hello RocketMQ 3
消费信息:Hello RocketMQ 2
消费信息:Hello RocketMQ 7
消费信息:Hello RocketMQ 6
实例2:
消费信息:Hello RocketMQ 1
消费信息:Hello RocketMQ 0
消费信息:Hello RocketMQ 4
消费信息:Hello RocketMQ 5
消费信息:Hello RocketMQ 8
消费信息:Hello RocketMQ 9复制
广播模式
public class BoardConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer");
consumer.setNamesrvAddr("localhost:9876");
//设置 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//订阅主题和标签
consumer.subscribe("TopicTest","TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费信息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
//运行2个实例,结果如下:
实例1:
消费信息:Hello RocketMQ 0
消费信息:Hello RocketMQ 1
消费信息:Hello RocketMQ 2
消费信息:Hello RocketMQ 3
消费信息:Hello RocketMQ 5
消费信息:Hello RocketMQ 9
消费信息:Hello RocketMQ 4
消费信息:Hello RocketMQ 6
消费信息:Hello RocketMQ 7
消费信息:Hello RocketMQ 8
实例2:
消费信息:Hello RocketMQ 2
消费信息:Hello RocketMQ 3
消费信息:Hello RocketMQ 1
消费信息:Hello RocketMQ 0
消费信息:Hello RocketMQ 5
消费信息:Hello RocketMQ 9
消费信息:Hello RocketMQ 4
消费信息:Hello RocketMQ 8
消费信息:Hello RocketMQ 7
消费信息:Hello RocketMQ 6复制
今天分享了消费者负载均衡模式和广播模式。在生产中,一般都是用负载均衡模式。广播模式比较少用。但还是得具体场景具体分析。
后续文章
RocketMQ-入门(已更新)
RocketMQ-发送消息(已更新)
RocketMQ-集群模式和广播模式(已更新) RocketMQ-顺序消息 RocketMQ-延迟消息 RocketMQ-批量消息 RocketMQ-过滤消息 RocketMQ-事务消息 RocketMQ-消息存储 RocketMQ-高可用 RocketMQ-高性能 RocketMQ-主从复制 RocketMQ-刷盘机制 RocketMQ-幂等性 RocketMQ-消息重试 RocketMQ-死信队列
社区征稿
欢迎社区小伙伴积极分享 RocketMQ 使用案例及最佳实践,投稿一经采用,会有 RocketMQ 精美 T 恤衫、书包等精美周边赠送,投稿邮箱:duhengforever@apache.org
文章转载自RocketMQ官微,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
数据库国产化替代深化:DBA的机遇与挑战
代晓磊
1329次阅读
2025-04-27 16:53:22
2025年4月国产数据库中标情况一览:4个千万元级项目,GaussDB与OceanBase大放异彩!
通讯员
808次阅读
2025-04-30 15:24:06
【活动】分享你的压箱底干货文档,三篇解锁进阶奖励!
墨天轮编辑部
542次阅读
2025-04-17 17:02:24
一页概览:Oracle GoldenGate
甲骨文云技术
507次阅读
2025-04-30 12:17:56
GoldenDB数据库v7.2焕新发布,助力全行业数据库平滑替代
GoldenDB分布式数据库
484次阅读
2025-04-30 12:17:50
优炫数据库成功入围新疆维吾尔自治区行政事业单位数据库2025年框架协议采购!
优炫软件
370次阅读
2025-04-18 10:01:22
给准备学习国产数据库的朋友几点建议
白鳝的洞穴
357次阅读
2025-05-07 10:06:14
XCOPS广州站:从开源自研之争到AI驱动的下一代数据库架构探索
韩锋频道
320次阅读
2025-04-29 10:35:54
MySQL 30 周年庆!MySQL 8.4 认证免费考!这次是认真的。。。
数据库运维之道
306次阅读
2025-04-28 11:01:25
国产数据库图谱又上新|82篇精选内容全览达梦数据库
墨天轮编辑部
290次阅读
2025-04-23 12:04:21