RabbitMQ 是实现了高级消息队列 协议(AMQP )的开源消息代理软件(亦称面向消息的中间件 )。RabbitMQ服务器是用Erlang 语言编写的,而集群和故障转移 是构建在开放电信平台 框架上的,支持高并发处理和分布式部署。所有主要的编程语言 均有与代理接口通讯的客户端库。RabbitMQ本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,更适合于企业级的开发。它还实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队,通过队列机制实现应用程序间的异步通信与数据传输,常用于流量削峰、系统解耦及异步处理场景。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。
参考文章:
5.RabbitMQ交换机详解 
7.RabbitMQ延时交换机 
6.RabbitMQ死信队列 
8.RabbitMQ队列详解 
9.RabbitMQ消息的可靠性 
 
RabbitMQ 交换机 Exchange(X) 可翻译成交换机/交换器/路由器
RabbitMQ交换器 (Exchange)类型 
Fanout Exchange(扇形交换机):广播,分发模式 ,把消息分发给所有订阅者 。广播,将消息交给所有绑定到交换机的队列 
Direct Exchange(直连交换机):定向,把消息交给符合指定routing key 的队列。发送方把消息发送给订阅方 ,针对多个订阅者,默认采取轮询方式进行消息发送 。 
Topic Exchange(主题交换机):通配符,匹配订阅模式 ,使用正则匹配 到消息队列,把消息交给符合routing pattern(路由模式) 的队列 
Headers Exchange(头部交换机):与 direct 类似,只是性能很差,此类型几乎用不到。 
x-local-random Exchange 
自定义交换机 
 
使用流程:引入依赖 → 配置MQ(yaml)→ 定义常量 →  定义MQ → 定义生产者 →  定义消费者 →  发送消息 → 测试结果
引入依赖,配置MQ(yaml)都一样,根据不同类型,不同情况。定义常量、MQ、生产者、消费者略有差别。
 
Fanout Exchange 扇形交换机会将消息投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;消息发送流程如下:
1 2 3 4 5 6 1. 可以有多个消费者 2. 每个消费者有自己的queue(队列) 3. 每个队列都要绑定到Exchange(交换机) 4. 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。 5. 交换机把消息发送给绑定过的所有队列 6. 队列的消费者都能拿到消息。实现一条消息被多个消费者消费 
 
定义常量 
1 2 3 4 5 public  class  RabbitMQConstant  {    public  static  final  String  EXCHANGE_FANOUT  =  "exchange.fanout" ;     public  static  final  String  QUEUE_FANOUT_A  =  "queue.fanout.a" ;     public  static  final  String  QUEUE_FANOUT_B  =  "queue.fanout.B" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Configuration public  class  RabbitConfig  {              @Bean      public  FanoutExchange fanoutExchange ()  {         return  new  FanoutExchange (RabbitMQConstant.EXCHANGE_FANOUT);     }          @Bean      public  Queue queueA ()  {         return  new  Queue (RabbitMQConstant.QUEUE_FANOUT_A);     }          @Bean      public  Queue queueB ()  {         return  new  Queue (RabbitMQConstant.QUEUE_FANOUT_B);     }          @Bean      public  Binding bingingA (FanoutExchange fanoutExchange, Queue queueA)  {                  return  BindingBuilder.bind(queueA).to(fanoutExchange);     }          @Bean      public  Binding bingingB (FanoutExchange fanoutExchange, Queue queueB)  {                  return  BindingBuilder.bind(queueB).to(fanoutExchange);     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;          public  void  sendMsg ()  {                  String  msg  =  "hello world" ;                  Message  message  =  new  Message (msg.getBytes());         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_FANOUT, "" , message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
定义消费者 
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public  class  ReceiveMessageService  {         @RabbitListener(queues = {RabbitMQConstant.QUEUE_FANOUT_A,RabbitMQConstant.QUEUE_FANOUT_B})      public  void  receiveMsg (Message message) {         byte [] body = message.getBody();         String  msg  =  new  String (body);         log.info("接收到的消息为:{}" ,msg);     } } 
 
测试结果 
1 2 3 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 接收到的消息为:hello world 接收到的消息为:hello world 
 
Direct Exchange 在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。路由键与队列名完全匹配的交换机。
1 2 3 4 P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。 X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列 C1:消费者,其所在队列指定了需要routing key 为 error 的消息 C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息 
 
定义常量 
1 2 3 4 5 6 7 public  class  RabbitMQConstant  {    public  static  final  String  EXCHANGE_DIRECT  =  "exchange.direct" ;     public  static  final  String  QUEUE_DIRECT_A  =  "queue.direct.a" ;     public  static  final  String  QUEUE_DIRECT_B  =  "queue.direct.b" ;     public  static  final  String  ROUTING_KEY  =  "info" ;     public  static  final  String  ROUTING_WARNING_KEY  =  "warn" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Configuration public  class  RabbitConfig  {         @Bean      public  DirectExchange directExchange ()  {                  return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DIRECT).build();     }          @Bean      public  Queue queueA ()  {                  return  QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT_A).build();     }     @Bean      public  Queue queueB ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT_B).build();     }               @Bean      public  Binding bindingA (DirectExchange directExchange, Queue queueA)  {                  return  BindingBuilder.bind(queueA).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);     }          @Bean      public  Binding bindingB1 (DirectExchange directExchange, Queue queueB)  {         return  BindingBuilder.bind(queueB).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);     }          @Bean      public  Binding bindingB2 (DirectExchange directExchange, Queue queueB)  {         return  BindingBuilder.bind(queueB).to(directExchange).with(RabbitMQConstant.ROUTING_WARNING_KEY);     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {                           Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();                  rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT,                                        RabbitMQConstant.ROUTING_KEY, message);                           Message  error_message  =  MessageBuilder.withBody("error world" .getBytes()).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT,                                       RabbitMQConstant.ROUTING_WARNING_KEY, error_message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
定义消费者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Component @Slf4j public  class  ReceiveMessageService  {         @RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT_A})      public  void  receiveMsgA (Message message)  {         byte [] body = message.getBody();         String  msg  =  new  String (body);         log.info("接收到A的消息为:{}" , msg);     }          @RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT_B})      public  void  receiveMsgB (Message message)  {         byte [] body = message.getBody();         String  msg  =  new  String (body);         log.info("接收到B的消息为:{}" , msg);     } } 
 
测试结果 
1 2 3 4 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 接收到B的消息为:hello world 接收到A的消息为:hello world 接收到B的消息为:error world 
 
Topic Exchange 1 2 Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!即direct是完全匹配,topic模式是模糊匹配。 通配符匹配(相当于模糊匹配),Topic模式中路由键通过"." 分为多个部分。 
 
通配符规则:用”.”隔开的为一个单词。
#:匹配一个或多个词
1 beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx 
 
*:匹配不多不少恰好 1 个词。必须有一个而且只有一个
1 beijing.* == beijing.queue, beijing.xyz 
 
