使用场景

在这里插入图片描述
在这里插入图片描述
目的是保证事务最终一致性

消息的TTL

在这里插入图片描述

Exchanges 死信

在这里插入图片描述

流程

在这里插入图片描述

整合使用

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.jhj.gulimall.ssoclient.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class MyMQConfig {


/**
* 容器中的Binding Queue Exchange 都会自动创建(RabbitMQ中没有的情况下)
* RabbitMQ中只要有就不会覆盖
*/

@Bean
public Queue orderDelayQueue(){

Map<String,Object> arguments = new HashMap<>();

//死信路由
arguments.put("x-dead-letter-exchange","order-event-exchange");
//死信路由键
arguments.put("x-dead-letter-routing-key","order.release.order");
//过期时间
arguments.put("x-message-ttl","60000");
Queue queue = new Queue("order.delay.queue", true, false, false,arguments);

return queue;
}

@Bean
public Queue orderReleaseQueue(){

Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}

@Bean
public Exchange orderEventExchange(){

return new TopicExchange("order-event-exchange", true, false);
}

@Bean
public Binding orderCreateOrderBinding(){

return new Binding("order.delay.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);
}

@Bean
public Binding orderReleaseOrderBinding(){

return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);
}
}

发送

1
2
3
4
5
6
7
8
/**
* 发送消息
*/
@Test
public void sendMessage(){

rabbitTemplate.convertAndSend("order-event-exchange","order.create.order","test");
}

接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RabbitListener(queues = {
"order.release.order.queue"})
public void listener(Message message, String s, Channel channel){

message.getBody();
System.out.println(s);
try {

// 第二个参数代表 是否批量回复 确认成功回复 收货
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
// 第二个参数代表 是否批量回复
// 第三个参数代表 是否重新放到queue中 退货 确认失败回复 false 丢弃 true 重新入队
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
} catch (IOException e) {

throw new RuntimeException(e);
}
}

如何保证消息可靠性

消息丢失

在这里插入图片描述

消息重复

在这里插入图片描述

消息积压

在这里插入图片描述

作者声明

1
如有问题,欢迎指正!