暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【轻松上手 RocketMQ 专栏】顺序消息与批量消息源码解析及场景分析

RocketMQ官微 2021-09-20
1243

这一讲我们来讲顺序消息与批量消息。

顺序消息是什么

首先,什么是顺序消息?

指的是按照消息的发送顺序来消费。RocketMQ中可以保证消息严格有序,可以分为局部有序和全局有序。

局部有序

什么是局部有序?

在每个MessageQueue里面的每一条消息之间都是保持相对有序的。但是不保证所有MessageQueue的消息都严格有序。

举例例子:订单1:创建-下单-付款-完成 订单2:创建-下单-付款

订单1和订单2,分别在不同的MessageQueue上,它们只需要保证MessageQueue里面的消息有序即可。

一个MessageQueue只能由一个消费者消费,且只能单线程消费。但是这个消费者可以开启多线程,同时消费多个MessageQueue。

全局有序

既然你已经知道了局部有序,那全局有序就更加简单了。

就是只有一个MessageQueue。这样子所有的消息,都会被发送到这个MessageQueue上。这样子就能保证所有的消息严格有序。

一个MessageQueue只能由一个消费者消费,且只能单线程消费。

生产者顺序发送消息

接下来,我们用代码来展示一下局部有序:

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");


        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();


        List<OrderEntity> list = buildOrderList();
        for (int i = 0; i < list.size(); i++) {
            int orderId = list.get(i).getId();
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("orderTopic""TagA""KEY" + i,
                    (list.get(i).toString()).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
            }, orderId);

           System.out.println("订单id:"+orderId+  "  发送结果:"+ sendResult);
        }
        //关闭生产者
        producer.shutdown();
    }

    private static List<OrderEntity> buildOrderList() {
        List<OrderEntity> res =  new ArrayList<>();

        OrderEntity order1 = new OrderEntity(147,"加入购物车");
        OrderEntity order2 = new OrderEntity(147,"下单");
        OrderEntity order3 = new OrderEntity(147,"付款");
        OrderEntity order4 = new OrderEntity(147,"完成");

        OrderEntity order5 = new OrderEntity(258,"加入购物车");
        OrderEntity order6 = new OrderEntity(258,"下单");

        OrderEntity order7 = new OrderEntity(369,"加入购物车");
        OrderEntity order8 = new OrderEntity(369,"下单");
        OrderEntity order9 = new OrderEntity(369,"付款");

        res.add(order1);
        res.add(order2);
        res.add(order3);
        res.add(order4);
        res.add(order5);
        res.add(order6);
        res.add(order7);
        res.add(order8);
        res.add(order9);

        return res;
    }
}
//运行结果:
订单id:147  发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B1F80000, offsetMsgId=0AFCA6FA00002A9F000000000009FBA7, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=3], queueOffset=44]
订单id:147  发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B1FD0001, offsetMsgId=0AFCA6FA00002A9F000000000009FC96, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=3], queueOffset=45]
订单id:147  发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B1FF0002, offsetMsgId=0AFCA6FA00002A9F000000000009FD7C, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=3], queueOffset=46]
订单id:147  发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B2010003, offsetMsgId=0AFCA6FA00002A9F000000000009FE62, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=3], queueOffset=47]
订单id:258  发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B2020004, offsetMsgId=0AFCA6FA00002A9F000000000009FF48, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=2], queueOffset=42]
订单id:258  发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B2040005, offsetMsgId=0AFCA6FA00002A9F00000000000A0037, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=2], queueOffset=43]
订单id:369  发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B2050006, offsetMsgId=0AFCA6FA00002A9F00000000000A011D, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=1], queueOffset=63]
订单id:369  发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B2060007, offsetMsgId=0AFCA6FA00002A9F00000000000A020C, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=1], queueOffset=64]
订单id:369  发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000010A0118B4AAC22BE4B2070008, offsetMsgId=0AFCA6FA00002A9F00000000000A02F2, messageQueue=MessageQueue [topic=orderTopic, brokerName=aarondeMacBook-Pro.local, queueId=1], queueOffset=65]



复制

总结:根据不同订单id的取模,把不同订单的消息分配到不同的MessageQueue,把相同订单消息分配到相同的MessageQueue。

你看,订单id=147的消息,都发送都queue3中;

订单id=258的消息,都发送都queue2中;

订单id=369的消息,都发送都queue1中。

消费者顺序消费消息

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer");
        consumer.setNamesrvAddr("localhost:9876");
        //设置从哪里开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("orderTopic""TagA");
        //确保一个queue只被一个线程消费
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("["+Thread.currentThread().getName()+"] "+new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //启动消费者
        consumer.start();

        System.out.println("消费者已启动");
    }
}
\\运行结果
[ConsumeMessageThread_1] OrderEntity{id=147, name='加入购物车'}
[ConsumeMessageThread_2] OrderEntity{id=258, name='加入购物车'}
[ConsumeMessageThread_3] OrderEntity{id=369, name='加入购物车'}
[ConsumeMessageThread_1] OrderEntity{id=147, name='下单'}
[ConsumeMessageThread_3] OrderEntity{id=369, name='下单'}
[ConsumeMessageThread_1] OrderEntity{id=147, name='付款'}
[ConsumeMessageThread_2] OrderEntity{id=258, name='下单'}
[ConsumeMessageThread_3] OrderEntity{id=369, name='付款'}
[ConsumeMessageThread_1] OrderEntity{id=147, name='完成'}

