RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的,支持高并发处理和分布式部署。所有主要的编程语言均有与代理接口通讯的客户端库。RabbitMQ本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,更适合于企业级的开发。它还实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队,通过队列机制实现应用程序间的异步通信与数据传输,常用于流量削峰、系统解耦及异步处理场景。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。

参考文章:

RabbitMQ

RabbitMQ安装Erlang安装 windows

【超级详细】RabbitMQ安装教程

【详细步骤】Springboot整合RabbitMQ

4.RabbitMQ工作模型

6.RabbitMQ死信队列

10.RabbitMQ集群

RabbitMQ 集群 - 普通集群、镜像集群、仲裁队列

RabbitMQ 介绍

2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出,里面存放的内容是message
RabbitMQ 是一个消息中间件:它接收消息并且转发,就类似于一个快递站,卖家把快递通过快递站,送到我们的手上,MQ也是这样,接收并存储消息,再转发。

RabbitMQ的特点

AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言灯条件的限制。

  • 可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
  • 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  • 多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
  • 多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。
  • 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
  • 跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。

应用场景

抢购活动,削峰填谷,防止系统崩塌
延迟信息处理,比如 10 分钟之后给下单未付款的用户发送邮件提醒
解耦系统,对于新增的功能可以单独写模块扩展,比如用户确认评价之后,新增了给用户返积分的功能,这个时候不用在业务代码里添加新增积分的功能,只需要把新增积分的接口订阅确认评价的消息队列即可,后面再添加任何功能只需要订阅对应的消息队列即可。

流量消峰

举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但比不能下单的体验要好。
简单来说: 就是在访问量剧增的情况下,但是应用仍然不能停,比如“双十一”下单的人多,但淘宝这个应用仍然要运行,所以就可以使用消息中间件采用队列的形式减少突然访问的压力

应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中间用户感受不到物流系统的故障,提升系统的可用性。

如图,把支付,库存,物流都交给MQ

image-20250729152629973

异步处理

有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。
这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。

image-20250729153106900

  • 同步是阻塞的(会造成等待)

    采用同步方式,用户从注册到响应成功,需要先保存注册信息,再发送邮件通知,邮件发送成功后再发送短信通知,短信发送成功后才通知用户成功,用户体验不好

    image-20250729191446438

  • 异步是非阻塞的(不会等待)

    采用异步方式,保存用户信息后,短信通知和邮件通知消息写入MQ(耗时极短),极大的缩短了响应时间

大流量高并发请求、批量数据传递,就可以采用异步处理,提升系统吞吐量;

image-20250729191621315

日志处理

主要是用kafka这个服务器来做;日志处理是指将消息队列用于在日志处理中

  • Kafka解决大量日志传输的问题

  • loger.info(…)

  • ELK 日志处理解决方案

loger.error(…) → logstash收集消息 → 发送消息的kafka → elastic search(es) → Kibana ELK日志处理平台

四大核心概念

RabbitMQ 的重要角色

RabbitMQ 中重要的角色有:生产者、消费者和代理:

  • 生产者:消息的创建者,负责创建和推送数据到消息服务器;
  • 消费者:消息的接收方,用于处理数据和确认消息
  • 代理(broker):就是 RabbitMQ 本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

生产者

产生数据发送消息的程序是生产者

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个由交换机类型决定。交换机类型:

  1. direct Exchange(直接交换机):匹配路由键,只有完全匹配消息才会被转发
  2. Fanout Excange(扇出交换机):将消息发送至所有的队列
  3. Topic Exchange(主题交换机):将路由按模式匹配,此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。
  4. Header Exchange:在绑定Exchange和Queue的时候指定一组键值对,header为键,根据请求消息中携带的header进行路由

队列

队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

工作原理

image-20250729153839738

RabbitMQ 的重要组件

  1. Producer(生产者):发送消息的应用;它将消息发送到 RabbitMQ 的交换器中

  2. Consumer(消费者)消费者是消息的接收方,它从 RabbitMQ 的队列中获取消息并进行处理

  3. Broker:标识消息队列服务器实体。接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

    • Broker = VHost1+Vhost2+Vhost3+…

    • Virtual Host = Exchange + Queue +Binding

  4. Message(消息):消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成。消息可以是任何数据(字符串、user对象,json串等)

    • 消息=消息头+消息体,根据routekey发送到指定的交换机 Exchange

    • 消息头:含各种属性 routing-key(路由键)、priority(优先级)、delivery-mode(指出该消息可能需要持久性存储[路由模式])等。

  5. Connection(网络连接):publisher/consumer 和 broker 之间的 TCP 连接。

    断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。

  6. ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。

  7. Channel(信道):消息推送使用的通道,是多路复用连接中的一条独立的双向数据流通道。

    • 如果每次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。
    • Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,
    • AMQP method 包含了 channel id 帮助客户端和 message broker 识channel,所以 channel 之间是完全隔离的。
    • Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销。
  8. Virtual host(虚拟主机):逻辑分组机制,类似nacos中的命名空间的概念(在代码中就是一个字符串)。出于多租户和安全因素设计,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同用户使用同一个RabbitMQ server提供的服务时,可划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。

    • Virtual 即 VHost
    • 默认目录 /
  9. vHost(虚拟主机):每个 RabbitMQ 都能创建很多 vhost,每个虚拟主机其实都是 mini 版的RabbitMQ,它拥有自己的 “交换机exchange、绑定Binding、队列Queue”,更重要的是每一个vhost拥有独立的权限机制,这样就能安全地使用一个RabbitMQ服务器来服务多个应用程序,其中每个vhost服务一个应用程序。

  10. Exchange(交换机):用于接受、分配消息。message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

  11. Routing Key(路由键):用于把生成者的数据分配到交换器上。生产者将消息发送到交换机时会携带一个key,来指定路由规则。

  12. Binding(绑定):exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。在绑定Exchange和Queue时,会指定一个BindingKey,生产者发送消息携带的RoutingKey会和bindingKey对比,若一致就将消息分发至这个队列。

  13. BindingKey(绑定键)用于把交换器的消息绑定到队列上。

  14. Queue(队列):用于存储生产者的消息。消息最终被送到这里等待 consumer 取走。一个message可被同时拷贝到多个queue。

