RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的,支持高并发处理和分布式部署。所有主要的编程语言均有与代理接口通讯的客户端库。RabbitMQ本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,更适合于企业级的开发。它还实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队,通过队列机制实现应用程序间的异步通信与数据传输,常用于流量削峰、系统解耦及异步处理场景。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。

参考文章:

5.RabbitMQ交换机详解

7.RabbitMQ延时交换机

6.RabbitMQ死信队列

8.RabbitMQ队列详解

9.RabbitMQ消息的可靠性

RabbitMQ 交换机

Exchange(X) 可翻译成交换机/交换器/路由器

RabbitMQ交换器 (Exchange)类型

  1. Fanout Exchange(扇形交换机):广播,分发模式,把消息分发给所有订阅者。广播,将消息交给所有绑定到交换机的队列
  2. Direct Exchange(直连交换机):定向,把消息交给符合指定routing key 的队列。发送方把消息发送给订阅方,针对多个订阅者,默认采取轮询方式进行消息发送
  3. Topic Exchange(主题交换机):通配符,匹配订阅模式,使用正则匹配到消息队列,把消息交给符合routing pattern(路由模式) 的队列
  4. Headers Exchange(头部交换机):与 direct 类似,只是性能很差,此类型几乎用不到。
  5. x-local-random Exchange
  6. 自定义交换机

使用流程:引入依赖 → 配置MQ(yaml)→ 定义常量 → 定义MQ → 定义生产者 → 定义消费者 → 发送消息 → 测试结果

引入依赖,配置MQ(yaml)都一样,根据不同类型,不同情况。定义常量、MQ、生产者、消费者略有差别。

Fanout Exchange

扇形交换机会将消息投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;消息发送流程如下:

1
2
3
4
5
6
1. 可以有多个消费者
2. 每个消费者有自己的queue(队列)
3. 每个队列都要绑定到Exchange(交换机)
4. 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5. 交换机把消息发送给绑定过的所有队列
6. 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

定义常量

1
2
3
4
5
public class RabbitMQConstant {
public static final String EXCHANGE_FANOUT = "exchange.fanout";
public static final String QUEUE_FANOUT_A = "queue.fanout.a";
public static final String QUEUE_FANOUT_B = "queue.fanout.B";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class RabbitConfig {
// rabbitmq 三部曲
// 1、定义交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitMQConstant.EXCHANGE_FANOUT);
}
// 2 定义队列a
@Bean
public Queue queueA() {
return new Queue(RabbitMQConstant.QUEUE_FANOUT_A);
}
// 2 定义队列b
@Bean
public Queue queueB() {
return new Queue(RabbitMQConstant.QUEUE_FANOUT_B);
}
// 3 绑定交换机和队列a
@Bean
public Binding bingingA(FanoutExchange fanoutExchange, Queue queueA) {
// 将队列A绑定到扇形交换机
return BindingBuilder.bind(queueA).to(fanoutExchange);
}
// 3 绑定交换机和队列b
@Bean
public Binding bingingB(FanoutExchange fanoutExchange, Queue queueB) {
// 将队列B绑定到扇形交换机
return BindingBuilder.bind(queueB).to(fanoutExchange);
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
// 生产者发送消息
public void sendMsg() {
// 定义要发送的消息
String msg = "hello world";
// 消息封装成Message对象
Message message = new Message(msg.getBytes());
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_FANOUT, "", message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

定义消费者

1
2
3
4
5
6
7
8
9
10
11
@Component
@Slf4j
public class ReceiveMessageService {
// 接收两个队列的消息
@RabbitListener(queues = {RabbitMQConstant.QUEUE_FANOUT_A,RabbitMQConstant.QUEUE_FANOUT_B})
public void receiveMsg(Message message){
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到的消息为:{}",msg);
}
}

测试结果

1
2
3
消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025
接收到的消息为:hello world
接收到的消息为:hello world

Direct Exchange

在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。路由键与队列名完全匹配的交换机。

1
2
3
4
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

定义常量

1
2
3
4
5
6
7
public class RabbitMQConstant {
public static final String EXCHANGE_DIRECT = "exchange.direct";
public static final String QUEUE_DIRECT_A = "queue.direct.a";
public static final String QUEUE_DIRECT_B = "queue.direct.b";
public static final String ROUTING_KEY = "info";
public static final String ROUTING_WARNING_KEY = "warn";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class RabbitConfig {
// 1、定义交换机
@Bean
public DirectExchange directExchange() {
//使用建造者模式创建
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DIRECT).build();
}
// 2、定义队列
@Bean
public Queue queueA() {
//使用建造者模式创建
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT_A).build();
}
@Bean
public Queue queueB() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT_B).build();
}
// 3、交换机和队列进行绑定
// 队列A绑定info
@Bean
public Binding bindingA(DirectExchange directExchange, Queue queueA) {
// 使用建造者模式创建
return BindingBuilder.bind(queueA).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);
}
// 队列B绑定info
@Bean
public Binding bindingB1(DirectExchange directExchange, Queue queueB) {
return BindingBuilder.bind(queueB).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);
}
// 队列B绑定warn
@Bean
public Binding bindingB2(DirectExchange directExchange, Queue queueB) {
return BindingBuilder.bind(queueB).to(directExchange).with(RabbitMQConstant.ROUTING_WARNING_KEY);
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;

public void sendMsg() {
// 发送一个两个队列都能接收的消息
// 使用建造者模式创建消息
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
// 参数1 交换机, 参数2 路由key, 参数3 消息
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT,
RabbitMQConstant.ROUTING_KEY, message);
// 发送一个只有队列B能接收的消息
// 使用建造者模式创建消息
Message error_message = MessageBuilder.withBody("error world".getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT,
RabbitMQConstant.ROUTING_WARNING_KEY, error_message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

定义消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
@Slf4j
public class ReceiveMessageService {
// 接收A队列的消息
@RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT_A})
public void receiveMsgA(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到A的消息为:{}", msg);
}
// 接收B队列的消息
@RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT_B})
public void receiveMsgB(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到B的消息为:{}", msg);
}
}

测试结果

1
2
3
4
消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025
接收到B的消息为:hello world
接收到A的消息为:hello world
接收到B的消息为:error world

Topic Exchange

1
2
Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!即direct是完全匹配,topic模式是模糊匹配。
通配符匹配(相当于模糊匹配),Topic模式中路由键通过"."分为多个部分。

通配符规则:用”.”隔开的为一个单词。

#:匹配一个或多个词

1
beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx

*:匹配不多不少恰好 1 个词。必须有一个而且只有一个

1
beijing.* == beijing.queue, beijing.xyz

定义常量

1
2
3
4
5
public class RabbitMQConstant {
public static final String EXCHANGE_TOPIC = "exchange.topic";
public static final String QUEUE_TOPIC_A = "queue.topic.a";
public static final String QUEUE_TOPIC_B = "queue.topic.b";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class RabbitConfig {
@Bean
public TopicExchange topicExchange() {
return ExchangeBuilder.topicExchange(RabbitMQConstant.EXCHANGE_TOPIC).build();
}
@Bean
public Queue queueA() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_TOPIC_A).build();
}
@Bean
public Queue queueB() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_TOPIC_B).build();
}
@Bean
public Binding bindingA(TopicExchange topicExchange, Queue queueA) {
return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");
}
@Bean
public Binding bindingB1(TopicExchange topicExchange, Queue queueB) {
return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");
}
@Bean
public Binding bindingB2(TopicExchange topicExchange, Queue queueB) {
return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
// 参数1 交换机,参数2 发送路由key, 参数3 消息
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_TOPIC, "lazy.orange.rabbit", message);
}
}

