当前位置: 首页 > news >正文

从 JMS 到 ActiveMQ:API 设计与扩展机制分析(一)

引言

{"type":"load_by_key","key":"auto_image_0_0","image_type":"search"}

在当今的分布式系统开发中,消息中间件扮演着举足轻重的角色,它为应用程序之间提供了可靠的异步通信机制。JMS(Java Message Service)作为 Java 平台上的消息服务规范,定义了一套通用的 API,使得开发者能够编写与具体消息中间件实现无关的代码,实现了更高层次的抽象和可移植性。而 ActiveMQ 则是 JMS 规范的一种流行实现,它不仅完全支持 JMS API,还提供了丰富的特性和强大的功能,如消息持久化、事务支持、集群部署等,在企业级应用中被广泛使用。

深入分析 JMS 和 ActiveMQ 的 API 设计与扩展机制,对于开发者来说具有多方面的重要意义。在 API 设计层面,理解 JMS 规范中定义的接口和类,如 ConnectionFactory、Connection、Session、Destination、MessageProducer、MessageConsumer 等,能让开发者熟练掌握如何创建连接、发送和接收消息,以及管理消息的生命周期,从而编写出简洁、高效且符合规范的消息处理代码。同时,研究 ActiveMQ 对 JMS API 的具体实现方式,能帮助开发者更好地利用 ActiveMQ 的特性,优化消息处理性能,例如在高并发场景下合理配置 ActiveMQ 的连接池和线程池。

在扩展机制方面,了解 JMS 的可插拔性和扩展性设计理念,有助于开发者根据业务需求,定制和扩展消息处理逻辑,实现诸如自定义消息序列化方式、消息过滤策略等功能。而 ActiveMQ 自身提供的丰富扩展点,如插件机制、协议扩展、存储扩展等,能让开发者灵活地对 ActiveMQ 进行二次开发,以满足不同的应用场景需求,比如在金融领域实现对消息的严格顺序控制和高可靠性传输。

本文将详细剖析 JMS 和 ActiveMQ 的 API 设计与扩展机制,通过理论阐述、代码示例和实际案例分析,帮助读者深入理解它们的工作原理和应用方法,从而在分布式系统开发中更好地运用消息中间件技术。

一、JMS 与 ActiveMQ 简介

(一)JMS 基础回顾

JMS 即 Java 消息服务(Java Message Service),是 Java 平台上关于面向消息中间件(MOM)的 API ,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。它定义了一组通用的接口和类,使得 Java 应用程序能够与各种消息中间件进行交互,而无需关心具体的实现细节。这就好比 JDBC 为 Java 应用程序提供了统一访问各种关系数据库的接口,JMS 则为消息通信提供了类似的抽象层,大大提高了代码的可移植性和可维护性。

JMS 的核心概念丰富而关键。连接工厂(ConnectionFactory)是创建连接的工厂,它就像是一个生产连接的 “车间”,通过它可以获取到与消息中间件的连接。连接(Connection)则是客户端与消息中间件之间的通信链路,是消息传递的基础通道。会话(Session)是在连接之上创建的单线程上下文,用于发送和接收消息,它就像是一个消息处理的 “工作区”,在这个区域内可以进行消息的各种操作 。

目的地(Destination)是消息的发送目标和接收来源,它分为队列(Queue)和主题(Topic)两种类型,分别对应着不同的消息模型。消息生产者(MessageProducer)负责创建并发送消息,就像是消息的 “发送者”,而消息消费者(MessageConsumer)则用于接收并处理消息,是消息的 “接收者”。

JMS 定义了两种主要的消息模型:点对点(Point-to-Point,P2P)模型和发布 / 订阅(Publish/Subscribe,Pub/Sub)模型。在 P2P 模型中,消息被发送到队列中,每个消息只能被一个消费者接收。就好比寄信,一封信只能被一个收件人收到。消息生产者和消费者之间没有时间上的严格相关性,即使消费者在生产者发送消息时不在线,只要消息在队列中未被消费,消费者上线后依然可以获取到消息 。