定义常量 
1 2 3 4 5 public  class  RabbitMQConstant  {    public  static  final  String  EXCHANGE_TOPIC  =  "exchange.topic" ;     public  static  final  String  QUEUE_TOPIC_A  =  "queue.topic.a" ;     public  static  final  String  QUEUE_TOPIC_B  =  "queue.topic.b" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Configuration public  class  RabbitConfig  {    @Bean      public  TopicExchange topicExchange ()  {         return  ExchangeBuilder.topicExchange(RabbitMQConstant.EXCHANGE_TOPIC).build();     }     @Bean      public  Queue queueA ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_TOPIC_A).build();     }     @Bean      public  Queue queueB ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_TOPIC_B).build();     }     @Bean      public  Binding bindingA (TopicExchange topicExchange, Queue queueA)  {         return  BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*" );     }     @Bean      public  Binding bindingB1 (TopicExchange topicExchange, Queue queueB)  {         return  BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit" );     }     @Bean      public  Binding bindingB2 (TopicExchange topicExchange, Queue queueB)  {         return  BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#" );     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();                  rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_TOPIC, "lazy.orange.rabbit" , message);     } } 
 
定义消费者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Component @Slf4j public  class  ReceiveMessageService  {         @RabbitListener(queues = {RabbitMQConstant.QUEUE_TOPIC_A})      public  void  receiveMsgA (Message message)  {         byte [] body = message.getBody();         String  msg  =  new  String (body);         log.info("接收到A的消息为:{}" , msg);     }          @RabbitListener(queues = {RabbitMQConstant.QUEUE_TOPIC_B})      public  void  receiveMsgB (Message message)  {         byte [] body = message.getBody();         String  msg  =  new  String (body);         log.info("接收到B的消息为:{}" , msg);     } } 
 
测试结果 
1 2 接收到A的消息为:hello world 接收到B的消息为:hello world 
 
基于消息内容中的headers属性进行匹配,不是根据路由键匹配。headers交换器和direct交换器完全一致,但性能差很多,几乎不用了。
消费方指定的headers中必须包含一个”x-match”键,”x-match”键的值有两个
发消息时可以指定消息属性(MessageProperties),如果heanders中包含多个消息属性,则所有属性都匹配上才算匹配上。
定义常量 
1 2 3 4 5 public  class  RabbitMQConstant  {    public  static  final  String  EXCHANGE_HEADERS  =  "exchange.headers" ;     public  static  final  String  QUEUE_HEADERS_A  =  "queue.headers.a" ;     public  static  final  String  QUEUE_HEADERS_B  =  "queue.headers.b" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Configuration public  class  RabbitConfig  {    @Bean      public  HeadersExchange headersExchange ()  {         return  ExchangeBuilder.headersExchange(RabbitMQConstant.EXCHANGE_HEADERS).build();     }     @Bean      public  Queue queueA ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_HEADERS_A).build();     }     @Bean      public  Queue queueB ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_HEADERS_B).build();     }          @Bean      public  Binding bindingA (HeadersExchange headersExchange, Queue queueA)  {         Map<String, Object> headerValues = new  HashMap <>();         headerValues.put("type" , "m" );         headerValues.put("status" , 1 );         return  BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();     }     @Bean      public  Binding bindingB (HeadersExchange headersExchange, Queue queueB)  {         Map<String, Object> headerValues = new  HashMap <>();         headerValues.put("type" , "s" );         headerValues.put("status" , 0 );         return  BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {                  MessageProperties  messageProperties  =  new  MessageProperties ();         Map<String, Object> headers = new  HashMap <>();         headers.put("type" , "s" );         headers.put("status" , 0 );                  messageProperties.setHeaders(headers);                  Message  message  =  MessageBuilder.withBody("hello world" .getBytes())                 .andProperties(messageProperties).build();                  rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_HEADERS, "" , message);         log.info("消息发送完毕!!!" );     } } 
 
定义消费者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Component @Slf4j public  class  ReceiveMessageService  {         @RabbitListener(queues = {RabbitMQConstant.QUEUE_HEADERS_A})      public  void  receiveMsgA (Message message)  {         byte [] body = message.getBody();         String  msg  =  new  String (body);         log.info("接收到A的消息为:{}" , msg);     }          @RabbitListener(queues = {RabbitMQConstant.QUEUE_HEADERS_B})      public  void  receiveMsgB (Message message)  {         byte [] body = message.getBody();         String  msg  =  new  String (body);         log.info("接收到B的消息为:{}" , msg);     } } 
 
测试结果 
1 2 消息发送完毕! 接收到B的消息为:hello world 
 
自定义交换机 参考:延时交换机使用方式
交换机属性 属性介绍 
name:交换机名称,就是一个字符串
 
Type:交换机类型(direct、topic、fanout、headers、x-local-random)五种
 
durable:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在
 
autoDelete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机
 
internal:内部使用的,如果是yes表示客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定
 
arguments:只有一个取值alternate-exchange,表示备用交换机
 
alternate:设置备用交换机
 
 
使用示例 定义常量 
1 2 3 4 public  class  RabbitMQConstant  {    public  static  final  String  EXCHANGE_DIRECT  =  "exchange.direct" ;     public  static  final  String  QUEUE_DIRECT  =  "queue.direct" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration public  class  RabbitConfig  {         @Bean      public  DirectExchange directExchange ()  {         return  ExchangeBuilder                  .directExchange(RabbitMQConstant.EXCHANGE_DIRECT)                  .autoDelete()                  .durable(false )                                   .internal()                  .build();     }          @Bean      public  Queue queue ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_DIRECT).build();     }          @Bean      public  Binding binding (DirectExchange directExchange, Queue queue)  {         return  BindingBuilder.bind(queue).to(directExchange).with("info" );     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes())                 .build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, "info" , message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_DIRECT, "" , message);         log.info("消息发送完毕!!!" );     } } 
 
定义消费者 
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public  class  ReceiveMessageService  {         @RabbitListener(queues = {RabbitMQConstant.QUEUE_DIRECT})      public  void  receiveMsgA (Message message)  {         byte [] body = message.getBody();         String  msg  =  new  String (body);         log.info("接收到的消息为:{}" , msg);     } } 
 
测试结果 
不加条件
1 2 3 消息发送完毕,发送时间为:Wed Jul 30  22 :01 :46  GMT+08:00  2025  消息发送完毕! 接收到的消息为:hello world 
 
当配置属性internal时报错
1 2 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code:403 reply-text:ACCESS_REFUSED.... 
 
测试持久化按上
设置持久化为false,重启rabbitmq-server,则交换机丢失
 
测试配置autoDelete
自动删除为 true,从控制台上手动解绑,会发现自动删除
 
 
备用交换机 使用场景 当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息。如果想要监测哪些消息被投递到没有对应的队列,可以用备用交换机来实现,然后接收备用交换机的消息从而记录日志或发送报警信息。
使用示例 注意:备用交换机一般使用fanout交换机
测试时:指定一个错误路由
重点:普通交换机设置参数绑定到备用交换机
定义常量 
1 2 3 4 5 6 7 8 9 10 11 public  class  RabbitMQConstant  {         public  static  final  String  EXCHANGE_NORMAL_NAME  =  "exchange.normal.backup" ;          public  static  final  String  EXCHANGE_BACKUP_NAME  =  "exchange.backup" ;          public  static  final  String  QUEUE_NORMAL_NAME  =  "queue.normal.backup" ;          public  static  final  String  QUEUE_BACKUP_NAME  =  "queue.backup" ;     public  static  final  String  ROUTING_WARNING_KEY  =  "info" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Configuration public  class  RabbitConfig  {         @Bean      public  DirectExchange normalExchange ()  {         Map<String, Object> arguments = new  HashMap <>();                  return  ExchangeBuilder                  .directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME)                  .alternate(RabbitMQConstant.EXCHANGE_BACKUP_NAME)                  .build();     }          @Bean      public  Queue queueNormal ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).build();     }          @Bean      public  Binding binding (DirectExchange normalExchange, Queue queueNormal)  {         return  BindingBuilder.bind(queueNormal).to(normalExchange)             .with(RabbitMQConstant.ROUTING_WARNING_KEY);     }          @Bean      public  FanoutExchange alternateExchange ()  {         return  ExchangeBuilder.fanoutExchange(RabbitMQConstant.EXCHANGE_BACKUP_NAME).build();     }          @Bean      public  Queue alternateQueue ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_BACKUP_NAME).build();     }          @Bean      public  Binding bindingAlternate (FanoutExchange alternateExchange, Queue alternateQueue)  {         return  BindingBuilder.bind(alternateQueue).to(alternateExchange);     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         Message message= MessageBuilder.withBody("hello world" .getBytes()).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,"test" ,message);         log.info("消息发送完毕,发送时间为:{}" ,new  Date ());     } } 
 
定义消费者 
1 2 3 4 5 6 7 8 9 10 11 @Component @Slf4j public  class  ReceiveMessageService  {         @RabbitListener(queues = {RabbitMQConstant.QUEUE_BACKUP_NAME})      public  void  receiveMsg (Message message)  {         byte [] body = message.getBody();         String  msg  =  new  String (body);         log.info("接收到备用的消息为:{}" , msg);     } } 
 