定义消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
@Slf4j
public class ReceiveMessageService {
// 接收A队列的消息
@RabbitListener(queues = {RabbitMQConstant.QUEUE_TOPIC_A})
public void receiveMsgA(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到A的消息为:{}", msg);
}
// 接收B队列的消息
@RabbitListener(queues = {RabbitMQConstant.QUEUE_TOPIC_B})
public void receiveMsgB(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到B的消息为:{}", msg);
}
}

测试结果

1
2
接收到A的消息为:hello world
接收到B的消息为:hello world

Headers Exchange

基于消息内容中的headers属性进行匹配,不是根据路由键匹配。headers交换器和direct交换器完全一致,但性能差很多,几乎不用了。

消费方指定的headers中必须包含一个”x-match”键,”x-match”键的值有两个

  • x-match=all:表示所有的键值对都匹配才能接受消息

  • x-match=any:表示只要有键值对匹配就能接受消息

发消息时可以指定消息属性(MessageProperties),如果heanders中包含多个消息属性,则所有属性都匹配上才算匹配上。

定义常量

1
2
3
4
5
public class RabbitMQConstant {
public static final String EXCHANGE_HEADERS = "exchange.headers";
public static final String QUEUE_HEADERS_A = "queue.headers.a";
public static final String QUEUE_HEADERS_B = "queue.headers.b";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Configuration
public class RabbitConfig {
@Bean
public HeadersExchange headersExchange() {
return ExchangeBuilder.headersExchange(RabbitMQConstant.EXCHANGE_HEADERS).build();
}
@Bean
public Queue queueA() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_HEADERS_A).build();
}
@Bean
public Queue queueB() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_HEADERS_B).build();
}
// 绑定队列与交换机
@Bean
public Binding bindingA(HeadersExchange headersExchange, Queue queueA) {
Map<String, Object> headerValues = new HashMap<>();
headerValues.put("type", "m");
headerValues.put("status", 1);
return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();
}
@Bean
public Binding bindingB(HeadersExchange headersExchange, Queue queueB) {
Map<String, Object> headerValues = new HashMap<>();
headerValues.put("type", "s");
headerValues.put("status", 0);
return BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
// 消息属性
MessageProperties messageProperties = new MessageProperties();
Map<String, Object> headers = new HashMap<>();
headers.put("type", "s");
headers.put("status", 0);
// 设置消息头
messageProperties.setHeaders(headers);
// 添加了消息属性
Message message = MessageBuilder.withBody("hello world".getBytes())
.andProperties(messageProperties).build();
// 头部交换机,路由key无所谓
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_HEADERS, "", message);
log.info("消息发送完毕!!!");
}
}

定义消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
@Slf4j
public class ReceiveMessageService {
// 接收A队列的消息
@RabbitListener(queues = {RabbitMQConstant.QUEUE_HEADERS_A})
public void receiveMsgA(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到A的消息为:{}", msg);
}

// 接收B队列的消息
@RabbitListener(queues = {RabbitMQConstant.QUEUE_HEADERS_B})
public void receiveMsgB(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到B的消息为:{}", msg);
}
}

测试结果

1
2
消息发送完毕!
接收到B的消息为:hello world

自定义交换机

参考:延时交换机使用方式

交换机属性

属性介绍

  1. name:交换机名称,就是一个字符串

  2. Type:交换机类型(direct、topic、fanout、headers、x-local-random)五种

  3. durable:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在

  4. autoDelete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机

  5. internal:内部使用的,如果是yes表示客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定

  6. arguments:只有一个取值alternate-exchange,表示备用交换机

  7. alternate:设置备用交换机

使用示例

定义常量

1
2
3
4
public class RabbitMQConstant {
public static final String EXCHANGE_DIRECT = "exchange.direct";
public static final String QUEUE_DIRECT = "queue.direct";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class RabbitConfig {
// 1、定义交换机
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder // 默认为持久化的
.directExchange(RabbitMQConstant.EXCHANGE_DIRECT) // 交换机的名字
.autoDelete() // 是否自动删除,默认不自动删除,调用该方法则表示自动删除
.durable(false) // false表示不持久化,服务器重启会丢失;true表示持久化
// 是否是内部使用(如果是yes表示客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定)
.internal() // 调用此方法表示设置为true
.build();
}
// 2、定义队列
@Bean
public Queue queue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT).build();
}
// 3、绑定交换机与队列
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with("info");
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, "info", message);
log.info("消息发送完毕,发送时间为:{}", new Date());
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, "", message);
log.info("消息发送完毕!!!");
}
}

定义消费者

1
2
3
4
5
6
7
8
9
10
11
@Component
@Slf4j
public class ReceiveMessageService {
// 接收队列的消息
@RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT})
public void receiveMsgA(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到的消息为:{}", msg);
}
}

测试结果

  1. 不加条件

    1
    2
    3
    消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025
    消息发送完毕!
    接收到的消息为:hello world
  2. 当配置属性internal时报错

    1
    2
    消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025
    Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code:403 reply-text:ACCESS_REFUSED....
  3. 测试持久化按上

    设置持久化为false,重启rabbitmq-server,则交换机丢失

  4. 测试配置autoDelete

    自动删除为 true,从控制台上手动解绑,会发现自动删除

备用交换机

使用场景

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息。如果想要监测哪些消息被投递到没有对应的队列,可以用备用交换机来实现,然后接收备用交换机的消息从而记录日志或发送报警信息。

使用示例

注意:备用交换机一般使用fanout交换机

测试时:指定一个错误路由

重点:普通交换机设置参数绑定到备用交换机

定义常量

1
2
3
4
5
6
7
8
9
10
11
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.backup";
// 备用交换机
public static final String EXCHANGE_BACKUP_NAME = "exchange.backup";
// 正常队列
public static final String QUEUE_NORMAL_NAME = "queue.normal.backup";
// 备用队列
public static final String QUEUE_BACKUP_NAME = "queue.backup";
public static final String ROUTING_WARNING_KEY = "info";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Configuration
public class RabbitConfig {
// 定义正常交换机
@Bean
public DirectExchange normalExchange() {
Map<String, Object> arguments = new HashMap<>();
/*//指定当前正常的交换机的备用交换机是谁
arguments.put("alternate-exchange", RabbitMQConstant.EXCHANGE_BACKUP_NAME);
return new DirectExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME, true, false, arguments);*/

return ExchangeBuilder // 默认为持久化的,默认不自动删除
.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME) // 交换机的名字
.alternate(RabbitMQConstant.EXCHANGE_BACKUP_NAME) //设置备用交换机 alternate-exchange
.build();
}
// 定义正常队列
@Bean
public Queue queueNormal() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).build();
}
// 绑定正常交换机和队列
@Bean
public Binding binding(DirectExchange normalExchange, Queue queueNormal) {
return BindingBuilder.bind(queueNormal).to(normalExchange)
.with(RabbitMQConstant.ROUTING_WARNING_KEY);
}
// 定义备用交换机
@Bean
public FanoutExchange alternateExchange() {
return ExchangeBuilder.fanoutExchange(RabbitMQConstant.EXCHANGE_BACKUP_NAME).build();
}
// 定义备用队列
@Bean
public Queue alternateQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_BACKUP_NAME).build();
}
// 绑定备用交换机和队列
@Bean
public Binding bindingAlternate(FanoutExchange alternateExchange, Queue alternateQueue) {
return BindingBuilder.bind(alternateQueue).to(alternateExchange);
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message= MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,"test",message);
log.info("消息发送完毕,发送时间为:{}",new Date());
}
}

