接收rabbitmq消息
以下是一个使用纯Java(非Spring Boot)接收RabbitMQ消息的完整实现,包含Maven依赖和持续监听消息的循环:
1. 首先添加Maven依赖 (pom.xml
)
<dependencies><!-- RabbitMQ Java Client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><!-- 日志框架 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency>
</dependencies>
2. RabbitMQ消息接收器实现
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitMQReceiver {private static final Logger logger = LoggerFactory.getLogger(RabbitMQReceiver.class);private final ConnectionFactory factory;private Connection connection;private Channel channel;private volatile boolean running = true;public RabbitMQReceiver(String host, int port, String username, String password) {factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);}public void startListening(String queueName) {try {// 建立连接connection = factory.newConnection();channel = connection.createChannel();// 声明队列(如果不存在则创建)channel.queueDeclare(queueName, true, false, false, null);logger.info("连接到队列: {}", queueName);// 设置每次只接收一条消息(公平分发)channel.basicQos(1);// 创建消费者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");logger.info("收到消息 [{}]: {}", delivery.getEnvelope().getDeliveryTag(), message);try {// 在这里处理你的业务逻辑processMessage(message);// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {logger.error("消息处理失败", e);// 处理失败时拒绝消息(不重新入队)channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);}};// 取消消费者回调CancelCallback cancelCallback = consumerTag -> {logger.warn("消费者被取消: {}", consumerTag);};// 开始消费消息channel.basicConsume(queueName, false, deliverCallback, cancelCallback);logger.info("开始监听消息... (按CTRL+C停止)");// 保持程序运行while (running) {Thread.sleep(1000); // 防止CPU空转}} catch (IOException | TimeoutException | InterruptedException e) {logger.error("RabbitMQ连接失败", e);} finally {closeResources();}}private void processMessage(String message) {// 这里是你的业务逻辑处理logger.info("处理消息: {}", message);// 示例:打印消息长度System.out.println("消息长度: " + message.length());}public void stop() {running = false;logger.info("停止监听...");}private void closeResources() {try {if (channel != null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}logger.info("RabbitMQ连接已关闭");} catch (IOException | TimeoutException e) {logger.error("关闭资源时出错", e);}}public static void main(String[] args) {// 配置RabbitMQ连接参数String host = "localhost";int port = 5672;String username = "guest";String password = "guest";String queueName = "my_queue";RabbitMQReceiver receiver = new RabbitMQReceiver(host, port, username, password);// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(() -> {receiver.stop();receiver.closeResources();}));// 开始监听receiver.startListening(queueName);}
}
关键功能说明:
-
持续监听机制:
while (running) {Thread.sleep(1000); // 防止CPU空转 }
使用
running
标志控制循环,优雅退出 -
消息处理流程:
- 声明队列确保存在
- 设置QoS为1(公平分发)
- 使用
DeliverCallback
处理消息 - 手动消息确认(ACK/NACK)
- 异常处理与错误恢复
-
资源管理:
- 使用
finally
块确保关闭连接 - 添加Shutdown Hook处理程序终止
- 线程安全的状态管理(
volatile running
)
- 使用
-
日志记录:
- 使用SLF4J进行日志记录
- 关键操作都有日志输出
使用说明:
-
启动消费者:
mvn compile exec:java -Dexec.mainClass="RabbitMQReceiver"
-
发送测试消息(使用RabbitMQ管理界面或命令行工具):
rabbitmqadmin publish exchange=amq.default routing_key=my_queue payload="hello world"
-
停止程序:
- 按
CTRL+C
优雅停止 - 程序会自动关闭连接
- 按
自定义配置:
-
修改连接参数:
String host = "your.rabbitmq.host"; int port = 5672; String username = "your_user"; String password = "your_password"; String queueName = "your_queue_name";
-
自定义消息处理:
修改processMessage
方法实现你的业务逻辑:private void processMessage(String message) {// 示例:解析JSON消息// JSONObject json = new JSONObject(message);// System.out.println("收到订单: " + json.getString("orderId"));// 你的实际业务逻辑 }
-
配置调整:
- 修改
channel.basicQos()
调整预取数量 - 修改
basicNack
的requeue
参数控制是否重新入队 - 添加交换机绑定逻辑(如果需要)
- 修改
这个实现遵循了RabbitMQ最佳实践,包括:
- 手动消息确认
- 公平分发(QoS设置)
- 连接和通道的异常处理
- 资源清理
- 优雅关闭机制
如果需要处理更复杂的场景(如多个队列、消息持久化、死信队列等),可以在channel.queueDeclare
和channel.basicConsume
方法中添加相应参数。