在 Pub/Sub 模型中,消息被发布到主题,多个订阅者可以同时接收同一个消息,类似于广播的形式,一个消息可以被多个听众听到。生产者和消费者之间存在一定的时间相关性,订阅者需要先订阅主题,才能接收到后续发布到该主题的消息。不过,JMS 也允许创建持久订阅,使得订阅者即使在离线状态下,也能在重新上线后接收到离线期间发布的消息。

在企业级应用中,JMS 扮演着异步通信的关键角色。例如,在一个电商系统中,用户下单后,订单系统可以通过 JMS 发送消息给库存系统、物流系统和支付系统等。库存系统接收到消息后进行库存扣减,物流系统安排发货,支付系统处理支付流程。这样,订单系统无需等待其他系统的处理结果,可以立即返回给用户响应,提高了系统的响应速度和用户体验。同时,各个系统之间通过 JMS 进行解耦,降低了系统之间的耦合度,使得每个系统可以独立进行升级和维护 。

(二)ActiveMQ 概述

ActiveMQ 是 Apache 出品的最流行的开源消息中间件之一,它是一个完全支持 JMS 1.1 和 J2EE 1.4 规范的 JMS Provider 实现。这意味着它不仅遵循 JMS 规范,提供了标准的 JMS API 实现,还在此基础上进行了大量的功能扩展和优化,以满足不同场景下的消息通信需求。

ActiveMQ 具有众多突出的特点和优势。它支持多种协议,如 OpenWire、AMQP、MQTT、STOMP 等,这使得它能够与不同类型的应用程序和系统进行集成。无论是传统的企业级应用,还是新兴的物联网设备,都可以通过合适的协议与 ActiveMQ 进行通信。同时,ActiveMQ 支持多种语言的客户端,包括 Java、C、C++、C#、Ruby、Perl、Python、PHP 等,极大地扩大了其应用范围,开发者可以根据项目的技术栈选择合适的客户端语言进行开发。

ActiveMQ 对 Spring 框架有着良好的支持,这在基于 Spring 的企业级开发中尤为重要。通过 Spring 的配置和管理,开发者可以方便地集成 ActiveMQ,实现消息的发送和接收,并且可以利用 Spring 的事务管理、依赖注入等特性来增强消息处理的功能和可靠性。例如,在一个基于 Spring Boot 的项目中,只需要简单地添加 ActiveMQ 的依赖,并在配置文件中进行一些基本配置,就可以快速搭建起消息通信的功能。

在分布式系统中,ActiveMQ 有着广泛的应用场景。它可以用于实现系统之间的异步通信,解耦不同模块之间的依赖关系。比如在一个微服务架构中,各个微服务之间通过 ActiveMQ 进行消息传递,实现数据的同步和业务流程的协同。同时,ActiveMQ 还可以用于实现消息的持久化存储,确保在系统故障或重启的情况下,消息不会丢失。在高并发场景下,ActiveMQ 可以通过集群部署和负载均衡技术,提高系统的吞吐量和可用性,保证消息的可靠传输和处理 。

二、JMS API 设计剖析

