当前位置: 首页 > news >正文

分布式系统中的 ActiveMQ:异步解耦与流量削峰(一)

一、引言

{"type":"load_by_key","key":"auto_image_0_0","image_type":"search"}

在当今数字化时代,分布式系统已成为构建大规模应用的关键架构。随着业务的快速发展和用户量的急剧增长,分布式系统面临着诸多挑战,其中异步通信、系统解耦和流量削峰是亟待解决的重要问题。

以电商系统为例,在秒杀活动中,大量用户同时涌入,瞬间产生海量订单请求。传统的同步处理方式会使系统不堪重负,导致响应缓慢甚至崩溃。而且,订单系统与库存系统、物流系统等紧密耦合,任何一个系统的故障或升级都可能影响其他系统的正常运行,大大降低了系统的稳定性和可扩展性。

ActiveMQ 作为一款强大的开源消息中间件,在分布式系统中发挥着至关重要的作用。它就像一座桥梁,连接着分布式系统中的各个组件,为异步通信、系统解耦和流量削峰提供了高效可靠的解决方案 。通过引入 ActiveMQ,系统可以将耗时的任务异步处理,使各个模块之间实现松耦合,同时有效应对突发的流量高峰,保障系统的稳定运行和高性能表现。

二、ActiveMQ 简介

ActiveMQ 是 Apache 软件基金会下的一个开源项目,是一款功能强大的消息中间件,在分布式系统中扮演着关键角色,为系统组件间的高效通信提供了可靠支持 。它遵循 JMS(Java Message Service)规范,这使得基于 Java 的应用程序能够方便地使用其提供的消息服务,实现异步通信和系统解耦。同时,ActiveMQ 支持多种语言客户端,如 Java、C、C++、C#、Ruby、Perl、Python、PHP 等 ,这意味着不同语言开发的系统组件都可以通过 ActiveMQ 进行通信,极大地拓宽了其应用范围,满足了多样化的分布式系统开发需求。

在传输协议方面,ActiveMQ 同样表现出色,支持多种协议,包括 OpenWire、Stomp、AMQP、MQTT 等 。不同的传输协议适用于不同的场景,例如,OpenWire 是 ActiveMQ 默认的高效二进制协议,适用于 Java 客户端与 ActiveMQ 之间的通信;MQTT 则是一种轻量级的协议,特别适合物联网设备等资源受限环境下的消息传输。这种多协议支持能力,使得 ActiveMQ 能够灵活地适应各种复杂的网络环境和应用场景。

ActiveMQ 主要支持两种消息模型:点对点(Point-to-Point)和发布 / 订阅(Publish/Subscribe) 。在点对点模型中,消息被发送到队列(Queue),每个消息只能被一个消费者接收,就像传统的信件投递,一封信只能被一个收件人收取。这种模型适用于需要确保消息被唯一处理的场景,比如订单处理系统,每个订单消息只需被一个处理模块处理,以保证订单处理的准确性和一致性。

而发布 / 订阅模型中,消息被发送到主题(Topic),多个订阅了该主题的消费者都可以接收到消息,类似于广播模式,一条广播消息可以被多个听众接收。这种模型适用于需要将消息广泛传播的场景,如实时行情推送系统,多个客户端都需要实时获取最新的行情信息,通过发布 / 订阅模型,行情消息可以同时被多个订阅客户端接收,实现信息的快速传播和共享。

正是由于 ActiveMQ 具备上述特性,它在众多分布式系统应用场景中都得到了广泛应用。在电商系统中,订单处理、库存管理、物流配送等模块之间可以通过 ActiveMQ 进行异步通信和解耦 。当用户下单后,订单消息被发送到 ActiveMQ 队列,订单处理模块从队列中获取消息进行处理,同时库存管理模块和物流配送模块也可以订阅相关消息,分别进行库存扣减和配送安排,各模块之间互不干扰,提高了系统的整体性能和可扩展性。

在金融系统中,ActiveMQ 可以用于实现交易消息的可靠传输和处理 。例如,股票交易系统中,买卖订单消息通过 ActiveMQ 在各个交易节点之间传递,确保交易的准确执行和数据的一致性,同时其高可靠性和性能保证了在高并发交易场景下系统的稳定运行。

三、异步解耦

(一)异步解耦原理剖析