RabbitMQ 的消息是怎么发送的

首先客户端必须连接到 RabbitMQ 服务器才能发布和消费消息,客户端和 rabbit server 之间会创建一个 tcp 连接,一旦 tcp 打开并通过了认证(认证就是你发送给 rabbit 服务器的用户名和密码),你的客户端和 RabbitMQ 就创建了一条 amqp 信道(channel),信道是创建在“真实” tcp 上的虚拟连接,amqp 命令都是通过信道发送出去的,每个信道都会有一个唯一的 id,不论是发布消息,订阅队列都是通过这个信道完成的。

工作模式(七种)

image-20250729180401408

simple (简单模式)

一个消费者消费一个生产者生产的信息,消息只能被消费⼀次
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。

Work queues(工作模式)

或者竞争消费者模式。工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多工人时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。
总之:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,消息不会重复分配给不同的消费者。

适⽤场景:集群环境中做异步处理

Publish/Subscribe(发布订阅模式)

生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息

特点

  • Exchange(交换机)

    只负责转发消息,不具备存储消息的能力,因此若没有任何队列与Exchange绑定,或者没有符合路由规则的队列,消息就会丢失

  • RoutingKey(路由键)

    ⽣产者将消息发给交换器时,指定的⼀个字符串,⽤来告诉交换机应该如何处理这个消息

  • Binding Key(绑定)

    RabbitMQ中通过Binding(绑定)将交换器与队列关联起来,在绑定的时候⼀般会指定⼀个Binding Key,这样RabbitMQ就知道如何正确地将消息路由到队列了

适合场景:消息需要被多个消费者同时接收的场景(如: 实时通知或者⼴播消息)

Routing(路由模式)

生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列

路由模式是发布订阅模式的变种,在发布订阅基础上,增加路由key。Exchange根据RoutingKey的规则,将数据筛选后发给对应的消费者队列。

适合场景:需要根据特定规则分发消息的场景

比如系统打印日志(⽇志等级分为error, warning, info,debug)就可以通过该模式把不同⽇志发送到不同的队列, 最终输出到不同的⽂件

Topics(主题模式)

生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者。

路由模式的升级版,在routingKey的基础上,增加了通配符的功能,使之更加灵活。也称通配符模式

Topics和Routing的基本原理相同,即⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列,类似于正则表达式的⽅式来定义Routingkey的模式

适合场景:需要灵活匹配和过滤消息的场景。

image-20250729160102405

RPC模式

在RPC通信的过程中, 没有⽣产者和消费者, ⽐较像咱们RPC远程调⽤, ⼤概就是通过两个队列实现了⼀个可回调的过程。

  • 客户端:

    1. 发送请求(携带replyTo,correlationId)

    2. 接收响应(验证correlationId)

  • 服务器:

    1. 接收请求 进行响应

    2. 发送响应(按客户端指定的replyTo,设置correlationId)

image-20250729194101402

发布确认模式

