一、基本介绍
RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
1.1 架构
- Broker启动的时候,会往每台NameServer(因为NameServer之间不通信,所以每台都得注册)注册自己的信息,这些信息包括自己的ip和端口号,自己这台Broker有哪些topic等信息。
- Producer在启动之后会跟会NameServer建立连接,定期从NameServer中获取Broker的信息,当发送消息的时候,会根据消息需要发送到哪个topic去找对应的Broker地址,如果有的话,就向这台Broker发送请求;没有找到的话,就看根据是否允许自动创建topic来决定是否发送消息。
- Broker在接收到Producer的消息之后,会将消息存起来,持久化,如果有从节点的话,也会主动同步给从节点,实现数据的备份
- Consumer启动之后也会跟会NameServer建立连接,定期从NameServer中获取Broker和对应topic的信息,然后根据自己需要订阅的topic信息找到对应的Broker的地址,然后跟Broker建立连接,获取消息,进行消费
1.2 组成部分
1.2.1 NameServer
可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。
1.2.2 Broker
核心的一个角色,主要是用来保存topic的信息,接受生产者产生的消息,持久化消息。在一个Broker集群中,相同的BrokerName可以称为一个Broker组,一个Broker组中,BrokerId为0的为主节点,其它的为从节点。BrokerName和BrokerId是可以在Broker启动时通过配置文件配置的。每个Broker组只存放一部分消息。
1.2.3 生产者
生产者(Producer)就是消息的发送者,Apache RocketMQ 拥有丰富的消息类型,可以支持不同的应用场景,在不同的场景中,需要使用不同的消息进行发送。支持多种消息,普通消息、顺序消息、事务消息、延迟消息。
1.2.4 消费者
用来消费生产者消息的一方。
消费者组:如果多个消费者设置了相同的Consumer Group,我们认为这些消费者在同一个消费组内。消费者组和生产者组都是一个逻辑概念。
- 当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。可以通过调整消费者消费的实例数量来调整消息的消费速度。
- 当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。调整消费者的数量无法影响消费速度。
1.2.5 队列
为了支持高并发和水平扩展,需要对 Topic 进行分区,在 RocketMQ 中这被称为队列,一个 Topic 可能有多个队列,并且可能分布在不同的 Broker 上。
1.2.6 消息
topic(主题):必填。可以理解为一个消息的集合的名字,生产者在发送消息的时候需要指定发到哪个topic下,消费者消费消息的时候也需要知道自己消费的是哪些topic底下的消息。
Tag(子主题) :选填。比topic低一级,消息标签,方便服务器过滤使用。目前只支持每个消息设置一个。
Keys: 选填。服务器会根据 keys 创建哈希索引,设置后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等。根据keys的值来选择投递的队列。
body :必填。表示消息的存储内容。
properties :选填。表示消息属性。RocketMQHeaders枚举中的配置。
transactionId :选填。会在事务消息中使用。
DelayTimeLevel:选填。消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费。
WaitStoreMsgOK:选填。表示消息是否在服务器落盘后才返回应答。
1.3 消费概念
1.3.1 负载均衡
集群模式下,同一个消费组内的消费者会分担收到的全量消息。Apache RocketMQ 提供了多种集群模式下的分配策略,包括平均分配策略、机房优先分配策略、一致性hash分配策略等。默认是平均分配策略。
在消费时如果要提升消费速度,必须保证队列数量和消费者实例数量相适应。每一个队列只会分配给一个消费者消费,队列不够的情况下增加消费者无法提升消费速度。
1.3.2 消费位点
在Apache RocketMQ中每个队列都会记录自己的最小位点、最大位点。
针对于消费组,还有消费位点的概念,在集群模式下,消费位点是由客户端提给交服务端保存的,在广播模式下,消费位点是由客户端自己保存的。
一般情况下消费位点正常更新,不会出现消息重复,但如果消费者发生崩溃或有新的消费者加入群组,就会触发重平衡,重平衡完成后,每个消费者可能会分配到新的队列,而不是之前处理的队列。为了能继续之前的工作,消费者需要读取每个队列最后一次的提交的消费位点,然后从消费位点处继续拉取消息。但在实际执行过程中,由于客户端提交给服务端的消费位点并不是实时的,所以重平衡就可能会导致消息少量重复。
1.3.3 推、拉和长轮询
MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
- Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
- Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Apache RocketMQ既提供了Push模式也提供了Pull模式。适用原生SDK来处理。
PUSH
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
复制
PULL
RocketMQ 4.6.0推出的Lite Pull Consumer,相比于原始的Pull Consumer更加简单易用,它提供了Subscribe和Assign两种模式。
//原始的Pull Consumer
public class PullConsumerTest {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
MessageQueue mq = new MessageQueue();
mq.setQueueId(0);
mq.setTopic("TopicTest");
mq.setBrokerName("jinrongtong-MacBook-Pro.local");
long offset = 26;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
System.out.printf("%s%n", pullResult.getMsgFoundList());
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
}
} catch (Exception e) {
e.printStackTrace();
}
consumer.shutdown();
}
}
复制
//subscribe方法定义方式和push类似,但是使用的`LitePullConsumer`拉取消息调用的是轮询poll接口,如果能拉取到消息则返回对应的消息列表,否则返回null。通过`setPullBatchSize`可以设置每一次拉取的最大消息数量,此外如果不额外设置,`LitePullConsumer`默认是自动提交位点。在subscribe模式下,同一个消费组下的多个`LitePullConsumer`会负载均衡消费,与PushConsumer一致。
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.setPullBatchSize(20);
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
litePullConsumer.shutdown();
}
}
}
//Assign模式一开始仍然是初始化DefaultLitePullConsumer,这里我们采用手动提交位点的方式,因此设置AutoCommit为false,然后启动consumer。与Subscribe模式不同的是,Assign模式下没有自动的负载均衡机制,需要用户自行指定需要拉取的队列
public class LitePullConsumerAssign {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
List<MessageQueue> list = new ArrayList<>(mqSet);
List<MessageQueue> assignList = new ArrayList<>();
for (int i = 0; i < list.size() / 2; i++) {
assignList.add(list.get(i));
}
litePullConsumer.assign(assignList);
litePullConsumer.seek(assignList.get(0), 10);
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
litePullConsumer.commitSync();
}
} finally {
litePullConsumer.shutdown();
}
}
}
复制