在传统的分布式系统架构中,服务之间通常采用同步调用的方式进行通信 。例如,服务 A 需要调用服务 B 的某个功能,服务 A 会直接发起对服务 B 的请求,并等待服务 B 处理完成返回结果后,才继续执行后续操作。这种方式虽然简单直接,但存在诸多弊端。当服务 B 出现故障或者响应时间过长时,服务 A 会一直处于阻塞状态,无法及时处理其他任务,严重影响系统的整体性能和响应速度 。而且,服务 A 和服务 B 之间形成了紧密的耦合关系,服务 B 的任何变动,如接口升级、业务逻辑调整等,都可能需要服务 A 进行相应的修改,这大大增加了系统的维护成本和复杂性,降低了系统的可扩展性和灵活性。

ActiveMQ 的出现为解决这些问题提供了有效的方案。它基于消息队列的机制,实现了服务之间的异步通信 。在这种模式下,当服务 A 有任务需要服务 B 处理时,服务 A 并不会直接调用服务 B,而是将相关的任务信息封装成消息,发送到 ActiveMQ 的消息队列中 。然后,服务 A 可以立即返回,继续执行其他任务,无需等待服务 B 的处理结果。而服务 B 则作为消息的消费者,从消息队列中获取消息,并按照自身的节奏进行处理。

这种异步通信的方式就像是现实生活中的邮件系统。发件人(生产者)将邮件(消息)投递到邮箱(消息队列)中,然后就可以去做其他事情,无需等待收件人(消费者)接收和处理邮件。收件人在方便的时候去邮箱收取邮件并进行处理 。通过这种方式,生产者和消费者之间实现了时间和空间上的解耦 。它们不需要同时在线,也不需要直接交互,各自可以独立地进行开发、部署和升级,互不干扰,极大地提高了系统的灵活性和可维护性 。 例如,在一个电商系统中,订单服务(生产者)在用户下单后,将订单消息发送到 ActiveMQ 队列,然后可以快速响应用户的下单请求,而无需等待库存服务和物流服务(消费者)处理订单消息。库存服务和物流服务可以根据自身的负载情况,从队列中获取订单消息进行处理,实现了订单服务与库存服务、物流服务之间的异步解耦。

(二)应用场景实例

以电商下单流程为例,在传统的同步调用模式下,当用户下单时,订单系统需要依次同步调用库存系统进行库存检查和扣减、物流系统进行物流信息预分配、支付系统进行支付处理等 。这意味着订单系统与这些系统之间存在紧密的耦合关系。一旦库存系统因为高并发或者自身故障而响应缓慢,订单系统就会被阻塞,无法及时处理后续的下单请求,导致用户等待时间过长,甚至出现超时错误。而且,任何一个子系统的升级或维护,都可能需要订单系统暂停服务并进行相应的代码调整,这对整个电商系统的稳定性和可用性造成了很大的影响。

引入 ActiveMQ 后,情况得到了极大的改善。当用户下单时,订单系统将订单消息发送到 ActiveMQ 的消息队列中,然后立即返回给用户下单成功的提示 。此时,订单系统的任务已经完成,它可以继续处理其他用户的下单请求。而库存系统、物流系统和支付系统则作为消息的消费者,从消息队列中异步获取订单消息,并进行各自的业务处理 。如果某个子系统出现故障或者负载过高,消息会在队列中等待,不会影响其他系统的正常运行。例如,当物流系统因临时故障需要进行修复时,订单消息会在队列中堆积,待物流系统恢复正常后,再从队列中获取消息进行处理,整个下单流程不会因为物流系统的短暂故障而中断,用户体验得到了极大的提升。

通过这种方式,各个系统之间通过 ActiveMQ 实现了解耦,它们可以独立地进行扩展、升级和维护,而不会相互影响 。这不仅提高了系统的灵活性和可维护性,还增强了系统的整体稳定性和可靠性,使电商系统能够更好地应对高并发的业务场景。

(三)代码示例演示

下面给出使用 Java 和 ActiveMQ 实现异步解耦的代码示例 ,通过这个示例可以更直观地了解如何在实际开发中利用 ActiveMQ 实现生产者和消费者之间的异步通信。

首先,需要引入 ActiveMQ 的相关依赖。如果使用 Maven 项目管理工具,可以在pom.xml文件中添加以下依赖:

 

<dependencies>

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-all</artifactId>

<version>5.16.3</version>

</dependency>

<dependency>

<groupId>javax.jms</groupId>

<artifactId>javax.jms-api</artifactId>

<version>2.0.1</version>

</dependency>

</dependencies>

生产者发送消息的代码如下:

 

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

private static final String BROKER_URL = "tcp://localhost:61616";

private static final String QUEUE_NAME = "orderQueue";