消息丢失其中一种情况是⽣产者问题:因为应⽤程序故障、⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息,可以采⽤发布确认(Publisher Confirms)机制实现。发送方确认机制好处在于它是异步的,⽣产者可以同时发布消息和等待信道返回确认消息。当消息最终得到确认之后,⽣产者可以通过回调⽅法来处理该确认消息。
如果RabbitMQ因自身内部错误导致消息丢失,就会发送⼀条nack(Basic.Nack)命令,⽣产者同样可以在回调方法中处理该nack命令使用发送确认机制必须要信道设置成confirm(确认)模式。发布确认有3种策略

  • 单独确认

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Individually {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 1. 建立连接
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(Constants.HOST);
    connectionFactory.setPort(Constants.PORT); // 需要提前开放端口号
    connectionFactory.setUsername(Constants.USER_NAME); // 账号
    connectionFactory.setPassword(Constants.PASSWORD); // 密码
    connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机
    Connection connection = connectionFactory.newConnection();
    // 1.开启信道
    Channel channel = connection.createChannel();
    // 2. 设置信道为confirm模式
    channel.confirmSelect();
    // 3.声明队列
    channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
    // 4.发送消息,并等待确认
    Long start = System.currentTimeMillis();
    for (int i = 0; i < Constants.MESSAGE_COUNT; i++) {
    String msg = "hello publisher confirms" + i;
    channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
    // 等待确认
    channel.waitForConfirmsOrDie(5000);
    }
    Long end = System.currentTimeMillis();
    System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",
    Constants.MESSAGE_COUNT, end - start);
    // 5.关闭资源
    channel.close();
    connection.close();
    }
    }
  • 批量确认

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class Batches {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 1. 建立连接
    ConnectionFactory connectionFactory = new ConnectionFactory();
    // ......

    // 4. 发送消息, 并进行确认
    long start = System.currentTimeMillis();
    int batchSize = 100;
    int outstandingMessageCount = 0;
    for (int i = 0; i < Constants.MESSAGE_COUNT; i++) {
    String msg = "hello publisher confirms" + i;
    channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
    outstandingMessageCount++;
    if (outstandingMessageCount == batchSize) {
    channel.waitForConfirmsOrDie(5000);
    outstandingMessageCount = 0;
    }
    }
    if (outstandingMessageCount > 0) {
    channel.waitForConfirmsOrDie(5000);
    }
    // ......
    }
    }
  • 异步确认

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class Asynchronously {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 1. 建立连接
    ConnectionFactory connectionFactory = new ConnectionFactory();
    // ......
    // 4. 监听confirm
    // 集合中存储的是未确认的消息ID
    long start = System.currentTimeMillis();
    SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
    channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    if (multiple) {
    confirmSeqNo.headSet(deliveryTag + 1).clear();
    } else {
    confirmSeqNo.remove(deliveryTag);
    }
    }
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    if (multiple) {
    confirmSeqNo.headSet(deliveryTag + 1).clear();
    } else {
    confirmSeqNo.remove(deliveryTag);
    }
    // 业务需要根据实际场景进行处理, 比如重发, 此处代码省略
    }
    });
    for (int i = 0; i < Constants.MESSAGE_COUNT; i++) {
    String msg = "hello publisher confirms" + i;
    long seqNo = channel.getNextPublishSeqNo();
    channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
    confirmSeqNo.add(seqNo);
    }
    while (!confirmSeqNo.isEmpty()) {
    Thread.sleep(10);
    }
    // ......
    }
    }

RabbitMQ 保证消息的稳定性

提供了事务的功能。通过channel 设置为 confirm(确认)模式

RabbitMQ 怎么避免消息丢失

  • 把消息持久化磁盘,保证服务器重启消息不丢失。
  • 每个集群中至少有一个物理磁盘,保证消息落入磁盘

消息持久化

RabbitMQ的消息默认存在内存中的,一旦服务器意外挂掉,消息就会丢失。消息持久化需做到三点:

1.Exchange设置持久化:消息已经到达持久化交换器
2.Queue设置持久化:声明队列必须设置持久化 durable 设置为 true。消息已经到达持久化队列
3.Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

RabbitMQ 持久化的缺点

降低了服务器的吞吐量,因为使用的是磁盘而非内存存储,从而降低了吞吐量。可**尽量使用 ssd 硬盘**来缓解吞吐量的问题。

ACK确认机制

多个消费者同时收取消息,收取消息到一半,突然某个消费者挂掉,要保证此条消息不丢失,就需要acknowledgement机制,就是消费者消费完成要通知服务端,服务端才将数据删除

这样就解决了,即使一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。

设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

1.单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

2.普通模式:默认的集群模式,某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

3.镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。

消息补偿机制

持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

产线网络环境太复杂,故未知数太多,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。

然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。

RabbitMQ 如何实现延迟队列

RabbitMQ本身没有延迟队列,需要靠TTL和DLX模拟出延迟的效果。延迟队列的实现有两种方式:

  • 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
  • 使用 RabbitMQ-delayed-message-exchange 插件实现延迟功能。

过期消息 TTL(Time To Live)

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter

RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

  1. 通过队列属性设置,队列中所有消息都有相同的过期时间。决定了在没有任何消费者的情况下,队列中的消息可以存活多久。

  2. 对消息进行单独设置,每条消息TTL可以不同。决定了在没有任何消费者消费时,消息可以存活多久。

  3. 如果消息和对列都设置过期时间,则消息的TTL以两者之间较小的那个数值为准。

死信队列 DLX (Dead-Letter-Exchange)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。

  1. x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange

  2. x-dead-letter-routing-key:指定routing-key发送

队列出现dead letter的情况有:

1.消息或者队列的TTL过期。消息积压太多消费不过来;没有对应的消费者

2.队列达到最大长度,先到达的消息仍没有被消费

3.消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false,没有让消息重新入队

利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

image-20250729195949777


RabbitMQ实战(SpringBoot)

Windows 环境安装

首先可以进入rabbitMQ官网上查看 RabbitMQ 版本要求 。定位到RabbitMQ and Erlang/OTP Compatibility Matrix,发现当前RabbitMQ的最新版本是4.1.2,要求Erlang版本最低是26.2,最高是27.0。安装最新版RabbitMQ也就是4.1.2。

因为RabbitMQ 用Erlang 语音开发的,所以先安装Erlang 。