定义消费者

1
2
3
4
5
6
7
8
9
10
11
@Component
@Slf4j
public class ReceiveMessageService {
// 接收备用队列的消息
@RabbitListener(queues = {RabbitMQConstant.QUEUE_BACKUP_NAME})
public void receiveMsg(Message message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("接收到备用的消息为:{}", msg);
}
}

发送消息

故意写错路由key,由于正常交换机设置了备用交换机,所以该消息就会进入备用交换机从而进入备用队列。可以写一个程序接收备用对列的消息,接收到后通知相关人员进行处理。如果正常交换机没有设置备用交换机,则该消息会被抛弃。

测试结果

1
2
消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025
接收到备用的消息为:hello world

RabbitMQ 延时交换机

延时问题

场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了。

解决方式

  • 定时任务方式:每隔3秒扫描一次数据库,查询过期的订单然后进行处理;

    优点:简单,容易实现;

    缺点:

    • 存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;
    • 性能较差,每次扫描数据库,如果订单量很大会影响性能
  • 被动取消:当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);

    优点:对服务器而言,压力小;

    缺点:

    • 用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;
    • 用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;
  • JDK延迟队列(单体应用,不能分布式下):DelayedQueue。无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素

    优点:实现简单,任务延迟低

    缺点:

    • 服务重启、宕机,数据丢失;
    • 只适合单机版,不适合集群;
    • 订单量大,可能内存不足而发生异常;
  • 采用消息中间件(rabbitmq)

    RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。

    不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样。

    image-20250730224552796

延时交换机

死信队列实现

实现方式

给正常队列绑定一个死信交换机和设置死信路由key;给正常队列设置消息过期时间,过期时间用于模拟延时操作,当消息过期后没有被消费就会转到死信队列。

定义常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RabbitMQConstant {
// 正常交换机(
public static final String EXCHANGE_NAME = "exchange.delay.normal.1";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.1";
// 正常队列
public static final String QUEUE_NAME = "queue.delay.normal.1";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.delay.dlx.1";
// 正常路由key
public static final String ROUTING_NAME = "order1";
// 死信路由key
public static final String ROUTING_DLX_NAME = "error1";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Configuration
public class RabbitConfig {
// 正常交换机
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
// 正常队列
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 重点:设置这两个参数
// 设置队列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
// 设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME)
.ttl(25000) // 队列的过期时间
.withArguments(arguments) // 设置对列的参数
.build();
}
// 正常交换机和正常队列绑定
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);
}
// 死信交换机
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
// 死信队列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
// 死信交换机和死信队列绑定
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

测试结果

image-20250730224414884

存在的问题

如果不设置队列的过期时间,在发送消息时设置消息的过期时间会存在以下问题

  • 若队头的消息过期时间长,后面的消息过期时间短,但因为队头的消息没有被消费,因此后面已过期的消息也无法到达死信队列中

多队列解决过期时间问题

对于上面存在的问题,可以将不同过期时间的消息发送到不同的队列上,过期后再转到死信队列上

定义MQ

注意:正常队列不要设置过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Configuration
public class RabbitConfig {
// 正常交换机
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
// 正常队列
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 重点:设置这两个参数
// 设置队列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
// 设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME)
//.ttl(5000) // 队列的过期时间
.withArguments(arguments) // 设置对列的参数
.build();
}
// 正常交换机和正常队列绑定
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);
}
// 死信交换机
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
// 死信队列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
// 死信交换机和死信队列绑定
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);
}
}

定义生产者

发送消息时先发送一条过期时间长的,再发送一条过期时间短的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("25000"); // 第一条消息 过期时间
Message message = MessageBuilder.withBody("hello world 1".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000"); // 第二条消息 过期时间
Message message = MessageBuilder.withBody("hello world 2".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}

定义消费者

消费者监听死信队列的消息来查看消息接收的时间

1
2
3
4
5
6
7
8
9
10
@Component
@Slf4j
public class ReceiveMessageService {
// 延迟队列一定要接收死信队列的消息
@RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME)
public void receiveMsg(Message message) {
String body = new String(message.getBody());
log.info("接收到的消息为:{},接收时间为:{}", body, new Date());
}
}

测试结果:先接收到的是队头的消息

1
2
3
4
消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025
消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025
接收到的消息为:hello world 1,接收时间为:Wed Jul 30 22:02:16 GMT+08:00 2025
接收到的消息为:hello world 2,接收时间为:Wed Jul 30 22:02:16 GMT+08:00 2025

使用延时插件

安装插件

  1. 下载

    选择对应的版本下载 rabbitmq_delayed_message_exchange 插件。官网下载地址community-plugins

  2. 插件拷贝到 RabbitMQ 服务器plugins目录下

  3. 解压缩

    1
    2
    3
    yum install unzip -y # 如果unzip没有安装,先安装一下
    cd /usr/local/rabbitmq_server-4.0.7/plugins
    unzip rabbitmq_delayed_message_exchange-v4.0.7.ez
  4. 启用插件

    1
    2
    cd /usr/local/rabbitmq_server-4.0.7/sbin/
    ./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  5. 查询安装情况

    1
    ./rabbitmq-plugins list # 查询安装的所有插件
  6. 重启Rabbitmq使其生效

实现原理

消息发送后不会直接投递到队列,而是先存储到内嵌的 Mnesia数据库中,然后会检查 x-delay 时间(消息头部),将过期的消息放到死信队列中。延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;

image-20250730225956046

Mnesia 是一个小型数据库,不适合于大量延迟消息的实现。解决了消息过期时间不一致出现的问题。

实现延时队列

消息只要发送到延时交换机即可,延时交换机绑定死信路由的key

定义常量

1
2
3
4
5
6
public class RabbitMQConstant {
// 正常交换机(死信交换机)
public static final String EXCHANGE_NAME = "exchange.delay.4";
public static final String QUEUE_NAME = "queue.delay.4";
public static final String ROUTING_NAME = "plugin4";
}

定义MQ

注意延时交换机需要使用自定义类型定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class RabbitConfig {
// 创建自定义交换机
@Bean
public CustomExchange customExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct"); // 放一个参数
// CustomExchange(String name, String type, boolean durable, boolean autoDelete,
// Map<String, Object> arguments)
return new CustomExchange(RabbitMQConstant.EXCHANGE_NAME,
"x-delayed-message", true, false, arguments);
}
@Bean
public Queue queue() {
return QueueBuilder
.durable(RabbitMQConstant.QUEUE_NAME) // 队列名称
.build();
}
@Bean
public Binding binding(CustomExchange customExchange, Queue queue) {
//绑定也指定路由key,加noargs 方法
return BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs();
}
}

定义生产者

