java实现RabbitMQ消息发送和接收功能(包含测试)
以下是一个完整的Java类,同时包含RabbitMQ消息发送和接收功能,使用纯Java实现(非Spring Boot),包含Maven依赖:
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class RabbitMQManager {private static final Logger logger = LoggerFactory.getLogger(RabbitMQManager.class);private final ConnectionFactory factory;private Connection connection;private Channel sendChannel;private Channel receiveChannel;private volatile boolean running = true;// 配置参数private final String host;private final int port;private final String username;private final String password;public RabbitMQManager(String host, int port, String username, String password) {this.host = host;this.port = port;this.username = username;this.password = password;factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);}/*** 初始化RabbitMQ连接和通道*/public void initialize() throws IOException, TimeoutException {connection = factory.newConnection();logger.info("成功连接到RabbitMQ服务器: {}:{}", host, port);// 创建发送通道(用于feedback队列)sendChannel = connection.createChannel();// 声明发送队列(持久化)sendChannel.queueDeclare("feedback", true, false, false, null);logger.info("发送队列 'feedback' 已声明");// 创建接收通道(单独通道用于消费消息)receiveChannel = connection.createChannel();}/*** 发送消息到feedback队列* @param message 要发送的消息内容*/public void sendToFeedbackQueue(String message) throws IOException {if (sendChannel == null || !sendChannel.isOpen()) {throw new IllegalStateException("发送通道未初始化或已关闭");}sendChannel.basicPublish("", // 使用默认交换机"feedback", // 路由键为队列名MessageProperties.PERSISTENT_TEXT_PLAIN, // 设置消息持久化message.getBytes(StandardCharsets.UTF_8));logger.info("消息已发送到feedback队列: {}", message);}/*** 开始监听指定队列的消息* @param queueName 要监听的队列名称*/public void startListening(String queueName) throws IOException {if (receiveChannel == null || !receiveChannel.isOpen()) {throw new IllegalStateException("接收通道未初始化或已关闭");}// 声明队列(如果不存在则创建)receiveChannel.queueDeclare(queueName, true, false, false, null);logger.info("开始监听队列: {}", queueName);// 设置每次只接收一条消息(公平分发)receiveChannel.basicQos(1);// 创建消费者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);logger.info("收到消息 [{}]: {}", delivery.getEnvelope().getDeliveryTag(), message);try {// 处理消息processMessage(message, queueName);// 手动确认消息receiveChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {logger.error("消息处理失败", e);// 处理失败时拒绝消息(不重新入队)receiveChannel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);}};// 取消消费者回调CancelCallback cancelCallback = consumerTag -> {logger.warn("消费者被取消: {}", consumerTag);};// 开始消费消息(关闭自动确认)receiveChannel.basicConsume(queueName, false, deliverCallback, cancelCallback);logger.info("监听器已启动,等待消息... (按CTRL+C停止)");// 保持程序运行try {while (running) {Thread.sleep(1000); // 防止CPU空转}} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.info("监听线程被中断");}}/*** 消息处理逻辑(可根据需要自定义)*/private void processMessage(String message, String queueName) {logger.info("处理来自队列 [{}] 的消息: {}", queueName, message);// 示例:如果是feedback队列的消息,可以发送响应if ("feedback".equals(queueName)) {logger.info("处理feedback消息: {}", message);} else {// 其他队列的处理逻辑logger.info("处理来自 {} 队列的消息", queueName);}// 这里可以添加业务逻辑,例如:// - 解析JSON消息// - 调用其他服务// - 发送响应到另一个队列}/*** 停止监听并关闭资源*/public void shutdown() {running = false;logger.info("停止监听...");closeResources();}private void closeResources() {try {if (sendChannel != null && sendChannel.isOpen()) {sendChannel.close();}if (receiveChannel != null && receiveChannel.isOpen()) {receiveChannel.close();}if (connection != null && connection.isOpen()) {connection.close();}logger.info("RabbitMQ连接已关闭");} catch (IOException | TimeoutException e) {logger.error("关闭资源时出错", e);}}public static void main(String[] args) {// 配置RabbitMQ连接参数String host = "localhost";int port = 5672;String username = "guest";String password = "guest";String listenQueue = "my_queue"; // 监听的队列名称RabbitMQManager manager = new RabbitMQManager(host, port, username, password);try {// 初始化连接manager.initialize();// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(manager::shutdown));// 启动监听线程new Thread(() -> {try {manager.startListening(listenQueue);} catch (IOException e) {logger.error("监听失败", e);}}).start();// 主线程发送测试消息到feedback队列Thread.sleep(2000); // 等待监听器启动for (int i = 1; i <= 5; i++) {String message = "测试消息 #" + i;manager.sendToFeedbackQueue(message);Thread.sleep(1000);}// 保持程序运行while (manager.running) {Thread.sleep(1000);}} catch (IOException | TimeoutException | InterruptedException e) {logger.error("RabbitMQ操作失败", e);manager.shutdown();}}
}
Maven 依赖 (pom.xml
)
<dependencies><!-- RabbitMQ Java Client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><!-- SLF4J 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.7</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency>
</dependencies>
关键设计说明:
-
双通道设计:
sendChannel
:专门用于发送消息到feedback队列receiveChannel
:专门用于从其他队列接收消息- 分离通道避免发送和接收操作相互阻塞
-
消息发送功能:
public void sendToFeedbackQueue(String message) throws IOException {sendChannel.basicPublish("", // 默认交换机"feedback", MessageProperties.PERSISTENT_TEXT_PLAIN, // 持久化消息message.getBytes(StandardCharsets.UTF_8)); }
-
消息接收功能:
- 支持监听任意指定队列
- 手动消息确认机制
- 公平分发设置(basicQos(1))
-
资源管理:
- 统一的initialize()方法初始化连接
- 优雅的shutdown()方法关闭资源
- 使用Runtime shutdown hook确保程序退出时清理资源
-
线程模型:
- 主线程:发送消息到feedback队列
- 单独线程:持续监听消息队列
- 使用volatile running标志控制线程退出
使用说明:
-
运行程序:
mvn compile exec:java -Dexec.mainClass="RabbitMQManager"
-
测试流程:
- 程序启动后会自动初始化RabbitMQ连接
- 启动监听线程监听
my_queue
队列 - 主线程发送5条测试消息到
feedback
队列 - 按CTRL+C停止程序
-
自定义配置:
- 修改main方法中的连接参数
- 更改监听的队列名称
- 在processMessage()方法中添加业务逻辑
扩展建议:
-
添加消息序列化:
// 发送对象 public void sendObjectToFeedback(Object obj) throws IOException {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(obj);oos.flush();byte[] bytes = bos.toByteArray();sendChannel.basicPublish("", "feedback", MessageProperties.PERSISTENT_BASIC, bytes); }
-
添加JSON支持:
<!-- 添加Jackson依赖 --> <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version> </dependency>
// 发送JSON ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(myObject); sendToFeedbackQueue(json);// 接收JSON MyObject obj = mapper.readValue(message, MyObject.class);
-
添加重连机制:
public void ensureConnected() throws IOException, TimeoutException {if (connection == null || !connection.isOpen()) {initialize();} }
这个实现提供了生产级别的RabbitMQ操作,包含:
- 连接管理
- 通道分离
- 消息持久化
- 手动确认
- 错误处理
- 优雅关闭
- 完整的日志记录
您可以根据实际需求调整队列名称、消息处理逻辑和错误处理策略。