下载安装Erlang

  1. 安装Erlang (因为RabbitMQ 用Erlang 语音开发的,所已我们先安装Erlang )。进入到Erlang官网下载 Erlang下载地址 。选择Erlang27.0 windows 64位的安装包,点击下载。

  2. 找到下载的Erlang安装包,右键以管理员身份运行。按照步骤next , 安装路径D:\IDEA\tools下,注意这里的路径不能包含中文

  3. 安装完成后配置环境变量

    新建系统变量ERLANG_HOME,变量值为D:\IDEA\tools\Erlang OTP。

    path中添加环境变量 %ERLANG_HOME%\bin 。

  4. 验证Erlang是否安装成功,打开cmd 输入 erl 。出现版本信息则说明安装成功

安装RabbitMQ

  1. 进入到RabbitMQ官网下载安装包,RabbitMQ下载地址 ,或者GitHUb下载:GitHub

  2. 找到下载的安装包右键以管理员身份运行。按照步骤next , 安装路径D:\IDEA\tools下

  3. 安装完成后,在win菜单下找到 RabbitMQ command prompt。

  4. 输入命令,激活rabbitmq的ui界面。

    1
    rabbitmq-plugins.bat enable rabbitmq_management 
  5. 激活完成后,重启rabbitmq。

    1
    2
    net stop RabbitMQ
    net start RabbitMQ
  6. 重启完成后,验证是否成功,登录到 localhost:15672,出现如下图登录页面即安装成功。

  7. RabbitMQ默认的登录用户和密码为 guest guest 。

  8. 登录成功后进入到页面,RabbitMQ的安装就算完成。

Linux 环境安装

  1. 进入到RabbitMQ官网下载安装包,RabbitMQ下载地址

  2. 进入GitHub|rabbitmq|erlang-rpm选择RabbitMQ支持的版本下载,或者进入到Erlang官网下载 Erlang下载地址

  3. 将下载好的安装文件 上传到服务器,上传在/root目录下

  4. 安装Erlang

    1
    2
    3
    4
    # 安装
    yum -y install esl-erlang_23.0.2-1_centos_7_amd64.rpm
    # 验证是否安装成功
    erl
  5. 安装RabbitMQ

    1
    2
    3
    4
    5
    6
    # 安装
    yum -y install rabbitmq-server-3.8.5-1.el7.noarch.rpm
    # 安装RabbitMQ 可视化管理控制台 它是以插件的形式存在的。查看所有的插件列表
    rabbitmq-plugins list
    # [ ] rabbitmq_management x.x.x 为管理系统的插件,安装命令如下
    rabbitmq-plugins enable rabbitmq_management
  6. 启动RabbitMQ

    1
    2
    3
    4
    # 启动RabbitMQ
    systemctl start rabbitmq-server.service
    # 确认是否启动成功
    systemctl status rabbitmq-server.service
  7. 访问RabbitMQ

    端口为:15672,默认的用户名:guest 密码:guset。访问地址是:自己的服务器的ip+端口(如 http://192.168.10.11:15672)

    如果不是在localhost本机登录会提示错误:User can only log in via localhost。解决办法: 让它可以进行远程登录,需要回到安装RabbitMQ的服务器进行如下操作

    1
    2
    3
    4
    5
    6
    7
    [root@localhost ~]# cd /etc/rabbitmq/
    [root@localhost rabbitmq]# ll
    # 发现只有一个文件 enabled_plugins
    # 编辑rabbitmq.config 添加[{rabbit, [{loopback_users, []}]}].
    [root@localhost rabbitmq]# vi rabbitmq.config
    # 添加[{rabbit, [{loopback_users, []}]}].后保存并退出,然后重启RabbitMQ
    [root@localhost ~]# systemctl restart rabbitmq-server.service

    这时候再访问并登录就可以了

项目实战

RabiitMQ 依赖引入

首先在已经搭建好的SpringBoot项目的pom.xml 中引入RabiitMQ的依赖

1
2
3
4
5
 <!-- AMQO 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQ 配置

  1. 在application.yml配置文件中进行RabbitMQ的配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    spring:
    rabbitmq:
    # 服务器
    host: 192.168.10.11
    # 用户名
    username: guest
    # 密码
    password: guest
    # 虚拟主机
    virtual-host: /
    # 端口
    port: 5672
    # 监听相关配置
    listener:
    simple:
    # 消费者的最小数量
    concurrency: 10
    # 消费者的最大数量
    max-concurrency: 10
    # 限制消费者 每次只处理一条消息,处理完毕后再处理下一条消息
    prefetch: 1
    # 启动时是否默认启动容器 默认true
    auto-startup: true
    # 当消息被拒绝时,是否重新进入队列
    default-requeue-rejected: true
    # 模板相关配置
    template:
    retry:
    # 开启重试 默认false
    enabled: true
    # 重试时间 默认1000ms
    initial-interval: 1000ms
    # 重试的次数 默认是3
    max-attempts: 3
    # 重试时间的最大间隔时间 默认10000ms
    max-interval: 10000ms
    # 重试的间隔乘数 比如配2.0 第一次就间隔10s 第二次 间隔20s 第三次间隔40s ****
    multiplier: 1
  2. 创建配置类 RabbitMQConfig

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class RabbitMQConfig {
    @Bean
    public Queue queue() {
    return new Queue("queue",true);
    }
    }
  3. 封装发送消息和接收消息的类:对消息的发送和消息的接受我们都分别进行封装

    MqSender:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Service
    @Slf4j
    public class MqSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(Object msg) {
    log.info("发送消息"+msg);
    rabbitTemplate.convertAndSend("queue", msg);
    }
    }

    MqReceiver:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    @Service
    @Slf4j
    public class MqReceiver{
    //监听配置的队列
    @RabbitListener(queues = "queue")
    public void receive(Object msg){
    log.info("接收消息:"+msg);
    }
    }

