暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

三、SpringBoot整合RocketMQ的使用

941

三、Springboot整合

3.1 配置

3.1.1 pom依赖

starter包,正常使用这个就可以了

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency>
复制

原生依赖

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.9.4</version>
</dependency>
复制

3.1.2 yml配置

rocketmq: name-server: 192.168.10.104:9876 producer: group: test-producer retry-times-when-send-failed:
复制

3.1.3 定义常量

测试代码中使用

public interface MqConstant { String TOPIC_001 = "xrj-test"; String TOPIC_002 = "xrj-reply"; String TOPIC_003 = "xrj-orderly"; String TOPIC_004 = "xrj-trans"; String TOPIC_005 = "xrj-delay"; //死信队列 String TOPIC_006 = "%DLQ%xrj-mq-004"; String TAG_001 = "xrj-test-tag-001"; String TAG_002 = "xrj-test-tag-002"; String TAG_003 = "xrj-test-tag-003"; String TAG_004 = "xrj-test-tag-004"; String TAG_005 = "xrj-test-tag-005"; String CONSUMER_GROUP_001 = "xrj-mq-001"; String CONSUMER_GROUP_002 = "xrj-mq-002"; String CONSUMER_GROUP_003 = "xrj-mq-003"; String CONSUMER_GROUP_004 = "xrj-mq-004"; String CONSUMER_GROUP_005 = "xrj-mq-005"; String CONSUMER_GROUP_006 = "xrj-mq-006"; }
复制

3.2 普通消息

普通场景的下的消息可以进行有种发送形式,同步发送、异步发送、单向发送、批量发送。

3.2.1 异步消息

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

@Component @Slf4j public class RocketmqProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送异步消息,异步消息通过回调来判断消息发送成功还是失败 */ public void sendAsyncMsg(String topic, String tag, List<String> msgList) { String destination = topic + ":" + tag; UUID uuid = UUID.randomUUID(); String key = "test" + uuid; Map<String, Object> headers = new HashMap<>(); headers.put(RocketMQHeaders.KEYS, key); int size = msgList.size(); CountDownLatch cdl = new CountDownLatch(size); msgList.forEach(msg -> rocketMQTemplate.asyncSend(destination, new GenericMessage<>(msg, headers), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { try { log.info("异步消息发送成功,返回值:{}", JSON.toJSONString(sendResult)); } finally { cdl.countDown(); } } @Override public void onException(Throwable throwable) { try { log.error("异步消息发送失败,异常:{}", throwable.getMessage(), throwable); } finally { cdl.countDown(); } } }) ); try { cdl.await(size, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
复制

3.2.2 同步消息

同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

同步消息发送会返回结果SendResut。SendResut包含实际发送状态还包括

  • SEND_OK(发送成功)

    消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。

  • FLUSH_DISK_TIMEOUT(刷盘超时)

    消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。

  • FLUSH_SLAVE_TIMEOUT(同步到备超时)

    消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。

  • SLAVE_NOT_AVAILABLE(备不可用)

    消息发送成功,但是此时Slave不可用。此时消息已经进入Master服务器队列,只有Master服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。

  • 如果发送失败会抛出异常。

    • 至多重试2次(同步发送为2次,异步发送为0次)。
    • 如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
    • 如果本身向broker发送消息产生超时异常,就不会再重试。
    • 如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。
/** * 发送同步消息,根据返回值的状态来判断发送成功还是失败 */ public void sendSyncMsg(String topic, String tag, String msg) { String destination = topic + ":" + tag; UUID uuid = UUID.randomUUID(); String key = "test" + uuid; Map<String, Object> headers = new HashMap<>(); headers.put(RocketMQHeaders.KEYS, key); Message<String> message = MessageBuilder.withPayload(msg).copyHeaders(headers).build(); SendResult sendResult = rocketMQTemplate.syncSend(destination, message); if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { log.info("异步消息发送成功,返回值:{}", JSON.toJSONString(sendResult)); } else { log.info("异步消息发送失败,返回值:{}", JSON.toJSONString(sendResult)); } }
复制

3.2.3单向发送

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

/** * 发送单向消息,无返回值,发了就不管了 */ public void sendOneWay(String topic, String tag, String msg) { String destination = topic + ":" + tag; UUID uuid = UUID.randomUUID(); String key = "test" + uuid; Map<String, Object> headers = new HashMap<>(); headers.put(RocketMQHeaders.KEYS, key); Message<String> message = MessageBuilder.withPayload(msg).copyHeaders(headers).build(); rocketMQTemplate.sendOneWay(destination, message); }
复制

3.2.4 批量发送

有几个限制:

  1. 同一批 batch 中 topic 必须相同。
  2. the message body size over max value, MAX: 4194304。一批数据最大4M。需要手动拆分/修改服务端配置。
/** * 同步发送批量消息,异步和上面单条类似 */ public void syncSendMessages(String topic, String tag, List<Message<?>> messageList, long timeout) { String destination = topic + ":" + tag; SendResult sendResult = this.rocketMQTemplate.syncSend(destination, messageList, timeout); if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { log.info("同步批量消息发送成功,返回值:{}", JSON.toJSONString(sendResult)); } else { log.info("同步批量消息发送失败,返回值:{}", JSON.toJSONString(sendResult)); } }
复制

3.2.5 消费消息

/** * 消费者监听指定的topic和tag即可,默认并发消费 */ @Component @Slf4j @RocketMQMessageListener(topic = MqConstant.TOPIC_001, consumerGroup = MqConstant.CONSUMER_GROUP_001, selectorExpression = MqConstant.TAG_001, consumeMode = ConsumeMode.CONCURRENTLY) public class SimpleRocketmqConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("简单消费实例消费消息:{}", JSON.toJSONString(message)); } }
复制

3.3 顺序消息

顺序消息是一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息。

RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。

