ActiveMQ是Apache下的一个子项目。使用Java完全支持JMS1.1(Java Message Service)和J2EE 1.4规范的 JMS Provider实现,少量代码就可以高效地实现高级应用场景,ActiveMQ是消息队列服务,是面向消息中间件(MOM)的最终实现,它为企业消息传递提供高可用、出色性能、可扩展、稳定和安全保障。可插拔的传输协议支持,比如:in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端 C++、Java、.Net,、Python、 Php、 Ruby等。

参考文章:

ActiveMQ介绍

ActiveMQ消息队列:从入门到Spring Boot实战

ActiveMQ 介绍

ActiveMQ 架构

在ActiveMQ中,生产者(Producer)发送消息到Queue或者Topic中,消费者(consumer)通过ActiveMQ支持的传输协议连接到ActiveMQ接受消息并做处理。

image-20250801153005242

  • 生产者(Producer):消息的生产者。

  • 消费者(Consumer):消息的消费者。

  • 队列(Queue):用于存储还未被消费者消费的消息,点对点模型以Queue作为通信载体。

  • 主题(Topic):用于存储还未被消费者消费的消息,发布订阅模式使用Topic作为通信载体。

  • 传输协议:ActiveMQ允许客户端使用多种协议来连接,包括:TCP、AMQP、STOMP、MQTT、WS。

相关概念

  • JMS:Java Message Service,JMS是一个java平台中关于面向消息中间件的API。

  • PTP:The point-to-point,可以同步或者异步的发送和接受消息,每个消息仅被发送一次,且消费一次。

  • pub/sub:publish/subscribe发布订阅,消息生产者将消息发到topic中,消息消费者从topic读取,与PTP不同的是,消息可以被重复消费。

  • 集群模式:分为单节点和主从两种。

  • 单节点:ActiveMQ只有一个实例,单独运行。

  • 主从:三个ActiveMQ组成ActiveMQ主从模式,实现高可用的架构。

  • 集群健康状态:分为健康和不健康两种。

  • 健康:集群中组件运行正常时,对应集群将处于健康状态。

  • 不健康:集群中组件运行状态异常时,对应集群将处于不健康状态。

应用场景

消息中间件ActiveMQ适用于以下场景:

  • 应用解耦:消息中间件ActiveMQ可以插入应用中间,为两边应用提供接口,实现应用通过消息中间件进行通信。

  • 异步处理:消息中间件ActiveMQ提供异步处理机制,允许应用把一些消息放入消息中间件中,并不立即处理它,在之后需要的时候再处理。

  • 流量削峰:使用消息中间件ActiveMQ能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。

  • 其他场景:可以用于日志处理、消息通讯等。

ActiveMQ安装与启动

安装启动只需从Apache ActiveMQ官方网站下载对应操作系统的发行版,解压后即可使用。以下是简要步骤:

  1. 下载ActiveMQ:访问ActiveMQ官方网站,下载最新稳定版本的ActiveMQ二进制包。
  2. 解压:将下载的压缩包解压到您选择的目录。
  3. 启动服务
    • Windows:进入解压目录下的bin文件夹,运行activemq.bat脚本。
    • Linux/macOS:进入解压目录下的bin文件夹,运行./activemq start命令。
  4. Web管理界面:ActiveMQ启动成功后,您可以通过浏览器访问其Web管理界面,默认地址通常是http://localhost:8161/admin/,默认用户名和密码均为admin。该界面提供了对队列、主题、连接等进行监控和管理的功能。

注意:在实际生产环境中,建议对ActiveMQ进行更详细的配置,例如持久化、安全性、内存限制等,以确保其稳定性和性能。但在Spring Boot集成开发阶段,默认配置通常足以满足基本需求。

SpringBoot 集成 ActiveMQ