RabbitMQ 测试

  1. 随便在一个Contoller类中写一个测试方法。此处在TUserController中添加一个测试方法如下代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @RestController
    @RequestMapping("/user")
    public class TUserController {
    @Autowired
    private MqSender mqSender;
    @RequestMapping("/mq")
    public void mq(){
    mqSender.send("Hello");
    }
    }
  2. 启动项目进行测试

    测试路径为 http://localhost:8080/user/mq,可以看到IEAR控制台会输出我们的测试数据如下

    1
    2
    ... com.xxxxx.seckilldemo.rabbitmq.MqSender  : 发送消息Hello
    ... com.xxxxx.seckilldemo.rabbitmq.MqReceiver : 接收消息:(Body:'Hello' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue, deliveryTag=1, consumerTag=amq.ctag-IrqC4b7RL3mg1rMy8NefPQ, consumerQueue=queue])

    项目启动后RabiitMQ的控制台会显示有一个连接如下图

    image-20250729180214185


RabbitMQ 集群

简介

RabbitMQ 的集群分两种模式,一种是默认集群模式,一种是镜像集群模式;

在RabbitMQ集群中所有的节点(一个节点就是一个RabbitMQ的broker服务器) 被归为两类:一类是磁盘节点,一类是内存节点;

磁盘节点会把集群的所有信息(比如交换机、绑定、队列等信息)持久化到磁盘中,而内存节点只会将这些信息保存到内存中,如果该节点宕机或重启,内存节点的数据会全部丢失,而磁盘节点的数据不会丢失;

RabbitMQ 集群主要有以下两个用途:

  • 高可用:某个服务器出现问题,整个 RabbitMQ 还可以继续使用;
  • 高容量:集群可以承载更多的消息量。

RabbitMQ 集群搭建需要注意的问题

节点之间使用“–link”连接,此属性不能忽略。
各节点使用的 erlang cookie 值必须相同,此值相当于“秘钥”的功能,用于各节点的认证
整个集群中必须包含一个磁盘节点

默认集群模式

默认集群简介

默认集群模式也叫 普通集群模式、或者 内置集群模式;普通集群,也叫做标准集群(classic cluster)

元数据

队列元数据:队列名称和属性(是否可持久化,是否自动删除)

交换器元数据:交换器名称、类型和属性

绑定元数据:交换器和队列的绑定列表

vhost元数据:vhost内的相关属性,如安全属性等;

当用户访问其中任何一个RabbitMQ节点时,查询到的queue/user/exchange/vhost等信息都是相同的;

数据同步特点

RabbitMQ默认集群模式只会把交换机、队列、虚拟主机等元数据信息在各个节点同步,而具体队列中的消息内容不会在各个节点中同步,队列的具体信息数据只在队列的拥有者节点保存,其他节点只知道队列的元数据和指向该节点的指针,所以其他节点接收到不属于该节点队列的消息时会将该消息传递给该队列的拥有者节点上;

集群不复制队列内容和状态到所有节点的原因

  • 节省存储空间;

  • 提升性能;

image-20250729180016020

若消息需要复制到集群中每个节点,网络开销不可避免,持久化消息还需要写磁盘,占用磁盘空间。

数据访问过程

若有一个消息生产者或者消息消费者通过amqp-client的客户端连接到节点1进行消息的发送或接收,那么此时集群中的消息收发只与节点1相关,这个没有任何问题;

若消息生产者所连接的是节点2或者节点3,此时队列1的完整数据不在该两个节点上,那么在发送消息过程中这两个节点主要起了一个路由转发作用,根据这两个节点上的元数据(也就是指向queueowner node的指针)转发至节点1上,最终发送的消息还是会存储至节点1的队列1上;

同样,若消息消费者所连接的节点2或者节点3,那这两节点也会作为路由节点起转发作用,将会从节点1的队列1中获取消息进行消费;

默认集群特征

  1. 集群中的各个节点是可以共享数据的,比如交换机信息、队列元信息,但不包含队列中的消息。

    什么是元信息? 元信息,就是指队列的描述信息,队列名、队列在哪一个节点中,但是不包括消息本身。

    不包括消息本身是什么意思? 假设我现在两个节点 mq1 和 mq2,然后有一个消费者订阅了 mq2 中的一个队列,但该消费者在访问时,不想小心访问到了 mq1,不过由于在 mq1 上持有 mq2 的元信息,所以他知道 mq2 在哪,因此就可以通过这个元数据找到 mq2 这个节点,然后拿到我们想要的数据(这就类似于 mq1 上有 指向 mq2 的指针,通过指针就可以找到 mq2)。

  2. 基于第一个特点的元信息,可以在访问集群中的某个节点时,发现队列不在该节点,就可以通过要访问数据的所在节点的元信息,进一步的拿到这个节点的所有数据。

  3. 如果队列所在节点宕机,那么消息就会丢失。

    这里有点类似于 redis 集群中的分片处理,每一个集群上存储全集队列的一部分队列,因此这个节点挂了,消息必然会丢失.

