当前位置: 首页 > news >正文

ActiveMQ 高级特性:延迟消息与优先级队列实战(一)

引言

在当今的分布式系统开发中,消息中间件扮演着至关重要的角色,而 ActiveMQ 作为一款广泛使用的开源消息中间件,凭借其丰富的特性和良好的性能,深受开发者的青睐。它支持多种消息模型,如点对点和发布 / 订阅,并且具备消息持久化、事务支持等功能,能够满足不同场景下的消息通信需求。

延迟消息和优先级队列是 ActiveMQ 的两个高级特性,在实际应用中具有重要的价值。延迟消息允许消息在指定的时间后被投递,这在许多场景中都非常有用,比如订单超时处理,如果用户下单后一段时间内未支付,系统可以通过延迟消息自动取消订单;又如任务定时执行,某些任务需要在特定的时间点执行,延迟消息就能很好地实现这一需求。优先级队列则根据消息的优先级进行排序,优先处理高优先级的消息,这在任务调度系统中尤为重要,比如系统中有一些紧急任务和普通任务,通过优先级队列可以确保紧急任务优先得到处理,从而保证系统的高效运行。

本文将深入探讨 ActiveMQ 的延迟消息与优先级队列这两个高级特性,并通过实战示例详细介绍它们的使用方法和注意事项,帮助读者更好地掌握这两个特性,提升在分布式系统开发中运用 ActiveMQ 的能力。

一、ActiveMQ 基础回顾

1.1 简介

ActiveMQ 是 Apache 软件基金会所研发的开放源代码消息中间件,作为一种纯 Java 程序,只要操作系统支持 Java 虚拟机,它便可执行。它的设计目标是在尽可能多的平台和语言上提供标准的、消息驱动的应用集成 ,具备多种强大特性。

在语言支持方面,它允许使用 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多种语言编写客户端,极大地拓宽了其适用范围,让不同技术栈的开发者都能轻松使用。在协议支持上,涵盖 OpenWire、Stomp REST、WS Notification、XMPP、AMQP 等,使其能适应各种复杂的网络环境和应用场景。

ActiveMQ 完全支持 JMS1.1 和 J2EE 1.4 规范,包括持久化、XA 消息、事务等功能,这使得它在企业级应用开发中表现出色,能够满足企业对于消息处理的严格要求。同时,它对 Spring 框架有着良好的支持,可以很容易地内嵌到使用 Spring 的系统里面去,并且支持 Spring2.0 的特性,方便了基于 Spring 框架的项目集成和开发 。此外,它还通过了常见 J2EE 服务器(如 Geronimo、JBoss 4、GlassFish、WebLogic)的测试,借助 JCA 1.5 resource adaptors 的配置,能够自动部署到任何兼容 J2EE 1.4 的商业服务器上 。

ActiveMQ 支持多种传送协议,如 in-VM、TCP、SSL、NIO、UDP、Jgroups、JXTA 等,为不同的应用场景提供了灵活的选择。在消息持久化方面,支持通过 JDBC 和 journal 提供高速的消息持久化,保证了消息在系统故障等情况下的可靠性。从设计架构上,它保证了高性能的集群、客户端 - 服务器、点对点等通信模式的实现,还支持 Ajax 以及与 Axis 的整合,并且可以很容易地调用内嵌 JMS provider 进行测试。

由于这些特性,ActiveMQ 被广泛应用于各种场景。在电商系统中,像京东、淘宝这类大型电商平台,在处理订单、库存、物流等模块间的通信时,ActiveMQ 可以发挥异步处理和应用解耦的作用,提高系统的响应速度和稳定性。在分布式系统开发中,它能实现不同服务之间的可靠消息传递,确保系统的各个部分能够高效协同工作。