复制

总结:可以看到,线程1,都是消费了id=147的消息,证明queue3只被一个线程所消息。

对比分析

我们看一下顺序消息的消费者,消费的时候,我们用的是MessageListenerOrderly。是用来告诉消费者,要顺序消费信息,并且只能一个线程去单独消费消息。

普通消息的消费者:MessageListenerConcurrently。看到Concurrent就知道是并发的意思,就是可以并发消费消息。

顺序消息适用场景

天上飞的理论,终究还得有落地的实现。

适用场景:业务中,需要保持顺序的。比如:数据库的binlog消息,订单的创建、下单、付款等消息。

什么是批量消息

批量发送消息可提高传递小消息的性能,尽可能在一次传递中,发送更多的消息。

批量发送的注意点:

  • 需要同一个topic才能一起批量发送
  • 不能发送延时消息
  • 不能发送重试消息
  • 不能发送一个超过1MB的批量消息

如何发送批量消息

知道批量消息的定义和注意点后,我们接下来先学会如何发送批量消息

生产者
public class Producer {
    public static void main(String[] args) throws Exception{
        //生产者组
        DefaultMQProducer producer = new DefaultMQProducer("batch_producer_group");

        //设置nameserver
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();

        List<Message> messageList = new ArrayList<>();

        //构建消息
        Message message1 = new Message("batchTopic","TagA","batchMessage1".getBytes(RemotingHelper.DEFAULT_CHARSET));
        Message message2 = new Message("batchTopic","TagA","batchMessage2".getBytes(RemotingHelper.DEFAULT_CHARSET));
        Message message3 = new Message("batchTopic","TagA","batchMessage3".getBytes(RemotingHelper.DEFAULT_CHARSET));
        Message message4 = new Message("batchTopic","TagA","batchMessage4".getBytes(RemotingHelper.DEFAULT_CHARSET));
        Message message5 = new Message("batchTopic","TagA","batchMessage5".getBytes(RemotingHelper.DEFAULT_CHARSET));
        Message message6 = new Message("batchTopic","TagA","batchMessage6".getBytes(RemotingHelper.DEFAULT_CHARSET));

        messageList.add(message1);
        messageList.add(message2);
        messageList.add(message3);
        messageList.add(message4);
        messageList.add(message5);
        messageList.add(message6);

        // 发送消息
        SendResult sendResult = producer.send(messageList);
        // 打印发送结果
        System.out.println("发送结果:"+sendResult);
        // 关闭生产者
        producer.shutdown();
    }
}


复制

总结:其实,发送批量消息,就是发送消息的集合。是同步发送。