安装

  1. 安装三台RabbitMQ机器:先安装一台MQ,然后克隆两台就行

  2. 设置IP地址:启动并设置三台机器的IP

    修改配置文件方式

  3. 修改主机名
    1
    sudo hostnamectl set-hostname rabbit11
  4. 修改/etc/hosts 文件

    首先需要配置一下hosts文件,因为RabbitMQ集群节点名称是读取hosts文件得到的;注意三台机机器都需要配置

    1
    2
    3
    4
    vim /etc/hosts
    192.168.1.11 rabbit11
    192.168.1.12 rabbit12
    192.168.1.13 rabbit13
  5. 重启网络

    三台机器均重启网络,使节点名生效

    1
    2
    3
    sudo systemctl restart NetworkManager
    # 低版本CentOS使用如下命令
    systemctl restart network
  6. 重新连接xshell

    重启后三台机器的xshell均退出,然后再重新连接,这样才能刷新主机的名字

  7. 关闭防火墙

    三台机器均需关闭

    1
    2
    3
    systemctl stop firewalld  ##关闭防火墙
    systemctl disable firewalld ##开机不启动防火墙
    systemctl status firewalld ##查看防火墙状态
  8. 修改.erlang.cookie文件

    三台机器 .erlang.cookie文件保持一致。由于是clone出的三台机器,所以肯定是一样的

    • 如果使用解压缩方式安装的RabbitMQ,那么该文件会在用户名目录下,也就是{用户名}/.erlang.cookie;

    • 如果使用rpm安装包方式进行安装,那么这个文件会在/var/lib/rabbitmq目录下;

    • 注意 .erlang.cookie的权限为400,目前已经是400

  9. 启动MQ

    分别启动三台机器上的rabbitmq

    1
    rabbitmq-server -detached
  10. 查看集群状态
    • 查看rabbitmq状态

      1
      2
      3
      4
      5
      rabbitmqctl status
      # 输出如下
      Status of node rabbit@rabbit11...
      []
      Runtime
    • 查看集群状态

      1
      2
      3
      4
      5
      6
      7
      8
      9
      rabbitmqctl cluster_status
      # 输出如下: rabbit@rabbit11集群节点名称,Disk Nodes节点类型是磁盘节点
      Cluster status of node rabbit@rabbit11
      Basics
      rabbitarabbit11Cluster name:Total CPU cores available cluster-wide: 4
      Cluster Tags
      (none)
      Disk Nodes
      rabbitarabbit11
  11. 构建集群
    • 加入节点1

      在rabbitmq12机器上执行命令,让12的rabbitmq加入集群

      注意:一定要先停止节点,将节点重置之后才能加入集群,否则数据同步会出现混乱

      1
      2
      3
      4
      5
      6
      7
      8
      # 先停止rabbitmq
      rabbitmqctl stop_app
      # 重置rabbitmq
      rabbitmqctl reset
      # 节点加入集群:rabbit@rabbit11是主节点的节点名,在集群状态中可以查看到节点名称
      rabbitmqctl join_cluster rabbit@rabbit11 --ram
      # 启动节点
      rabbitmqctl start_app
    • 添加节点2

      在rabbit13节点上也执行同样的命令,使rabbit13节点也加入到集群中。当然也可以让rabbit13作为一个磁盘节点

  12. 添加用户和权限

    操作一个节点,添加用户和权限等

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # 列出用户
    rabbitmqctl list_users
    # 添加用户
    rabbitmqctl add_user admin 123456
    # 查看权限
    rabbitmqctl list_permissions
    # 设置权限
    rabbitmqctl set_permissions admin ".*" ".*" ".*"
    # 设置角色
    rabbitmqctl set_user_tags admin administrator
  13. 启动web控制台

    启动web控制台插件。注意:三台机器都要启动,因为插件不属于元数据,因此需要分别启动

    1
    2
    3
    4
    # 进入插件目录
    cd /usr/local/rabbitmq_server-4.0.7/plugins/
    # 启动web端插件
    rabbitmq-plugins enable rabbitmq_management
  14. 创建虚拟主机

    使用web浏览器添加一个虚拟主机:longdidi

    image-20250729180621236

  15. 再次查看集群状态

    当执行完操作以后在浏览器访问web管控台来看看效果;随便在哪个节点打开web管控台都能看到集群环境各节点的信息;也可以使用”rabbitmqctl cluster_status”查看集群状态;

    image-20250729180657444

  16. 验证集群
    1. 创建队列

      image-20250729180728567

    2. 创建交换机

      image-20250729180804318

    3. 绑定交换机与队列

      • 进入交换机

        image-20250729181126120

      • 绑定交换机与队列

        image-20250729181204097

    4. 发布消息

      image-20250729181229138

    5. 查看消息

      在任意节点查看消息

      image-20250729181259889

    6. 停止主节点rabbit@rabbit11节点

      1
      2
      3
      poweroff
      # Connection Closing...socket close.
      # Connection closed by foreign host.
    7. 再在其它节点查看消息

      image-20250729181458290

  17. 删除节点
    1
    2
    3
    4
    5
    rabbitmgctl stop_app
    rabbitmgctl reset
    rabbitmgctl start_app
    rabbitmgctl cluster_status
    rabbitmgctl forget_cluster_node rabbit@node2 # (node1节点上执行)

节点原理

RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证。

镜像集群模式

镜像集群模式简介