(一)核心接口详解

  1. ConnectionFactory 接口:ConnectionFactory 是用于创建 Connection 的工厂接口,它是 JMS 客户端与 JMS Provider 之间连接的创建者。在实际应用中,不同的 JMS Provider 会提供各自的 ConnectionFactory 实现类,例如 ActiveMQ 提供的 ActiveMQConnectionFactory。通过 ConnectionFactory,开发者可以获取到与消息中间件的连接,并且可以设置一些连接相关的参数,如连接地址、用户名、密码等。它就像是一个连接的 “生产车间”,负责制造出符合要求的连接对象,为后续的消息通信搭建基础通道。
  1. Connection 接口:Connection 代表了 JMS 客户端与 JMS Provider 之间的物理连接,是消息传递的基础。它负责管理与 JMS Provider 的通信,包括建立连接、启动和停止连接等操作。在一个 Connection 中,可以创建多个 Session,每个 Session 用于执行一组消息操作。当 Connection 建立后,客户端就可以通过它与 JMS Provider 进行交互,发送和接收消息。同时,Connection 还提供了一些方法来管理事务,例如开始事务、提交事务和回滚事务等,确保消息操作的原子性和一致性 。
  1. Session 接口:Session 是在 Connection 之上创建的单线程上下文,用于发送和接收消息。它就像是一个消息处理的 “工作区”,在这个区域内可以进行消息的创建、发送、接收等各种操作。Session 还负责管理消息的确认模式,即当消费者接收消息后,如何通知 JMS Provider 消息已被成功接收。JMS 提供了几种确认模式,如自动确认(AUTO_ACKNOWLEDGE)、客户端手动确认(CLIENT_ACKNOWLEDGE)和事务性确认(SESSION_TRANSACTED) 。在自动确认模式下,当消费者接收到消息后,JMS Provider 会自动将消息标记为已接收;在客户端手动确认模式下,消费者需要调用消息的 acknowledge () 方法来手动确认消息;在事务性确认模式下,消息的确认与事务相关联,只有当事务成功提交时,消息才会被确认 。

下面通过一段代码示例来展示如何使用这些核心接口创建连接和会话:

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSExample {

public static void main(String[] args) throws Exception {

// 创建ConnectionFactory

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

// 创建Connection

Connection connection = factory.createConnection();

// 启动连接

connection.start();

// 创建Session,第一个参数为是否支持事务,第二个参数为确认模式

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 这里可以进行消息发送和接收等操作

// 关闭Session

session.close();

// 关闭Connection

connection.close();

}

}

在上述代码中,首先创建了一个 ActiveMQConnectionFactory 对象,通过它指定了连接到 ActiveMQ 的地址。然后使用该工厂创建了一个 Connection 对象,并启动了连接。接着,在连接的基础上创建了一个 Session 对象,设置为非事务模式且采用自动确认模式。这样就完成了 JMS 连接和会话的创建,后续可以在这个会话中进行消息的生产和消费操作。

在消息生产和消费流程中,这些核心接口起着关键作用。ConnectionFactory 为整个消息通信提供了连接创建的基础,就像是搭建了一条通往消息中间件的道路;Connection 建立了与消息中间件的实际连接,是消息传输的载体;而 Session 则提供了一个操作消息的环境,就像是在这条道路上划分出了一个个的 “工作区域”,使得消息的生产和消费能够有条不紊地进行 。例如,在消息生产过程中,生产者通过 Session 创建 MessageProducer,然后使用 MessageProducer 将消息发送到目的地;在消息消费过程中,消费者通过 Session 创建 MessageConsumer,从目的地接收消息并进行处理 。

(二)消息模型相关 API

  1. 点对点模型相关 API:在点对点(P2P)模型中,主要涉及 Queue、MessageProducer 和 MessageConsumer 接口。Queue 代表一个消息队列,它是消息的存储和转发中心,就像是一个 “信箱”,消息生产者将消息发送到这个 “信箱” 中,而消息消费者从这里取出消息进行处理。MessageProducer 用于创建并发送消息到 Queue,它就像是消息的 “发送者”,通过调用其 send 方法将消息发送到指定的 Queue 中。MessageConsumer 则用于从 Queue 中接收消息,是消息的 “接收者”,可以通过其 receive 方法来获取消息,receive 方法有多种重载形式,可以设置等待消息的超时时间等参数 。
  1. 发布订阅模型相关 API:在发布 / 订阅(Pub/Sub)模型中,主要涉及 Topic、MessageProducer 和 MessageConsumer 接口。Topic 代表一个主题,它类似于一个 “广播中心”,消息生产者将消息发布到这个主题上,而所有订阅了该主题的消息消费者都可以接收到消息。与 Queue 不同,Topic 的消息会被发送给所有订阅者,实现了一对多的消息传递。MessageProducer 同样用于发送消息,但这里是将消息发送到 Topic,而 MessageConsumer 则是从 Topic 订阅并接收消息。对于持久订阅,订阅者需要在创建 MessageConsumer 时设置相应的标识,这样即使订阅者在消息发布时离线,也能在重新上线后获取到离线期间发布的消息 。