消费者
// 消费者组
        DefaultMQPushConsumer consumer =  new DefaultMQPushConsumer("batch_consumer_group");
        //注册nameserver
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅主题
        consumer.subscribe("batchTopic","TagA");

        // 开启消费offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //这里的list默认每次只拉取1条消息
                for (int i = 0; i < list.size(); i++) {
                    MessageExt messageExt = list.get(i);
                    String msg = new String(messageExt.getBody());
                    System.out.println(Thread.currentThread().getName()+"集合大小size="+list.size()+" 消费结果:"+i+" "+msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动了");
        //运行结果:
        消费者启动了
ConsumeMessageThread_1集合大小size=1 消费结果:0 batchMessage1
ConsumeMessageThread_2集合大小size=1 消费结果:0 batchMessage2
ConsumeMessageThread_3集合大小size=1 消费结果:0 batchMessage3
ConsumeMessageThread_4集合大小size=1 消费结果:0 batchMessage4
ConsumeMessageThread_5集合大小size=1 消费结果:0 batchMessage5
ConsumeMessageThread_6集合大小size=1 消费结果:0 batchMessage6

复制

总结:这里的list,其实每次消费是默认拉取1条消息的。

那如果拉取多条消息呢?
  • 如果需要回调时一次传递多条消息,可以通过消费者的setConsumeMessageBatchMaxSize() 指定一次最多消费的消息数量,默认是1。可能你想着我一次消费的数量越多越好,那我就定义一次消费100条。当定义了消息的最大消费数量是100时,实际上一次可消费的消息数量最多也就32条,因为broker端对此做了限制。因为Consumer默认一次最多只能拉取32条消息,可以通过setPullBatchSize()指定一次可以拉取消息的数量。

所以集合中msgs的最大大小是consumeMessageBatchMaxSize和pullBatchSize的较小值。

比如:

// 消费者组
        DefaultMQPushConsumer consumer =  new DefaultMQPushConsumer("batch_consumer_group");
        //注册nameserver
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅主题
        consumer.subscribe("batchTopic","TagA");

        // 开启消费offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //设置一次拉取的数量
        consumer.setConsumeMessageBatchMaxSize(3);
        consumer.setPullBatchSize(2);
        //监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (int i = 0; i < list.size(); i++) {
                    MessageExt messageExt = list.get(i);
                    String msg = new String(messageExt.getBody());
                    System.out.println(Thread.currentThread().getName()+"集合大小size="+list.size()+" 消费结果:"+i+" "+msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动了");
        //运行结果
        消费者启动了
ConsumeMessageThread_1集合大小size=1 消费结果:0 batchMessage1
ConsumeMessageThread_2集合大小size=2 消费结果:0 batchMessage2
ConsumeMessageThread_2集合大小size=2 消费结果:1 batchMessage3
ConsumeMessageThread_3集合大小size=2 消费结果:0 batchMessage4
ConsumeMessageThread_3集合大小size=2 消费结果:1 batchMessage5
ConsumeMessageThread_4集合大小size=1 消费结果:0 batchMessage6

复制
批量消息一次发送不能超过1MB

批量消息发送消息,不能超过1MB。

那如果超过以后怎么办?

  • 需要分割进行发送。

那如何进行分割:

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            //tmpsize:是一条消息的空间大小
            //tmpSize=topic的长度和Body的长度
            int tmpSize = message.getTopic().length() + message.getBody().length;
            //properties有标签等信息
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            //日志头信息
            tmpSize = tmpSize + 20//for log overhead
            //如果一条消息就超过1MB,记录下来,
            if (tmpSize > SIZE_LIMIT) {
                
                if (nextIndex - currIndex == 0) {
                    //用于截取列表长度
                    nextIndex++;
                }
                break;
            }
            //超过1MB,就发送
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                //累加totalSize
                totalSize += tmpSize;
            }

        }
        //截取消息列表,返回
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    public static void main(String[] args) throws Exception {
        //生产者组
        DefaultMQProducer producer = new DefaultMQProducer("batch_producer_group");

        //设置nameserver
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();

        List<Message> messageList = new ArrayList<>();

        byte[] bytes1 = new byte[600*1000];//600K
        byte[] bytes2 = new byte[200*1000];//200K
        byte[] bytes3 = new byte[300*1000];//300K
        //构建消息
        Message message1 = new Message("batchTopic","TagA",bytes1);
        Message message2 = new Message("batchTopic","TagA",bytes2);
        Message message3 = new Message("batchTopic","TagA",bytes3);

        messageList.add(message1);
        messageList.add(message2);
        messageList.add(message3);

        ListSplitter splitter = new ListSplitter(messageList);
        while (splitter.hasNext()) {
            try {
                List<Message>  listItem = splitter.next();
                // 发送消息
                SendResult sendResult = producer.send(listItem);
                // 打印发送结果
                System.out.println("发送结果:"+sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                //handle the error
            }
        }
        // 关闭生产者
        producer.shutdown();
    }
}
//运行结果:
发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001A9AB18B4AAC2941CAD020000,7F000001A9AB18B4AAC2941CAD050001, offsetMsgId=C0A8006800002A9F00000000003BBA71,C0A8006800002A9F000000000044E2F5, messageQueue=MessageQueue [topic=batchTopic, brokerName=localhost, queueId=3], queueOffset=6]
发送结果:SendResult [sendStatus=SEND_OK, msgId=7F000001A9AB18B4AAC2941CAE410002, offsetMsgId=C0A8006800002A9F000000000047F0F9, messageQueue=MessageQueue 

复制

总结:三条消息,第1条600K,第2条200K,第三条300K。

因为批量发送不能超过1MB,因此,第1条和第2条,打包放一起,一起发送。而第3条消息,单独发送。

浅析批量消息

接下来,我们简单分析一下批量消息的源码:

进入send方法后,会发现batch方法

继续进入batch方法

简单总结一下大概的流程:

  • 1,根据list按照一定的规则,生成MessageBatch对象
  • 2,遍历list列表,校验每个消息,设置uniqID和topic
  • 3,MessageBatch设置body消息和topic

总的来说,就是组装msgBatch对象并返回。

接下来,我们深入1中:generateFromList()

批量消息总结

上面就是我们这一讲的主要内容:批量消息的使用和源码分析,由此可见,批量消息主要用于对延迟要求相对较低且对吞吐量要求较高的场景下,如日志等大数据消息传输。

后续文章

  • RocketMQ-入门已更新)

  • RocketMQ-发送消息已更新)

  • RocketMQ-集群模式和广播模式(已更新)
  • RocketMQ-顺序与批量消息源码解析与场景分析(已更新)
  • RocketMQ-延迟消息源码解析与场景分析
  • RocketMQ-事务源码解析与场景分析
  • RocketMQ-幂等性
  • RocketMQ-服务端消息过滤

  • RocketMQ-消息存储与刷盘机制
  • RocketMQ-高可用
  • RocketMQ-高性能
  • RocketMQ-主从复制
  • RocketMQ-消息重试
  • RocketMQ-死信队列


文章转载自RocketMQ官微,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论