从 JMS 到 ActiveMQ:API 设计与扩展机制分析(二)
(三)消息结构与 API 操作
- JMS 消息结构:JMS 消息主要由消息头(Header)、属性(Properties)和消息体(Body)三部分组成。消息头包含了许多预定义的字段,用于标识消息、设置优先权、失效时间等,例如 JMSMessageID 用于唯一标识一条消息,JMSDeliveryMode 用于指定消息的投递模式(持久或非持久),JMSPriority 用于设置消息的优先级,JMSTimestamp 用于记录消息发送的时间,JMSExpiration 用于设置消息的过期时间等 。这些字段由 JMS Provider 在消息发送时自动填充或根据发送者的设置进行配置 。属性部分允许开发者自定义一些键值对,用于添加额外的信息,比如消息的业务类型、处理标志等,这些属性可以在消息的发送和接收过程中被使用,方便对消息进行分类和处理 。消息体则是消息的实际内容,根据不同的消息类型,消息体的结构和内容也不同,JMS 提供了多种消息类型,如 TextMessage 用于发送文本消息,MapMessage 用于发送键值对形式的消息,BytesMessage 用于发送字节数组消息,StreamMessage 用于发送 Java 原始类型的数据流消息,ObjectMessage 用于发送可序列化的 Java 对象消息 。
- API 操作示例:通过代码示例展示创建、发送、接收不同类型消息的操作。
创建并发送 TextMessage
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TextMessageProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("textQueue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("This is a text message");
// 设置自定义属性
message.setStringProperty("messageType", "text");
producer.send(message);
System.out.println("Sent text message: " + message.getText());
producer.close();
session.close();
connection.close();
}
}
接收 TextMessage
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TextMessageConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("textQueue");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received text message: " + textMessage.getText());
// 获取自定义属性
System.out.println("Message type: " + textMessage.getStringProperty("messageType"));
}
consumer.close();
session.close();
connection.close();
}
}
创建并发送 MapMessage
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.MapMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MapMessageProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("mapQueue");
MessageProducer producer = session.createProducer(queue);
MapMessage message = session.createMapMessage();
message.setString("key1", "value1");
message.setInt("key2", 123);
// 设置自定义属性
message.setStringProperty("messageType", "map");
producer.send(message);
System.out.println("Sent map message");
producer.close();
session.close();
connection.close();
}
}
接收 MapMessage
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.MapMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MapMessageConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("mapQueue");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive();
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
System.out.println("Received map message");
System.out.println("Value of key1: " + mapMessage.getString("key1"));
System.out.println("Value of key2: " + mapMessage.getInt("key2"));
// 获取自定义属性
System.out.println("Message type: " + mapMessage.getStringProperty("messageType"));
}
consumer.close();
session.close();
connection.close();
}
}
消息结构在 API 操作中的作用至关重要。消息头中的字段为消息的传输和处理提供了基本的控制信息,例如通过设置 JMSPriority 可以让重要的消息优先被处理,设置 JMSExpiration 可以避免消息长时间占用资源。属性部分则增强了消息的灵活性和可扩展性,开发者可以根据业务需求添加各种自定义信息,方便在消息处理过程中进行判断和操作。消息体则承载了实际的业务数据,不同的消息类型适用于不同的业务场景,例如 TextMessage 适合传输文本内容,MapMessage 适合传输结构化的键值对数据 。