生产者发送消息时要在headers中添加过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("x-delay", 25000);// 第一条消息 延迟时间
//messageProperties.setExpiration("25000"); // 不要用这个
Message message = MessageBuilder.withBody("hello world 1".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("x-delay", 15000); // 第二条消息 延迟时间
//messageProperties.setExpiration("15000"); // 不要用这个
Message message = MessageBuilder.withBody("hello world 2".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}

定义消费者

1
2
3
4
5
6
7
8
9
10
@Component
@Slf4j
public class ReceiveMessageService {
// 延迟队列一定要接收死信队列的消息
@RabbitListener(queues = RabbitMQConstant.QUEUE_NAME)
public void receiveMsg(Message message) {
String body = new String(message.getBody());
log.info("接收到的消息为:{},接收时间为:{}", body, new Date());
}
}

RabbitMQ 死信队列

过期消息(Time To Live,TTL)

消息的过期时间有两种设置方式:(过期消息)

  1. 单条消息过期:单条消息的过期时间决定了在没有任何消费者消费时,消息可以存活多久;
  2. 队列属性设置所有消息过期:队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存活多久;

如果消息和对列都设置过期时间,则消息的TTL以两者之间较小的那个数值为准。

设置消息过期时间

定义常量

1
2
3
4
5
public class RabbitMQConstant {
public static final String EXCHANGE_NAME = "exchange.ttl.a";
public static final String QUEUE_TNAME = "queue.ttl.a";
public static final String ROUTING_KEY = "info";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class RabbitConfig {
// 1、定义交换机
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
// 2、定义队列
@Bean
public Queue queue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_TNAME).build();
}
// 3、绑定交换机与队列
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
// 设置单条消息的过期时间
public void sendMsg() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("35000"); //过期的毫秒
Message message = MessageBuilder.withBody("hello world".getBytes())
.andProperties(messageProperties).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_KEY, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

测试结果:发送消息后查看 queue.ttl.a 有一条数据,35秒后查看发现数据丢失。

设置队列消息过期时间

定义常量

1
2
3
4
5
public class RabbitMQConstant {
public static final String EXCHANGE_NAME = "exchange.ttl.b";
public static final String QUEUE_TNAME = "queue.ttl.b";
public static final String ROUTING_KEY = "info";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class RabbitConfig {
// 1、定义交换机
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
// 2、定义队列
@Bean
public Queue queue() {
// 方式1 new Queue 的方式
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 15000); // 消息过期时间
return new Queue(RabbitMQConstant.QUEUE_TNAME, true, false, false, arguments);
// 方式2 建造者
//return QueueBuilder.durable(RabbitMQConstant.QUEUE_TNAME).withArguments(arguments).build();
}
// 3、绑定交换机与队列
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_KEY, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

测试结果:先后发送两条消息,发送消息后查看 queue.ttl.b 有2条数据,第1条消息到期后消失,第2条消息到期后消失

死信队列(Dead Letter Exchange,DLX)

单条消息过期

消息设置了过期时间,在到达过期时间后消息没有被消费:可能是因为消息积压太多,消息消费不过来;也可能是没有对应的消费者。

定义常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.1";
// 正常队列,没有消费者,设置过期时间
public static final String QUEUE_NORMAL_NAME = "queue.normal.1";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.dlx.1";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.dlx.1";
// 正常路由key
public static final String NORMAL_KEY = "order1";
// 死信路由key
public static final String DLX_KEY = "error1";
}
// 示例中,各个示例的编号分别顺次为1,2,3,4,5

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
public class RabbitConfig {
// 正常交换机
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();
}
// 正常队列
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 重点:设置这两个参数
// 设置队列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
// 设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
.withArguments(arguments) // 设置对列的参数
.build();
}
// 正常交换机和正常队列绑定
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);
}
// 死信交换机
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
// 死信队列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();
}
// 死信交换机和死信队列绑定
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
// 生产者发送消息
public void sendMsg() {
// 消息属性
MessageProperties messageProperties = new MessageProperties();
// 设置单条消息的过期时间,单位为毫秒,数据类型为字符串
messageProperties.setExpiration("15000");
Message message = MessageBuilder.withBody("hello world".getBytes())
.andProperties(messageProperties).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,
RabbitMQConstant.NORMAL_KEY, message);
log.info("消息发送完毕:发送时间为:{}", new Date());

}
}

测试结果:发送的消息先出现在正常队列,待过期后消息会出现在死信队列

队列设置过期

队列设置了过期时间,在到达过期时间后消息没有被消费:可能是因为消息积压太多,消息消费不过来;也可能是没有对应的消费者。

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class RabbitConfig {
// ......
// 正常队列
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 设置队列的过期时间为20秒
arguments.put("x-message-ttl", 20000);
// 设置这两个参数
// 重点:给正常队列绑定死信交换机和设置死信路由的key,也就是消息过期后发送到哪个死信交换机,发送时设置死信路由的key
// 设置对列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
// 设置死信路由key,要和死信交换机和死信队列绑定key一模一样
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).withArguments(arguments).build();
}
// 其他同上
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,
RabbitMQConstant.NORMAL_KEY, message);
log.info("消息发送完毕:发送时间为:{}", new Date());
}
}

测试结果:发送的消息先出现在正常队列,待过期后消息会出现在死信队列

队列到达最大长度

先入队的消息会被发送到DLX

定义常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.2";
// 正常队列:没有消费者、设置过期时间
public static final String QUEUE_NORMAL_NAME = "queue.normal.2";
// 死信交换机
public static final String EXCHANGE_DLX_NAME = "exchange.dlx.2";
// 死信队列
public static final String QUEUE_DLX_NAME = "queue.dlx.2";
// 正常路由key
public static final String NORMAL_KEY = "order2";
// 死信路由key
public static final String DLX_KEY = "error2";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class RabbitConfig {
// ......
// 正常队列
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 设置对列的最大长度
arguments.put("x-max-length", 5);
// 重点:设置这两个参数
// 设置队列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
// 设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
.withArguments(arguments) // 设置对列过期时间
.build();
}
// 其他同上
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
for (int i = 1; i <= 8; i++) {
String str = "hello world " + i;
Message message = MessageBuilder.withBody(str.getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,
RabbitMQConstant.NORMAL_KEY, message);
}
log.info("消息发送完毕:发送时间为:{}", new Date());
}
}

测试结果:超过正常队列的长度后先入队的消息会放入死信队列

消费消息不进行重新投递

  • 从正常的队列接收消息,但对消息不进行确认并且不对消息进行重新投递,此时消息就进入死信队列
  • 业务处理过程中出现异常也会变成死信,因为消费者没有进行确认

配置MQ

需要开启消费者手动确认模式,否则默认是消息消费后自动确认的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 8080
spring:
application:
name: dlx-learn4
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
# 开启消费者手动确认
listener:
simple:
acknowledge-mode: manual

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class RabbitConfig {
// ......
// 正常队列
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 15000); //设置对列的过期时间
// 重点:设置这两个参数
// 设置对列的死信交换机
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
// 设置死信路由key,要和死信交换机和死信队列绑定key一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.DLX_KEY);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)
.withArguments(arguments) // 设置对列的参数
.build();
}
// 其他同上
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
String str = "hello world";
Message message = MessageBuilder.withBody(str.getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,
RabbitMQConstant.NORMAL_KEY, message);
log.info("消息发送完毕:发送时间为:{}", new Date());
}
}