1.2 核心组件与工作原理

  • Broker:Broker 是 ActiveMQ 的核心组件,负责接收、存储和传递消息 ,可以将其看作是一个消息服务器。它可以运行在单机上,也可以分布在多个节点上组成集群,以实现高可用性和负载均衡。比如在一个大型的分布式电商系统中,可能会部署多个 Broker 节点,共同承担处理大量订单消息、库存更新消息等任务,防止单个节点因负载过高而出现性能瓶颈。
  • Connection:Connection 是生产者和消费者与 Broker 之间的通信链路 ,就像是一条连接客户端(生产者或消费者)和消息服务器(Broker)的桥梁。它可以是 TCP 连接、SSL 连接或者 HTTP 连接等。当一个订单系统(生产者)要向 ActiveMQ 发送订单消息时,首先需要建立一个到 Broker 的 Connection,通过这个连接来传输消息。
  • Session:Session 是生产者和消费者之间的通信会话 ,它可以是同步会话,也可以是异步会话。在 Session 中可以创建消息的生产者(MessageProducer)、消费者(MessageConsumer)以及消息的目的地(Destination,如队列 Queue 或主题 Topic)。例如,在一个物流系统(消费者)从 ActiveMQ 接收订单配送消息的过程中,会在一个 Session 内创建 MessageConsumer 来接收消息。
  • Destination:Destination 是消息的接收端 ,分为队列(Queue)和主题(Topic)两种类型。队列是一种先进先出(FIFO)的数据结构,生产者将消息发送到队列,消费者从队列中按照先进先出的顺序取消息进行处理,适用于一对一的消息通信场景,比如订单系统发送订单消息到队列,库存系统从队列中获取消息更新库存。主题则采用发布 / 订阅模式,一个生产者可以向主题发送消息,多个消费者可以订阅该主题并接收到相同的消息,适用于一对多的广播通信场景,比如系统发布公告消息到主题,多个相关的业务模块都可以订阅该主题获取公告内容。
  • MessageProducer:消息生产者,负责将消息发送到 Destination 。比如电商系统中的订单模块,在用户下单后,订单模块作为 MessageProducer 将订单相关消息发送到指定的 Queue 或 Topic。
  • MessageConsumer:消息消费者,负责从 Destination 接收消息 。如电商系统中的物流模块,作为 MessageConsumer 从对应的 Queue 或 Topic 中接收订单消息,从而安排后续的物流配送任务。

消息的生产和消费过程如下:

  1. 消息生产:生产者首先创建一个 ConnectionFactory,通过它创建与 Broker 的 Connection。接着,在这个 Connection 上创建 Session。在 Session 中创建 Destination(Queue 或 Topic)以及 MessageProducer。生产者创建消息对象(如 TextMessage、ObjectMessage 等),设置好消息的内容和属性后,通过 MessageProducer 将消息发送到指定的 Destination。例如,在一个在线教育平台中,当用户购买课程后,订单系统作为生产者,创建一个包含订单信息(课程 ID、用户 ID、购买时间等)的 TextMessage,通过 MessageProducer 发送到名为 “orderQueue” 的队列中。
  1. 消息消费:消费者同样先创建 ConnectionFactory 和 Connection,然后在 Connection 上创建 Session,并在 Session 中创建与生产者发送消息时相同的 Destination 以及 MessageConsumer。消费者通过 MessageConsumer 接收消息,可以采用同步阻塞方式(如Message message = consumer.receive();)等待接收消息,也可以设置消息监听器(consumer.setMessageListener(new MessageListener() {... });)以异步方式接收消息。当接收到消息后,消费者对消息进行处理。比如课程管理系统作为消费者,从 “orderQueue” 队列中接收到订单消息后,解析消息内容,更新课程的销售记录等。

二、延迟消息实战

2.1 延迟消息概念与应用场景

延迟消息是指消息在发送后,并不会立即被投递到消费者,而是在经过指定的时间延迟后才被投递 。这种特性在许多实际应用场景中都非常有用。

在电商系统中,订单超时处理是一个常见的场景。当用户下单后,如果在规定的时间内(如 30 分钟)未完成支付,系统需要自动取消订单并释放库存。通过发送延迟消息,在用户下单时,向消息队列发送一条延迟 30 分钟的消息,消息内容包含订单信息。30 分钟后,消息被投递,消费者接收到消息后检查订单状态,如果订单仍未支付,则执行取消订单和释放库存的操作。