发送消息 
故意写错路由key,由于正常交换机设置了备用交换机,所以该消息就会进入备用交换机从而进入备用队列。可以写一个程序接收备用对列的消息,接收到后通知相关人员进行处理。如果正常交换机没有设置备用交换机,则该消息会被抛弃。
测试结果 
1 2 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 接收到备用的消息为:hello world 
 
 
RabbitMQ 延时交换机 延时问题 场景 :有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了。
解决方式 :
定时任务方式 :每隔3秒扫描一次数据库,查询过期的订单然后进行处理;
优点: 简单,容易实现;
缺点: 
存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟; 
性能较差,每次扫描数据库,如果订单量很大会影响性能 
 
 
被动取消 :当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);
优点: 对服务器而言,压力小;
缺点: 
用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响; 
用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差; 
 
 
JDK延迟队列(单体应用,不能分布式下):DelayedQueue。无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素
优点: 实现简单,任务延迟低
缺点: 
服务重启、宕机,数据丢失; 
只适合单机版,不适合集群; 
订单量大,可能内存不足而发生异常; 
 
 
采用消息中间件(rabbitmq)
RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。
不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样。
 
 
延时交换机 死信队列实现 实现方式 给正常队列绑定一个死信交换机和设置死信路由key;给正常队列设置消息过期时间,过期时间用于模拟延时操作,当消息过期后没有被消费就会转到死信队列。
定义常量 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  class  RabbitMQConstant  {         public  static  final  String  EXCHANGE_NAME  =  "exchange.delay.normal.1" ;          public  static  final  String  EXCHANGE_DLX_NAME  =  "exchange.delay.dlx.1" ;          public  static  final  String  QUEUE_NAME  =  "queue.delay.normal.1" ;          public  static  final  String  QUEUE_DLX_NAME  =  "queue.delay.dlx.1" ;          public  static  final  String  ROUTING_NAME  =  "order1" ;          public  static  final  String  ROUTING_DLX_NAME  =  "error1" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Configuration public  class  RabbitConfig  {         @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }          @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();                           arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME);                  arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.ROUTING_DLX_NAME);         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME)                 .ttl(25000 )                                  .withArguments(arguments)                      .build();     }          @Bean      public  Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue)  {         return  BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);     }          @Bean      public  DirectExchange dlxExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();     }          @Bean      public  Queue dlxQueue ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();     }          @Bean      public  Binding bindingDlx (DirectExchange dlxExchange, Queue dlxQueue)  {         return  BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 @Service @Slf4j public  class  SendMessageService  {    @Autowired      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                        RabbitMQConstant.ROUTING_NAME, message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
测试结果 
存在的问题 如果不设置队列的过期时间,在发送消息时设置消息的过期时间会存在以下问题
若队头的消息过期时间长,后面的消息过期时间短,但因为队头的消息没有被消费,因此后面已过期的消息也无法到达死信队列中 
 
多队列解决过期时间问题 对于上面存在的问题,可以将不同过期时间的消息发送到不同的队列上,过期后再转到死信队列上
定义MQ 
注意:正常队列不要设置过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Configuration public  class  RabbitConfig  {         @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }          @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();                           arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME);                  arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.ROUTING_DLX_NAME);         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME)                                  .withArguments(arguments)                      .build();     }             @Bean      public  Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue)  {         return  BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME);     }          @Bean      public  DirectExchange dlxExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();     }          @Bean      public  Queue dlxQueue ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();     }          @Bean      public  Binding bindingDlx (DirectExchange dlxExchange, Queue dlxQueue)  {         return  BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME);     }   } 
 
定义生产者 
发送消息时先发送一条过期时间长的,再发送一条过期时间短的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Service @Slf4j public  class  SendMessageService  {    @Autowired      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         {             MessageProperties  messageProperties  =  new  MessageProperties ();             messageProperties.setExpiration("25000" );              Message  message  =  MessageBuilder.withBody("hello world 1" .getBytes())                     .andProperties(messageProperties)                     .build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                            RabbitMQConstant.ROUTING_NAME, message);             log.info("消息发送完毕,发送时间为:{}" , new  Date ());         }         {             MessageProperties  messageProperties  =  new  MessageProperties ();             messageProperties.setExpiration("15000" );              Message  message  =  MessageBuilder.withBody("hello world 2" .getBytes())                     .andProperties(messageProperties)                     .build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                            RabbitMQConstant.ROUTING_NAME, message);             log.info("消息发送完毕,发送时间为:{}" , new  Date ());         }     } } 
 
定义消费者 
消费者监听死信队列的消息来查看消息接收的时间
1 2 3 4 5 6 7 8 9 10 @Component @Slf4j public  class  ReceiveMessageService  {         @RabbitListener(queues = RabbitMQConstant.QUEUE_DLX_NAME)      public  void  receiveMsg (Message message)  {         String  body  =  new  String (message.getBody());         log.info("接收到的消息为:{},接收时间为:{}" , body, new  Date ());     } } 
 
测试结果 :先接收到的是队头的消息
1 2 3 4 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 消息发送完毕,发送时间为:Wed Jul 30 22:01:46 GMT+08:00 2025 接收到的消息为:hello world 1,接收时间为:Wed Jul 30 22:02:16 GMT+08:00 2025 接收到的消息为:hello world 2,接收时间为:Wed Jul 30 22:02:16 GMT+08:00 2025 
 
使用延时插件 安装插件 
下载 
选择对应的版本下载 rabbitmq_delayed_message_exchange  插件。官网下载地址community-plugins 。
 
插件拷贝到 RabbitMQ 服务器plugins目录下 
 
解压缩
1 2 3 yum install unzip -y  cd  /usr/local/rabbitmq_server-4.0.7/pluginsunzip rabbitmq_delayed_message_exchange-v4.0.7.ez 
 
启用插件 
1 2 cd  /usr/local/rabbitmq_server-4.0.7/sbin/./rabbitmq-plugins enable  rabbitmq_delayed_message_exchange  
 
查询安装情况 
 
重启Rabbitmq使其生效
 
 
实现原理 消息发送后不会直接投递到队列,而是先存储到内嵌的 Mnesia数据库中,然后会检查 x-delay 时间(消息头部),将过期的消息放到死信队列中。延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;
Mnesia 是一个小型数据库,不适合于大量延迟消息的实现。解决了消息过期时间不一致出现的问题。
实现延时队列 消息只要发送到延时交换机即可,延时交换机绑定死信路由的key
定义常量 
1 2 3 4 5 6 public  class  RabbitMQConstant  {         public  static  final  String  EXCHANGE_NAME  =  "exchange.delay.4" ;     public  static  final  String  QUEUE_NAME  =  "queue.delay.4" ;     public  static  final  String  ROUTING_NAME  =  "plugin4" ; } 
 
定义MQ 
注意延时交换机需要使用自定义类型定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration public  class  RabbitConfig  {         @Bean      public  CustomExchange customExchange ()  {         Map<String, Object> arguments = new  HashMap <>();         arguments.put("x-delayed-type" , "direct" );                            return  new  CustomExchange (RabbitMQConstant.EXCHANGE_NAME,                                    "x-delayed-message" , true , false , arguments);     }     @Bean      public  Queue queue ()  {         return  QueueBuilder                 .durable(RabbitMQConstant.QUEUE_NAME)                  .build();     }     @Bean      public  Binding binding (CustomExchange customExchange, Queue queue)  {                  return  BindingBuilder.bind(queue).to(customExchange).with(RabbitMQConstant.ROUTING_NAME).noargs();     } } 
 
定义生产者 
生产者发送消息时要在headers中添加过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Service @Slf4j public  class  SendMessageService  {    @Autowired      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         {             MessageProperties  messageProperties  =  new  MessageProperties ();             messageProperties.setHeader("x-delay" , 25000 );                          Message  message  =  MessageBuilder.withBody("hello world 1" .getBytes())                     .andProperties(messageProperties)                     .build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                            RabbitMQConstant.ROUTING_NAME, message);             log.info("消息发送完毕,发送时间为:{}" , new  Date ());         }         {             MessageProperties  messageProperties  =  new  MessageProperties ();             messageProperties.setHeader("x-delay" , 15000 );                           Message  message  =  MessageBuilder.withBody("hello world 2" .getBytes())                     .andProperties(messageProperties)                     .build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                            RabbitMQConstant.ROUTING_NAME, message);             log.info("消息发送完毕,发送时间为:{}" , new  Date ());         }     } } 
 
定义消费者 
1 2 3 4 5 6 7 8 9 10 @Component @Slf4j public  class  ReceiveMessageService  {         @RabbitListener(queues = RabbitMQConstant.QUEUE_NAME)      public  void  receiveMsg (Message message)  {         String  body  =  new  String (message.getBody());         log.info("接收到的消息为:{},接收时间为:{}" , body, new  Date ());     } } 
 
 
RabbitMQ 死信队列 过期消息(Time To Live,TTL) 消息的过期时间有两种设置方式:(过期消息)
单条消息过期 :单条消息的过期时间决定了在没有任何消费者消费时,消息可以存活多久; 
队列属性设置所有消息过期 :队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存活多久; 
 
