RabbitMQ 是实现了高级消息队列 协议(AMQP )的开源消息代理软件(亦称面向消息的中间件 )。RabbitMQ服务器是用Erlang 语言编写的,而集群和故障转移 是构建在开放电信平台 框架上的,支持高并发处理和分布式部署。所有主要的编程语言 均有与代理接口通讯的客户端库。RabbitMQ本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,更适合于企业级的开发。它还实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队,通过队列机制实现应用程序间的异步通信与数据传输,常用于流量削峰、系统解耦及异步处理场景。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。
参考文章:
5.RabbitMQ交换机详解
7.RabbitMQ延时交换机
6.RabbitMQ死信队列
8.RabbitMQ队列详解
9.RabbitMQ消息的可靠性
RabbitMQ 交换机 Exchange(X) 可翻译成交换机/交换器/路由器
RabbitMQ交换器 (Exchange)类型
Fanout Exchange(扇形交换机):广播,分发模式 ,把消息分发给所有订阅者 。广播,将消息交给所有绑定到交换机的队列
Direct Exchange(直连交换机):定向,把消息交给符合指定routing key 的队列。发送方把消息发送给订阅方 ,针对多个订阅者,默认采取轮询方式进行消息发送 。
Topic Exchange(主题交换机):通配符,匹配订阅模式 ,使用正则匹配 到消息队列,把消息交给符合routing pattern(路由模式) 的队列
Headers Exchange(头部交换机):与 direct 类似,只是性能很差,此类型几乎用不到。
x-local-random Exchange
自定义交换机
使用流程:引入依赖 → 配置MQ(yaml)→ 定义常量 → 定义MQ → 定义生产者 → 定义消费者 → 发送消息 → 测试结果
引入依赖,配置MQ(yaml)都一样,根据不同类型,不同情况。定义常量、MQ、生产者、消费者略有差别。
Fanout Exchange 扇形交换机会将消息投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;消息发送流程如下:
1 2 3 4 5 6 1. 可以有多个消费者 2. 每个消费者有自己的queue(队列) 3. 每个队列都要绑定到Exchange(交换机) 4. 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。 5. 交换机把消息发送给绑定过的所有队列 6. 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
定义常量
1 2 3 4 5 public class RabbitMQConstant { public static final String EXCHANGE_FANOUT = "exchange.fanout" ; public static final String QUEUE_FANOUT_A = "queue.fanout.a" ; public static final String QUEUE_FANOUT_B = "queue.fanout.B" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Configuration public class RabbitConfig { @Bean public FanoutExchange fanoutExchange () { return new FanoutExchange (RabbitMQConstant.EXCHANGE_FANOUT); } @Bean public Queue queueA () { return new Queue (RabbitMQConstant.QUEUE_FANOUT_A); } @Bean public Queue queueB () { return new Queue (RabbitMQConstant.QUEUE_FANOUT_B); } @Bean public Binding bingingA (FanoutExchange fanoutExchange, Queue queueA) { return BindingBuilder.bind(queueA).to(fanoutExchange); } @Bean public Binding bingingB (FanoutExchange fanoutExchange, Queue queueB) { return BindingBuilder.bind(queueB).to(fanoutExchange); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { String msg = "hello world" ; Message message = new Message (msg.getBytes()); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_FANOUT, "" , message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
定义消费者
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public class ReceiveMessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_FANOUT_A,RabbitMQConstant.QUEUE_FANOUT_B}) public void receiveMsg (Message message) { byte [] body = message.getBody(); String msg = new String (body); log.info("接收到的消息为:{}" ,msg); } }
测试结果
1 2 3 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 接收到的消息为:hello world 接收到的消息为:hello world
Direct Exchange 在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。路由键与队列名完全匹配的交换机。
1 2 3 4 P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。 X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列 C1:消费者,其所在队列指定了需要routing key 为 error 的消息 C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
定义常量
1 2 3 4 5 6 7 public class RabbitMQConstant { public static final String EXCHANGE_DIRECT = "exchange.direct" ; public static final String QUEUE_DIRECT_A = "queue.direct.a" ; public static final String QUEUE_DIRECT_B = "queue.direct.b" ; public static final String ROUTING_KEY = "info" ; public static final String ROUTING_WARNING_KEY = "warn" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Configuration public class RabbitConfig { @Bean public DirectExchange directExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DIRECT).build(); } @Bean public Queue queueA () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT_A).build(); } @Bean public Queue queueB () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT_B).build(); } @Bean public Binding bindingA (DirectExchange directExchange, Queue queueA) { return BindingBuilder.bind(queueA).to(directExchange).with(RabbitMQConstant.ROUTING_KEY); } @Bean public Binding bindingB1 (DirectExchange directExchange, Queue queueB) { return BindingBuilder.bind(queueB).to(directExchange).with(RabbitMQConstant.ROUTING_KEY); } @Bean public Binding bindingB2 (DirectExchange directExchange, Queue queueB) { return BindingBuilder.bind(queueB).to(directExchange).with(RabbitMQConstant.ROUTING_WARNING_KEY); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, RabbitMQConstant.ROUTING_KEY, message); Message error_message = MessageBuilder.withBody("error world" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, RabbitMQConstant.ROUTING_WARNING_KEY, error_message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
定义消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Component @Slf4j public class ReceiveMessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT_A}) public void receiveMsgA (Message message) { byte [] body = message.getBody(); String msg = new String (body); log.info("接收到A的消息为:{}" , msg); } @RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT_B}) public void receiveMsgB (Message message) { byte [] body = message.getBody(); String msg = new String (body); log.info("接收到B的消息为:{}" , msg); } }
测试结果
1 2 3 4 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 接收到B的消息为:hello world 接收到A的消息为:hello world 接收到B的消息为:error world
Topic Exchange 1 2 Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!即direct是完全匹配,topic模式是模糊匹配。 通配符匹配(相当于模糊匹配),Topic模式中路由键通过"." 分为多个部分。
通配符规则:用”.”隔开的为一个单词。
#:匹配一个或多个词
1 beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx
*:匹配不多不少恰好 1 个词。必须有一个而且只有一个
1 beijing.* == beijing.queue, beijing.xyz
定义常量
1 2 3 4 5 public class RabbitMQConstant { public static final String EXCHANGE_TOPIC = "exchange.topic" ; public static final String QUEUE_TOPIC_A = "queue.topic.a" ; public static final String QUEUE_TOPIC_B = "queue.topic.b" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Configuration public class RabbitConfig { @Bean public TopicExchange topicExchange () { return ExchangeBuilder.topicExchange(RabbitMQConstant.EXCHANGE_TOPIC).build(); } @Bean public Queue queueA () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_TOPIC_A).build(); } @Bean public Queue queueB () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_TOPIC_B).build(); } @Bean public Binding bindingA (TopicExchange topicExchange, Queue queueA) { return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*" ); } @Bean public Binding bindingB1 (TopicExchange topicExchange, Queue queueB) { return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit" ); } @Bean public Binding bindingB2 (TopicExchange topicExchange, Queue queueB) { return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#" ); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_TOPIC, "lazy.orange.rabbit" , message); } }
定义消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Component @Slf4j public class ReceiveMessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_TOPIC_A}) public void receiveMsgA (Message message) { byte [] body = message.getBody(); String msg = new String (body); log.info("接收到A的消息为:{}" , msg); } @RabbitListener(queues = {RabbitMQConstant.QUEUE_TOPIC_B}) public void receiveMsgB (Message message) { byte [] body = message.getBody(); String msg = new String (body); log.info("接收到B的消息为:{}" , msg); } }
测试结果
1 2 接收到A的消息为:hello world 接收到B的消息为:hello world
基于消息内容中的headers属性进行匹配,不是根据路由键匹配。headers交换器和direct交换器完全一致,但性能差很多,几乎不用了。
消费方指定的headers中必须包含一个”x-match”键,”x-match”键的值有两个
发消息时可以指定消息属性(MessageProperties),如果heanders中包含多个消息属性,则所有属性都匹配上才算匹配上。
定义常量
1 2 3 4 5 public class RabbitMQConstant { public static final String EXCHANGE_HEADERS = "exchange.headers" ; public static final String QUEUE_HEADERS_A = "queue.headers.a" ; public static final String QUEUE_HEADERS_B = "queue.headers.b" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Configuration public class RabbitConfig { @Bean public HeadersExchange headersExchange () { return ExchangeBuilder.headersExchange(RabbitMQConstant.EXCHANGE_HEADERS).build(); } @Bean public Queue queueA () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_HEADERS_A).build(); } @Bean public Queue queueB () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_HEADERS_B).build(); } @Bean public Binding bindingA (HeadersExchange headersExchange, Queue queueA) { Map<String, Object> headerValues = new HashMap <>(); headerValues.put("type" , "m" ); headerValues.put("status" , 1 ); return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match(); } @Bean public Binding bindingB (HeadersExchange headersExchange, Queue queueB) { Map<String, Object> headerValues = new HashMap <>(); headerValues.put("type" , "s" ); headerValues.put("status" , 0 ); return BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match(); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { MessageProperties messageProperties = new MessageProperties (); Map<String, Object> headers = new HashMap <>(); headers.put("type" , "s" ); headers.put("status" , 0 ); messageProperties.setHeaders(headers); Message message = MessageBuilder.withBody("hello world" .getBytes()) .andProperties(messageProperties).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_HEADERS, "" , message); log.info("消息发送完毕!!!" ); } }
定义消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Component @Slf4j public class ReceiveMessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_HEADERS_A}) public void receiveMsgA (Message message) { byte [] body = message.getBody(); String msg = new String (body); log.info("接收到A的消息为:{}" , msg); } @RabbitListener(queues = {RabbitMQConstant.QUEUE_HEADERS_B}) public void receiveMsgB (Message message) { byte [] body = message.getBody(); String msg = new String (body); log.info("接收到B的消息为:{}" , msg); } }
测试结果
1 2 消息发送完毕! 接收到B的消息为:hello world
自定义交换机 参考:延时交换机使用方式
交换机属性 属性介绍
name:交换机名称,就是一个字符串
Type:交换机类型(direct、topic、fanout、headers、x-local-random)五种
durable:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在
autoDelete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机
internal:内部使用的,如果是yes表示客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定
arguments:只有一个取值alternate-exchange,表示备用交换机
alternate:设置备用交换机
使用示例 定义常量
1 2 3 4 public class RabbitMQConstant { public static final String EXCHANGE_DIRECT = "exchange.direct" ; public static final String QUEUE_DIRECT = "queue.direct" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration public class RabbitConfig { @Bean public DirectExchange directExchange () { return ExchangeBuilder .directExchange(RabbitMQConstant.EXCHANGE_DIRECT) .autoDelete() .durable(false ) .internal() .build(); } @Bean public Queue queue () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT).build(); } @Bean public Binding binding (DirectExchange directExchange, Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with("info" ); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()) .build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, "info" , message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, "" , message); log.info("消息发送完毕!!!" ); } }
定义消费者
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public class ReceiveMessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT}) public void receiveMsgA (Message message) { byte [] body = message.getBody(); String msg = new String (body); log.info("接收到的消息为:{}" , msg); } }
测试结果
不加条件
1 2 3 消息发送完毕,发送时间为:Wed Jul 30 22 :01 :46 GMT+08:00 2025 消息发送完毕! 接收到的消息为:hello world
当配置属性internal时报错
1 2 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code:403 reply-text:ACCESS_REFUSED....
测试持久化按上
设置持久化为false,重启rabbitmq-server,则交换机丢失
测试配置autoDelete
自动删除为 true,从控制台上手动解绑,会发现自动删除
备用交换机 使用场景 当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息。如果想要监测哪些消息被投递到没有对应的队列,可以用备用交换机来实现,然后接收备用交换机的消息从而记录日志或发送报警信息。
使用示例 注意:备用交换机一般使用fanout交换机
测试时:指定一个错误路由
重点:普通交换机设置参数绑定到备用交换机
定义常量
1 2 3 4 5 6 7 8 9 10 11 public class RabbitMQConstant { public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.backup" ; public static final String EXCHANGE_BACKUP_NAME = "exchange.backup" ; public static final String QUEUE_NORMAL_NAME = "queue.normal.backup" ; public static final String QUEUE_BACKUP_NAME = "queue.backup" ; public static final String ROUTING_WARNING_KEY = "info" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { Map<String, Object> arguments = new HashMap <>(); return ExchangeBuilder .directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME) .alternate(RabbitMQConstant.EXCHANGE_BACKUP_NAME) .build(); } @Bean public Queue queueNormal () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).build(); } @Bean public Binding binding (DirectExchange normalExchange, Queue queueNormal) { return BindingBuilder.bind(queueNormal).to(normalExchange) .with(RabbitMQConstant.ROUTING_WARNING_KEY); } @Bean public FanoutExchange alternateExchange () { return ExchangeBuilder.fanoutExchange(RabbitMQConstant.EXCHANGE_BACKUP_NAME).build(); } @Bean public Queue alternateQueue () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_BACKUP_NAME).build(); } @Bean public Binding bindingAlternate (FanoutExchange alternateExchange, Queue alternateQueue) { return BindingBuilder.bind(alternateQueue).to(alternateExchange); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { Message message= MessageBuilder.withBody("hello world" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,"test" ,message); log.info("消息发送完毕,发送时间为:{}" ,new Date ()); } }
定义消费者
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public class ReceiveMessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_BACKUP_NAME}) public void receiveMsg (Message message) { byte [] body = message.getBody(); String msg = new String (body); log.info("接收到备用的消息为:{}" , msg); } }
发送消息
故意写错路由key,由于正常交换机设置了备用交换机,所以该消息就会进入备用交换机从而进入备用队列。可以写一个程序接收备用对列的消息,接收到后通知相关人员进行处理。如果正常交换机没有设置备用交换机,则该消息会被抛弃。
测试结果
1 2 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 接收到备用的消息为:hello world
RabbitMQ 延时交换机 延时问题 场景 :有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了。
解决方式 :
定时任务方式 :每隔3秒扫描一次数据库,查询过期的订单然后进行处理;
优点: 简单,容易实现;
缺点:
存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;
性能较差,每次扫描数据库,如果订单量很大会影响性能
被动取消 :当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);
优点: 对服务器而言,压力小;
缺点:
用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;
用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;
JDK延迟队列(单体应用,不能分布式下):DelayedQueue。无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素
优点: 实现简单,任务延迟低
缺点:
服务重启、宕机,数据丢失;
只适合单机版,不适合集群;
订单量大,可能内存不足而发生异常;
采用消息中间件(rabbitmq)
RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。
不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样。
延时交换机 死信队列实现 实现方式 给正常队列绑定一个死信交换机和设置死信路由key;给正常队列设置消息过期时间,过期时间用于模拟延时操作,当消息过期后没有被消费就会转到死信队列。
定义常量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class RabbitMQConstant { public static final String EXCHANGE_NAME = "exchange.delay.normal.1" ; public static final String EXCHANGE_DLX_NAME = "exchange.delay.dlx.1" ; public static final String QUEUE_NAME = "queue.delay.normal.1" ; public static final String QUEUE_DLX_NAME = "queue.delay.dlx.1" ; public static final String ROUTING_NAME = "order1" ; public static final String ROUTING_DLX_NAME = "error1" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME); arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.ROUTING_DLX_NAME); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME) .ttl(25000 ) .withArguments(arguments) .build(); } @Bean public Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME); } @Bean public DirectExchange dlxExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build(); } @Bean public Queue dlxQueue () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build(); } @Bean public Binding bindingDlx (DirectExchange dlxExchange, Queue dlxQueue) { return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 @Service @Slf4j public class SendMessageService { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
测试结果
存在的问题 如果不设置队列的过期时间,在发送消息时设置消息的过期时间会存在以下问题
若队头的消息过期时间长,后面的消息过期时间短,但因为队头的消息没有被消费,因此后面已过期的消息也无法到达死信队列中
多队列解决过期时间问题 对于上面存在的问题,可以将不同过期时间的消息发送到不同的队列上,过期后再转到死信队列上
定义MQ
注意:正常队列不要设置过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME); arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.ROUTING_DLX_NAME); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME) .withArguments(arguments) .build(); } @Bean public Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME); } @Bean public DirectExchange dlxExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build(); } @Bean public Queue dlxQueue () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build(); } @Bean public Binding bindingDlx (DirectExchange dlxExchange, Queue dlxQueue) { return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME); } }
定义生产者
发送消息时先发送一条过期时间长的,再发送一条过期时间短的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Service @Slf4j public class SendMessageService { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg () { { MessageProperties messageProperties = new MessageProperties (); messageProperties.setExpiration("25000" ); Message message = MessageBuilder.withBody("hello world 1" .getBytes()) .andProperties(messageProperties) .build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } { MessageProperties messageProperties = new MessageProperties (); messageProperties.setExpiration("15000" ); Message message = MessageBuilder.withBody("hello world 2" .getBytes()) .andProperties(messageProperties) .build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } } }
定义消费者
消费者监听死信队列的消息来查看消息接收的时间
1 2 3 4 5 6 7 8 9 10 @Component @Slf4j public class ReceiveMessageService { @RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME) public void receiveMsg (Message message) { String body = new String (message.getBody()); log.info("接收到的消息为:{},接收时间为:{}" , body, new Date ()); } }
测试结果 :先接收到的是队头的消息
1 2 3 4 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 接收到的消息为:hello world 1,接收时间为:Wed Jul 30 22:02:16 GMT+08:00 2025 接收到的消息为:hello world 2,接收时间为:Wed Jul 30 22:02:16 GMT+08:00 2025
使用延时插件 安装插件
下载
选择对应的版本下载 rabbitmq_delayed_message_exchange 插件。官网下载地址community-plugins 。
插件拷贝到 RabbitMQ 服务器plugins目录下
解压缩
1 2 3 yum install unzip -y cd /usr/local/rabbitmq_server-4.0.7/pluginsunzip rabbitmq_delayed_message_exchange-v4.0.7.ez
启用插件
1 2 cd /usr/local/rabbitmq_server-4.0.7/sbin/./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查询安装情况
重启Rabbitmq使其生效
实现原理 消息发送后不会直接投递到队列,而是先存储到内嵌的 Mnesia数据库中,然后会检查 x-delay 时间(消息头部),将过期的消息放到死信队列中。延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;
Mnesia 是一个小型数据库,不适合于大量延迟消息的实现。解决了消息过期时间不一致出现的问题。
实现延时队列 消息只要发送到延时交换机即可,延时交换机绑定死信路由的key
定义常量
1 2 3 4 5 6 public class RabbitMQConstant { public static final String EXCHANGE_NAME = "exchange.delay.4" ; public static final String QUEUE_NAME = "queue.delay.4" ; public static final String ROUTING_NAME = "plugin4" ; }
定义MQ
注意延时交换机需要使用自定义类型定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration public class RabbitConfig { @Bean public CustomExchange customExchange () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-delayed-type" , "direct" ); return new CustomExchange (RabbitMQConstant.EXCHANGE_NAME, "x-delayed-message" , true , false , arguments); } @Bean public Queue queue () { return QueueBuilder .durable(RabbitMQConstant.QUEUE_NAME) .build(); } @Bean public Binding binding (CustomExchange customExchange, Queue queue) { return BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs(); } }
定义生产者
生产者发送消息时要在headers中添加过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Service @Slf4j public class SendMessageService { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg () { { MessageProperties messageProperties = new MessageProperties (); messageProperties.setHeader("x-delay" , 25000 ); Message message = MessageBuilder.withBody("hello world 1" .getBytes()) .andProperties(messageProperties) .build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } { MessageProperties messageProperties = new MessageProperties (); messageProperties.setHeader("x-delay" , 15000 ); Message message = MessageBuilder.withBody("hello world 2" .getBytes()) .andProperties(messageProperties) .build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } } }
定义消费者
1 2 3 4 5 6 7 8 9 10 @Component @Slf4j public class ReceiveMessageService { @RabbitListener(queues = RabbitMQConstant.QUEUE_NAME) public void receiveMsg (Message message) { String body = new String (message.getBody()); log.info("接收到的消息为:{},接收时间为:{}" , body, new Date ()); } }
RabbitMQ 死信队列 过期消息(Time To Live,TTL) 消息的过期时间有两种设置方式:(过期消息)
单条消息过期 :单条消息的过期时间决定了在没有任何消费者消费时,消息可以存活多久;
队列属性设置所有消息过期 :队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存活多久;
如果消息和对列都设置过期时间,则消息的TTL以两者之间较小的那个数值为准。
设置消息过期时间 定义常量
1 2 3 4 5 public class RabbitMQConstant { public static final String EXCHANGE_NAME = "exchange.ttl.a" ; public static final String QUEUE_TNAME = "queue.ttl.a" ; public static final String ROUTING_KEY = "info" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Configuration public class RabbitConfig { @Bean public DirectExchange directExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue queue () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_TNAME).build(); } @Bean public Binding binding (DirectExchange directExchange, Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstant.ROUTING_KEY); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { MessageProperties messageProperties = new MessageProperties (); messageProperties.setExpiration("35000" ); Message message = MessageBuilder.withBody("hello world" .getBytes()) .andProperties(messageProperties).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_KEY, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
测试结果 :发送消息后查看 queue.ttl.a 有一条数据,35秒后查看发现数据丢失。
设置队列消息过期时间 定义常量
1 2 3 4 5 public class RabbitMQConstant { public static final String EXCHANGE_NAME = "exchange.ttl.b" ; public static final String QUEUE_TNAME = "queue.ttl.b" ; public static final String ROUTING_KEY = "info" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Configuration public class RabbitConfig { @Bean public DirectExchange directExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue queue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-message-ttl" , 15000 ); return new Queue (RabbitMQConstant.QUEUE_TNAME, true , false , false , arguments); } @Bean public Binding binding (DirectExchange directExchange, Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstant.ROUTING_KEY); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_KEY, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
测试结果 :先后发送两条消息,发送消息后查看 queue.ttl.b 有2条数据,第1条消息到期后消失,第2条消息到期后消失
死信队列(Dead Letter Exchange,DLX) 单条消息过期 消息设置了过期时间,在到达过期时间后消息没有被消费:可能是因为消息积压太多,消息消费不过来;也可能是没有对应的消费者。
定义常量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class RabbitMQConstant { public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.1" ; public static final String QUEUE_NORMAL_NAME = "queue.normal.1" ; public static final String EXCHANGE_DLX_NAME = "exchange.dlx.1" ; public static final String QUEUE_DLX_NAME = "queue.dlx.1" ; public static final String NORMAL_KEY = "order1" ; public static final String DLX_KEY = "error1" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build(); } @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME); arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.DLX_KEY); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME) .withArguments(arguments) .build(); } @Bean public Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY); } @Bean public DirectExchange dlxExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build(); } @Bean public Queue dlxQueue () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build(); } @Bean public Binding bindingDlx (DirectExchange dlxExchange, Queue dlxQueue) { return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { MessageProperties messageProperties = new MessageProperties (); messageProperties.setExpiration("15000" ); Message message = MessageBuilder.withBody("hello world" .getBytes()) .andProperties(messageProperties).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message); log.info("消息发送完毕:发送时间为:{}" , new Date ()); } }
测试结果 :发送的消息先出现在正常队列,待过期后消息会出现在死信队列
队列设置过期 队列设置了过期时间,在到达过期时间后消息没有被消费:可能是因为消息积压太多,消息消费不过来;也可能是没有对应的消费者。
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration public class RabbitConfig { @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-message-ttl" , 20000 ); arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME); arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.DLX_KEY); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).withArguments(arguments).build(); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message); log.info("消息发送完毕:发送时间为:{}" , new Date ()); } }
测试结果 :发送的消息先出现在正常队列,待过期后消息会出现在死信队列
队列到达最大长度 先入队的消息会被发送到DLX
定义常量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class RabbitMQConstant { public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.2" ; public static final String QUEUE_NORMAL_NAME = "queue.normal.2" ; public static final String EXCHANGE_DLX_NAME = "exchange.dlx.2" ; public static final String QUEUE_DLX_NAME = "queue.dlx.2" ; public static final String NORMAL_KEY = "order2" ; public static final String DLX_KEY = "error2" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration public class RabbitConfig { @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-max-length" , 5 ); arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME); arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.DLX_KEY); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME) .withArguments(arguments) .build(); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { for (int i = 1 ; i <= 8 ; i++) { String str = "hello world " + i; Message message = MessageBuilder.withBody(str.getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message); } log.info("消息发送完毕:发送时间为:{}" , new Date ()); } }
测试结果 :超过正常队列的长度后先入队的消息会放入死信队列
消费消息不进行重新投递
从正常的队列接收消息,但对消息不进行确认并且不对消息进行重新投递,此时消息就进入死信队列
业务处理过程中出现异常也会变成死信,因为消费者没有进行确认
配置MQ
需要开启消费者手动确认模式,否则默认是消息消费后自动确认的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 server: port: 8080 spring: application: name: dlx-learn4 rabbitmq: host: 192.168 .1 .101 port: 5672 username: admin password: 123456 virtual-host: longdidi listener: simple: acknowledge-mode: manual
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration public class RabbitConfig { @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-message-ttl" , 15000 ); arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME); arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.DLX_KEY); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME) .withArguments(arguments) .build(); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { String str = "hello world" ; Message message = MessageBuilder.withBody(str.getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message); log.info("消息发送完毕:发送时间为:{}" , new Date ()); } }
定义消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Service @Slf4j public class ReceiveMessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_NORMAL_NAME}) public void receiveMsg (Message message, Channel channel) { System.out.println("接收到的消息:" + message); MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); try { byte [] body = message.getBody(); String str = new String (body); log.info("接收到的消息为:{},接收时间为:{}" , str, new Date ()); int a = 1 / 0 ; channel.basicAck(deliveryTag, false ); } catch (Exception e) { log.error("接收着出现问题:{}" , e.getMessage()); try { channel.basicNack(deliveryTag, false , false ); } catch (IOException ex) { throw new RuntimeException (ex); } throw new RuntimeException (e); } } }
测试结果 :模拟异常,发现消息未重新入队,而是进入了死信队列
消费者拒绝消息 开启手动确认模式并拒绝消息,不重新投递,则进入死信队列
配置yaml时需要开启消费者手动确认模式,配置同上述消息不进行重新投递
定义MQ
这里的队列无需设置队列的过期时间,因为消费者拒绝后如果不重新投递就直接进入死信队列
1 2 3 4 5 6 7 8 9 10 11 12 13 @Configuration public class RabbitConfig { @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME); arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.QUEUE_DLX_NAME); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).withArguments(arguments).build(); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; public void sendMsg () { String str = "hello world" ; Message message = MessageBuilder.withBody(str.getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME, RabbitMQConstant.NORMAL_KEY, message); log.info("消息发送完毕:发送时间为:{}" , new Date ()); } }
定义消费者
消费者拒绝消息后不进行重新投递
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf4j @Service public class ReceiveMessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_NORMAL_NAME}) public void receiveMsg (Message message, Channel channel) { System.out.println("接收到的消息:" + message); MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); try { byte [] body = message.getBody(); String str = new String (body); log.info("接收到的消息为:{},接收时间为:{}" , str, new Date ()); int a = 1 / 0 ; channel.basicAck(deliveryTag, false ); } catch (Exception e) { log.error("接收着出现问题:{}" , e.getMessage()); try { channel.basicReject(deliveryTag, false ); } catch (IOException ex) { throw new RuntimeException (ex); } throw new RuntimeException (e); } } }
测试结果 :拒绝后的消息不再入队,正常和死信队列都没有了
死信应用场景
RabbitMQ 队列 队列属性
Type:设置队列的队列类型;
Name:队列名称,就是一个字符串,随便一个字符串就可以;
Durability:声明队列是否持久化,代表队列在服务器重启后是否还存在;
Auto delete:是否自动删除。true表示当没有消费者连接到这个队列时,队列会自动删除;
Exclusive:该属性的队列只对第一个连接它的消费者可见(之后其它消费者无法访问该队列),连接断开时自动删除。基本设置成false
Arguments:队列的其他属性,例如指定DLX(死信交换机等);
x-expires:Number:当Queue(队列)在指定的时间未被访问则队列将被自动删除
x-message-ttl:Number:发布的消息在队列中存在多长时间后被取消(单位毫秒)
x-overflow:String:设置队列溢出行为,当达到队列的最大长度时消息的处理方式
drop-head:删除头部消息
reject-publish:超过队列长度后,后面发布的消息会被拒绝接收
reject-publish-dlx:超过队列长度后,后面发布的消息会被拒绝接收并发布到死信交换机
仲裁队列类型仅支持:drop-head and reject-publish两种
x-max-length:Number:队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息
x-max-length-bytes:Number:队列在开始从头部删除就绪消息之前可以包含的总正文大小。受限于内存大小,超过该阈值则从队列头部开始删除消息
x-single-active-consumer:表示队列是否是只能有一个消费者。设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略设置为false时消息循环分发给所有消费者(默认false)
x-dead-letter-exchange:String:指定队列关联的死信交换机。队列消息达上限后溢出消息不被删掉,而保存到另一队列;
x-dead-letter-routing-key:String:指定死信交换机的路由键,一般和7一起定义
x-queue-mode:String:队列类型x-queue-mode=lazy懒队列,在磁盘上尽可能多地保留消息以减少RAM使用。如果未设置则队列将保留内存缓存以尽可能快地传递消息;如果设置则队列消息将保存在磁盘上,消费时从磁盘上读取消费
x-queue-master-locator:String(用的较少):在集群模式下设置队列分配到的主节点位置信息;
每个queue都有一个master节点,所有对于queue的操作都是事先在master上完成,之后再slave上进行相同的操作;
每个不同的queue可以坐落在不同的集群节点上,这些queue如果配置了镜像队列,那么会有1个master和多个slave
基本上所有的操作都落在master上,那么如果这些queues的master都落在个别的服务节点上,而其他的节点又很空闲,这样就无法做到负载均衡,那么势必会影响性能;
关于master queue host 的分配有几种策略,可以在queue声明的时候使用x-queue-master-locator参数,或者在policy上设置queue-master-locator,或者直接在rabbitmq的配置文件中定义queue_master_locator有三种可供选择的策略
min-masters:选择master queue数最少的那个服务节点host;
client-local:选择与client相连接的那个服务节点host;
random:随机分配;
属性测试 非持久化属性测试 定义常量
1 2 3 4 5 6 7 8 9 10 11 public class RabbitMQConstant { public static final String EXCHANGE_NAME = "exchange.properties.1" ; public static final String QUEUE_NAME1 = "queue.properties.test1.1" ; public static final String QUEUE_NAME2 = "queue.properties.test1.2" ; public static final String ROUTING_NAME1 = "key.properties.test1.1" ; public static final String ROUTING_NAME2 = "key.properties.test1.2" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue normalQueue1 () { return QueueBuilder.nonDurable().build(); } @Bean public Queue normalQueue2 () { return QueueBuilder.nonDurable(RabbitMQConstant.QUEUE_NAME1).build(); } @Bean public Binding bindingNormal1 (DirectExchange normalExchange, Queue normalQueue1) { return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1); } @Bean public Binding bindingNormal2 (DirectExchange normalExchange, Queue normalQueue2) { return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2); } }
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Service @Slf4j public class SendMessageService { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg () { { Message message = MessageBuilder.withBody("hello world1" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } { Message message = MessageBuilder.withBody("hello world2" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME2, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } } }
测试结果
1 2 发行消息查看控制台发现。设置了名字的会使用自定义名字,未设置名字的队列会自动生成名字 不持久化的队列在服务器重启后队列会消失
持久化测试 是否自动删除。如果为true,当没有消费者连接到这个队列的时候,队列会自动删除
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Configuration public class RabbitConfig { @Bean public Queue normalQueue1 () { return QueueBuilder.durable().build(); } @Bean public Queue normalQueue2 () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).build(); } }
测试结果
1 2 发行消息查看控制台发现。设置了名字的会使用自定义名字,未设置名字的队列会自动生成名字 持久化的队列在服务器重启后队列依然存在
自动删除测试 定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration public class RabbitConfig { @Bean public Queue normalQueue1 () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1) .autoDelete() .build(); } @Bean public Queue normalQueue2 () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME2) .build(); } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public class ReceivemessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1}) public void receiveMsg1 (Message message) { log.info("1接收到的消息为:{}" , new String (message.getBody())); } @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME2}) public void receiveMsg2 (Message message) { log.info("2接收到的消息为:{}" , new String (message.getBody())); } }
测试结果 :设置为自动删除的队列1在没有消费者连接后被自动删除
可见性测试 普通队列允许的消费者没有限制,多个消费者绑定到同一个队列时,RabbitMQ会采用轮询进行投递。如果需要消费者独占队列,在队列创建的时候,设定属性exclusive为true。
exclusive有两个作用
如果设置为false则可以使用两个消费者都访问同一个队列,没有任何问题;如果设置为true则会对当前队列加锁,其他连接connection是不能访问的,同一个连接的不同channel是可以访问的。如果强制访问会报如下异常:
1 It could be originally declared on another connection or the exclusive property value does not match that of the original declaration., class-id=60, method-id=20)
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue normalQueue1 () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1) .exclusive() .build(); } @Bean public Binding bindingNormal1 (DirectExchange normalExchange, Queue normalQueue1) { return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1); } }
消费者
这里的消费者表示在同一个Connection中消费消息的多个消费者,测试是否同一个Connection中的多个消费者可以消费。代码同上。
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @SpringBootApplication public class Rabbitmq08Properties04Application implements ApplicationRunner { public static void main (String[] args) { SpringApplication.run(Rabbitmq08Properties04Application.class, args); } @Resource private SendMessageService sendMessageService; @Override public void run (ApplicationArguments args) throws Exception { sendMessageService.sendMsg(); { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.1.101" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("123456" ); factory.setVirtualHost("longdidi" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("连接1接收到信息:" + new String (body)); } }; channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true , consumer); channel.close(); connection.close(); } { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.1.101" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("123456" ); factory.setVirtualHost("longdidi" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("连接2接收到信息:" + new String (body)); } }; channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true , consumer); channel.close(); connection.close(); } } }
测试结果
1 2 3 4 5 6 7 1.启动程序,如果有多个Connection连接会报错:channel error;protocol method:#method... 2.注释掉主程序中的两个Connection连接代码重启测试,发现同一Connection内的多个消费者可以连接消费,输出如下 消息发送完毕,发送时间为:Thu Jul 31 14:20:30 GMT+08:00 2025 消息发送完毕,发送时间为:Thu Jul 31 14:20:30 GMT+08:00 2025 消费者1接收到的消息为:hello world2 消费者2接收到的消息为:hello world1 3.停止程序运行查看队列是否自动删除:当连接关闭时队列自动删除
Arguments测试 删除属性测试 x-expires属性:当Queue(队列)在指定的时间未被访问则队列将被自动删除。
定义常量
1 2 3 4 5 6 7 8 9 10 11 public class RabbitMQConstant { public static final String EXCHANGE_NAME = "exchange.properties.normal.1" ; public static final String EXCHANGE_DLX_NAME = "exchange.properties.dlx.1" ; public static final String QUEUE_NAME1 = "queue.properties.normal.1" ; public static final String QUEUE_DLX_NAME1 = "queue.properties.dlx.1" ; public static final String ROUTING_NAME1 = "key.properties.normal.1" ; public static final String ROUTING_DLX_NAME1 = "key.properties.dlx.1" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-expires" , 10000L ); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build(); } @Bean public Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1); } }
测试结果 :启动应用查看控制台,生成了队列,等待10s之后刷新控制台,队列被删除
设置队列过期时间 发布的消息在队列中存在多长时间后被取消(单位毫秒),参考 队列设置过期
设置队列长度 x-max-length:Number:队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息。参考 队列到达最大长度
设置队列溢出行为 当达到队列的最大长度时消息的处理方式:有效值为drop-head(删除头部消息)、reject-publish(拒绝发布)或reject-publish-dlx(拒绝发布到死信交换机)。仲裁队列类型仅支持drop-head and reject-publish两种。
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-max-length" , 5 ); arguments.put("x-overflow" , "reject-publish-dlx" ); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1) .withArguments(arguments) .deadLetterExchange(RabbitMQConstant.EXCHANGE_DLX_NAME) .deadLetterRoutingKey(RabbitMQConstant.ROUTING_DLX_NAME1) .build(); } @Bean public Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1); } @Bean public DirectExchange dlxExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build(); } @Bean public Queue dlxQueue () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME1).build(); } @Bean public Binding bindingDlx (DirectExchange dlxExchange, Queue dlxQueue) { return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME1); } }
测试结果
1 2 3 4 5 6 1.当属性设置为drop-head时 如果超过队列长度,先入队的消息会被先删除(如果配置了死信交换机则会移至死信交换机) 2.当属性设置为reject-publish时 该模式下当队列达到最天长度后会拒绝收消息,也不会将队头的消息移至死信队列(如果配置了死信队列的话),超长的消息未被接收 3.当属性设置为reject-publish-dlx时 拒绝接收后面的消息并将拒绝的消息放到死信交换机中,后面发送的消息会存储到死信队列中
设置队列内存大小 x-max-length-bytes:Number:队列在开始从头部删除就绪消息之前可以包含的总正文大小。受限于内存大小,超过该阈值则从队列头部开始删除消息
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-max-length-bytes" , 30 ); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build(); } @Bean public Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1); } }
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Service @Slf4j public class SendMessageService { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg () { for (int i = 0 ; i < 3 ; i++) { String str = "你好我好大家好" + i; Message message = MessageBuilder.withBody(str.getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } for (int i = 0 ; i < 3 ; i++) { String str = "hello" + i; Message message = MessageBuilder.withBody(str.getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } } }
测试结果 :队列中只剩3条信息,先入队的超过内存的限制的消息被删除了
设置单一消费者 表示队列是否是只能有一个消费者。设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略;设置为false时消息循环分发给所有消费者(默认false)。
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); arguments.put("x-single-active-consumer" , true ); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build(); } @Bean public Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1); } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public class ReceivemessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1}) public void receiveMsg1 (Message message) { log.info("消费者1接收到的消息为:{}" , new String (message.getBody())); } @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1}) public void receiveMsg2 (Message message) { log.info("消费者2接收到的消息为:{}" , new String (message.getBody())); } }
测试结果 :只有消费者1收到了消息
设置死信交换机 定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Configuration public class RabbitConfig { @Bean Map<String, Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME); arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.ROUTING_DLX_NAME1); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1) .ttl(5000 ) .withArguments(arguments) .build(); } }
测试结果 :超时的消息放入了死信队列
RabbitMQ 消息可靠性 生产者确认 Confirm模式简介
代表消息从生产者发送到Exchange;
代表消息从Exchange路由到Queue;
代表消息在Queue中存储;
代表消费者监听Queue并消费消息;
可能因为网络或者Broker的问题导致①失败,而此时应该让生产者知道消息是否正确发送到了Broker的exchange中;
有两种解决方案:
开启Confirm(确认)模式;(异步)
开启Transaction(事务)模式;(性能低,实际项目中很少用)
消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器 Broker里面的exchange交换机,则会给生产者一个应答生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;
Confirm模式实现
开启生产者确认模式
1 2 3 4 5 6 7 8 spring: rabbitmq: host: 192.168 .10 .11 port: 5672 username: admin password: 123456 virtual-host: longdidi publisher-confirm-type: correlated
实现RabbitTemplate.ConfirmCallback交界口,重写confirm()方法
判断成功和失败的ack结果,可以根据具体的结果,如果ack为false,对消息进行重新发送或记录日志等处理
设置rabbitTemplate的确认回调方法
1 rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
定义常量
1 2 3 4 5 6 7 8 public class RabbitMQConstant { public static final String EXCHANGE_NAME = "exchange.confirm.normal.1" ; public static final String QUEUE_NAME1 = "queue.confirm.normal.1" ; public static final String ROUTING_NAME1 = "key.confirm.normal.1" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build(); } @Bean public Queue normalQueue () { Map<String, Object> arguments = new HashMap <>(); return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build(); } @Bean public Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue) { return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1); } }
外部类实现 生产者 :设置回调方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Service @Slf4j public class SendMessageService { @Autowired private RabbitTemplate rabbitTemplate; @Resource private MyConfirmCallBack confirmCallBack; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(confirmCallBack); } public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); CorrelationData correlationData = new CorrelationData (); correlationData.setId("order_123456" ); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error" , RabbitMQConstant.ROUTING_NAME1, message, correlationData); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
实现接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Component @Slf4j public class MyConfirmCallBack implements RabbitTemplate .ConfirmCallback { @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { log.info("关联id为:{}" , correlationData.getId() + "" ); if (ack) { log.info("消息正确的达到交换机" ); return ; } else { log.error("消息没有到达交换机,原因为:{}" , cause); } } }
生产者实现 生产者 :生产者直接实现RabbitTemplate.ConfirmCallback接口,重写confirm()方法并设置回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Service @Slf4j public class SendMessageService implements RabbitTemplate .ConfirmCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); } public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); CorrelationData correlationData = new CorrelationData (); correlationData.setId("order_123456" ); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error" , RabbitMQConstant.QUEUE_NAME1, message, correlationData); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { log.info("关联id为:{}" , correlationData.getId() + "" ); if (ack) { log.info("消息正确的达到交换机" ); return ; } log.error("消息没有到达交换机,原因为:{}" , cause); } }
匿名内部内实现 生产者 :生产者使用匿名内部类实现RabbitTemplate.ConfirmCallback接口,重写confirm()方法并设置回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Service @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback( new RabbitTemplate .ConfirmCallback() { @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { log.info("关联id为:{}" , correlationData.getId() + "" ); if (ack) { log.info("消息正确的达到交换机" ); return ; } log.error("消息没有到达交换机,原因为:{}" , cause); } } ); } public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); CorrelationData correlationData = new CorrelationData (); correlationData.setId("order_123456" ); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error" , RabbitMQConstant.QUEUE_NAME1, message, correlationData); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
Lambad 实现 生产者 :生产者使用lambda实现RabbitTemplate.ConfirmCallback接口,重写confirm()方法并设置回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Service @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback( (correlationData, ack, cause) -> { log.info("关联id为:{}" , correlationData.getId() + "" ); if (ack) { log.info("消息正确的达到交换机" ); return ; } log.error("消息没有到达交换机,原因为:{}" , cause); } ); } public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); CorrelationData correlationData = new CorrelationData (); correlationData.setId("order_123456" ); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error" , RabbitMQConstant.ROUTING_NAME1, message, correlationData); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
测试结果 :写错交换机的名字,查看日志输出
1 2 关联id为:order_123456 消息没有到达交换机,原因为:channel error;protocol method:#method<channel
交换机确认 可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败;
使用return模式可以实现消息无法路由的时候返回给生产者,消息从 exchange –> queue 投递失败则会返回一个 returnCallback,利用这个callback控制消息的可靠性投递;
return模式实现
开启return确认模式
1 2 3 4 5 6 7 8 9 spring: rabbitmq: host: 192.168 .10 .11 port: 5672 username: admin password: 123456 virtual-host: longdidi publisher-confirm-type: correlated publisher-returns: true
实现RabbitTemplate.ReturnsCallback接口并重写returnedMessage()方法
设置回调
使用rabbitTemplate.setReturnCallback设置退回函数
当消息从exchange路由到queue失败后,则会将消息退回给producer,并执行回调函数returnedMessage;
定义常量
1 2 3 4 5 6 7 8 public class RabbitMQConstant { public static final String EXCHANGE_NAME = "exchange.return.normal.1" ; public static final String QUEUE_NAME1 = "queue.return.normal.1" ; public static final String ROUTING_NAME1 = "key.return.normal.1" ; }
外部类实现 生产者 :设置回调方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Service @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; @Resource private MyReturnCallBack myReturnCallBack; @PostConstruct public void init () { rabbitTemplate.setReturnsCallback(myReturnCallBack); } public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1 + "error" , message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
外部实现接口
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component @Slf4j public class MyReturnCallBack implements RabbitTemplate .ReturnsCallback { @Override public void returnedMessage (ReturnedMessage returnedMessage) { log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}" , returnedMessage.getReplyText()); } }
生产者实现 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Service @Slf4j public class SendMessageService implements RabbitTemplate .ReturnsCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setReturnsCallback(this ); } public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()) .build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1 + "error" , message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } @Override public void returnedMessage (ReturnedMessage returnedMessage) { log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}" , returnedMessage.getReplyText()); } }
匿名内部类实现 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Service @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setReturnsCallback( new RabbitTemplate .ReturnsCallback() { @Override public void returnedMessage (ReturnedMessage returnedMessage) { log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}" , returnedMessage.getReplyText()); } } ); } public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1 + "error" , message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
lambda实现 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Service @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setReturnsCallback( message -> { log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}" , message.getReplyText()); } ); } public void sendMsg () { Message message = MessageBuilder.withBody("hello world" .getBytes()) .build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1 + "error" , message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
测试结果
1 2 消息发送完毕,发送时间为:Thu Jul 31 15:32:48 GMT+08:00 2025 消息从交换机没有正确的路由到(投递到)队列,原因为:NO_ROUTE
使用备用交换机
使用备份交换机(alternate-exchange),无法路由的消息会发送到这个备用交换机上;
定义常量
1 2 3 4 5 6 7 8 9 10 11 public class RabbitMQConstant { public static final String EXCHANGE_NORMAL_NAME = "exchange.normal.backup.1" ; public static final String EXCHANGE_BACKUP_NAME = "exchange.backup.1" ; public static final String QUEUE_NORMAL_NAME = "queue.normal.backup.1" ; public static final String QUEUE_BACKUP_NAME = "queue.backup.1" ; public static final String ROUTING_WARNING_KEY = "info1" ; }
定义MQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Configuration public class RabbitConfig { @Bean public DirectExchange normalExchange () { return ExchangeBuilder .directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME) .alternate(RabbitMQConstant.EXCHANGE_BACKUP_NAME) .build(); } @Bean public Queue queueNormal () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).build(); } @Bean public Binding binding (DirectExchange normalExchange, Queue queueNormal) { return BindingBuilder.bind(queueNormal) .to(normalExchange) .with(RabbitMQConstant.ROUTING_WARNING_KEY); } @Bean public FanoutExchange alternateExchange () { return ExchangeBuilder.fanoutExchange(RabbitMQConstant.EXCHANGE_BACKUP_NAME).build(); } @Bean public Queue alternateQueue () { return QueueBuilder.durable(RabbitMQConstant.QUEUE_BACKUP_NAME).build(); } @Bean public Binding bindingAlternate (FanoutExchange alternateExchange, Queue alternateQueue) { return BindingBuilder.bind(alternateQueue).to(alternateExchange); } }
测试结果 :消息存储到了备用队列
消息持久化存储 消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;
可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;
解决方案:
队列持久化
1 QueueBuilder.durable("队列名称" ).build();
交换机持久化
1 ExchangeBuilder.directExchange("交换机名称" ).durable(true ).build();
消息持久化,默认就是持久化的
1 2 3 MessageProperties messageProperties = new MessageProperties ();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
消费者手动确认 手动确认消息
采用消息消费时的手动ack确认机制来保证;如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制 (message acknowledgement);
1 2 3 4 5 6 7 8 9 10 11 12 13 spring: rabbitmq: host: 192.168 .10 .11 port: 5672 username: admin password: 123456 virtual-host: longdidi publisher-confirm-type: correlated publisher-returns: true listener: simple: acknowledge-mode: manual
消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Service @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback( (correlationData, ack, cause) -> { if (!ack) { log.error("消息没有到达交换机,原因为:{}" , cause); } } ); rabbitTemplate.setReturnsCallback( returnedMessage -> { log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}" , returnedMessage.getReplyText()); } ); } public void sendMsg () { MessageProperties messageProperties = new MessageProperties (); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = MessageBuilder.withBody("hello world" .getBytes()) .andProperties(messageProperties).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1+"1" , message); log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Component @Slf4j public class ReceiveMessageService { @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1}) public void receiveMsg (Message message, Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { log.info("接收到的消息为:{}" , new String (message.getBody())); channel.basicAck(deliveryTag, false ); } catch (Exception e) { log.error("消息处理出现问题" ); try { channel.basicNack(deliveryTag, false , true ); } catch (IOException ex) { throw new RuntimeException (ex); } throw new RuntimeException (e); } } }
消息的幂等性(消息不被重复消费) 同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;
幂等性 :对于一个资源,不管请求一次还是多次,对该资源本身造成的影响应该是相同的,不能因重复请求而对该资源重复造成影响;
以接口幂等性举例 :
接口幂等性 :一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的;
比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的;
如何避免消息的重复消费问题?(消息消费时的幂等性)
全局唯一ID + Redis:生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令(setnx(messageId, 1)),将messageId作为key放到redis中,若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;
实体类
1 2 3 4 5 6 7 8 9 10 @Data @AllArgsConstructor @NoArgsConstructor @Builder public class Orders implements Serializable { private String orderId; private String orderName; private BigDecimal orderMoney; private Date orderTime; }
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Service @Slf4j public class SendMessageService { @Resource private RabbitTemplate rabbitTemplate; @Resource private ObjectMapper objectMapper; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback( (correlationData, ack, cause) -> { if (!ack) { log.error("消息没有到达交换机,原因为:{}" , cause); } } ); rabbitTemplate.setReturnsCallback( returnedMessage -> { log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}" , returnedMessage.getReplyText()); } ); } public void sendMsg () throws JsonProcessingException { { Orders orders1 = Orders.builder() .orderId("order_12345" ) .orderName("买的手机" ) .orderMoney(new BigDecimal (2356 )) .orderTime(new Date ()) .build(); String strOrders1 = objectMapper.writeValueAsString(orders1); MessageProperties messageProperties = new MessageProperties (); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = MessageBuilder.withBody(strOrders1.getBytes()) .andProperties(messageProperties).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message); } { Orders orders2 = Orders.builder() .orderId("order_12345" ) .orderName("买的手机" ) .orderMoney(new BigDecimal (2356 )) .orderTime(new Date ()) .build(); String strOrders2 = objectMapper.writeValueAsString(orders2); MessageProperties messageProperties = new MessageProperties (); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = MessageBuilder.withBody(strOrders2.getBytes()) .andProperties(messageProperties).build(); rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message); } log.info("消息发送完毕,发送时间为:{}" , new Date ()); } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Component @Slf4j public class ReceiveMessageService { @Resource private ObjectMapper objectMapper; @Resource private StringRedisTemplate stringRedisTemplate; @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1}) public void receiveMsg (Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); Orders orders = objectMapper.readValue(message.getBody(), Orders.class); try { log.info("接收到的消息为:{}" , orders.toString()); Boolean setResult = stringRedisTemplate.opsForValue() .setIfAbsent("idempotent:" + orders.getOrderId(), orders.getOrderId()); if (setResult) { log.info("向数据库插入订单" ); } channel.basicAck(deliveryTag, false ); } catch (Exception e) { log.error("消息处理出现问题" ); try { channel.basicNack(deliveryTag, false , true ); } catch (IOException ex) { throw new RuntimeException (ex); } throw new RuntimeException (e); } } }
测试结果
1 2 3 4 消息发送完毕,发送时间为:Thu Jul 31 16:04:07 GMT+08:00 2025 接收到的消息为:0rders(orderId=order_12345,orderName=买的手机,rderMoney=2356... 向数据库插入订单 接收到的消息为:0rders(orderId=order_12345,orderName=买的手机,rderMoney=2356...