消息确认机制。
// 普通Confirm
public class Publish1 {
@Test
public void testPublish() throws IOException {
// 普通Confirm, confirm 只能确认消息有没有到达 exchange 无法保证是否到达queue
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
// 开启消息确认机制
channel.confirmSelect();
String queue = "queue";
String msg = "消息!";
channel.basicPublish("",queue,null,msg.getBytes());
// 消息是否到达交换机
try {
if (channel.waitForConfirms()) {
System.out.println("消息到达交换机");
} else {
System.out.println("消息未到达交换机");
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息未到达交换机");
}
try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 批量Confirm
public class Publish2 {
@Test
public void testPublish() throws IOException {
// 批量Confirm
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
// 开启消息确认机制
channel.confirmSelect();
String queue = "queue";
for (int i = 0; i < 10; i++) {
String msg = "消息! " + i;
channel.basicPublish("",queue,null,msg.getBytes());
}
try {
// 有一个失败的时候,就直接全部失败
channel.waitForConfirmsOrDie();
System.out.println("消息到达交换机");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息未到达交换机");
}
try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 异步Confirm
public class Publish3 {
@Test
public void testPublish() throws IOException {
// 异步 Confirm
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
// 开启消息确认机制
channel.confirmSelect();
try {
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("消息发送成功,标识:" + l + ",是否是批量" + b);
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息发送失败,标识:" + l + ",是否是批量" + b);
}
});
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息未到达交换机");
}
String queue = "queue";
String msg = "消息! ";
channel.basicPublish("",queue,null,msg.getBytes());
System.in.read();
try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
// Return机制,监听消息是否从exchange送到了指定的queue中
public class Publish4 {
@Test
public void testPublish() throws IOException, TimeoutException {
// return 机制
Connection connection = RabbitConfig.getConnection();
Channel channel = connection.createChannel();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
// 当消息没有送达到queue时,才会执行。
System.out.println(new String(bytes,"UTF-8") + "没有送达到Queue中!!");
}
});
String queue = "queue";
String msg = "消息! ";
// 在发送消息时,指定mandatory参数为true,当队列没有接收到消息时,执行returnListener回调
channel.basicPublish("",queue,true,null,msg.getBytes());
System.in.read();
channel.close();
connection.close();
}
}
复制
SpringBoot 实现。
// application.xml
spring:
rabbitmq:
publisher-confirm-type: simple #新版本用
publisher-confirms: true #开启confimr确认机制
publisher-returns: true #开启return机制
复制
// RabbitMQConfirmAndReturn.java 开启Confirm和Return
@Component
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
// 当前类创建完成后调用
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if(b){
System.out.println("消息已经送达到Exchange");
}else{
System.out.println("消息没有送达到Exchange");
}
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息没有送达到Queue");
}
}
复制
避免消息重复消费。主要用到 redis 的 setnx 存值特性。
// pom.xml
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
// Publish.java 生产者
// 指定messageId
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) ///指定消息是否需要持久化 1 - 需要持久化 2 - 不需要持久化
.messageId(UUID.randomUUID().toString())
.build();
for (int i = 0; i < 10; i++) {
String queue = "queue";
String msg = "消息! " + i;
// 在发送消息时,指定mandatory参数为true,当队列没有接收到消息时,执行returnListener回调
channel.basicPublish("",queue,true,properties,msg.getBytes());
}
// Consume.java 消费者
// 监听
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Jedis jedis = new Jedis("192.168.199.109", 6379);
String messageId = properties.getMessageId();
// setnx 存值,key不存在才能存值
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if (result != null && "ok".equalsIgnoreCase(result)) {
System.out.println("接收: " + new String(body, "UTF-8"));
jedis.set(messageId, "1");
channel.basicAck(envelope.getDeliveryTag(), false);
} else {
if ("1".equals(jedis.get(messageId))) {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
};
channel.basicConsume(queue, false, consumer);
复制
SpringBoot 实现。
// pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
// application.yml
spring:
redis:
host: 192.168.199.109
port: 6379
复制
// 修改生产者
@Test
public void contextLoads() {
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
for (int i = 0; i < 10; i++) {
String msg = "快红狗 " + i;
rabbitTemplate.convertAndSend("boot-mq-exchange1","fast.red.dog",msg, messageId);
}
}
// 修改消费者
@Component
public class RabbitMQListener {
@Autowired
private StringRedisTemplate redisTemplate;
// 手动ack
@RabbitListener(queues = "boot-mq-queue")
public void receive(String msg, Channel channel, Message message) throws IOException {
String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
System.out.println("接收: " + msg);
redisTemplate.opsForValue().set(messageId, "1");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
if ("1".equals(redisTemplate.opsForValue().get(messageId))) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
}
复制
RabbitMQ 应用。
修改前文 SpringBoot 应用的 es 添加数据功能。客户模块作为生产者,查询模块作为消费者。
// 客户模块
// pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
// application.xml
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
// CustomerServiceImpl.java
// HttpHeaders httpHeaders = new HttpHeaders();
// httpHeaders.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));
// HttpEntity httpEntity = new HttpEntity(json, httpHeaders);
//
// // 调用 ES 添加请求
//
// restTemplate.postForObject("http://localhost:8080/search/add", httpEntity, String.class);
rabbitTemplate.convertAndSend("customer-exchange", "open.search.add", json);
复制
// 搜索模块
// pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
// application.xml
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual
// RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
@Bean
public TopicExchange getTopicExchange() {
return new TopicExchange("customer-exchange", true, false);
}
@Bean
public Queue getQueue() {
return new Queue("customer-queue", true, false, false, null);
}
@Bean
public Binding getBinding(TopicExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("open.search.*");
}
}
// RabbitMQListener.java
@Component
public class RabbitMQListener {
@Autowired
private SearchService searchService;
@RabbitListener(queues = "customer-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
System.out.println("rabbitmq receive: " + msg);
System.out.println("RoutingKey: " + message.getMessageProperties().getReceivedRoutingKey());
ObjectMapper objectMapper = new ObjectMapper();
Customer customer = objectMapper.readValue(msg, Customer.class);
searchService.addCustomer(customer);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
复制
文章转载自java小小小小栈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
数据库国产化替代深化:DBA的机遇与挑战
代晓磊
1206次阅读
2025-04-27 16:53:22
2025年4月国产数据库中标情况一览:4个千万元级项目,GaussDB与OceanBase大放异彩!
通讯员
687次阅读
2025-04-30 15:24:06
数据库,没有关税却有壁垒
多明戈教你玩狼人杀
586次阅读
2025-04-11 09:38:42
国产数据库需要扩大场景覆盖面才能在竞争中更有优势
白鳝的洞穴
568次阅读
2025-04-14 09:40:20
【活动】分享你的压箱底干货文档,三篇解锁进阶奖励!
墨天轮编辑部
491次阅读
2025-04-17 17:02:24
一页概览:Oracle GoldenGate
甲骨文云技术
468次阅读
2025-04-30 12:17:56
GoldenDB数据库v7.2焕新发布,助力全行业数据库平滑替代
GoldenDB分布式数据库
460次阅读
2025-04-30 12:17:50
优炫数据库成功入围新疆维吾尔自治区行政事业单位数据库2025年框架协议采购!
优炫软件
353次阅读
2025-04-18 10:01:22
国产数据库图谱又上新|82篇精选内容全览达梦数据库
墨天轮编辑部
267次阅读
2025-04-23 12:04:21
关于征集数据库标准体系更新意见和数据库标准化需求的通知
数据库标准工作组
239次阅读
2025-04-11 11:30:08