镜像模式是基于默认集群模式加上一定的配置得来的;在默认模式下的RabbitMQ集群,它会把所有节点的交换机、绑定、队列的元数据进行复制确保所有节点都有一份相同的元数据信息,但是队列数据分为两种:

  • 一种是队列的元数据信息(比如队列的最大容量,队列的名称等配置信息)

  • 一种是队列里面的消息

镜像模式则是把所有的队列数据完全同步,包括元数据信息和消息数据信息,当然这对性能肯定会有一定影响,当对数据可靠性要求较高时,可以使用镜像模式。

镜像集群模式特征

镜像集群,本质上就是主从模式。主要有以下几个特性:

  1. 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份

    这里就和之前的普通交换机不一样了,不仅仅是交换机可以共享,队列中的消息大家也可以共享了。

  2. 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点

    比如现在有三个节点,现在我再 mq1 上创建了一个队列 q1,因此 mq1 就是主节点,那么还可以给他挑一个镜像节点,比如我再 mq2 上做一个镜像,那么 mq2 就会去找 mq1 同步 q1 的所有数据。

  3. 一个队列的主节点可能是另一个队列的镜像节点

    也就是说,主节点和镜像节点是可以相互备份的。比如 q1 在 mq1 上,给 mq2 备份了一份,q2 实在 mq2 上,给 mq3 备份了一份,q3 在 mq3 上的,给 mq1 备份了一份,那么这个时候,mq1 就是 q1 的主节点,同时也是 q3 的镜像节点。

  4. 所有操作都是主节点完成,然后同步给镜像节点 。

  5. 主节点宕机后,镜像节点会替代成新的主节点

镜像模式配置 3.X版本

在默认集群模式的基础上执行如下命令就可以把一个默认的集群模式变成镜像集群模式。镜像队列配置语法:

1
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
  • rabbitmqctl set_policy:固定写法

  • -p Vhost:可选参数,设置虚拟主机的名字(针对指定vhost下的queue进行设置)

  • Name::设置策略的名称(自己取个名字就可以)

  • Pattern::queue的匹配模式(正则表达式);^表示所有的队列都是镜像队列

  • Definition:镜像定义(json格式),包括三个部分ha-mode、ha-params、ha-sync-mode

    • ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes

      1
      2
      3
      all:表示在集群中所有的节点上进行镜像
      exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
      nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
    • ha-params:ha-mode模式需要用到的参数

    • ha-sync-mode:队列中消息的同步方式,有效值为automatic(自动向master同步数据)和manual(手动向master同步数据)

  • priority:可选参数,指的是policy策略的优先级;

示例:比如想配置所有名字开头为policy_的队列进行镜像,镜像数量为2,那么命令如下(在任意节点执行如下命令)

1
rabbitmqctl set_policy -p longdidi my_policy "^policy_"  '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'  
  1. 同步所有数据

    所有节点、所有虚拟主机、所有队列 都进行镜像。如果要在所有节点所有队列上进行镜像则在任意节点执行如下命令

    1
    rabbitmqctl set_policy my-all "^"  '{"ha-mode":"all"}'   
  2. 同步指定数据

    针对某个虚拟主机进行镜像

    1
    rabbitmqctl set_policy -p longdidi my-all "^" '{"ha-mode": "exactly", "ha-params": 2, "ha-sync-mode": "automatic"}'

镜像模式配置 4.X版本

image-20250729182945585

仲裁队列

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  1. 与镜像队列一样,都是主从模式,支持主从数据同步
  2. 使用非常简单,没有复杂的配置
  3. 主从同步基于Raft协议,强一致

在 SpringAMQP 中创建仲裁队列:

1
2
3
4
5
6
7
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁队列
.build();
}

SpringAMQP连接集群,只需要在yaml中配置即可:

1
2
3
4
5
6
spring:
rabbitmq:
addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
username: root
password: 1111
virtual-host: /

SpringBoot集成集群

重点连接配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
spring:
rabbitmq:
# 连接单台rabbitmq 服务器的地址
# host: 192.168.1.101
# 连接单台rabbitmq 服务器的端口
# port: 5672
username: admin
password: 123456
virtual-host: longdidi
publisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式
publisher-returns: true #开启return模式
# 开启消费者手动确认
listener:
simple:
acknowledge-mode: manual
addresses: 192.168.1.11:5672,192.168.1.12:5672,192.1.13:5672

测试模块:rabbitmq-10-cluster-01

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>

配置MQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
server:
port: 8080
spring:
application:
name: cluster-learn01
rabbitmq:
# 连接单台rabbitmq 服务器的地址
# host: 192.168.1.101
# 连接单台rabbitmq 服务器的端口
# port: 5672
username: admin
password: 123456
virtual-host: longdidi
publisher-confirm-type: correlated # 开启生产者的确认模式,设置关联模式
publisher-returns: true #开启return模式
# 开启消费者手动确认
listener:
simple:
acknowledge-mode: manual
addresses: 192.168.1.11:5672,192.168.1.12:5672,192.1.13:5672
data:
redis:
host: 192.168.1.4
port: 6379
#password: 123456
database: 0 # 0号数据库