在任务定时执行场景中,比如每天凌晨需要执行数据备份任务,或者每周一需要发送周报提醒。可以在系统中设置定时任务,在合适的时间点发送延迟消息,消息到达消费者后触发相应的任务执行逻辑 。例如,在每天凌晨 0 点,系统发送一条延迟 1 分钟的消息,消费者接收到消息后启动数据备份任务,这样可以避免在整点时刻系统负载过高时执行备份任务,影响其他业务的正常运行。

在物流系统中,当货物到达目的地后,可能需要在一段时间后(如 2 小时)自动确认收货并更新物流状态。通过延迟消息,在货物到达目的地时发送一条延迟 2 小时的消息,消息包含物流单号等信息,2 小时后消息被投递,消费者根据消息内容确认收货并更新物流状态,提高物流处理的自动化程度 。

2.2 ActiveMQ 实现延迟消息的方式

2.2.1 使用 TimeToLive 属性

通过设置消息的 TimeToLive(TTL)属性可以实现延迟投递。TTL 属性表示消息在队列中的存活时间,单位为毫秒。当生产者发送消息时,设置 TTL 属性为一个大于 0 的值,消息并不会立即被投递,而是在经过 TTL 时间后才会被投递到消费者 。

例如,以下是使用 Java 代码通过设置 TimeToLive 属性实现延迟消息发送的生产者示例:

 

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class DelayedMessageProducer {

private static final String BROKER_URL = "tcp://localhost:61616";

private static final String QUEUE_NAME = "DelayedMessageQueue";

public static void main(String[] args) throws JMSException {

// 创建连接工厂

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

// 创建连接

Connection connection = connectionFactory.createConnection();

// 启动连接

connection.start();

// 创建会话

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建队列

Destination destination = session.createQueue(QUEUE_NAME);

// 创建消息生产者

MessageProducer producer = session.createProducer(destination);

// 创建文本消息

TextMessage message = session.createTextMessage("这是一条延迟消息");

// 设置消息的TimeToLive属性为5000毫秒,即5秒后投递

message.setJMSExpiration(System.currentTimeMillis() + 5000);

// 发送消息

producer.send(message);

System.out.println("延迟消息已发送");

// 关闭资源

producer.close();

session.close();

connection.close();

}

}

消费者代码示例如下:

 

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class DelayedMessageConsumer {

private static final String BROKER_URL = "tcp://localhost:61616";

private static final String QUEUE_NAME = "DelayedMessageQueue";

public static void main(String[] args) throws JMSException {

// 创建连接工厂

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

// 创建连接

Connection connection = connectionFactory.createConnection();

// 启动连接

connection.start();

// 创建会话

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建队列

Destination destination = session.createQueue(QUEUE_NAME);

// 创建消息消费者

MessageConsumer consumer = session.createConsumer(destination);

// 接收消息,这里采用同步阻塞方式接收消息

Message message = consumer.receive();

if (message != null && message instanceof TextMessage) {

TextMessage textMessage = (TextMessage) message;

System.out.println("接收到延迟消息:" + textMessage.getText());

}

// 关闭资源

consumer.close();

session.close();

connection.close();

}

}

在上述代码中,生产者创建了一条文本消息,并设置了消息的JMSExpiration属性(即 TTL)为当前时间加上 5000 毫秒,这样消息会在 5 秒后被投递。消费者通过receive方法接收消息,当消息被投递时,消费者可以接收到并处理该消息。

2.2.2 使用 Scheduled Message 机制

ActiveMQ 的 Scheduled Message 机制提供了更灵活的延迟消息设置方式 。它允许通过设置消息的属性来实现延迟投递、定时重复投递等功能。

主要涉及以下几个属性:

  • AMQ_SCHEDULED_DELAY:消息延迟发送的时间,单位为毫秒 。例如,设置该属性为 60000,表示消息将在 60 秒后发送。
  • AMQ_SCHEDULED_PERIOD:每次重新发送该消息的时间间隔,单位为毫秒 。如果设置了该属性,消息在延迟发送后,会按照这个时间间隔重复发送。
  • AMQ_SCHEDULED_REPEAT:重新发送该消息的次数 。结合AMQ_SCHEDULED_PERIOD属性,可以实现消息的多次重复发送。
  • AMQ_SCHEDULED_CRON:使用 Cron 表达式设置发送该消息的时机 。通过 Cron 表达式,可以实现更复杂的定时任务,比如每天凌晨 3 点发送消息。

