1.1MQ概述
MQ全称 Message Queue(消息队列),存储消息的中间件,是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
rabbitmq官网地址:https://www.rabbitmq.com/
1.2MQ的优点和缺点
优点:
1.应用解耦:
使用 MQ 使得应用间解耦,提升容错性和可维护性。
2.异步:
相较于串行执行方式,提高响应速度
3.削峰填谷:
系统瞬间请求增多时,直接打到数据库压力很大,使用MQ消息并限制消息消费的速度,消息短时间内积压在MQ中,然后再由数据库能承受的范围慢慢消费。
缺点:
1.系统复杂度提高:
使用MQ异步调用之后也会随之带来一些问题,例如如何保证消息不丢失、消息的幂等性、顺序性、系统一致性问题等。
2.系统可用性降低:
引入消息队列后,一旦MQ宕机,就会对业务造成影响,需要做集群高可用
1.3常见的MQ产品:
这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。
1.4 RabbitMq简介
RabbitMQ 基础架构图:
RabbitMQ 中的相关概念:
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic(publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
常用工作模式介绍:
简单模式:
上图模型中的概念:
⚫ P:生产者,也就是要发送消息的程序
⚫ C:消费者:消息的接收者,会一直等待消息到来
⚫ queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
Work queues 工作队列模式 :
⚫ Work Queues:与简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
⚫ 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
Pub/Sub 发布订阅模式 :
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
⚫ P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
⚫ C:消费者,消息的接收者,会一直等待消息到来
⚫ Queue:消息队列,接收消息、缓存消息
⚫ Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
Routing 路由模式:
模式说明
⚫ 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
⚫ 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
⚫ Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
⚫ P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
⚫ X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
⚫ C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
⚫ C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
工作模式小结:
1. 简单模式
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
2. 工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
3. 发布订阅模式 Publish/subscribe
需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
4. 路由模式 Routing
需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。
2. RabbitMQ 的安装和配置
2.1 Mac安装rabbitmq:
使用brew包管理工具安装rabbitmq
需先安装brew:
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
失败的话可以使用中科大下载地址:
/bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"
然后执行:brew install rabbitmq
安装完成后进入安装目录,启动rabbitmq
cd opt/homebrew/Cellar/rabbitmq/3.9.14
./sbin/rabbitmq-server
访问localhost:15672,使用guest账号登录如下:
2.2 Springboot集成rabbitmq demo
2.21引入Maven依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.22 Yml配置:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
2.23队列配置:
@Configuration
public class DirectRabbitConfig {
//队列 TestDirectQueue
@Bean
public Queue TestDirectQueue() {
return new Queue("TestDirectQueue",true);
}
//Direct交换机 TestDirectExchange
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("TestDirectExchange",true,false);
}
//将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRoutingKey");
}
}
2.24 生产者配置:
@RestController
@RequestMapping("/test")
@Slf4j
public class DemoController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage")
public ResponseEntity<String> sendMessage() {
log.info("sendMessage---");
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRoutingKey", "testMessage");
return ResponseEntity.ok("sendMessage");
}
}
启动服务调用接口,观察rabbitmq管理界面如下:
2.25 消费者配置:
@RestController
@Slf4j
public class ConsumerController {
@RabbitListener(queues = "TestDirectQueue")
public void process(String message) {
log.info("消费者收到消息:{}",message);
}
}
消费消息后观察管理界面如下:
3.RabbitMQ高级特性
消息可靠性:
1.exchange、queue 、message持久化
2.生产方确认Confirm
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式:
confirm 确认模式、return 退回模式
rabbitmq 整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback
消息从 exchange-->queue 投递失败则会返回一个 returnCallback
我们将利用这两个 callback 控制消息的可靠性投递 。
3. Consumer Ack消费方确认Ack
默认自动确认消息,可设置手动确认。
自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,手动确认,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
4. Broker高可用
消费端限流:
设置消费端一次拉取多少消息,消费端的确认模式为手动确认。
TTL:
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
当消息到达存活时间后,还没有被消费,会被自动清除。
死信队列:
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
1. 死信交换机和死信队列和普通的没有区别
2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
3. 消息成为死信的三种情况:
3.1. 队列消息长度到达限制;
3.2. 消费者拒接消费消息,并且不重回队列;
3.3. 原队列存在消息过期设置,消息到达超时时间未被消费;
延迟队列(TTL + DLX) :
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
例如下单后,30分钟未支付,取消订单,回滚库存。
在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
4.RabbitMQ应用问题
1.消息可靠性保障--消息补偿:
100%确保消息发送成功
消息补偿机制需要建立在业务数据库和MQ数据库的基础之上 , 当我们发送消息时 , 需要同时将消息数据保存在数据库中, 两者的状态必须记录。然后通过业务数据库和MQ数据库的对比检查消费是否成功,不成功,进行消息补偿措施,重新发送消息处理
消息补偿机制核心 : 发现未成功消费的消息, 并且重新发送消息
消息回调检查服务 : 发送正常消息同时发送一个延迟消息, 当监听到延迟消息的时候, 检查MDB中是否有消费记录 , 如果没有代表存在消息丢失, 重新发送消息
消息定时检查服务 : 设置定时任务, 定时比对业务DB和MDB ,中的数据是否一致, 如果不一致,存在消息丢失, 重新发送
2.消息幂等性保障 :
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任
意多次执行对资源本身所产生的影响均与一次执行的影响相同。 在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
解决方案:
可采用数据库版本号乐观锁(写数据的同时更新版本号),或分布式锁