public static void main(String[] args) {

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

Connection connection = null;

Session session = null;

MessageProducer producer = null;

try {

// 创建连接

connection = connectionFactory.createConnection();

connection.start();

// 创建会话,第一个参数为是否支持事务,第二个参数为签收模式

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建队列

Queue queue = session.createQueue(QUEUE_NAME);

// 创建消息生产者

producer = session.createProducer(queue);

// 创建文本消息

TextMessage message = session.createTextMessage("这是一条新的订单消息");

// 发送消息

producer.send(message);

System.out.println("已发送消息: " + message.getText());

} catch (JMSException e) {

e.printStackTrace();

} finally {

// 关闭资源

if (producer != null) {

try {

producer.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

if (session != null) {

try {

session.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

if (connection != null) {

try {

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

}

在这段代码中,首先创建了一个ActiveMQConnectionFactory连接工厂,用于创建与 ActiveMQ 服务器的连接 。然后通过连接工厂创建连接,并启动连接。接着创建一个会话,会话用于创建消息生产者、消费者以及消息对象 。在创建会话时,设置了不支持事务,采用自动签收模式 。之后创建了一个队列对象,指定了队列名称为orderQueue 。再创建消息生产者,并创建一条文本消息,最后将消息发送到队列中。

消费者接收消息的代码如下:

 

import javax.jms.*;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

private static final String BROKER_URL = "tcp://localhost:61616";

private static final String QUEUE_NAME = "orderQueue";

public static void main(String[] args) {

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);

Connection connection = null;

Session session = null;

MessageConsumer consumer = null;

try {

// 创建连接

connection = connectionFactory.createConnection();

connection.start();

// 创建会话,第一个参数为是否支持事务,第二个参数为签收模式

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 创建队列

Queue queue = session.createQueue(QUEUE_NAME);

// 创建消息消费者

consumer = session.createConsumer(queue);

// 接收消息,设置超时时间为5000毫秒

Message message = consumer.receive(5000);

if (message != null && message instanceof TextMessage) {

TextMessage textMessage = (TextMessage) message;

System.out.println("已接收消息: " + textMessage.getText());

} else {

System.out.println("未接收到消息或消息类型错误");

}

} catch (JMSException e) {

e.printStackTrace();

} finally {

// 关闭资源

if (consumer != null) {

try {

consumer.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

if (session != null) {

try {

session.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

if (connection != null) {

try {

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

}

消费者代码与生产者代码类似,同样创建连接工厂、连接、会话和队列 。不同的是,消费者创建的是消息消费者,通过receive方法从队列中接收消息,并设置了超时时间为 5000 毫秒 。如果在规定时间内接收到消息,并且消息类型为文本消息,则打印出消息内容;否则,提示未接收到消息或消息类型错误 。通过以上代码示例,可以清晰地看到如何使用 Java 和 ActiveMQ 实现生产者与消费者之间的异步解耦,消息的发送和接收是相互独立的过程,实现了系统间的解耦和异步通信 。

相关文章:

  • 潮乎盲盒商城系统全开源多级分销推广海报奖品兑换试玩概率OSS云存储多端源码
  • 《Java高级编程:从原理到实战 - 进阶知识篇五》
  • Qt中QVector的实现与简化
  • 统计学中的p值是什么?怎么使用?
  • TS 枚举类型
  • 【PostgreSQL数据分析实战:从数据清洗到可视化全流程】4.2 数据类型转换(CAST函数/自定义函数)
  • WSL在D盘安装Ubuntu
  • 8.5 从零到生产:Docker+K8s+CI/CD全链路部署实战手册
  • 【SpringAI+阿里云百炼】AI对话4个Demo
  • 40. 组合总和 II
  • 洛谷 P2866 [USACO06NOV] Bad Hair Day S
  • Untiy基础学习(五)Inspector窗口中可编辑的变量
  • Linux之用户管理
  • SALOME源码分析: SolverLab
  • 大模型(LLMs)RAG 版面分析——文本分块面
  • Rust的安全卫生原则
  • Java二维码学习
  • Spark,Idea中编写Spark程序 2
  • 从入门到登峰-嵌入式Tracker定位算法全景之旅 Part 4 |IMU 死算与校正:惯性导航在资源受限环境的落地
  • 在CentOS环境中安装MySQL数据库保姆级教程
  • 立夏的野火饭
  • 一代名伶程砚秋经典影像:一箱旧影,芳华满堂
  • 山东一景区怕游客赶不到海撒三千斤蛤蜊:给游客提供情绪价值
  • “译通天下·言立寰宇”:华东师大翻译家的精神传承
  • 赵乐际主持十四届全国人大常委会第十五次会议闭幕会并作讲话
  • 上海国际咖啡文化节开幕,北外滩集结了超350个展位