如果消息和对列都设置过期时间,则消息的TTL以两者之间较小的那个数值为准。
设置消息过期时间 定义常量 
1 2 3 4 5 public  class  RabbitMQConstant  {    public  static  final  String  EXCHANGE_NAME  =  "exchange.ttl.a" ;     public  static  final  String  QUEUE_TNAME  =  "queue.ttl.a" ;     public  static  final  String  ROUTING_KEY  =  "info" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Configuration public  class  RabbitConfig  {         @Bean      public  DirectExchange directExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }          @Bean      public  Queue queue ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_TNAME).build();     }          @Bean      public  Binding binding (DirectExchange directExchange, Queue queue)  {         return  BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;          public  void  sendMsg ()  {         MessageProperties  messageProperties  =  new  MessageProperties ();         messageProperties.setExpiration("35000" );          Message  message  =  MessageBuilder.withBody("hello world" .getBytes())                 .andProperties(messageProperties).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                        RabbitMQConstant.ROUTING_KEY, message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
测试结果 :发送消息后查看 queue.ttl.a 有一条数据,35秒后查看发现数据丢失。
设置队列消息过期时间 定义常量 
1 2 3 4 5 public  class  RabbitMQConstant  {    public  static  final  String  EXCHANGE_NAME  =  "exchange.ttl.b" ;     public  static  final  String  QUEUE_TNAME  =  "queue.ttl.b" ;     public  static  final  String  ROUTING_KEY  =  "info" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Configuration public  class  RabbitConfig  {         @Bean      public  DirectExchange directExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }          @Bean      public  Queue queue ()  {                  Map<String, Object> arguments = new  HashMap <>();         arguments.put("x-message-ttl" , 15000 );          return  new  Queue (RabbitMQConstant.QUEUE_TNAME, true , false , false , arguments);                       }          @Bean      public  Binding binding (DirectExchange directExchange, Queue queue)  {         return  BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstant.ROUTING_KEY);     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                        RabbitMQConstant.ROUTING_KEY, message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
测试结果 :先后发送两条消息,发送消息后查看 queue.ttl.b 有2条数据,第1条消息到期后消失,第2条消息到期后消失
死信队列(Dead Letter Exchange,DLX) 单条消息过期 消息设置了过期时间,在到达过期时间后消息没有被消费:可能是因为消息积压太多,消息消费不过来;也可能是没有对应的消费者。
定义常量 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  class  RabbitMQConstant  {         public  static  final  String  EXCHANGE_NORMAL_NAME  =  "exchange.normal.1" ;          public  static  final  String  QUEUE_NORMAL_NAME  =  "queue.normal.1" ;          public  static  final  String  EXCHANGE_DLX_NAME  =  "exchange.dlx.1" ;          public  static  final  String  QUEUE_DLX_NAME  =  "queue.dlx.1" ;          public  static  final  String  NORMAL_KEY  =  "order1" ;          public  static  final  String  DLX_KEY  =  "error1" ; }  
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Configuration public  class  RabbitConfig  {         @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME).build();     }          @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();                           arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME);                  arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.DLX_KEY);         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)                 .withArguments(arguments)                      .build();     }          @Bean      public  Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue)  {         return  BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.NORMAL_KEY);     }          @Bean      public  DirectExchange dlxExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();     }          @Bean      public  Queue dlxQueue ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME).build();     }          @Bean      public  Binding bindingDlx (DirectExchange dlxExchange, Queue dlxQueue)  {         return  BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.DLX_KEY);     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;          public  void  sendMsg ()  {                  MessageProperties  messageProperties  =  new  MessageProperties ();                  messageProperties.setExpiration("15000" );         Message  message  =  MessageBuilder.withBody("hello world" .getBytes())                 .andProperties(messageProperties).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,                                        RabbitMQConstant.NORMAL_KEY, message);         log.info("消息发送完毕:发送时间为:{}" , new  Date ());     } } 
 
测试结果 :发送的消息先出现在正常队列,待过期后消息会出现在死信队列
队列设置过期 队列设置了过期时间,在到达过期时间后消息没有被消费:可能是因为消息积压太多,消息消费不过来;也可能是没有对应的消费者。
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration public  class  RabbitConfig  {              @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();                  arguments.put("x-message-ttl" , 20000 );                                    arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME);                    arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.DLX_KEY);         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).withArguments(arguments).build();     }      } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,                                        RabbitMQConstant.NORMAL_KEY, message);         log.info("消息发送完毕:发送时间为:{}" , new  Date ());     } } 
 
测试结果 :发送的消息先出现在正常队列,待过期后消息会出现在死信队列
队列到达最大长度 先入队的消息会被发送到DLX
定义常量 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  class  RabbitMQConstant  {         public  static  final  String  EXCHANGE_NORMAL_NAME  =  "exchange.normal.2" ;          public  static  final  String  QUEUE_NORMAL_NAME  =  "queue.normal.2" ;          public  static  final  String  EXCHANGE_DLX_NAME  =  "exchange.dlx.2" ;          public  static  final  String  QUEUE_DLX_NAME  =  "queue.dlx.2" ;          public  static  final  String  NORMAL_KEY  =  "order2" ;          public  static  final  String  DLX_KEY  =  "error2" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration public  class  RabbitConfig  {              @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();                  arguments.put("x-max-length" , 5 );                           arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME);                  arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.DLX_KEY);         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)                 .withArguments(arguments)                  .build();     }      } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         for  (int  i  =  1 ; i <= 8 ; i++) {             String  str  =  "hello world "  + i;             Message  message  =  MessageBuilder.withBody(str.getBytes()).build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,                                           RabbitMQConstant.NORMAL_KEY, message);         }         log.info("消息发送完毕:发送时间为:{}" , new  Date ());     } } 
 
测试结果 :超过正常队列的长度后先入队的消息会放入死信队列
消费消息不进行重新投递 
从正常的队列接收消息,但对消息不进行确认并且不对消息进行重新投递,此时消息就进入死信队列 
业务处理过程中出现异常也会变成死信,因为消费者没有进行确认 
 
配置MQ 
需要开启消费者手动确认模式,否则默认是消息消费后自动确认的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 server:   port:  8080  spring:   application:      name:  dlx-learn4    rabbitmq:      host:  192.168 .1 .101      port:  5672      username:  admin      password:  123456      virtual-host:  longdidi           listener:        simple:          acknowledge-mode:  manual  
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration public  class  RabbitConfig  {              @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();         arguments.put("x-message-ttl" , 15000 );                            arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME);                  arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.DLX_KEY);         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME)                 .withArguments(arguments)                  .build();     }      } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         String  str  =  "hello world" ;         Message  message  =  MessageBuilder.withBody(str.getBytes()).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,                                        RabbitMQConstant.NORMAL_KEY, message);         log.info("消息发送完毕:发送时间为:{}" , new  Date ());     } } 
 
定义消费者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Service @Slf4j public  class  ReceiveMessageService  {         @RabbitListener(queues = {RabbitMQConstant.QUEUE_NORMAL_NAME})      public  void  receiveMsg (Message message, Channel channel)  {         System.out.println("接收到的消息:"  + message);                                                               MessageProperties  messageProperties  =  message.getMessageProperties();                  long  deliveryTag  =  messageProperties.getDeliveryTag();         try  {             byte [] body = message.getBody();             String  str  =  new  String (body);             log.info("接收到的消息为:{},接收时间为:{}" , str, new  Date ());                                       int  a  =  1  / 0 ;                          channel.basicAck(deliveryTag, false );         } catch  (Exception e) {             log.error("接收着出现问题:{}" , e.getMessage());             try  {                                  channel.basicNack(deliveryTag, false , false );             } catch  (IOException ex) {                 throw  new  RuntimeException (ex);             }             throw  new  RuntimeException (e);         }     } } 
 
测试结果 :模拟异常,发现消息未重新入队,而是进入了死信队列
消费者拒绝消息 开启手动确认模式并拒绝消息,不重新投递,则进入死信队列
配置yaml时需要开启消费者手动确认模式,配置同上述消息不进行重新投递
定义MQ 
这里的队列无需设置队列的过期时间,因为消费者拒绝后如果不重新投递就直接进入死信队列
1 2 3 4 5 6 7 8 9 10 11 12 13 @Configuration public  class  RabbitConfig  {              @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();         arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME);         arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.QUEUE_DLX_NAME);         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).withArguments(arguments).build();     }      } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         String  str  =  "hello world" ;         Message  message  =  MessageBuilder.withBody(str.getBytes()).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NORMAL_NAME,                                        RabbitMQConstant.NORMAL_KEY, message);         log.info("消息发送完毕:发送时间为:{}" , new  Date ());     } } 
 