  • 生产顺序性: RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:

    • 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
    • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
  • 消费顺序性指定消费模式为顺序消费,默认为并发消费。

3.3.1 发送顺序消息

@Component @Slf4j public class RocketmqOrderlyProducer { @Autowired private RocketMQTemplate rocketMQTemplate; /** * rocketMq中支持局部顺序消息,要想全局顺序只能保证同一个队列 * 每一个topic中的Queue中的消息是严格顺序的,所以需要保证严格顺序消费的消息指定QueueId进行传数据即可 * 不同Queue上的消息可以并发消费,同一个Queue上的消息仍然可以保证顺序消费 */ public void sendOrderlyMsg(String topic, String tag, String msg) { String destination = topic + ":" + tag; String hashKey = "56789"; for (int i = 0; i < 5; i++) { String lastMsg = msg + "," + i; Message<String> message = MessageBuilder.withPayload(lastMsg).build(); //hashKey一致时选定的数据broker和queue一致 SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, hashKey, 60000); log.info("消息发送成功,keys:{},msg:{},返回结果:{}", hashKey, lastMsg, JSON.toJSONString(sendResult)); } } }
复制

3.3.2 消费顺序消息

/** * 顺序消息,消费时必须消费模式为ConsumeMode.ORDERLY * 同一个topic的同一个queue中才有序 * 不同Queue上的消息可以并发消费,同一个Queue上的消息仍然可以保证顺序消费 */ @Component @Slf4j @RocketMQMessageListener(topic = MqConstant.TOPIC_003, consumerGroup = MqConstant.CONSUMER_GROUP_003, selectorExpression = MqConstant.TAG_003, consumeMode = ConsumeMode.ORDERLY) public class OrderlyRocketmqConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("顺序消费者消费了消息:{}", message); } }
复制

3.4 延迟消息

延迟消息和普通消息相比多了一个延迟等级参数,支持18个等级。最短1s,最久2小时,无法自定义延迟时间。

/** * 延迟级别 1-18 * “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h” */ @Slf4j @Component public class RocketmqDelayProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendDelayMsg(String topic, String tag, String msg, int delayLevel) { String desc = topic + ":" + tag; UUID uuid = UUID.randomUUID(); String key = "xrj" + uuid; Map<String, Object> headers = new HashMap<>(); headers.put(RocketMQHeaders.KEYS, key); Message<String> message = MessageBuilder.withPayload(msg).copyHeaders(headers).build(); //delayLevel指定延迟等级 SendResult sendResult = rocketMQTemplate.syncSend(desc, message, 60000, delayLevel); if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { log.info("发送延迟消息成功,返回结果:{}", JSON.toJSONString(sendResult)); } else { log.error("发送延迟消息失败,返回结果:{}", JSON.toJSONString(sendResult)); } } }
复制

消费者的定义普通消息一样

@Slf4j @Component @RocketMQMessageListener(consumerGroup = MqConstant.CONSUMER_GROUP_005,topic = MqConstant.TOPIC_005) public class DelayRocketmqConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("延迟消息消费者消费了延迟消息:{}", JSON.toJSONString(message)); } }
复制

3.5 事务消息

image-20230515164202486

事务消息基于半消息实现分为三个步骤:

  1. 生产端生产消息,将消息发送至Broker
  2. 生产端同时启动监听者,监听本地事务的执行情况,Broker拿到消息后等待监听者的本地事务执行确认,确认本地事务执行完毕之后再把消息推送给消费者消费
  3. 消费者消费消费消息,如果消费失败会返回RECONSUME_LATER重试消费标识,默认可以重试指定次数之后消息进入死信队列转人工处理。

3.5.1 发送事务消息

发送事务消息的时候需要携带一个唯一的事务id

@Component @Slf4j public class RocketmqTransactionProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendTransMsg(String topic, String tag, String msg, Integer status) { String destination = topic + ":" + tag; //事务ID UUID transId = UUID.randomUUID(); String key = "test_" + transId; Map<String, Object> headers = new HashMap<>(); headers.put(RocketMQHeaders.KEYS, key); headers.put(RocketMQHeaders.TRANSACTION_ID, transId); Message<?> transMsg = MessageBuilder.withPayload(msg).copyHeaders(headers).build(); //发送事务消息 TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(destination, transMsg, status); if (SendStatus.SEND_OK.equals(transactionSendResult.getSendStatus())) { log.info("事务消息发送成功,返回结果:{}", JSON.toJSONString(transactionSendResult)); } else { log.error("事务消息发送失败,返回结果:{}", JSON.toJSONString(transactionSendResult)); } } }
复制

3.5.2 监听本地事务

本地事务监听这里就需要执行本地事务和定义事务回查接口。