要使用 Scheduled Message 机制,需要在activemq.xml配置文件中开启 schedulerSupport属性,即在broker节点上添加schedulerSupport="true",如下所示:

 

<broker xmlns="http://activemq.apache.org/schema/core" schedulerSupport="true">

<!-- 其他配置 -->

</broker>

以下是基于 Spring Boot 的代码示例,展示如何使用 Scheduled Message 机制发送延迟消息:

首先,在pom.xml文件中添加 ActiveMQ 和 Spring JMS 的依赖:

 

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-jms</artifactId>

</dependency>

</dependencies>

然后,在application.yml文件中配置 ActiveMQ 的连接信息:

 

spring:

activemq:

broker-url: tcp://localhost:61616

user: admin

password: admin

接着,创建消息发送服务类:

 

import org.apache.activemq.ScheduledMessage;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.stereotype.Service;

@Service

public class ScheduledMessageService {

@Autowired

private JmsTemplate jmsTemplate;

public void sendScheduledMessage(String destinationName, String messageContent, long delay, long period, int repeat) {

jmsTemplate.send(destinationName, session -> {

// 创建文本消息

javax.jms.TextMessage textMessage = session.createTextMessage(messageContent);

// 设置延迟发送时间

textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);

// 设置重复发送时间间隔

if (period > 0) {

textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);

}

// 设置重复发送次数

if (repeat > 0) {

textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);

}

return textMessage;

});

}

}

最后,在测试类中调用发送方法:

 

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest

public class ScheduledMessageTest {

@Autowired

private ScheduledMessageService scheduledMessageService;

@Test

public void testSendScheduledMessage() {

// 发送延迟消息,延迟60秒发送,不重复发送

scheduledMessageService.sendScheduledMessage("ScheduledMessageQueue", "这是一条定时延迟消息", 60000, 0, 0);

}

}

在上述代码中,通过ScheduledMessageService类的sendScheduledMessage方法发送延迟消息,设置了消息的延迟时间为 60 秒,不设置重复发送时间间隔和次数。在测试类中调用该方法,即可发送延迟消息 。消费者的代码与前面使用 TimeToLive 属性时的消费者代码类似,通过监听队列接收消息并处理。

2.3 实战案例与代码实现

2.3.1 案例背景与需求分析

假设我们正在开发一个电商系统,其中订单模块需要实现订单支付超时取消的功能。当用户下单后,系统生成订单并发送到消息队列,同时设置订单的支付超时时间为 30 分钟。如果在 30 分钟内用户未完成支付,系统需要自动取消订单,并将订单状态更新为 “已取消”,同时释放订单中占用的库存 。

为了实现这个功能,我们可以利用 ActiveMQ 的延迟消息特性。在用户下单时,向 ActiveMQ 发送一条包含订单信息的延迟消息,延迟时间设置为 30 分钟。30 分钟后,消息被投递到消费者,消费者接收到消息后检查订单的支付状态,如果订单未支付,则执行取消订单和释放库存的操作 。

2.3.2 代码实现与配置

首先,创建一个 Spring Boot 项目,并在pom.xml文件中添加 ActiveMQ 和 Spring JMS 的依赖:

 

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-jms</artifactId>

</dependency>

</dependencies>

然后,在application.yml文件中配置 ActiveMQ 的连接信息:

 

spring:

activemq:

broker-url: tcp://localhost:61616

user: admin

password: admin

接下来,创建订单实体类Order:

 

import java.io.Serializable;

public class Order implements Serializable {

private static final long serialVersionUID = 1L;

private Long orderId;

private String orderNo;

private String status;

// 其他订单相关属性和getter、setter方法

}

创建消息发送服务类OrderMessageSender:

 

import org.apache.activemq.ScheduledMessage;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.stereotype.Service;

@Service

public class OrderMessageSender {

@Autowired

private JmsTemplate jmsTemplate;

public void sendOrderMessage(Order order, long delay) {

jmsTemplate.send("OrderQueue", session -> {

// 创建对象消息,因为要传递订单对象

javax.jms.ObjectMessage objectMessage = session.createObjectMessage(order);

// 设置延迟发送时间,单位毫秒

objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);

return objectMessage;

});

}

}