定义消费者 
消费者拒绝消息后不进行重新投递
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf4j @Service public  class  ReceiveMessageService  {         @RabbitListener(queues = {RabbitMQConstant.QUEUE_NORMAL_NAME})      public  void  receiveMsg (Message message, Channel channel)  {         System.out.println("接收到的消息:"  + message);                  MessageProperties  messageProperties  =  message.getMessageProperties();                  long  deliveryTag  =  messageProperties.getDeliveryTag();         try  {             byte [] body = message.getBody();             String  str  =  new  String (body);             log.info("接收到的消息为:{},接收时间为:{}" , str, new  Date ());                                       int  a  =  1  / 0 ;                          channel.basicAck(deliveryTag, false );         } catch  (Exception e) {             log.error("接收着出现问题:{}" , e.getMessage());             try  {                                  channel.basicReject(deliveryTag, false );             } catch  (IOException ex) {                 throw  new  RuntimeException (ex);             }             throw  new  RuntimeException (e);         }     } } 
 
测试结果 :拒绝后的消息不再入队,正常和死信队列都没有了
死信应用场景 
 
RabbitMQ 队列 队列属性 
Type:设置队列的队列类型;
 
Name:队列名称,就是一个字符串,随便一个字符串就可以;
 
Durability:声明队列是否持久化,代表队列在服务器重启后是否还存在;
 
Auto delete:是否自动删除。true表示当没有消费者连接到这个队列时,队列会自动删除;
 
Exclusive:该属性的队列只对第一个连接它的消费者可见(之后其它消费者无法访问该队列),连接断开时自动删除。基本设置成false
 
Arguments:队列的其他属性,例如指定DLX(死信交换机等);
x-expires:Number:当Queue(队列)在指定的时间未被访问则队列将被自动删除
 
x-message-ttl:Number:发布的消息在队列中存在多长时间后被取消(单位毫秒)
 
x-overflow:String:设置队列溢出行为,当达到队列的最大长度时消息的处理方式
drop-head:删除头部消息 
reject-publish:超过队列长度后,后面发布的消息会被拒绝接收 
reject-publish-dlx:超过队列长度后,后面发布的消息会被拒绝接收并发布到死信交换机 
 
仲裁队列类型仅支持:drop-head and reject-publish两种
 
x-max-length:Number:队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息
 
x-max-length-bytes:Number:队列在开始从头部删除就绪消息之前可以包含的总正文大小。受限于内存大小,超过该阈值则从队列头部开始删除消息
 
x-single-active-consumer:表示队列是否是只能有一个消费者。设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略设置为false时消息循环分发给所有消费者(默认false)
 
x-dead-letter-exchange:String:指定队列关联的死信交换机。队列消息达上限后溢出消息不被删掉,而保存到另一队列;
 
x-dead-letter-routing-key:String:指定死信交换机的路由键,一般和7一起定义
 
x-queue-mode:String:队列类型x-queue-mode=lazy懒队列,在磁盘上尽可能多地保留消息以减少RAM使用。如果未设置则队列将保留内存缓存以尽可能快地传递消息;如果设置则队列消息将保存在磁盘上,消费时从磁盘上读取消费
 
x-queue-master-locator:String(用的较少):在集群模式下设置队列分配到的主节点位置信息;
每个queue都有一个master节点,所有对于queue的操作都是事先在master上完成,之后再slave上进行相同的操作;
每个不同的queue可以坐落在不同的集群节点上,这些queue如果配置了镜像队列,那么会有1个master和多个slave
基本上所有的操作都落在master上,那么如果这些queues的master都落在个别的服务节点上,而其他的节点又很空闲,这样就无法做到负载均衡,那么势必会影响性能;
关于master queue host 的分配有几种策略,可以在queue声明的时候使用x-queue-master-locator参数,或者在policy上设置queue-master-locator,或者直接在rabbitmq的配置文件中定义queue_master_locator有三种可供选择的策略
min-masters:选择master queue数最少的那个服务节点host; 
client-local:选择与client相连接的那个服务节点host; 
random:随机分配; 
 
 
 
 
 
属性测试 非持久化属性测试 定义常量 
1 2 3 4 5 6 7 8 9 10 11 public  class  RabbitMQConstant  {         public  static  final  String  EXCHANGE_NAME  =  "exchange.properties.1" ;          public  static  final  String  QUEUE_NAME1  =  "queue.properties.test1.1" ;     public  static  final  String  QUEUE_NAME2  =  "queue.properties.test1.2" ;          public  static  final  String  ROUTING_NAME1  =  "key.properties.test1.1" ;     public  static  final  String  ROUTING_NAME2  =  "key.properties.test1.2" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Configuration public  class  RabbitConfig  {    @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }          @Bean      public  Queue normalQueue1 ()  {         return  QueueBuilder.nonDurable().build();     }          @Bean      public  Queue normalQueue2 ()  {         return  QueueBuilder.nonDurable(RabbitMQConstant.QUEUE_NAME1).build();     }     @Bean      public  Binding bindingNormal1 (DirectExchange normalExchange, Queue normalQueue1)  {         return  BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);     }     @Bean      public  Binding bindingNormal2 (DirectExchange normalExchange, Queue normalQueue2)  {         return  BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);     } } 
 
定义生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Service @Slf4j public  class  SendMessageService  {    @Autowired      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         {             Message  message  =  MessageBuilder.withBody("hello world1" .getBytes()).build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                            RabbitMQConstant.ROUTING_NAME1, message);             log.info("消息发送完毕,发送时间为:{}" , new  Date ());         }         {             Message  message  =  MessageBuilder.withBody("hello world2" .getBytes()).build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                            RabbitMQConstant.ROUTING_NAME2, message);             log.info("消息发送完毕,发送时间为:{}" , new  Date ());         }     } } 
 
测试结果 
1 2 发行消息查看控制台发现。设置了名字的会使用自定义名字,未设置名字的队列会自动生成名字 不持久化的队列在服务器重启后队列会消失 
 
持久化测试 是否自动删除。如果为true,当没有消费者连接到这个队列的时候,队列会自动删除
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Configuration public  class  RabbitConfig  {                   @Bean      public  Queue normalQueue1 ()  {         return  QueueBuilder.durable().build();     }          @Bean      public  Queue normalQueue2 ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).build();     }      } 
 
测试结果 
1 2 发行消息查看控制台发现。设置了名字的会使用自定义名字,未设置名字的队列会自动生成名字 持久化的队列在服务器重启后队列依然存在 
 
自动删除测试 定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration public  class  RabbitConfig  {              @Bean      public  Queue normalQueue1 ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)             .autoDelete()             .build();     }          @Bean      public  Queue normalQueue2 ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME2)                          .build();     }      } 
 
消费者 
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public  class  ReceivemessageService  {    @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})      public  void  receiveMsg1 (Message message)  {         log.info("1接收到的消息为:{}" , new  String (message.getBody()));     }     @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME2})      public  void  receiveMsg2 (Message message)  {         log.info("2接收到的消息为:{}" , new  String (message.getBody()));     } } 
 
测试结果 :设置为自动删除的队列1在没有消费者连接后被自动删除
可见性测试 普通队列允许的消费者没有限制,多个消费者绑定到同一个队列时,RabbitMQ会采用轮询进行投递。如果需要消费者独占队列,在队列创建的时候,设定属性exclusive为true。
exclusive有两个作用
如果设置为false则可以使用两个消费者都访问同一个队列,没有任何问题;如果设置为true则会对当前队列加锁,其他连接connection是不能访问的,同一个连接的不同channel是可以访问的。如果强制访问会报如下异常:
1 It could be originally declared on another connection or the exclusive property value does not match that of the original declaration., class-id=60, method-id=20) 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Configuration public  class  RabbitConfig  {    @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }     @Bean      public  Queue normalQueue1 ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)                 .exclusive()                                  .build();     }     @Bean      public  Binding bindingNormal1 (DirectExchange normalExchange, Queue normalQueue1)  {         return  BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);     } } 
 