下面通过代码示例展示不同模型下消息生产和消费的操作:

点对点模型示例
 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class P2PProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue("myQueue");

MessageProducer producer = session.createProducer(queue);

TextMessage message = session.createTextMessage("Hello, P2P!");

producer.send(message);

System.out.println("Sent message: " + message.getText());

producer.close();

session.close();

connection.close();

}

}

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class P2PConsumer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue("myQueue");

MessageConsumer consumer = session.createConsumer(queue);

Message message = consumer.receive();

if (message instanceof TextMessage) {

TextMessage textMessage = (TextMessage) message;

System.out.println("Received message: " + textMessage.getText());

}

consumer.close();

session.close();

connection.close();

}

}

发布订阅模型示例
 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.MessageProducer;

import javax.jms.Topic;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class PubSubProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Topic topic = session.createTopic("myTopic");

MessageProducer producer = session.createProducer(topic);

TextMessage message = session.createTextMessage("Hello, Pub/Sub!");

producer.send(message);

System.out.println("Sent message: " + message.getText());

producer.close();

session.close();

connection.close();

}

}

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.Topic;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class PubSubConsumer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

Connection connection = factory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Topic topic = session.createTopic("myTopic");

MessageConsumer consumer = session.createConsumer(topic);

Message message = consumer.receive();

if (message instanceof TextMessage) {

TextMessage textMessage = (TextMessage) message;

System.out.println("Received message: " + textMessage.getText());

}

consumer.close();

session.close();

connection.close();

}

}

对比两种模型的 API 差异,主要体现在目的地的类型上,P2P 模型使用 Queue,而 Pub/Sub 模型使用 Topic。这也导致了消息传递的方式不同,P2P 模型中消息只能被一个消费者接收,而 Pub/Sub 模型中消息可以被多个订阅者接收。在适用场景方面,P2P 模型适用于需要确保消息被唯一处理的场景,例如订单处理系统,每个订单消息只需要被一个处理模块接收并处理;而 Pub/Sub 模型适用于需要广播消息的场景,如实时新闻推送系统,新闻消息需要发送给所有订阅的用户 。

相关文章:

  • Uniapp app 安卓手机(红米)自定义基座进行真机调试
  • 什么是供应链关键业务指标体系,如何利用指标驱动管理闭环
  • 解决osx-arm64平台上conda默认源没有提供 python=3.7 的官方编译版本的问题
  • 数据库插入数据时自动生成
  • 智能排产破解制造业效率困局
  • React 中 useMemo 和 useEffect 的区别(计算与监听方面)
  • (三)毛子整洁架构(Infrastructure层/DapperHelper/乐观锁)
  • 分布式处理架构
  • 地图、图表的制作要领
  • 明远智睿SSD2351开发板:仪器仪表与智慧农业的创新利器
  • Unity基础学习(九)输入系统全解析:鼠标、键盘与轴控制
  • Vibe Coding: 优点与缺点
  • 通信协议选型篇:如何根据项目需求选择合适的通信协议?
  • 韩媒聚焦Lazarus攻击手段升级,CertiK联创顾荣辉详解应对之道
  • Mysql数据库进阶
  • SAF利用由Varjo和AFormX开发的VR/XR模拟器推动作战训练
  • 关于大数据的基础知识(二)——国内大数据产业链分布结构
  • Java SE(10)——抽象类接口
  • Python实例题:Python快速获取斗图表情
  • 【python】Calculate the Angle of a Triangle
  • 巴基斯坦对印度发起网络攻击,致其约70%电网瘫痪
  • “苏河超级管”调研:桥下公园“留白”很好,指引差点
  • 越怕出错越会出错,“墨菲定律”的魔咒该怎么破?
  • 全国人大常委会启动食品安全法执法检查
  • 虚假认定实质性重组、高估不良债权价值,原中国华融资产重庆分公司被罚180万元
  • 化学家、台湾地区“中研院”原学术副院长陈长谦逝世