定义消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Service
@Slf4j
public class ReceiveMessageService {
/**
* 监听正常的那个队列的名字,不是监听那个死信队列
* 从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
* channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
*/
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NORMAL_NAME})
public void receiveMsg(Message message, Channel channel) {
System.out.println("接收到的消息:" + message);
//对消息不确认,ack单词是 确认 的意思
// void basicNack(long deliveryTag, boolean multiple, boolean requeue)
// deliveryTag:消息的一个数字标签
// multiple:true表示对小于deliveryTag标签下的消息都进行Nack不确认;false表示只对当前deliveryTag标签的消息Nack
// requeue:true表示消息被Nack后重新发送到队列;false表示消息被Nack后不会重新发送到队列
// 获取消息属性
MessageProperties messageProperties = message.getMessageProperties();
// 获取消息的唯一标识,类似身份证或者学号
long deliveryTag = messageProperties.getDeliveryTag();
try {
byte[] body = message.getBody();
String str = new String(body);
log.info("接收到的消息为:{},接收时间为:{}", str, new Date());
//TODO 业务逻辑处理
// 这里模拟一个异常,出现异常后进行手动不确认并且不重新投递设置
int a = 1 / 0;
// 消费者的手动确认:false表示只确认当前消息;改成true为批量确认标志号小于当前标志号的所有消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("接收着出现问题:{}", e.getMessage());
try {
// 消费者的手动不确认:参数3为false表示不重新入队(不重新投递),就会变成死信;为true表示是重新入队
channel.basicNack(deliveryTag, false, false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}

测试结果:模拟异常,发现消息未重新入队,而是进入了死信队列

消费者拒绝消息

开启手动确认模式并拒绝消息,不重新投递,则进入死信队列

配置yaml时需要开启消费者手动确认模式,配置同上述消息不进行重新投递

定义MQ

这里的队列无需设置队列的过期时间,因为消费者拒绝后如果不重新投递就直接进入死信队列

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class RabbitConfig {
// ......
// 正常队列
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.QUEUE_DLX_NAME);
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).withArguments(arguments).build();
}
// 其他同上
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
String str = "hello world";
Message message = MessageBuilder.withBody(str.getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,
RabbitMQConstant.NORMAL_KEY, message);
log.info("消息发送完毕:发送时间为:{}", new Date());
}
}

定义消费者

消费者拒绝消息后不进行重新投递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
@Service
public class ReceiveMessageService {
/**
* 监听正常的那个队列的名字,不是监听那个死信队列
* 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
* channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
*/
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NORMAL_NAME})
public void receiveMsg(Message message, Channel channel) {
System.out.println("接收到的消息:" + message);
// 获取消息属性
MessageProperties messageProperties = message.getMessageProperties();
// 获取消息的唯一标识,类似身份证或者学号
long deliveryTag = messageProperties.getDeliveryTag();
try {
byte[] body = message.getBody();
String str = new String(body);
log.info("接收到的消息为:{},接收时间为:{}", str, new Date());
// TODO 业务逻辑处理
// 这里模拟一个业务异常,出现异常后进入消费者拒绝设置,不进行重新投递
int a = 1 / 0;
// 消费者的手动确认:false是只确认当前消息;改成true为批量确认标志号小于当前标志号的所有消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("接收着出现问题:{}", e.getMessage());
try {
// 拒绝消息:参数1是消息的标识,参数2是否重新入队(false表示拒绝后不重新入队),不可以批量处理
channel.basicReject(deliveryTag, false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}

测试结果:拒绝后的消息不再入队,正常和死信队列都没有了

死信应用场景

  • 监听死信消息以便查找问题
  • 可以实现延时队列业务

RabbitMQ 队列

队列属性

  1. Type:设置队列的队列类型;

  2. Name:队列名称,就是一个字符串,随便一个字符串就可以;

  3. Durability:声明队列是否持久化,代表队列在服务器重启后是否还存在;

  4. Auto delete:是否自动删除。true表示当没有消费者连接到这个队列时,队列会自动删除;

  5. Exclusive:该属性的队列只对第一个连接它的消费者可见(之后其它消费者无法访问该队列),连接断开时自动删除。基本设置成false

  6. Arguments:队列的其他属性,例如指定DLX(死信交换机等);

    1. x-expires:Number:当Queue(队列)在指定的时间未被访问则队列将被自动删除

    2. x-message-ttl:Number:发布的消息在队列中存在多长时间后被取消(单位毫秒)

    3. x-overflow:String:设置队列溢出行为,当达到队列的最大长度时消息的处理方式

      1. drop-head:删除头部消息
      2. reject-publish:超过队列长度后,后面发布的消息会被拒绝接收
      3. reject-publish-dlx:超过队列长度后,后面发布的消息会被拒绝接收并发布到死信交换机

      仲裁队列类型仅支持:drop-head and reject-publish两种

    4. x-max-length:Number:队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息

    5. x-max-length-bytes:Number:队列在开始从头部删除就绪消息之前可以包含的总正文大小。受限于内存大小,超过该阈值则从队列头部开始删除消息

    6. x-single-active-consumer:表示队列是否是只能有一个消费者。设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略设置为false时消息循环分发给所有消费者(默认false)

    7. x-dead-letter-exchange:String:指定队列关联的死信交换机。队列消息达上限后溢出消息不被删掉,而保存到另一队列;

    8. x-dead-letter-routing-key:String:指定死信交换机的路由键,一般和7一起定义

    9. x-queue-mode:String:队列类型x-queue-mode=lazy懒队列,在磁盘上尽可能多地保留消息以减少RAM使用。如果未设置则队列将保留内存缓存以尽可能快地传递消息;如果设置则队列消息将保存在磁盘上,消费时从磁盘上读取消费

    10. x-queue-master-locator:String(用的较少):在集群模式下设置队列分配到的主节点位置信息;

      每个queue都有一个master节点,所有对于queue的操作都是事先在master上完成,之后再slave上进行相同的操作;

      每个不同的queue可以坐落在不同的集群节点上,这些queue如果配置了镜像队列,那么会有1个master和多个slave

      基本上所有的操作都落在master上,那么如果这些queues的master都落在个别的服务节点上,而其他的节点又很空闲,这样就无法做到负载均衡,那么势必会影响性能;

      关于master queue host 的分配有几种策略,可以在queue声明的时候使用x-queue-master-locator参数,或者在policy上设置queue-master-locator,或者直接在rabbitmq的配置文件中定义queue_master_locator有三种可供选择的策略

      1. min-masters:选择master queue数最少的那个服务节点host;
      2. client-local:选择与client相连接的那个服务节点host;
      3. random:随机分配;

属性测试

非持久化属性测试

定义常量

1
2
3
4
5
6
7
8
9
10
11
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.1";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.test1.1";
public static final String QUEUE_NAME2 = "queue.properties.test1.2";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.test1.1";
public static final String ROUTING_NAME2 = "key.properties.test1.2";
}
// 示例中,各个示例的编号分别顺次为1,2,3,4,5...

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
// 正常队列 不持久化、不设置名字
@Bean
public Queue normalQueue1() {
return QueueBuilder.nonDurable().build();
}
// 正常队列 不持久化、设置名字
@Bean
public Queue normalQueue2() {
return QueueBuilder.nonDurable(RabbitMQConstant.QUEUE_NAME1).build();
}
@Bean
public Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {
return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
@Bean
public Binding bindingNormal2(DirectExchange normalExchange, Queue normalQueue2) {
return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);
}
}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
Message message = MessageBuilder.withBody("hello world1".getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
{
Message message = MessageBuilder.withBody("hello world2".getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME2, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}

测试结果

1
2
发行消息查看控制台发现。设置了名字的会使用自定义名字,未设置名字的队列会自动生成名字
不持久化的队列在服务器重启后队列会消失

持久化测试

是否自动删除。如果为true,当没有消费者连接到这个队列的时候,队列会自动删除

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
public class RabbitConfig {
// 交换机同上

// 正常队列 持久化、不设置名字
@Bean
public Queue normalQueue1() {
return QueueBuilder.durable().build();
}
// 正常队列 持久化、设置名字
@Bean
public Queue normalQueue2() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).build();
}
// 绑定同上
}

测试结果

1
2
发行消息查看控制台发现。设置了名字的会使用自定义名字,未设置名字的队列会自动生成名字
持久化的队列在服务器重启后队列依然存在

自动删除测试

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
public class RabbitConfig {
// 交换机同上
// 正常队列 持久化、不设置名字
@Bean
public Queue normalQueue1() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.autoDelete()// 设置自动删除
.build();
}
// 正常队列 持久化、设置名字
@Bean
public Queue normalQueue2() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME2)
// .autoDelete()// 设置不自动删除,默认就是不自动删除
.build();
}
// 绑定同上
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@Slf4j
public class ReceivemessageService {
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg1(Message message) {
log.info("1接收到的消息为:{}", new String(message.getBody()));
}
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME2})
public void receiveMsg2(Message message) {
log.info("2接收到的消息为:{}", new String(message.getBody()));
}
}