消费者 
这里的消费者表示在同一个Connection中消费消息的多个消费者,测试是否同一个Connection中的多个消费者可以消费。代码同上。
发送消息 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @SpringBootApplication public  class  Rabbitmq08Properties04Application  implements  ApplicationRunner  {    public  static  void  main (String[] args)  {         SpringApplication.run(Rabbitmq08Properties04Application.class, args);     }     @Resource      private  SendMessageService sendMessageService;          @Override      public  void  run (ApplicationArguments args)  throws  Exception {         sendMessageService.sendMsg();         {                 ConnectionFactory  factory  =  new  ConnectionFactory ();                          factory.setHost("192.168.1.101" );             factory.setPort(5672 );             factory.setUsername("admin" );             factory.setPassword("123456" );             factory.setVirtualHost("longdidi" );             Connection  connection  =  factory.newConnection();                          Channel  channel  =  connection.createChannel();             DefaultConsumer  consumer  =  new  DefaultConsumer (channel) {                                  @Override                  public  void  handleDelivery (String consumerTag, Envelope envelope,                                              AMQP.BasicProperties properties,                                             byte [] body)  throws  IOException {                    System.out.println("连接1接收到信息:"  + new  String (body));                 }             };             channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true , consumer);                          channel.close();             connection.close();         }         {                          ConnectionFactory  factory  =  new  ConnectionFactory ();                          factory.setHost("192.168.1.101" );             factory.setPort(5672 );             factory.setUsername("admin" );             factory.setPassword("123456" );             factory.setVirtualHost("longdidi" );             Connection  connection  =  factory.newConnection();                          Channel  channel  =  connection.createChannel();             DefaultConsumer  consumer  =  new  DefaultConsumer (channel) {                                  @Override                  public  void  handleDelivery (String consumerTag, Envelope envelope,                                             AMQP.BasicProperties properties,                                             byte [] body)  throws  IOException {                    System.out.println("连接2接收到信息:"  + new  String (body));                 }             };             channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true , consumer);                          channel.close();             connection.close();         }     } } 
 
测试结果 
1 2 3 4 5 6 7 1.启动程序,如果有多个Connection连接会报错:channel error;protocol method:#method... 2.注释掉主程序中的两个Connection连接代码重启测试,发现同一Connection内的多个消费者可以连接消费,输出如下 消息发送完毕,发送时间为:Thu Jul 31 14:20:30 GMT+08:00 2025 消息发送完毕,发送时间为:Thu Jul 31 14:20:30 GMT+08:00 2025 消费者1接收到的消息为:hello world2 消费者2接收到的消息为:hello world1 3.停止程序运行查看队列是否自动删除:当连接关闭时队列自动删除 
 
Arguments测试 删除属性测试 x-expires属性:当Queue(队列)在指定的时间未被访问则队列将被自动删除。
定义常量 
1 2 3 4 5 6 7 8 9 10 11 public  class  RabbitMQConstant  {         public  static  final  String  EXCHANGE_NAME  =  "exchange.properties.normal.1" ;     public  static  final  String  EXCHANGE_DLX_NAME  =  "exchange.properties.dlx.1" ;          public  static  final  String  QUEUE_NAME1  =  "queue.properties.normal.1" ;     public  static  final  String  QUEUE_DLX_NAME1  =  "queue.properties.dlx.1" ;          public  static  final  String  ROUTING_NAME1  =  "key.properties.normal.1" ;     public  static  final  String  ROUTING_DLX_NAME1  =  "key.properties.dlx.1" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration public  class  RabbitConfig  {         @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }          @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();         arguments.put("x-expires" , 10000L );          return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();     }          @Bean      public  Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue)  {         return  BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);     } } 
 
测试结果 :启动应用查看控制台,生成了队列,等待10s之后刷新控制台,队列被删除
设置队列过期时间 发布的消息在队列中存在多长时间后被取消(单位毫秒),参考 队列设置过期 
设置队列长度 x-max-length:Number:队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息。参考 队列到达最大长度 
设置队列溢出行为 当达到队列的最大长度时消息的处理方式:有效值为drop-head(删除头部消息)、reject-publish(拒绝发布)或reject-publish-dlx(拒绝发布到死信交换机)。仲裁队列类型仅支持drop-head and reject-publish两种。
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Configuration public  class  RabbitConfig  {         @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }          @Bean      public  Queue normalQueue ()  {                           Map<String, Object> arguments = new  HashMap <>();         arguments.put("x-max-length" , 5 );                                                      arguments.put("x-overflow" , "reject-publish-dlx" );         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)                 .withArguments(arguments)                 .deadLetterExchange(RabbitMQConstant.EXCHANGE_DLX_NAME)                 .deadLetterRoutingKey(RabbitMQConstant.ROUTING_DLX_NAME1)                 .build();     }          @Bean      public  Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue)  {         return  BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);     }          @Bean      public  DirectExchange dlxExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();     }          @Bean      public  Queue dlxQueue ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME1).build();     }          @Bean      public  Binding bindingDlx (DirectExchange dlxExchange, Queue dlxQueue)  {         return  BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME1);     } } 
 
测试结果 
1 2 3 4 5 6 1.当属性设置为drop-head时 如果超过队列长度,先入队的消息会被先删除(如果配置了死信交换机则会移至死信交换机) 2.当属性设置为reject-publish时 该模式下当队列达到最天长度后会拒绝收消息,也不会将队头的消息移至死信队列(如果配置了死信队列的话),超长的消息未被接收 3.当属性设置为reject-publish-dlx时 拒绝接收后面的消息并将拒绝的消息放到死信交换机中,后面发送的消息会存储到死信队列中 
 
设置队列内存大小 x-max-length-bytes:Number:队列在开始从头部删除就绪消息之前可以包含的总正文大小。受限于内存大小,超过该阈值则从队列头部开始删除消息
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public  class  RabbitConfig  {    @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }     @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();         arguments.put("x-max-length-bytes" , 30 );          return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();     }     @Bean      public  Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue)  {         return  BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);     } } 
 
生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Service @Slf4j public  class  SendMessageService  {    @Autowired      private  RabbitTemplate rabbitTemplate;     public  void  sendMsg ()  {         for  (int  i  =  0 ; i < 3 ; i++) {             String  str  =  "你好我好大家好"  + i;             Message  message  =  MessageBuilder.withBody(str.getBytes()).build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                            RabbitMQConstant.ROUTING_NAME1, message);             log.info("消息发送完毕,发送时间为:{}" , new  Date ());         }         for  (int  i  =  0 ; i < 3 ; i++) {             String  str  =  "hello"  + i;             Message  message  =  MessageBuilder.withBody(str.getBytes()).build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                            RabbitMQConstant.ROUTING_NAME1, message);             log.info("消息发送完毕,发送时间为:{}" , new  Date ());         }     } } 
 
测试结果 :队列中只剩3条信息,先入队的超过内存的限制的消息被删除了
设置单一消费者 表示队列是否是只能有一个消费者。设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略;设置为false时消息循环分发给所有消费者(默认false)。
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public  class  RabbitConfig  {    @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }     @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();         arguments.put("x-single-active-consumer" , true );         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();     }     @Bean      public  Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue)  {         return  BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);     } } 
 
消费者 
1 2 3 4 5 6 7 8 9 10 11 12 @Component @Slf4j public  class  ReceivemessageService  {    @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})      public  void  receiveMsg1 (Message message)  {         log.info("消费者1接收到的消息为:{}" , new  String (message.getBody()));     }     @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})      public  void  receiveMsg2 (Message message)  {         log.info("消费者2接收到的消息为:{}" , new  String (message.getBody()));     } } 
 
测试结果 :只有消费者1收到了消息
设置死信交换机 定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Configuration public  class  RabbitConfig  {              @Bean          Map<String, Object> arguments = new  HashMap <>();         arguments.put("x-dead-letter-exchange" , RabbitMQConstant.EXCHANGE_DLX_NAME);         arguments.put("x-dead-letter-routing-key" , RabbitMQConstant.ROUTING_DLX_NAME1);         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)                 .ttl(5000 )                  .withArguments(arguments)                 .build();     }      } 
 
测试结果 :超时的消息放入了死信队列
 
RabbitMQ 消息可靠性 生产者确认 Confirm模式简介 
代表消息从生产者发送到Exchange; 
代表消息从Exchange路由到Queue; 
代表消息在Queue中存储; 
代表消费者监听Queue并消费消息; 
 