Spring Boot为集成消息队列提供了极大的便利,通过引入spring-boot-starter-activemq依赖,可以实现ActiveMQ的自动配置,大大简化了开发流程。

  1. 添加Maven依赖

    在您的Spring Boot项目的pom.xml文件中,添加以下Maven依赖:

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <!-- 这个Starter会自动引入ActiveMQ客户端库以及Spring JMS相关的自动配置,无需手动配置JMS连接工厂、JMS模板等 -->
  2. 配置ActiveMQ连接

    src/main/resources目录下的application.propertiesapplication.yml配置文件中,添加ActiveMQ的连接信息。以下是application.properties的示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # ActiveMQ Broker URL
    spring.activemq.broker-url=tcp://localhost:61616
    # ActiveMQ 用户名 (如果需要认证)
    spring.activemq.user=admin
    # ActiveMQ 密码 (如果需要认证)
    spring.activemq.password=admin
    # 是否启用内嵌的ActiveMQ Broker (默认为false,如果为true则不需要外部ActiveMQ服务)
    # spring.activemq.in-memory=true
    # 是否启用JMS连接池 (如果为true,需要添加activemq-pool依赖)
    # spring.activemq.pool.enabled=false

    • spring.activemq.broker-url:指定ActiveMQ服务器的连接地址和端口。如果ActiveMQ运行在本地默认端口,通常是tcp://localhost:61616
    • spring.activemq.userspring.activemq.password:如果您的ActiveMQ服务器配置了认证,则需要提供相应的用户名和密码。
    • spring.activemq.in-memory:如果设置为true,Spring Boot将启动一个内嵌的ActiveMQ Broker,这在开发和测试环境中非常方便,无需单独安装和启动ActiveMQ服务。但在生产环境中,通常会连接到独立的ActiveMQ服务器。
    • spring.activemq.pool.enabled:是否启用JMS连接池。如果设置为true,建议添加activemq-pool依赖以获得更好的性能和资源管理。
  3. ActiveMQ配置类(可选,用于自定义Destination)

在某些情况下,您可能希望通过编程方式定义JMS的目的地(Queue或Topic),或者进行更高级的JMS配置。您可以创建一个配置类来定义这些Bean。例如,定义一个名为my_queue的队列和一个名为my_topic的主题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class ActiveMqConfig {
public static final String QUEUE_NAME = "my_queue";
public static final String TOPIC_NAME = "my_topic";
// 定义点对点模式的队列
@Bean
public Queue queue() {
return new ActiveMQQueue(QUEUE_NAME);
}
// 定义发布/订阅模式的主题
@Bean
public Topic topic() {
return new ActiveMQTopic(TOPIC_NAME);
}
}

这种方式可在Spring容器中获取到这些Destination的Bean,并在消息发送时直接引用它们,增加了代码的可维护性和灵活性。

消息的生产与消费实战

Spring Boot通过JmsTemplate简化了消息的发送,并通过@JmsListener注解实现了消息的便捷消费。下面我们将通过具体的代码示例来演示如何实现点对点和发布/订阅两种模式下的消息生产与消费。

消息生产者

将消息发送到ActiveMQ的指定目的地(队列或主题)。Spring Boot自动配置了JmsTemplate,可以直接注入并使用它来发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Component
public class MessageProducer {
@Autowired
private JmsTemplate jmsTemplate;
/**
* 发送消息到指定目的地
* @param destination 目的地(队列或主题)
* @param message 消息内容
*/
public void sendMessage(Destination destination, String message) {
jmsTemplate.convertAndSend(destination, message);
System.out.println("Message sent to " + destination + ": " + message);
}
/**
* 发送消息到指定队列名称
* @param queueName 队列名称
* @param message 消息内容
*/
public void sendQueueMessage(String queueName, String message) {
jmsTemplate.convertAndSend(queueName, message);
System.out.println("Message sent to queue " + queueName + ": " + message);
}
/**
* 发送消息到指定主题名称
* @param topicName 主题名称
* @param message 消息内容
*/
public void sendTopicMessage(String topicName, String message) {
jmsTemplate.convertAndSend(topicName, message);
System.out.println("Message sent to topic " + topicName + ": " + message);
}
}

在上述代码中,我们提供了三种发送消息的方法:

  • sendMessage(Destination destination, String message):直接使用JMS Destination对象发送消息,适用于通过@Bean定义了QueueTopic的情况。
  • sendQueueMessage(String queueName, String message):通过队列名称发送消息,JmsTemplate会自动解析为队列目的地。
  • sendTopicMessage(String topicName, String message):通过主题名称发送消息,JmsTemplate会自动解析为主题目的地。