定义MQ队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
// 正常交换机,使用durable()方法设置持久化
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).durable(true).build();
}
// 正常队列,durable()方法就是持久化
@Bean
public Queue normalQueue() {
// Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
Map<String, Object> arguments = new HashMap<>();
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1).withArguments(arguments).build();
}
// 正常交换机和正常队列绑定
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}

创建生产者、消费者

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.vo.Orders;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.Date;

@Service
@Slf4j
public class SendMessageService {
@Resource
private RabbitTemplate rabbitTemplate;
// 这个对象可以进行序列化和反序列化(json格式)
@Resource
private ObjectMapper objectMapper;

// 构造方法执行后自动执行
@PostConstruct
public void init() {
// 开启生产者的确定模式
rabbitTemplate.setConfirmCallback(
(correlationData, ack, cause) -> {
if (!ack) {
log.error("消息没有到达交换机,原因为:{}", cause);
// TODO 重发消息或者记录错误日志
}
}
);

rabbitTemplate.setReturnsCallback(
returnedMessage -> {
log.error("消息没有从交换机正确的投递(路由)到队列,原因为:{}", returnedMessage.getReplyText());
// TODO 记录错误日志,给程序员发短信或者或者邮件
}
);
}

public void sendMsg() throws JsonProcessingException {
{
//创建订单
Orders orders1 = Orders.builder()
.orderId("order_12345").orderName("买的手机")
.orderMoney(new BigDecimal(2356)).orderTime(new Date()).build();
//转成json
String strOrders1 = objectMapper.writeValueAsString(orders1);
MessageProperties messageProperties = new MessageProperties();
//设置单条消息的持久化,默认就是持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = MessageBuilder.withBody(strOrders1.getBytes())
.andProperties(messageProperties).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1, message);
}
{
Orders orders2 = Orders.builder() .orderId("order_12345").orderName("买的手机")
.orderMoney(new BigDecimal(2356)).orderTime(new Date()).build();
String strOrders2 = objectMapper.writeValueAsString(orders2);
MessageProperties messageProperties = new MessageProperties();
//设置单条消息的持久化,默认就是持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = MessageBuilder.withBody(strOrders2.getBytes())
.andProperties(messageProperties).build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME,
RabbitMQConstant.ROUTING_NAME1, message);
}
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import com.fasterxml.jackson.databind.ObjectMapper;
import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.vo.Orders;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import com.rabbitmq.client.Channel;

@Component
@Slf4j
public class ReceiveMessageService {
@Resource
private ObjectMapper objectMapper;
@Resource
private StringRedisTemplate stringRedisTemplate;

@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg(Message message, Channel channel) throws IOException {
// 获取消息的唯一标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 使用objectmapper把字节数组反序列化成对象
Orders orders = objectMapper.readValue(message.getBody(), Orders.class);

try {
log.info("接收到的消息为:{}", orders.toString());
// 如果不存在就在redis中存储
Boolean setResult = stringRedisTemplate.opsForValue()
.setIfAbsent("idempotent:" + orders.getOrderId(), orders.getOrderId());
if (setResult) {
// TODO 向数据库插入订单等
log.info("向数据库插入订单");
}
// 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("消息处理出现问题");
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}

定义常量、实体类

定义常量

1
2
3
4
5
6
7
8
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.idempotent.normal.1";
// 队列
public static final String QUEUE_NAME1 = "queue.idempotent.normal.1";
// 路由key
public static final String ROUTING_NAME1 = "key.idempotent.normal.1";
}

定义实体类

1
2
3
4
5
6
7
8
9
10
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Orders implements Serializable {
private String orderId;
private String orderName;
private BigDecimal orderMoney;
private Date orderTime; // 下单时间
}

测试

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Rabbitmq10Cluster01Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq10Cluster01Application.class, args);
}
@Resource
private SendMessageService sendMessageService;

// 程序一启动就会运行该方法
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}

测试结果

1
2
3
消息发送完毕,发送时间为:Tue Jul 29 18:41:22 GMT+08:00 2025
接收到的消息为:0rders(orderId=order_12345,orderName=买的手机,orderMoney=2356....
接收到的消息为:0rders(orderId=order_12345,orderName=买的手机,orderMoney=2356....

RabbitMQ 节点的类型

磁盘节点:消息会存储到磁盘。
内存节点:消息都存储在内存中,重启服务器消息丢失,性能高于磁盘类型

RabbitMQ 每个节点是其他节点的完整拷贝吗?为什么?

不是,原因有以下两个:

  • 存储空间的考虑:若每个节点都拥有所有队列的完全拷贝,新增节点不但没有新增存储空间,反而增加了冗余数据
  • 性能的考虑:若每条消息都需要完整拷贝到每一个集群节点,那新增节点并没有提升处理消息的能力,最多是保持和单节点相同的性能甚至是更糟

RabbitMQ 集群中唯一一个磁盘节点崩溃了会发生什么

如果唯一磁盘的磁盘节点崩溃了,不能进行以下操作:

  • 不能创建队列
  • 不能创建交换器
  • 不能创建绑定
  • 不能添加用户
  • 不能更改权限
  • 不能添加和删除集群节点

唯一磁盘节点崩溃了,集群可以保持运行,但不能更改任何东西

RabbitMQ 对集群节点停止顺序有要求吗

RabbitMQ 对集群的停止顺序是有要求的,应该先关闭内存节点,最后再关闭磁盘节点。若顺序恰好相反的话,可能会造成消息的丢失