暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

一、RocketMQ简单介绍

355

一、基本介绍

RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

1.1 架构

image-20230514234644902

  1. Broker启动的时候,会往每台NameServer(因为NameServer之间不通信,所以每台都得注册)注册自己的信息,这些信息包括自己的ip和端口号,自己这台Broker有哪些topic等信息。
  2. Producer在启动之后会跟会NameServer建立连接,定期从NameServer中获取Broker的信息,当发送消息的时候,会根据消息需要发送到哪个topic去找对应的Broker地址,如果有的话,就向这台Broker发送请求;没有找到的话,就看根据是否允许自动创建topic来决定是否发送消息。
  3. Broker在接收到Producer的消息之后,会将消息存起来,持久化,如果有从节点的话,也会主动同步给从节点,实现数据的备份
  4. Consumer启动之后也会跟会NameServer建立连接,定期从NameServer中获取Broker和对应topic的信息,然后根据自己需要订阅的topic信息找到对应的Broker的地址,然后跟Broker建立连接,获取消息,进行消费

1.2 组成部分

1.2.1 NameServer

可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。

1.2.2 Broker

核心的一个角色,主要是用来保存topic的信息,接受生产者产生的消息,持久化消息。在一个Broker集群中,相同的BrokerName可以称为一个Broker组,一个Broker组中,BrokerId为0的为主节点,其它的为从节点。BrokerName和BrokerId是可以在Broker启动时通过配置文件配置的。每个Broker组只存放一部分消息。

1.2.3 生产者

生产者(Producer)就是消息的发送者,Apache RocketMQ 拥有丰富的消息类型,可以支持不同的应用场景,在不同的场景中,需要使用不同的消息进行发送。支持多种消息,普通消息、顺序消息、事务消息、延迟消息。

1.2.4 消费者

用来消费生产者消息的一方。

消费者组:如果多个消费者设置了相同的Consumer Group,我们认为这些消费者在同一个消费组内。消费者组和生产者组都是一个逻辑概念。

  • 当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。可以通过调整消费者消费的实例数量来调整消息的消费速度。
  • 当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。调整消费者的数量无法影响消费速度。

1.2.5 队列

为了支持高并发和水平扩展,需要对 Topic 进行分区,在 RocketMQ 中这被称为队列,一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上。

1.2.6 消息

topic(主题):必填。可以理解为一个消息的集合的名字,生产者在发送消息的时候需要指定发到哪个topic下,消费者消费消息的时候也需要知道自己消费的是哪些topic底下的消息。

Tag(子主题) :选填。比topic低一级,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个。

Keys: 选填。服务器会根据 keys 创建哈希索引,设置后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等。根据keys的值来选择投递的队列。

body :必填。表示消息的存储内容。

properties :选填。表示消息属性。RocketMQHeaders枚举中的配置。

transactionId :选填。会在事务消息中使用。

DelayTimeLevel:选填。消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费。

WaitStoreMsgOK:选填。表示消息是否在服务器落盘后才返回应答。

1.3 消费概念

1.3.1 负载均衡

集群模式下,同一个消费组内的消费者会分担收到的全量消息。Apache RocketMQ 提供了多种集群模式下的分配策略,包括平均分配策略、机房优先分配策略、一致性hash分配策略等。默认是平均分配策略。

在消费时如果要提升消费速度,必须保证队列数量和消费者实例数量相适应。每一个队列只会分配给一个消费者消费,队列不够的情况下增加消费者无法提升消费速度。

1.3.2 消费位点

在Apache RocketMQ中每个队列都会记录自己的最小位点、最大位点。

针对于消费组,还有消费位点的概念,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的。

一般情况下消费位点正常更新,不会出现消息重复,但如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡,重平衡完成后,每个消费者可能会分配到新的队列,而不是之前处理的队列。为了能继续之前的工作,消费者需要读取每个队列最后一次的提交的消费位点,然后从消费位点处继续拉取消息。但在实际执行过程中,由于客户端提交给服务端的消费位点并不是实时的,所以重平衡就可能会导致消息少量重复

1.3.3 推、拉和长轮询

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

  • Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
  • Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

Apache RocketMQ既提供了Push模式也提供了Pull模式。适用原生SDK来处理。

PUSH

public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 初始化consumer,并设置consumer group name DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // 设置NameServer地址 consumer.setNamesrvAddr("localhost:9876"); //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息 consumer.subscribe("TopicTest", "*"); //注册回调接口来处理从Broker中收到的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动Consumer consumer.start(); System.out.printf("Consumer Started.%n"); } }
复制

PULL

RocketMQ 4.6.0推出的Lite Pull Consumer,相比于原始的Pull Consumer更加简单易用,它提供了Subscribe和Assign两种模式。

//原始的Pull Consumer public class PullConsumerTest { public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); try { MessageQueue mq = new MessageQueue(); mq.setQueueId(0); mq.setTopic("TopicTest"); mq.setBrokerName("jinrongtong-MacBook-Pro.local"); long offset = 26; PullResult pullResult = consumer.pull(mq, "*", offset, 32); if (pullResult.getPullStatus().equals(PullStatus.FOUND)) { System.out.printf("%s%n", pullResult.getMsgFoundList()); consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); } } catch (Exception e) { e.printStackTrace(); } consumer.shutdown(); } }
复制
//subscribe方法定义方式和push类似,但是使用的`LitePullConsumer`拉取消息调用的是轮询poll接口,如果能拉取到消息则返回对应的消息列表,否则返回null。通过`setPullBatchSize`可以设置每一次拉取的最大消息数量,此外如果不额外设置,`LitePullConsumer`默认是自动提交位点。在subscribe模式下,同一个消费组下的多个`LitePullConsumer`会负载均衡消费,与PushConsumer一致。 public class LitePullConsumerSubscribe { public static volatile boolean running = true; public static void main(String[] args) throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test"); litePullConsumer.subscribe("TopicTest", "*"); litePullConsumer.setPullBatchSize(20); litePullConsumer.start(); try { while (running) { List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s%n", messageExts); } } finally { litePullConsumer.shutdown(); } } } //Assign模式一开始仍然是初始化DefaultLitePullConsumer,这里我们采用手动提交位点的方式,因此设置AutoCommit为false,然后启动consumer。与Subscribe模式不同的是,Assign模式下没有自动的负载均衡机制,需要用户自行指定需要拉取的队列 public class LitePullConsumerAssign { public static volatile boolean running = true; public static void main(String[] args) throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); litePullConsumer.setAutoCommit(false); litePullConsumer.start(); Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest"); List<MessageQueue> list = new ArrayList<>(mqSet); List<MessageQueue> assignList = new ArrayList<>(); for (int i = 0; i < list.size() / 2; i++) { assignList.add(list.get(i)); } litePullConsumer.assign(assignList); litePullConsumer.seek(assignList.get(0), 10); try { while (running) { List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s %n", messageExts); litePullConsumer.commitSync(); } } finally { litePullConsumer.shutdown(); } } }
复制
最后修改时间:2023-05-16 11:13:37
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论

目录
  • 一、基本介绍
    • 1.1 架构
    • 1.2 组成部分
      • 1.2.1 NameServer
      • 1.2.2 Broker
      • 1.2.3 生产者
      • 1.2.4 消费者
      • 1.2.5 队列
      • 1.2.6 消息
    • 1.3 消费概念
      • 1.3.1 负载均衡
      • 1.3.2 消费位点
      • 1.3.3 推、拉和长轮询