点对点消息(Queue)

点对点消息模式中,消息发送到队列,并且只能被一个消费者接收和处理。

生产者示例

通过一个简单的REST控制器来触发消息发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@RequestMapping("/queue")
public class QueueProducerController {
@Autowired
private MessageProducer messageProducer;
@Autowired
private Queue queue; // 注入ActiveMqConfig中定义的Queue Bean
@GetMapping("/send")
public String sendQueueMessage(@RequestParam("message") String message) {
messageProducer.sendMessage(queue, message);
return "Queue message sent: " + message;
}
}

当访问/queue/send?message=hello时,消息hello将被发送到my_queue队列。

消费者示例

消息消费者使用@JmsListener注解来监听指定目的地的消息。当有消息到达时,注解所修饰的方法将被自动调用。

1
2
3
4
5
6
7
8
9
@Component
public class QueueConsumer {
// 监听my_queue队列的消息,@param message 接收到的消息内容
@JmsListener(destination = ActiveMqConfig.QUEUE_NAME)
public void receiveQueueMessage(String message) {
System.out.println("Received queue message: " + message);
// 在这里处理接收到的消息逻辑,例如保存到数据库、调用其他服务等
}
}

@JmsListener(destination = ActiveMqConfig.QUEUE_NAME)指定了该方法将监听ActiveMqConfig.QUEUE_NAME(即my_queue)队列的消息。当消息到达时,receiveQueueMessage方法将被调用,并传入消息内容。

发布/订阅消息(Topic)

发布/订阅消息模式中,消息发送到主题,所有订阅该主题的消费者都会收到消息的副本。

生产者示例

与队列生产者类似,主题生产者将消息发送到主题。

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@RequestMapping("/topic")
public class TopicProducerController {
@Autowired
private MessageProducer messageProducer;
@Autowired
private Topic topic; // 注入ActiveMqConfig中定义的Topic Bean
@GetMapping("/send")
public String sendTopicMessage(@RequestParam("message") String message) {
messageProducer.sendMessage(topic, message);
return "Topic message sent: " + message;
}
}

当访问/topic/send?message=event时,消息event将被发送到my_topic主题。

消费者示例

主题消费者同样使用@JmsListener注解来监听主题。与队列不同的是,每个监听相同主题的消费者都会收到消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class TopicConsumer {
// 监听my_topic主题的消息,@param message 接收到的消息内容
@JmsListener(destination = ActiveMqConfig.TOPIC_NAME)
public void receiveTopicMessage(String message) {
System.out.println("Received topic message: " + message);
// 在这里处理接收到的消息逻辑
}
// 另一个监听my_topic主题的消费者,用于演示发布/订阅模式
@JmsListener(destination = ActiveMqConfig.TOPIC_NAME)
public void anotherReceiveTopicMessage(String message) {
System.out.println("Another consumer received topic message: " + message);
}
}

当消息发送到my_topic主题时,receiveTopicMessageanotherReceiveTopicMessage两个方法都将收到该消息。

测试Controller

为了方便测试,您可以创建一个主应用程序类,并运行Spring Boot应用。然后通过浏览器或Postman访问上述定义的REST接口来发送消息,观察控制台输出以验证消息的生产和消费。

1
2
3
4
5
6
7
@SpringBootApplication
@EnableJms // 启用JMS功能
public class ActivemqSpringBootApplication {
public static void main(String[] args) {
SpringApplication.run(ActivemqSpringBootApplication.class, args);
}
}

确保在ActivemqSpringBootApplication类上添加@EnableJms注解,以启用Spring Boot的JMS功能。

ActiveMQ与Spring Boot集成的优势在于:

  • 简化配置:Spring Boot的自动配置大大减少了手动配置JMS连接和模板的工作量。
  • 快速开发JmsTemplate@JmsListener注解使得消息的发送和消费变得极其简单直观。
  • 提高效率:通过消息队列,可以实现系统间的异步处理,提高系统吞吐量和响应速度。
  • 增强健壮性:消息队列的削峰填谷能力和消息持久化机制,提升了系统的稳定性和可靠性。