创建消息接收服务类OrderMessageReceiver:

 

import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Service;

@Service

public class OrderMessageReceiver {

@JmsListener(destination = "OrderQueue")

public void receiveOrderMessage(Order order) {

// 检查订单支付状态,这里假设订单实体中有支付状态字段,实际应用中需要从数据库查询

if ("未支付".equals(order.getStatus())) {

// 执行取消订单操作,更新订单状态到数据库

order.setStatus("已取消");

// 模拟释放库存操作

System.out.println("订单 " + order.getOrderNo() + " 支付超时,已取消,库存已释放");

} else {

System.out.println("订单 " + order.getOrderNo() + " 已支付,无需取消");

}

}

}

在ActiveMQ的配置文件activemq.xml中,确保开启了 schedulerSupport属性:

 

<broker xmlns="http://activemq.apache.org/schema/core" schedulerSupport="true">

<!-- 其他配置 -->

</broker>

在订单创建的业务逻辑中,调用消息发送服务发送延迟消息:

 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RestController;

@RestController

public class OrderController {

@Autowired

private OrderMessageSender orderMessageSender;

@PostMapping("/orders")

public String createOrder(@RequestBody Order order) {

// 模拟订单创建,设置订单编号和初始状态

order.setOrderNo("20240101001");

order.setStatus("未支付");

// 发送延迟消息,延迟30分钟,单位毫秒

orderMessageSender.sendOrderMessage(order, 30 * 60 * 1000);

return "订单已创建,等待支付";

}

}

2.3.3 测试与验证

启动 ActiveMQ 服务和 Spring Boot 应用。使用 Postman 等工具向/orders接口发送 POST 请求,请求体中包含订单信息,模拟用户下单操作 。

发送请求后,查看控制台输出,确认订单消息已发送。等待 30 分钟后,再次查看控制台输出,应该能看到订单支付超时取消的相关信息,如 “订单 20240101001 支付超时,已取消,库存已释放” 。

为了更准确地验证,可以在数据库中查看订单状态是否已更新为 “已取消”,以及库存相关数据是否已恢复到订单创建前的状态 。通过以上测试,可以验证延迟消息是否按照预期时间投递和消费,以及订单支付超时取消功能是否正常工作。

相关文章:

  • Java中的线程
  • 编程题 02-线性结构3 Reversing Linked List【PAT】
  • Arduino快速入门
  • 组合数学——容斥原理
  • K8S Ingress、IngressController 快速开始
  • [数据结构高阶]并查集初识、手撕、可以解决哪类问题?
  • AI Agent开发第64课-DIFY和企业现有系统结合实现高可配置的智能零售AI Agent(上)
  • Matlab 空调温度时延模型的模糊pid控制
  • M8040A/M8199助力数据中心收发信机测试
  • 25、Tailwind:魔法速记术——React 19 样式新思路
  • 数据治理域——数据治理体系建设
  • Hive HA配置高可用
  • 多样本整合Banksy空间聚类分析(Visium HD, Xenium, CosMx)
  • AAAI-2025 | 中科院无人机导航新突破!FELA:基于细粒度对齐的无人机视觉对话导航
  • 深入浅出:Java 中的动态类加载与编译技术
  • 15.three官方示例+编辑器+AI快速学习webgl_buffergeometry_instancing
  • IOT藍牙探測 C2 架構:社會工程/節點分離防追尋
  • Windows下安装Docker Desktop到C盘以外的盘
  • DNS工作原理与报文解析
  • Python Day23 学习
  • 中国恒大:清盘人向香港高等法院申请撤回股份转让
  • 盖茨说对中国技术封锁起到反作用
  • 深一度|在亚马尔的天才面前,姆巴佩戴上“帽子”又如何
  • 5年建成强化城市核心功能新引擎,上海北外滩“风景文化都是顶流”
  • 西藏日喀则市拉孜县发生5.5级地震,震感明显部分人被晃醒
  • 山西忻州市人大常委会副主任郭建平接受审查调查