测试结果:设置为自动删除的队列1在没有消费者连接后被自动删除

可见性测试

普通队列允许的消费者没有限制,多个消费者绑定到同一个队列时,RabbitMQ会采用轮询进行投递。如果需要消费者独占队列,在队列创建的时候,设定属性exclusive为true。

exclusive有两个作用

  • 当连接关闭时connection.close()该队列是否会自动删除;

  • 该队列是否是私有的private

如果设置为false则可以使用两个消费者都访问同一个队列,没有任何问题;如果设置为true则会对当前队列加锁,其他连接connection是不能访问的,同一个连接的不同channel是可以访问的。如果强制访问会报如下异常:

1
It could be originally declared on another connection or the exclusive property value does not match that of the original declaration., class-id=60, method-id=20)

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
@Bean
public Queue normalQueue1() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.exclusive()//声明只有第一个连接的消费者可见,(之后其它消费者无法访问该队列),并且在连接断开时自动删除
//.autoDelete()// 设置自动删除
.build();
}
@Bean
public Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {
return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}

消费者

这里的消费者表示在同一个Connection中消费消息的多个消费者,测试是否同一个Connection中的多个消费者可以消费。代码同上。

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@SpringBootApplication
public class Rabbitmq08Properties04Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties04Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
// 程序一启动就会运行该方法
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();

{ // 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置工厂参数
factory.setHost("192.168.1.101");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("longdidi");
Connection connection = factory.newConnection();

// 3.创建channel
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法 当收到信息 自动执行该方法consumerTag
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("连接1接收到信息:" + new String(body));
}
};
channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true, consumer);
// 6.释放资源
channel.close();
connection.close();
}
{
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置工厂参数
factory.setHost("192.168.1.101");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("longdidi");
Connection connection = factory.newConnection();
// 3.创建channel
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 回调方法 当收到信息 自动执行该方法 consumerTag
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("连接2接收到信息:" + new String(body));
}
};
channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true, consumer);
// 6.释放资源
channel.close();
connection.close();
}
}
}

测试结果

1
2
3
4
5
6
7
1.启动程序,如果有多个Connection连接会报错:channel error;protocol method:#method...
2.注释掉主程序中的两个Connection连接代码重启测试,发现同一Connection内的多个消费者可以连接消费,输出如下
消息发送完毕,发送时间为:Thu Jul 31 14:20:30 GMT+08:00 2025
消息发送完毕,发送时间为:Thu Jul 31 14:20:30 GMT+08:00 2025
消费者1接收到的消息为:hello world2
消费者2接收到的消息为:hello world1
3.停止程序运行查看队列是否自动删除:当连接关闭时队列自动删除

Arguments测试

删除属性测试

x-expires属性:当Queue(队列)在指定的时间未被访问则队列将被自动删除。

定义常量

1
2
3
4
5
6
7
8
9
10
11
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.normal.1";
public static final String EXCHANGE_DLX_NAME = "exchange.properties.dlx.1";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.normal.1";
public static final String QUEUE_DLX_NAME1 = "queue.properties.dlx.1";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.normal.1";
public static final String ROUTING_DLX_NAME1 = "key.properties.dlx.1";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class RabbitConfig {
// 正常交换机
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
// 正常队列
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-expires", 10000L); // 当Queue(队列)在指定的时间未被访问则队列将被自动删除
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();
}
// 正常交换机和正常队列绑定
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}

测试结果:启动应用查看控制台,生成了队列,等待10s之后刷新控制台,队列被删除

设置队列过期时间

发布的消息在队列中存在多长时间后被取消(单位毫秒),参考 队列设置过期

设置队列长度

x-max-length:Number:队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息。参考 队列到达最大长度

设置队列溢出行为

当达到队列的最大长度时消息的处理方式:有效值为drop-head(删除头部消息)、reject-publish(拒绝发布)或reject-publish-dlx(拒绝发布到死信交换机)。仲裁队列类型仅支持drop-head and reject-publish两种。

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Configuration
public class RabbitConfig {
// 正常交换机
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
// 正常队列
@Bean
public Queue normalQueue() {
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,
// @Nullable Map<String, Object> arguments)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-length", 5);// 队列的溢出行为,删除头部
// 队列的溢出行为,删除头部(默认行为)
// arguments.put("x-overflow", "drop-head");
// 队列的溢出行为,拒绝发布
//arguments.put("x-overflow", "reject-publish");
// 队列的溢出行为,拒绝接收消息,超过长度的消息会被发送到死信交换机而不是拒绝接收
arguments.put("x-overflow", "reject-publish-dlx");
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.withArguments(arguments)
.deadLetterExchange(RabbitMQConstant.EXCHANGE_DLX_NAME)
.deadLetterRoutingKey(RabbitMQConstant.ROUTING_DLX_NAME1)
.build();
}
// 正常交换机和正常队列绑定
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
// 死信交换机
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
// 死信队列
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME1).build();
}
// 死信交换机和死信队列绑定
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME1);
}
}

测试结果

1
2
3
4
5
6
1.当属性设置为drop-head时
如果超过队列长度,先入队的消息会被先删除(如果配置了死信交换机则会移至死信交换机)
2.当属性设置为reject-publish时
该模式下当队列达到最天长度后会拒绝收消息,也不会将队头的消息移至死信队列(如果配置了死信队列的话),超长的消息未被接收
3.当属性设置为reject-publish-dlx时
拒绝接收后面的消息并将拒绝的消息放到死信交换机中,后面发送的消息会存储到死信队列中

设置队列内存大小