消息队列在实际项目中的应用场景非常广泛,例如:

  • 异步处理:用户注册后发送邮件、短信通知;订单支付成功后更新库存、生成物流信息等。
  • 应用解耦:微服务架构中,不同服务之间通过消息进行通信,避免直接依赖,提高服务独立性。
  • 流量削峰:秒杀活动、大促期间,将瞬时高并发请求放入消息队列,后端服务匀速处理,防止系统崩溃。
  • 日志处理:将系统产生的海量日志发送到消息队列,由专门的日志处理服务进行收集、分析和存储。
  • 分布式事务:通过消息队列实现最终一致性,确保分布式系统中数据的一致性。

未来学习方向:

  • ActiveMQ集群:了解ActiveMQ的Master-Slave、Broker Network等集群模式,实现高可用和负载均衡。

  • 消息持久化:深入理解消息如何存储到数据库或文件系统,确保消息在Broker重启后不丢失。

  • 事务消息:学习如何使用JMS事务或Spring的事务管理来确保消息发送和业务操作的原子性。

  • 死信队列(DLQ):处理无法被消费者正常处理的消息,避免消息丢失。

  • 消息重发与幂等性:设计健壮的消费者,处理消息重发带来的重复消费问题。

  • 其他消息中间件:Kafka、RabbitMQ、RocketMQ等其他主流消息队列。


ActiveMQ 集群

Activemq的集群


JMS

JMS(Java Message Service)是Java平台上关于面向消息中间件(MOM)的API,它定义了Java应用程序如何创建、发送、接收和读取消息。JMS是独立于具体平台的API,绝大多数MOM提供商都对JMS提供支持,ActiveMQ就是其中之一。理解JMS的核心概念对于使用ActiveMQ至关重要。

JMS模型

JMS API定义了一组标准接口和类,用于实现消息传递。以下是JMS中的几个核心对象模型:

  • 连接工厂(ConnectionFactory):这是创建JMS连接的工厂接口。客户端通过查找JNDI(Java Naming and Directory Interface)来获取ConnectionFactory的实例,然后使用它来创建与消息代理的连接。
  • 连接(Connection):表示客户端与JMS提供者(即消息代理,如ActiveMQ)之间的活动连接。它封装了客户端与消息代理之间的物理连接。
  • 会话(Session):会话是发送和接收消息的单线程上下文。它可以创建消息生产者、消息消费者和消息本身。会话支持事务,并且可以指定消息的确认模式。
  • 目的地(Destination):目的地是消息发送和接收的地点。JMS定义了两种类型的目的地:
    • 队列(Queue):用于点对点(P2P)消息传递模型。消息发送者将消息发送到队列,消息接收者从队列中获取消息。一条消息只能被一个消费者接收和处理。
    • 主题(Topic):用于发布/订阅(Pub/Sub)消息传递模型。消息发布者将消息发布到主题,所有订阅该主题的消费者都会收到消息的副本。
  • 消息生产者(MessageProducer):由会话创建,用于向目的地发送消息。生产者可以发送不同类型的消息(文本、字节、对象、映射、流)。
  • 消息消费者(MessageConsumer):由会话创建,用于从目的地接收消息。消费者可以同步接收消息(阻塞等待)或异步接收消息(通过消息监听器)。

消息传递模式

JMS支持两种主要的消息传递模式,它们对应于不同的业务场景和消息处理需求:

  • 点对点(Point-to-Point, P2P)

    • 特点:基于队列(Queue)实现。消息生产者将消息发送到特定的队列,消息消费者从该队列中接收消息。一条消息只能被一个消费者消费,即使有多个消费者监听同一个队列,消息也会被轮询分发给其中一个。

    • 应用场景:适用于任务分配、工作流处理等场景,例如订单处理、支付通知等,确保每条消息只被处理一次。

  • 发布/订阅(Publish/Subscribe, Pub/Sub)

    • 特点:基于主题(Topic)实现。消息发布者将消息发布到特定的主题,所有订阅了该主题的消费者都会收到消息的副本。这种模式允许一对多的消息分发。
    • 应用场景:适用于广播通知、事件分发等场景,例如新闻发布、股票行情更新、系统日志分发等,所有相关方都需要接收到相同的消息。