  1. 消息发送到Broker之后,消息不会直接给到消费者消费而是先执行本地事务,Broker根据本地事务的执行情况进行处理。
  2. 如果本地事务提交返回RocketMQLocalTransactionState.COMMIT,Broker就会把消息给到下游消费者消费,而且保证必须处理完成消息。
  3. 如果本地事务回滚返回RocketMQLocalTransactionState.ROLLBACK,Broker就不会把消息给到下游消费者消费直接删掉。
  4. 如果本地事务状态未知返回RocketMQLocalTransactionState.UNKNOWN或者网络波动导致Broker一直没收到消息的确认,Broker就会调用事务回查,直到事务状态确定/或回查15次。
  5. 消费者消费数据一直消费失败,失败指定次数消息会进入死信队列。
@Slf4j @RocketMQTransactionListener public class LocalTransMsgListener implements RocketMQLocalTransactionListener { /** * 执行本地事务 */ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); log.info("本地事务监听器执行本地事务消息,事务id:{},msg:{},参数:{}", transId, JSON.toJSONString(msg), arg); Integer args = (Integer) arg; log.info("开始执行本地事务"); if(args == 0){ log.info("准备提交本地事务"); return RocketMQLocalTransactionState.COMMIT; }else if(args == 1){ log.info("准备回滚本地事务"); return RocketMQLocalTransactionState.ROLLBACK; }else{ log.info("事务异常,准备执行回查"); return RocketMQLocalTransactionState.UNKNOWN; } } /** * 本地事务回查,第一次回查6秒,后续60秒回查一次,最多回查15次 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); log.info("本地事务监听器执行回查本地事务,事务id:{},msg:{}", transId, JSON.toJSONString(msg)); //一般是根据事务id查询数据库看本地事务是否执行完毕 try { int flag = LocalDateTime.now().getSecond() % 2; if (flag == 0){ log.info("事务消息回查提交..."); return RocketMQLocalTransactionState.COMMIT; } log.info("事务消息回查回滚..."); return RocketMQLocalTransactionState.ROLLBACK; } catch (Exception e) { log.info("事务消息回查未知.."); return RocketMQLocalTransactionState.UNKNOWN; } } }
复制

3.5.3 消费事务消息

maxReconsumeTimes设置最大重复次数,根据业务决定,并发消费模式默认16次。

并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下

顺序消费消费失败后会先在客户端本地重试直到最大重试次数,默认最大重试次数为Integer.MAX,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序。

/** * 消费的时候,如果发生异常会重新消费消息,正常消费会自己返回CONSUME_SUCCESS,发生异常则会返回RECONSUME_LATER然后重新消费该信息,超过默认的重复消费次数后,消息会被放入死信队列 */ @Slf4j @Component @RocketMQMessageListener(maxReconsumeTimes = 2, consumerGroup = MqConstant.CONSUMER_GROUP_004, topic = MqConstant.TOPIC_004, selectorExpression = MqConstant.TAG_004, consumeMode = ConsumeMode.CONCURRENTLY) public class TransRocketmqConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { //int i = 1 / 0;//模拟消费异常 log.info("事务消息消费实例消费消息:{}", JSON.toJSONString(message)); } }
复制

3.5.4 监听死信队列

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。

如果产生了死信消息,那对应的TOPIC的死信Topic名称为%DLQ%${TOPIC_NAME},死信队列的消息将不会再被消费(也可以监听启动监听死信队列的消费者)。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信队列的信息。

需要注意DLQ队列的权限问题

perm的对应关系,2-W-只写,4-R-只读,6-RW-读写。不同版本可能默认权限不一样,可以在控制台/mqadmin工具编辑。

@Slf4j @Component @RocketMQMessageListener(consumerGroup = MqConstant.CONSUMER_GROUP_006, topic = MqConstant.TOPIC_006) public class DLQConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("消费了死信队列中的消息:{}", JSON.toJSONString(message)); } }
复制
最后修改时间:2023-05-16 11:13:09
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论

目录
  • 三、Springboot整合
    • 3.1 配置
      • 3.1.1 pom依赖
      • 3.1.2 yml配置
      • 3.1.3 定义常量
    • 3.2 普通消息
      • 3.2.1 异步消息
      • 3.2.2 同步消息
      • 3.2.3单向发送
      • 3.2.4 批量发送
      • 3.2.5 消费消息
    • 3.3 顺序消息
      • 3.3.1 发送顺序消息
      • 3.3.2 消费顺序消息
    • 3.4 延迟消息
    • 3.5 事务消息
      • 3.5.1 发送事务消息
      • 3.5.2 监听本地事务
      • 3.5.3 消费事务消息
      • 3.5.4 监听死信队列