x-max-length-bytes:Number:队列在开始从头部删除就绪消息之前可以包含的总正文大小。受限于内存大小,超过该阈值则从队列头部开始删除消息

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-length-bytes", 30); // 队列的内存大小
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();
}
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
for (int i = 0; i < 3; i++) {
String str = "你好我好大家好" + i;
Message message = MessageBuilder.withBody(str.getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
for (int i = 0; i < 3; i++) {
String str = "hello" + i;
Message message = MessageBuilder.withBody(str.getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}

测试结果:队列中只剩3条信息,先入队的超过内存的限制的消息被删除了

设置单一消费者

表示队列是否是只能有一个消费者。设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略;设置为false时消息循环分发给所有消费者(默认false)。

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-single-active-consumer", true);//队列的最大长度
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();
}
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@Slf4j
public class ReceivemessageService {
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg1(Message message) {
log.info("消费者1接收到的消息为:{}", new String(message.getBody()));
}
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg2(Message message) {
log.info("消费者2接收到的消息为:{}", new String(message.getBody()));
}
}

测试结果:只有消费者1收到了消息

设置死信交换机

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
public class RabbitConfig {
// 正常交换机同上

@Bean
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//死信交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME1);//死信路由key

return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.ttl(5000) // 设置超时时间
.withArguments(arguments)
.build();
}
// 其他同上
}

测试结果:超时的消息放入了死信队列


RabbitMQ 消息可靠性

生产者确认

Confirm模式简介

image-20250731150818210

  1. 代表消息从生产者发送到Exchange;
  2. 代表消息从Exchange路由到Queue;
  3. 代表消息在Queue中存储;
  4. 代表消费者监听Queue并消费消息;

可能因为网络或者Broker的问题导致①失败,而此时应该让生产者知道消息是否正确发送到了Broker的exchange中;

有两种解决方案:

  • 开启Confirm(确认)模式;(异步)
  • 开启Transaction(事务)模式;(性能低,实际项目中很少用)

消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;

image-20250731150944952

Confirm模式实现

  • 开启生产者确认模式

    1
    2
    3
    4
    5
    6
    7
    8
    spring:
    rabbitmq:
    host: 192.168.10.11
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi
    publisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式
  • 实现RabbitTemplate.ConfirmCallback交界口,重写confirm()方法

    判断成功和失败的ack结果,可以根据具体的结果,如果ack为false,对消息进行重新发送或记录日志等处理

  • 设置rabbitTemplate的确认回调方法

    1
    rabbitTemplate.setConfirmCallback(messageConfirmCallBack);

定义常量

1
2
3
4
5
6
7
8
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.confirm.normal.1";
// 队列
public static final String QUEUE_NAME1 = "queue.confirm.normal.1";
// 路由key
public static final String ROUTING_NAME1 = "key.confirm.normal.1";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
@Bean
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();
}
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}

外部类实现

生产者:设置回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Resource
private MyConfirmCallBack confirmCallBack;
// 构造方法执行后会调用一次该方法,只调用一次,起到初始化的作用
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(confirmCallBack);
}
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
// 关联数据对象
CorrelationData correlationData = new CorrelationData(); //关联数据
// 比如设置一个订单ID,到时候在confirm回调里面就可以知道是哪个订单没有发送到交换机上去
correlationData.setId("order_123456"); // 发送订单信息
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error",
RabbitMQConstant.ROUTING_NAME1, message, correlationData);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

实现接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
@Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 交换机收到消息后,会回调该方法
* @param correlationData 相关联的数据
* @param ack 有两个取值,true和false,true表示成功:消息正确地到达交换机,反之false就是消息没有正确地到达交换机
* @param cause 消息没有正确地到达交换机的原因是什么
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("关联id为:{}", correlationData.getId() + "");
if (ack) {
log.info("消息正确的达到交换机");
return;
} else {
// ack =false 没有到达交换机
log.error("消息没有到达交换机,原因为:{}", cause);
}
}
}

生产者实现

生产者:生产者直接实现RabbitTemplate.ConfirmCallback接口,重写confirm()方法并设置回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
@Slf4j
public class SendMessageService implements RabbitTemplate.ConfirmCallback {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
CorrelationData correlationData = new CorrelationData(); //关联数据
correlationData.setId("order_123456"); // 发送订单信息
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error",
RabbitMQConstant.QUEUE_NAME1, message, correlationData);
log.info("消息发送完毕,发送时间为:{}", new Date());
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("关联id为:{}", correlationData.getId() + "");
if (ack) {
log.info("消息正确的达到交换机");
return;
}
log.error("消息没有到达交换机,原因为:{}", cause);
}
}

匿名内部内实现

生产者:生产者使用匿名内部类实现RabbitTemplate.ConfirmCallback接口,重写confirm()方法并设置回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Service
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(
new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("关联id为:{}", correlationData.getId() + "");
if (ack) {
log.info("消息正确的达到交换机");
return;
}
log.error("消息没有到达交换机,原因为:{}", cause);
}
}
);
}
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
CorrelationData correlationData = new CorrelationData(); //关联数据
correlationData.setId("order_123456"); //发送订单信息
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error",
RabbitMQConstant.QUEUE_NAME1, message, correlationData);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

Lambad 实现

生产者:生产者使用lambda实现RabbitTemplate.ConfirmCallback接口,重写confirm()方法并设置回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(
// 使用lambda 表达式
(correlationData, ack, cause) -> {
log.info("关联id为:{}", correlationData.getId() + "");
if (ack) {
log.info("消息正确的达到交换机");
return;
}
log.error("消息没有到达交换机,原因为:{}", cause);
}
);
}
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
CorrelationData correlationData = new CorrelationData(); //关联数据
correlationData.setId("order_123456"); //发送订单信息
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error",
RabbitMQConstant.ROUTING_NAME1, message, correlationData);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

测试结果:写错交换机的名字,查看日志输出

1
2
关联id为:order_123456
消息没有到达交换机,原因为:channel error;protocol method:#method<channel

交换机确认

可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败;

使用return模式可以实现消息无法路由的时候返回给生产者,消息从 exchange –> queue 投递失败则会返回一个 returnCallback,利用这个callback控制消息的可靠性投递;

image-20250731150818210

return模式实现

  • 开启return确认模式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    spring:
    rabbitmq:
    host: 192.168.10.11
    port: 5672
    username: admin
    password: 123456
    virtual-host: longdidi
    publisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式
    publisher-returns: true #开启return模式
  • 实现RabbitTemplate.ReturnsCallback接口并重写returnedMessage()方法

  • 设置回调

    使用rabbitTemplate.setReturnCallback设置退回函数

    当消息从exchange路由到queue失败后,则会将消息退回给producer,并执行回调函数returnedMessage;

定义常量

1
2
3
4
5
6
7
8
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.return.normal.1";
// 队列
public static final String QUEUE_NAME1 = "queue.return.normal.1";
// 路由key
public static final String ROUTING_NAME1 = "key.return.normal.1";
}

外部类实现

生产者:设置回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MyReturnCallBack myReturnCallBack;
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(myReturnCallBack); //设置回调
}
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1 + "error", message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

