三、Springboot整合
3.1 配置
3.1.1 pom依赖
starter包,正常使用这个就可以了
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
复制
原生依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.4</version> </dependency>
复制
3.1.2 yml配置
rocketmq: name-server: 192.168.10.104:9876 producer: group: test-producer retry-times-when-send-failed:
复制
3.1.3 定义常量
测试代码中使用
public interface MqConstant {
String TOPIC_001 = "xrj-test";
String TOPIC_002 = "xrj-reply";
String TOPIC_003 = "xrj-orderly";
String TOPIC_004 = "xrj-trans";
String TOPIC_005 = "xrj-delay";
//死信队列
String TOPIC_006 = "%DLQ%xrj-mq-004";
String TAG_001 = "xrj-test-tag-001";
String TAG_002 = "xrj-test-tag-002";
String TAG_003 = "xrj-test-tag-003";
String TAG_004 = "xrj-test-tag-004";
String TAG_005 = "xrj-test-tag-005";
String CONSUMER_GROUP_001 = "xrj-mq-001";
String CONSUMER_GROUP_002 = "xrj-mq-002";
String CONSUMER_GROUP_003 = "xrj-mq-003";
String CONSUMER_GROUP_004 = "xrj-mq-004";
String CONSUMER_GROUP_005 = "xrj-mq-005";
String CONSUMER_GROUP_006 = "xrj-mq-006";
}
复制
3.2 普通消息
普通场景的下的消息可以进行有种发送形式,同步发送、异步发送、单向发送、批量发送。
3.2.1 异步消息
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
@Component
@Slf4j
public class RocketmqProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送异步消息,异步消息通过回调来判断消息发送成功还是失败
*/
public void sendAsyncMsg(String topic, String tag, List<String> msgList) {
String destination = topic + ":" + tag;
UUID uuid = UUID.randomUUID();
String key = "test" + uuid;
Map<String, Object> headers = new HashMap<>();
headers.put(RocketMQHeaders.KEYS, key);
int size = msgList.size();
CountDownLatch cdl = new CountDownLatch(size);
msgList.forEach(msg -> rocketMQTemplate.asyncSend(destination, new GenericMessage<>(msg, headers), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
try {
log.info("异步消息发送成功,返回值:{}", JSON.toJSONString(sendResult));
} finally {
cdl.countDown();
}
}
@Override
public void onException(Throwable throwable) {
try {
log.error("异步消息发送失败,异常:{}", throwable.getMessage(), throwable);
} finally {
cdl.countDown();
}
}
})
);
try {
cdl.await(size, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
复制
3.2.2 同步消息
同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
同步消息发送会返回结果SendResut。SendResut包含实际发送状态还包括
-
SEND_OK(发送成功)
消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
-
FLUSH_DISK_TIMEOUT(刷盘超时)
消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。
-
FLUSH_SLAVE_TIMEOUT(同步到备超时)
消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。
-
SLAVE_NOT_AVAILABLE(备不可用)
消息发送成功,但是此时Slave不可用。此时消息已经进入Master服务器队列,只有Master服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。
-
如果发送失败会抛出异常。
- 至多重试2次(同步发送为2次,异步发送为0次)。
- 如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
- 如果本身向broker发送消息产生超时异常,就不会再重试。
- 如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。
/**
* 发送同步消息,根据返回值的状态来判断发送成功还是失败
*/
public void sendSyncMsg(String topic, String tag, String msg) {
String destination = topic + ":" + tag;
UUID uuid = UUID.randomUUID();
String key = "test" + uuid;
Map<String, Object> headers = new HashMap<>();
headers.put(RocketMQHeaders.KEYS, key);
Message<String> message = MessageBuilder.withPayload(msg).copyHeaders(headers).build();
SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("异步消息发送成功,返回值:{}", JSON.toJSONString(sendResult));
} else {
log.info("异步消息发送失败,返回值:{}", JSON.toJSONString(sendResult));
}
}
复制
3.2.3单向发送
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
/**
* 发送单向消息,无返回值,发了就不管了
*/
public void sendOneWay(String topic, String tag, String msg) {
String destination = topic + ":" + tag;
UUID uuid = UUID.randomUUID();
String key = "test" + uuid;
Map<String, Object> headers = new HashMap<>();
headers.put(RocketMQHeaders.KEYS, key);
Message<String> message = MessageBuilder.withPayload(msg).copyHeaders(headers).build();
rocketMQTemplate.sendOneWay(destination, message);
}
复制
3.2.4 批量发送
有几个限制:
- 同一批 batch 中 topic 必须相同。
- the message body size over max value, MAX: 4194304。一批数据最大4M。需要手动拆分/修改服务端配置。
/**
* 同步发送批量消息,异步和上面单条类似
*/
public void syncSendMessages(String topic, String tag, List<Message<?>> messageList, long timeout) {
String destination = topic + ":" + tag;
SendResult sendResult = this.rocketMQTemplate.syncSend(destination, messageList, timeout);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("同步批量消息发送成功,返回值:{}", JSON.toJSONString(sendResult));
} else {
log.info("同步批量消息发送失败,返回值:{}", JSON.toJSONString(sendResult));
}
}
复制
3.2.5 消费消息
/**
* 消费者监听指定的topic和tag即可,默认并发消费
*/
@Component
@Slf4j
@RocketMQMessageListener(topic = MqConstant.TOPIC_001,
consumerGroup = MqConstant.CONSUMER_GROUP_001,
selectorExpression = MqConstant.TAG_001,
consumeMode = ConsumeMode.CONCURRENTLY)
public class SimpleRocketmqConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("简单消费实例消费消息:{}", JSON.toJSONString(message));
}
}
复制
3.3 顺序消息
顺序消息是一种对消息发送和消费顺序有严格要求的消息。对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息。
RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。
-
生产顺序性: RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:
- 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
- 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
-
消费顺序性指定消费模式为顺序消费,默认为并发消费。
3.3.1 发送顺序消息
@Component
@Slf4j
public class RocketmqOrderlyProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* rocketMq中支持局部顺序消息,要想全局顺序只能保证同一个队列
* 每一个topic中的Queue中的消息是严格顺序的,所以需要保证严格顺序消费的消息指定QueueId进行传数据即可
* 不同Queue上的消息可以并发消费,同一个Queue上的消息仍然可以保证顺序消费
*/
public void sendOrderlyMsg(String topic, String tag, String msg) {
String destination = topic + ":" + tag;
String hashKey = "56789";
for (int i = 0; i < 5; i++) {
String lastMsg = msg + "," + i;
Message<String> message = MessageBuilder.withPayload(lastMsg).build();
//hashKey一致时选定的数据broker和queue一致
SendResult sendResult = rocketMQTemplate.syncSendOrderly(destination, message, hashKey, 60000);
log.info("消息发送成功,keys:{},msg:{},返回结果:{}", hashKey, lastMsg, JSON.toJSONString(sendResult));
}
}
}
复制
3.3.2 消费顺序消息
/**
* 顺序消息,消费时必须消费模式为ConsumeMode.ORDERLY
* 同一个topic的同一个queue中才有序
* 不同Queue上的消息可以并发消费,同一个Queue上的消息仍然可以保证顺序消费
*/
@Component
@Slf4j
@RocketMQMessageListener(topic = MqConstant.TOPIC_003, consumerGroup = MqConstant.CONSUMER_GROUP_003, selectorExpression = MqConstant.TAG_003, consumeMode = ConsumeMode.ORDERLY)
public class OrderlyRocketmqConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("顺序消费者消费了消息:{}", message);
}
}
复制
3.4 延迟消息
延迟消息和普通消息相比多了一个延迟等级参数,支持18个等级。最短1s,最久2小时,无法自定义延迟时间。
/**
* 延迟级别 1-18
* “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”
*/
@Slf4j
@Component
public class RocketmqDelayProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendDelayMsg(String topic, String tag, String msg, int delayLevel) {
String desc = topic + ":" + tag;
UUID uuid = UUID.randomUUID();
String key = "xrj" + uuid;
Map<String, Object> headers = new HashMap<>();
headers.put(RocketMQHeaders.KEYS, key);
Message<String> message = MessageBuilder.withPayload(msg).copyHeaders(headers).build();
//delayLevel指定延迟等级
SendResult sendResult = rocketMQTemplate.syncSend(desc, message, 60000, delayLevel);
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("发送延迟消息成功,返回结果:{}", JSON.toJSONString(sendResult));
} else {
log.error("发送延迟消息失败,返回结果:{}", JSON.toJSONString(sendResult));
}
}
}
复制
消费者的定义普通消息一样
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = MqConstant.CONSUMER_GROUP_005,topic = MqConstant.TOPIC_005)
public class DelayRocketmqConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("延迟消息消费者消费了延迟消息:{}", JSON.toJSONString(message));
}
}
复制
3.5 事务消息
事务消息基于半消息实现分为三个步骤:
- 生产端生产消息,将消息发送至Broker
- 生产端同时启动监听者,监听本地事务的执行情况,Broker拿到消息后等待监听者的本地事务执行确认,确认本地事务执行完毕之后再把消息推送给消费者消费
- 消费者消费消费消息,如果消费失败会返回RECONSUME_LATER重试消费标识,默认可以重试指定次数之后消息进入死信队列转人工处理。
3.5.1 发送事务消息
发送事务消息的时候需要携带一个唯一的事务id
@Component
@Slf4j
public class RocketmqTransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTransMsg(String topic, String tag, String msg, Integer status) {
String destination = topic + ":" + tag;
//事务ID
UUID transId = UUID.randomUUID();
String key = "test_" + transId;
Map<String, Object> headers = new HashMap<>();
headers.put(RocketMQHeaders.KEYS, key);
headers.put(RocketMQHeaders.TRANSACTION_ID, transId);
Message<?> transMsg = MessageBuilder.withPayload(msg).copyHeaders(headers).build();
//发送事务消息
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(destination, transMsg, status);
if (SendStatus.SEND_OK.equals(transactionSendResult.getSendStatus())) {
log.info("事务消息发送成功,返回结果:{}", JSON.toJSONString(transactionSendResult));
} else {
log.error("事务消息发送失败,返回结果:{}", JSON.toJSONString(transactionSendResult));
}
}
}
复制
3.5.2 监听本地事务
本地事务监听这里就需要执行本地事务和定义事务回查接口。
- 消息发送到Broker之后,消息不会直接给到消费者消费而是先执行本地事务,Broker根据本地事务的执行情况进行处理。
- 如果本地事务提交返回RocketMQLocalTransactionState.COMMIT,Broker就会把消息给到下游消费者消费,而且保证必须处理完成消息。
- 如果本地事务回滚返回RocketMQLocalTransactionState.ROLLBACK,Broker就不会把消息给到下游消费者消费直接删掉。
- 如果本地事务状态未知返回RocketMQLocalTransactionState.UNKNOWN或者网络波动导致Broker一直没收到消息的确认,Broker就会调用事务回查,直到事务状态确定/或回查15次。
- 消费者消费数据一直消费失败,失败指定次数消息会进入死信队列。
@Slf4j
@RocketMQTransactionListener
public class LocalTransMsgListener implements RocketMQLocalTransactionListener {
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
log.info("本地事务监听器执行本地事务消息,事务id:{},msg:{},参数:{}", transId, JSON.toJSONString(msg), arg);
Integer args = (Integer) arg;
log.info("开始执行本地事务");
if(args == 0){
log.info("准备提交本地事务");
return RocketMQLocalTransactionState.COMMIT;
}else if(args == 1){
log.info("准备回滚本地事务");
return RocketMQLocalTransactionState.ROLLBACK;
}else{
log.info("事务异常,准备执行回查");
return RocketMQLocalTransactionState.UNKNOWN;
}
}
/**
* 本地事务回查,第一次回查6秒,后续60秒回查一次,最多回查15次
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
log.info("本地事务监听器执行回查本地事务,事务id:{},msg:{}", transId, JSON.toJSONString(msg));
//一般是根据事务id查询数据库看本地事务是否执行完毕
try {
int flag = LocalDateTime.now().getSecond() % 2;
if (flag == 0){
log.info("事务消息回查提交...");
return RocketMQLocalTransactionState.COMMIT;
}
log.info("事务消息回查回滚...");
return RocketMQLocalTransactionState.ROLLBACK;
} catch (Exception e) {
log.info("事务消息回查未知..");
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
复制
3.5.3 消费事务消息
maxReconsumeTimes设置最大重复次数,根据业务决定,并发消费模式默认16次。
并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。
并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下
顺序消费消费失败后会先在客户端本地重试直到最大重试次数,默认最大重试次数为Integer.MAX,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序。
/**
* 消费的时候,如果发生异常会重新消费消息,正常消费会自己返回CONSUME_SUCCESS,发生异常则会返回RECONSUME_LATER然后重新消费该信息,超过默认的重复消费次数后,消息会被放入死信队列
*/
@Slf4j
@Component
@RocketMQMessageListener(maxReconsumeTimes = 2,
consumerGroup = MqConstant.CONSUMER_GROUP_004,
topic = MqConstant.TOPIC_004,
selectorExpression = MqConstant.TAG_004,
consumeMode = ConsumeMode.CONCURRENTLY)
public class TransRocketmqConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
//int i = 1 / 0;//模拟消费异常
log.info("事务消息消费实例消费消息:{}", JSON.toJSONString(message));
}
}
复制
3.5.4 监听死信队列
当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。
如果产生了死信消息,那对应的TOPIC的死信Topic名称为%DLQ%${TOPIC_NAME}
,死信队列的消息将不会再被消费(也可以监听启动监听死信队列的消费者)。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信队列的信息。
需要注意DLQ队列的权限问题
perm的对应关系,2-W-只写,4-R-只读,6-RW-读写。不同版本可能默认权限不一样,可以在控制台/mqadmin工具编辑。
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = MqConstant.CONSUMER_GROUP_006, topic = MqConstant.TOPIC_006)
public class DLQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("消费了死信队列中的消息:{}", JSON.toJSONString(message));
}
}
复制