暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RabbitMQ 应用

java小小小小栈 2021-03-04
199



        消息确认机制。



// 普通Confirm


public class Publish1 {


@Test
public void testPublish() throws IOException {


// 普通Confirm, confirm 只能确认消息有没有到达 exchange 无法保证是否到达queue


Connection connection = RabbitConfig.getConnection();


Channel channel = connection.createChannel();


// 开启消息确认机制
channel.confirmSelect();


String queue = "queue";
String msg = "消息!";


channel.basicPublish("",queue,null,msg.getBytes());


// 消息是否到达交换机
try {
if (channel.waitForConfirms()) {
System.out.println("消息到达交换机");
} else {
System.out.println("消息未到达交换机");
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息未到达交换机");
}




try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}


}




// 批量Confirm
public class Publish2 {


@Test
public void testPublish() throws IOException {


// 批量Confirm


Connection connection = RabbitConfig.getConnection();


Channel channel = connection.createChannel();


// 开启消息确认机制
channel.confirmSelect();


String queue = "queue";


for (int i = 0; i < 10; i++) {
String msg = "消息! " + i;


channel.basicPublish("",queue,null,msg.getBytes());
}




try {
// 有一个失败的时候,就直接全部失败
channel.waitForConfirmsOrDie();
System.out.println("消息到达交换机");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息未到达交换机");
}




try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}


// 异步Confirm


public class Publish3 {


@Test
public void testPublish() throws IOException {


// 异步 Confirm


Connection connection = RabbitConfig.getConnection();


Channel channel = connection.createChannel();


// 开启消息确认机制
channel.confirmSelect();


try {
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("消息发送成功,标识:" + l + ",是否是批量" + b);
}


@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息发送失败,标识:" + l + ",是否是批量" + b);
}
});
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息未到达交换机");
}


String queue = "queue";
String msg = "消息! ";
channel.basicPublish("",queue,null,msg.getBytes());


System.in.read();


try {
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}


}


}




// Return机制,监听消息是否从exchange送到了指定的queue中


public class Publish4 {


@Test
public void testPublish() throws IOException, TimeoutException {


// return 机制


Connection connection = RabbitConfig.getConnection();


Channel channel = connection.createChannel();


channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
// 当消息没有送达到queue时,才会执行。
System.out.println(new String(bytes,"UTF-8") + "没有送达到Queue中!!");
}
});


String queue = "queue";
String msg = "消息! ";
// 在发送消息时,指定mandatory参数为true,当队列没有接收到消息时,执行returnListener回调
channel.basicPublish("",queue,true,null,msg.getBytes());


System.in.read();


channel.close();
connection.close();


}
}




复制


        SpringBoot 实现。



// application.xml


spring:
rabbitmq:
publisher-confirm-type: simple #新版本用
publisher-confirms: true #开启confimr确认机制
publisher-returns: true #开启return机制


复制


// RabbitMQConfirmAndReturn.java 开启Confirm和Return


@Component
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {


@Autowired
private RabbitTemplate rabbitTemplate;


// 当前类创建完成后调用
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}


@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if(b){
System.out.println("消息已经送达到Exchange");
}else{
System.out.println("消息没有送达到Exchange");
}
}


@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息没有送达到Queue");
}
}


复制


        避免消息重复消费。主要用到 redis 的 setnx 存值特性。



// pom.xml


<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>


// Publish.java 生产者


// 指定messageId
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) ///指定消息是否需要持久化 1 - 需要持久化 2 - 不需要持久化
.messageId(UUID.randomUUID().toString())
.build();


for (int i = 0; i < 10; i++) {
String queue = "queue";
String msg = "消息! " + i;
// 在发送消息时,指定mandatory参数为true,当队列没有接收到消息时,执行returnListener回调
channel.basicPublish("",queue,true,properties,msg.getBytes());
}




// Consume.java 消费者


// 监听
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


Jedis jedis = new Jedis("192.168.199.109", 6379);
String messageId = properties.getMessageId();


// setnx 存值,key不存在才能存值
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if (result != null && "ok".equalsIgnoreCase(result)) {
System.out.println("接收: " + new String(body, "UTF-8"));
jedis.set(messageId, "1");


channel.basicAck(envelope.getDeliveryTag(), false);
} else {
if ("1".equals(jedis.get(messageId))) {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}




}
};


channel.basicConsume(queue, false, consumer);


复制


        SpringBoot 实现。



// pom.xml


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>


// application.yml
spring:
redis:
host: 192.168.199.109
port: 6379


复制


// 修改生产者
@Test
public void contextLoads() {


CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
for (int i = 0; i < 10; i++) {
String msg = "快红狗 " + i;
rabbitTemplate.convertAndSend("boot-mq-exchange1","fast.red.dog",msg, messageId);


}


}


// 修改消费者
@Component
public class RabbitMQListener {


@Autowired
private StringRedisTemplate redisTemplate;


// 手动ack
@RabbitListener(queues = "boot-mq-queue")
public void receive(String msg, Channel channel, Message message) throws IOException {


String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");


if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
System.out.println("接收: " + msg);


redisTemplate.opsForValue().set(messageId, "1");


channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
if ("1".equals(redisTemplate.opsForValue().get(messageId))) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}


}
}


复制


        RabbitMQ 应用。

        修改前文 SpringBoot 应用的 es 添加数据功能。客户模块作为生产者,查询模块作为消费者。



// 客户模块


// pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>




// application.xml
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test


// CustomerServiceImpl.java


// HttpHeaders httpHeaders = new HttpHeaders();
// httpHeaders.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));
// HttpEntity httpEntity = new HttpEntity(json, httpHeaders);
//
// // 调用 ES 添加请求
//
// restTemplate.postForObject("http://localhost:8080/search/add", httpEntity, String.class);


rabbitTemplate.convertAndSend("customer-exchange", "open.search.add", json);




复制


// 搜索模块


// pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>




// application.xml
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual


// RabbitMQConfig.java


@Configuration
public class RabbitMQConfig {


@Bean
public TopicExchange getTopicExchange() {
return new TopicExchange("customer-exchange", true, false);
}


@Bean
public Queue getQueue() {
return new Queue("customer-queue", true, false, false, null);
}


@Bean
public Binding getBinding(TopicExchange exchange, Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("open.search.*");
}


}


// RabbitMQListener.java


@Component
public class RabbitMQListener {


@Autowired
private SearchService searchService;


@RabbitListener(queues = "customer-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
System.out.println("rabbitmq receive: " + msg);


System.out.println("RoutingKey: " + message.getMessageProperties().getReceivedRoutingKey());


ObjectMapper objectMapper = new ObjectMapper();


Customer customer = objectMapper.readValue(msg, Customer.class);
searchService.addCustomer(customer);


channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);


}


}


复制


文章转载自java小小小小栈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论