可能因为网络或者Broker的问题导致①失败,而此时应该让生产者知道消息是否正确发送到了Broker的exchange中;
有两种解决方案:
开启Confirm(确认)模式;(异步) 
开启Transaction(事务)模式;(性能低,实际项目中很少用) 
 
消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器 Broker里面的exchange交换机,则会给生产者一个应答生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;
Confirm模式实现 
开启生产者确认模式
1 2 3 4 5 6 7 8 spring:   rabbitmq:      host:  192.168 .10 .11      port:  5672      username:  admin      password:  123456      virtual-host:  longdidi      publisher-confirm-type:  correlated   
 
实现RabbitTemplate.ConfirmCallback交界口,重写confirm()方法
判断成功和失败的ack结果,可以根据具体的结果,如果ack为false,对消息进行重新发送或记录日志等处理
 
设置rabbitTemplate的确认回调方法
1 rabbitTemplate.setConfirmCallback(messageConfirmCallBack); 
 
 
定义常量 
1 2 3 4 5 6 7 8 public  class  RabbitMQConstant  {         public  static  final  String  EXCHANGE_NAME  =  "exchange.confirm.normal.1" ;          public  static  final  String  QUEUE_NAME1  =  "queue.confirm.normal.1" ;          public  static  final  String  ROUTING_NAME1  =  "key.confirm.normal.1" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Configuration public  class  RabbitConfig  {    @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();     }     @Bean      public  Queue normalQueue ()  {         Map<String, Object> arguments = new  HashMap <>();         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();     }     @Bean      public  Binding bindingNormal (DirectExchange normalExchange, Queue normalQueue)  {         return  BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);     } } 
 