外部实现接口

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {
/**
* 当消息从交换机 没有正确地 到达队列,则会触发该方法
* 如果消息从交换机 正确地 到达队列了,那么就不会触发该方法
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}", returnedMessage.getReplyText());
}
}

生产者实现

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
@Slf4j
public class SendMessageService implements RabbitTemplate.ReturnsCallback {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(this); // 设置回调
}
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1 + "error", message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}", returnedMessage.getReplyText());
}
}

匿名内部类实现

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Service
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(
// 匿名内部类 实现了接口
new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}",
returnedMessage.getReplyText());
}
}
); // 设置回调
}
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes()).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1 + "error", message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

lambda实现

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(
message -> {
log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}", message.getReplyText());
}
);
}
public void sendMsg() {
Message message = MessageBuilder.withBody("hello world".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1 + "error", message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

测试结果

1
2
消息发送完毕,发送时间为:Thu Jul 31 15:32:48 GMT+08:00 2025
消息从交换机没有正确的路由到(投递到)队列,原因为:NO_ROUTE

使用备用交换机

image-20250731150818210

使用备份交换机(alternate-exchange),无法路由的消息会发送到这个备用交换机上;

定义常量

1
2
3
4
5
6
7
8
9
10
11
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.backup.1";
// 备用交换机
public static final String EXCHANGE_BACKUP_NAME = "exchange.backup.1";
// 正常队列
public static final String QUEUE_NORMAL_NAME = "queue.normal.backup.1";
// 备用队列
public static final String QUEUE_BACKUP_NAME = "queue.backup.1";
public static final String ROUTING_WARNING_KEY = "info1";
}

定义MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
public class RabbitConfig {
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder // 默认为持久化的,默认不自动删除
.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME) // 交换机的名字
.alternate(RabbitMQConstant.EXCHANGE_BACKUP_NAME) // 设置备用交换机 alternate-exchange
.build();
}
@Bean
public Queue queueNormal() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).build();
}
// 注意 ROUTING_WARNING_KEY
@Bean
public Binding binding(DirectExchange normalExchange, Queue queueNormal) {
return BindingBuilder.bind(queueNormal)
.to(normalExchange)
.with(RabbitMQConstant.ROUTING_WARNING_KEY);
}
@Bean
public FanoutExchange alternateExchange() {
return ExchangeBuilder.fanoutExchange(RabbitMQConstant.EXCHANGE_BACKUP_NAME).build();
}
@Bean
public Queue alternateQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_BACKUP_NAME).build();
}
@Bean
public Binding bindingAlternate(FanoutExchange alternateExchange, Queue alternateQueue) {
return BindingBuilder.bind(alternateQueue).to(alternateExchange);
}
}

测试结果:消息存储到了备用队列

消息持久化存储

消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;

image-20250731150818210

可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;

解决方案:

  1. 队列持久化

    1
    QueueBuilder.durable("队列名称").build();
  2. 交换机持久化

    1
    ExchangeBuilder.directExchange("交换机名称").durable(true).build();
  3. 消息持久化,默认就是持久化的

    1
    2
    3
    MessageProperties messageProperties = new MessageProperties();
    // 设置消息持久化,当然它默认就是持久化,所以可以不用设置
    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

消费者手动确认

手动确认消息

image-20250731150818210

采用消息消费时的手动ack确认机制来保证;如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
rabbitmq:
host: 192.168.10.11
port: 5672
username: admin
password: 123456
virtual-host: longdidi
publisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式
publisher-returns: true #开启return模式
# 开启消费者手动确认
listener:
simple:
acknowledge-mode: manual

消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Service
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 开启生产者的确定模式
rabbitTemplate.setConfirmCallback(
(correlationData, ack, cause) -> {
if (!ack) {
log.error("消息没有到达交换机,原因为:{}", cause);
// TODO 重发消息或者记录错误日志
}
}
);
// 开启交换机确认模式
rabbitTemplate.setReturnsCallback(
returnedMessage -> {
log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}", returnedMessage.getReplyText());
// TODO 记录错误日志,给程序员发短信或者或者邮件
}
);
}
public void sendMsg() {
MessageProperties messageProperties = new MessageProperties();
// 设置单条消息的持久化,默认就是持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = MessageBuilder.withBody("hello world".getBytes())
.andProperties(messageProperties).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1+"1", message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
@Slf4j
public class ReceiveMessageService {
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg(Message message, Channel channel) {
// 获取消息的唯一标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("接收到的消息为:{}", new String(message.getBody()));
// TODO 插入订单等
// int a=10/0;
// 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("消息处理出现问题");
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}

消息的幂等性(消息不被重复消费)

同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;

幂等性:对于一个资源,不管请求一次还是多次,对该资源本身造成的影响应该是相同的,不能因重复请求而对该资源重复造成影响;

以接口幂等性举例

接口幂等性:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的;

比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的;

如何避免消息的重复消费问题?(消息消费时的幂等性)

全局唯一ID + Redis:生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令(setnx(messageId, 1)),将messageId作为key放到redis中,若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;

实体类

1
2
3
4
5
6
7
8
9
10
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Orders implements Serializable {
private String orderId;
private String orderName;
private BigDecimal orderMoney;
private Date orderTime; // 下单时间
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Service
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
// 这个对象可以进行序列化和反序列化(json格式)
@Resource
private ObjectMapper objectMapper;

@PostConstruct
public void init() {
// 开启生产者的确定模式
rabbitTemplate.setConfirmCallback(
(correlationData, ack, cause) -> {
if (!ack) {
log.error("消息没有到达交换机,原因为:{}", cause);
// TODO 重发消息或者记录错误日志
}
}
);
rabbitTemplate.setReturnsCallback(
returnedMessage -> {
log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}", returnedMessage.getReplyText());
// TODO 记录错误日志,给程序员发短信或者或者邮件
}
);
}
public void sendMsg() throws JsonProcessingException {
{
// 创建订单
Orders orders1 = Orders.builder()
.orderId("order_12345")
.orderName("买的手机")
.orderMoney(new BigDecimal(2356))
.orderTime(new Date())
.build();
// 转成json
String strOrders1 = objectMapper.writeValueAsString(orders1);
MessageProperties messageProperties = new MessageProperties();
// 设置单条消息的持久化,默认就是持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = MessageBuilder.withBody(strOrders1.getBytes())
.andProperties(messageProperties).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1, message);
}
{
Orders orders2 = Orders.builder()
.orderId("order_12345")
.orderName("买的手机")
.orderMoney(new BigDecimal(2356))
.orderTime(new Date())
.build();
String strOrders2 = objectMapper.writeValueAsString(orders2);
MessageProperties messageProperties = new MessageProperties();
// 设置单条消息的持久化,默认就是持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = MessageBuilder.withBody(strOrders2.getBytes())
.andProperties(messageProperties).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1, message);
}

log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Component
@Slf4j
public class ReceiveMessageService {
@Resource
private ObjectMapper objectMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg(Message message, Channel channel) throws IOException {
// 获取消息的唯一标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 使用objectmapper把字节数组反序列化成对象
Orders orders = objectMapper.readValue(message.getBody(), Orders.class);
try {
log.info("接收到的消息为:{}", orders.toString());
// 如果不存在就在redis中存储
Boolean setResult = stringRedisTemplate.opsForValue()
.setIfAbsent("idempotent:" + orders.getOrderId(), orders.getOrderId());
if (setResult) {
// TODO 向数据库插入订单等
log.info("向数据库插入订单");
}
// 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("消息处理出现问题");
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}

测试结果

1
2
3
4
消息发送完毕,发送时间为:Thu Jul 31 16:04:07 GMT+08:00 2025
接收到的消息为:0rders(orderId=order_12345,orderName=买的手机,rderMoney=2356...
向数据库插入订单
接收到的消息为:0rders(orderId=order_12345,orderName=买的手机,rderMoney=2356...