外部类实现 生产者 :设置回调方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Service @Slf4j public  class  SendMessageService  {    @Autowired      private  RabbitTemplate rabbitTemplate;     @Resource      private  MyConfirmCallBack confirmCallBack;          @PostConstruct      public  void  init ()  {         rabbitTemplate.setConfirmCallback(confirmCallBack);     }     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();                  CorrelationData  correlationData  =  new  CorrelationData ();                   correlationData.setId("order_123456" );          rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error" ,                                        RabbitMQConstant.ROUTING_NAME1, message, correlationData);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
实现接口 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Component @Slf4j public  class  MyConfirmCallBack  implements  RabbitTemplate .ConfirmCallback {         @Override      public  void  confirm (CorrelationData correlationData, boolean  ack, String cause)  {         log.info("关联id为:{}" , correlationData.getId() + "" );         if  (ack) {             log.info("消息正确的达到交换机" );             return ;         } else  {                          log.error("消息没有到达交换机,原因为:{}" , cause);         }     } } 
 
生产者实现 生产者 :生产者直接实现RabbitTemplate.ConfirmCallback接口,重写confirm()方法并设置回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Service @Slf4j public  class  SendMessageService  implements  RabbitTemplate .ConfirmCallback {    @Resource      private  RabbitTemplate rabbitTemplate;     @PostConstruct      public  void  init ()  {         rabbitTemplate.setConfirmCallback(this );     }     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();         CorrelationData  correlationData  =  new  CorrelationData ();          correlationData.setId("order_123456" );          rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error" ,                                       RabbitMQConstant.QUEUE_NAME1, message, correlationData);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     }     @Override      public  void  confirm (CorrelationData correlationData, boolean  ack, String cause)  {         log.info("关联id为:{}" , correlationData.getId() + "" );         if  (ack) {             log.info("消息正确的达到交换机" );             return ;         }         log.error("消息没有到达交换机,原因为:{}" , cause);     } } 
 
匿名内部内实现 生产者 :生产者使用匿名内部类实现RabbitTemplate.ConfirmCallback接口,重写confirm()方法并设置回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Service @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     @PostConstruct      public  void  init ()  {         rabbitTemplate.setConfirmCallback(                 new  RabbitTemplate .ConfirmCallback() {                     @Override                      public  void  confirm (CorrelationData correlationData, boolean  ack, String cause)  {                         log.info("关联id为:{}" , correlationData.getId() + "" );                         if  (ack) {                             log.info("消息正确的达到交换机" );                             return ;                         }                         log.error("消息没有到达交换机,原因为:{}" , cause);                     }                 }         );     }     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();         CorrelationData  correlationData  =  new  CorrelationData ();          correlationData.setId("order_123456" );          rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error" ,                                        RabbitMQConstant.QUEUE_NAME1, message, correlationData);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
Lambad 实现 生产者 :生产者使用lambda实现RabbitTemplate.ConfirmCallback接口,重写confirm()方法并设置回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Service @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     @PostConstruct      public  void  init ()  {         rabbitTemplate.setConfirmCallback(                          (correlationData, ack, cause) -> {                 log.info("关联id为:{}" , correlationData.getId() + "" );                 if  (ack) {                     log.info("消息正确的达到交换机" );                     return ;                 }                 log.error("消息没有到达交换机,原因为:{}" , cause);             }         );     }     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();         CorrelationData  correlationData  =  new  CorrelationData ();          correlationData.setId("order_123456" );          rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME + "error" ,                                       RabbitMQConstant.ROUTING_NAME1, message, correlationData);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
测试结果 :写错交换机的名字,查看日志输出
1 2 关联id为:order_123456 消息没有到达交换机,原因为:channel error;protocol method:#method<channel 
 
交换机确认 可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败;
使用return模式可以实现消息无法路由的时候返回给生产者,消息从 exchange –> queue 投递失败则会返回一个 returnCallback,利用这个callback控制消息的可靠性投递;
return模式实现 
开启return确认模式
1 2 3 4 5 6 7 8 9 spring:   rabbitmq:      host:  192.168 .10 .11      port:  5672      username:  admin      password:  123456      virtual-host:  longdidi      publisher-confirm-type:  correlated       publisher-returns:  true   
 
实现RabbitTemplate.ReturnsCallback接口并重写returnedMessage()方法
 
设置回调
使用rabbitTemplate.setReturnCallback设置退回函数
当消息从exchange路由到queue失败后,则会将消息退回给producer,并执行回调函数returnedMessage;
 
 
定义常量 
1 2 3 4 5 6 7 8 public  class  RabbitMQConstant  {         public  static  final  String  EXCHANGE_NAME  =  "exchange.return.normal.1" ;          public  static  final  String  QUEUE_NAME1  =  "queue.return.normal.1" ;          public  static  final  String  ROUTING_NAME1  =  "key.return.normal.1" ; } 
 
外部类实现 生产者 :设置回调方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Service @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     @Resource      private  MyReturnCallBack myReturnCallBack;     @PostConstruct      public  void  init ()  {         rabbitTemplate.setReturnsCallback(myReturnCallBack);      }     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                        RabbitMQConstant.ROUTING_NAME1 + "error" , message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
外部实现接口 
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component @Slf4j public  class  MyReturnCallBack  implements  RabbitTemplate .ReturnsCallback {         @Override      public  void  returnedMessage (ReturnedMessage returnedMessage)  {         log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}" , returnedMessage.getReplyText());     } } 
 
生产者实现 生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Service @Slf4j public  class  SendMessageService  implements  RabbitTemplate .ReturnsCallback {    @Resource      private  RabbitTemplate rabbitTemplate;     @PostConstruct      public  void  init ()  {         rabbitTemplate.setReturnsCallback(this );      }     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes())                 .build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                        RabbitMQConstant.ROUTING_NAME1 + "error" , message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     }     @Override      public  void  returnedMessage (ReturnedMessage returnedMessage)  {         log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}" , returnedMessage.getReplyText());     } } 
 
匿名内部类实现 生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Service @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     @PostConstruct      public  void  init ()  {         rabbitTemplate.setReturnsCallback(                                  new  RabbitTemplate .ReturnsCallback() {                     @Override                      public  void  returnedMessage (ReturnedMessage returnedMessage)  {                         log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}" ,                                    returnedMessage.getReplyText());                     }                 }         );      }     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes()).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                        RabbitMQConstant.ROUTING_NAME1 + "error" , message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
lambda实现 生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Service @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     @PostConstruct      public  void  init ()  {         rabbitTemplate.setReturnsCallback(                 message -> {                     log.error("消息从交换机没有正确的路由到(投递到)队列,原因为:{}" , message.getReplyText());                 }         );     }     public  void  sendMsg ()  {         Message  message  =  MessageBuilder.withBody("hello world" .getBytes())                 .build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                        RabbitMQConstant.ROUTING_NAME1 + "error" , message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
测试结果 
1 2 消息发送完毕,发送时间为:Thu Jul 31 15:32:48 GMT+08:00 2025 消息从交换机没有正确的路由到(投递到)队列,原因为:NO_ROUTE 
 
使用备用交换机 
使用备份交换机(alternate-exchange),无法路由的消息会发送到这个备用交换机上;
定义常量 
1 2 3 4 5 6 7 8 9 10 11 public  class  RabbitMQConstant  {         public  static  final  String  EXCHANGE_NORMAL_NAME  =  "exchange.normal.backup.1" ;          public  static  final  String  EXCHANGE_BACKUP_NAME  =  "exchange.backup.1" ;          public  static  final  String  QUEUE_NORMAL_NAME  =  "queue.normal.backup.1" ;          public  static  final  String  QUEUE_BACKUP_NAME  =  "queue.backup.1" ;     public  static  final  String  ROUTING_WARNING_KEY  =  "info1" ; } 
 
定义MQ 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Configuration public  class  RabbitConfig  {    @Bean      public  DirectExchange normalExchange ()  {         return  ExchangeBuilder                  .directExchange(RabbitMQConstant.EXCHANGE_NORMAL_NAME)                  .alternate(RabbitMQConstant.EXCHANGE_BACKUP_NAME)                  .build();     }     @Bean      public  Queue queueNormal ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_NORMAL_NAME).build();     }          @Bean      public  Binding binding (DirectExchange normalExchange, Queue queueNormal)  {         return  BindingBuilder.bind(queueNormal)             .to(normalExchange)             .with(RabbitMQConstant.ROUTING_WARNING_KEY);     }     @Bean      public  FanoutExchange alternateExchange ()  {         return  ExchangeBuilder.fanoutExchange(RabbitMQConstant.EXCHANGE_BACKUP_NAME).build();     }     @Bean      public  Queue alternateQueue ()  {         return  QueueBuilder.durable(RabbitMQConstant.QUEUE_BACKUP_NAME).build();     }     @Bean      public  Binding bindingAlternate (FanoutExchange alternateExchange, Queue alternateQueue)  {         return  BindingBuilder.bind(alternateQueue).to(alternateExchange);     } } 
 
测试结果 :消息存储到了备用队列
消息持久化存储 消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;
可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;
解决方案:
队列持久化
1 QueueBuilder.durable("队列名称" ).build(); 
 
交换机持久化
1 ExchangeBuilder.directExchange("交换机名称" ).durable(true ).build(); 
 
消息持久化,默认就是持久化的
1 2 3 MessageProperties  messageProperties  =  new  MessageProperties ();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 
 
 
消费者手动确认 手动确认消息 
采用消息消费时的手动ack确认机制来保证;如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制 (message acknowledgement);
1 2 3 4 5 6 7 8 9 10 11 12 13 spring:   rabbitmq:      host:  192.168 .10 .11      port:  5672      username:  admin      password:  123456      virtual-host:  longdidi      publisher-confirm-type:  correlated       publisher-returns:  true            listener:        simple:          acknowledge-mode:  manual  
 
消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);
生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Service @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;     @PostConstruct      public  void  init ()  {                  rabbitTemplate.setConfirmCallback(                 (correlationData, ack, cause) -> {                     if  (!ack) {                         log.error("消息没有到达交换机,原因为:{}" , cause);                                              }                 }         );                  rabbitTemplate.setReturnsCallback(                 returnedMessage -> {                     log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}" , returnedMessage.getReplyText());                                      }         );     }     public  void  sendMsg ()  {         MessageProperties  messageProperties  =  new  MessageProperties ();                  messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);         Message  message  =  MessageBuilder.withBody("hello world" .getBytes())                 .andProperties(messageProperties).build();         rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                        RabbitMQConstant.ROUTING_NAME1+"1" , message);         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
消费者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Component @Slf4j public  class  ReceiveMessageService  {    @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})      public  void  receiveMsg (Message message, Channel channel)  {                  long  deliveryTag  =  message.getMessageProperties().getDeliveryTag();         try  {             log.info("接收到的消息为:{}" , new  String (message.getBody()));                                                    channel.basicAck(deliveryTag, false );         } catch  (Exception e) {             log.error("消息处理出现问题" );             try  {                 channel.basicNack(deliveryTag, false , true );             } catch  (IOException ex) {                 throw  new  RuntimeException (ex);             }             throw  new  RuntimeException (e);         }     } } 
 
消息的幂等性(消息不被重复消费) 同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;
幂等性 :对于一个资源,不管请求一次还是多次,对该资源本身造成的影响应该是相同的,不能因重复请求而对该资源重复造成影响;
以接口幂等性举例 :
接口幂等性 :一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的;
比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的;
如何避免消息的重复消费问题?(消息消费时的幂等性) 
全局唯一ID + Redis:生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令(setnx(messageId, 1)),将messageId作为key放到redis中,若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;
实体类 
1 2 3 4 5 6 7 8 9 10 @Data @AllArgsConstructor @NoArgsConstructor @Builder public  class  Orders  implements  Serializable  {    private  String orderId;     private  String orderName;     private  BigDecimal orderMoney;     private  Date orderTime;  } 
 
生产者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Service @Slf4j public  class  SendMessageService  {    @Resource      private  RabbitTemplate rabbitTemplate;          @Resource      private  ObjectMapper objectMapper;     @PostConstruct      public  void  init ()  {                  rabbitTemplate.setConfirmCallback(                 (correlationData, ack, cause) -> {                     if  (!ack) {                         log.error("消息没有到达交换机,原因为:{}" , cause);                                              }                 }         );         rabbitTemplate.setReturnsCallback(                 returnedMessage -> {                     log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}" , returnedMessage.getReplyText());                                      }         );     }     public  void  sendMsg ()  throws  JsonProcessingException {         {                          Orders  orders1  =  Orders.builder()                     .orderId("order_12345" )                     .orderName("买的手机" )                     .orderMoney(new  BigDecimal (2356 ))                     .orderTime(new  Date ())                     .build();                          String  strOrders1  =  objectMapper.writeValueAsString(orders1);             MessageProperties  messageProperties  =  new  MessageProperties ();                          messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);             Message  message  =  MessageBuilder.withBody(strOrders1.getBytes())                     .andProperties(messageProperties).build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                            RabbitMQConstant.ROUTING_NAME1, message);         }         {             Orders  orders2  =  Orders.builder()                     .orderId("order_12345" )                     .orderName("买的手机" )                     .orderMoney(new  BigDecimal (2356 ))                     .orderTime(new  Date ())                     .build();             String  strOrders2  =  objectMapper.writeValueAsString(orders2);             MessageProperties  messageProperties  =  new  MessageProperties ();                          messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);             Message  message  =  MessageBuilder.withBody(strOrders2.getBytes())                     .andProperties(messageProperties).build();             rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,                                            RabbitMQConstant.ROUTING_NAME1, message);         }         log.info("消息发送完毕,发送时间为:{}" , new  Date ());     } } 
 
消费者 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Component @Slf4j public  class  ReceiveMessageService  {    @Resource      private  ObjectMapper objectMapper;     @Resource      private  StringRedisTemplate stringRedisTemplate;     @RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})      public  void  receiveMsg (Message message, Channel channel)  throws  IOException {                  long  deliveryTag  =  message.getMessageProperties().getDeliveryTag();                  Orders  orders  =  objectMapper.readValue(message.getBody(), Orders.class);         try  {             log.info("接收到的消息为:{}" , orders.toString());                          Boolean  setResult  =  stringRedisTemplate.opsForValue()                 .setIfAbsent("idempotent:"  + orders.getOrderId(), orders.getOrderId());             if  (setResult) {                                  log.info("向数据库插入订单" );             }                          channel.basicAck(deliveryTag, false );         } catch  (Exception e) {             log.error("消息处理出现问题" );             try  {                 channel.basicNack(deliveryTag, false , true );             } catch  (IOException ex) {                 throw  new  RuntimeException (ex);             }             throw  new  RuntimeException (e);         }     } } 
 
测试结果 
1 2 3 4 消息发送完毕,发送时间为:Thu Jul 31 16:04:07 GMT+08:00 2025 接收到的消息为:0rders(orderId=order_12345,orderName=买的手机,rderMoney=2356... 向数据库插入订单 接收到的消息为:0rders(orderId=order_12345,orderName=买的手机,rderMoney=2356...