当前位置: 首页 > news >正文

接收rabbitmq消息

以下是一个使用纯Java(非Spring Boot)接收RabbitMQ消息的完整实现,包含Maven依赖和持续监听消息的循环:

1. 首先添加Maven依赖 (pom.xml)

<dependencies><!-- RabbitMQ Java Client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><!-- 日志框架 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency>
</dependencies>

2. RabbitMQ消息接收器实现

import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitMQReceiver {private static final Logger logger = LoggerFactory.getLogger(RabbitMQReceiver.class);private final ConnectionFactory factory;private Connection connection;private Channel channel;private volatile boolean running = true;public RabbitMQReceiver(String host, int port, String username, String password) {factory = new ConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);}public void startListening(String queueName) {try {// 建立连接connection = factory.newConnection();channel = connection.createChannel();// 声明队列(如果不存在则创建)channel.queueDeclare(queueName, true, false, false, null);logger.info("连接到队列: {}", queueName);// 设置每次只接收一条消息(公平分发)channel.basicQos(1);// 创建消费者DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");logger.info("收到消息 [{}]: {}", delivery.getEnvelope().getDeliveryTag(), message);try {// 在这里处理你的业务逻辑processMessage(message);// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (Exception e) {logger.error("消息处理失败", e);// 处理失败时拒绝消息(不重新入队)channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);}};// 取消消费者回调CancelCallback cancelCallback = consumerTag -> {logger.warn("消费者被取消: {}", consumerTag);};// 开始消费消息channel.basicConsume(queueName, false, deliverCallback, cancelCallback);logger.info("开始监听消息... (按CTRL+C停止)");// 保持程序运行while (running) {Thread.sleep(1000); // 防止CPU空转}} catch (IOException | TimeoutException | InterruptedException e) {logger.error("RabbitMQ连接失败", e);} finally {closeResources();}}private void processMessage(String message) {// 这里是你的业务逻辑处理logger.info("处理消息: {}", message);// 示例:打印消息长度System.out.println("消息长度: " + message.length());}public void stop() {running = false;logger.info("停止监听...");}private void closeResources() {try {if (channel != null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}logger.info("RabbitMQ连接已关闭");} catch (IOException | TimeoutException e) {logger.error("关闭资源时出错", e);}}public static void main(String[] args) {// 配置RabbitMQ连接参数String host = "localhost";int port = 5672;String username = "guest";String password = "guest";String queueName = "my_queue";RabbitMQReceiver receiver = new RabbitMQReceiver(host, port, username, password);// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(() -> {receiver.stop();receiver.closeResources();}));// 开始监听receiver.startListening(queueName);}
}

关键功能说明:

  1. 持续监听机制

    while (running) {Thread.sleep(1000); // 防止CPU空转
    }
    

    使用running标志控制循环,优雅退出

  2. 消息处理流程

    • 声明队列确保存在
    • 设置QoS为1(公平分发)
    • 使用DeliverCallback处理消息
    • 手动消息确认(ACK/NACK)
    • 异常处理与错误恢复
  3. 资源管理

    • 使用finally块确保关闭连接
    • 添加Shutdown Hook处理程序终止
    • 线程安全的状态管理(volatile running
  4. 日志记录

    • 使用SLF4J进行日志记录
    • 关键操作都有日志输出

使用说明:

  1. 启动消费者

    mvn compile exec:java -Dexec.mainClass="RabbitMQReceiver"
    
  2. 发送测试消息(使用RabbitMQ管理界面或命令行工具):

    rabbitmqadmin publish exchange=amq.default routing_key=my_queue payload="hello world"
    
  3. 停止程序

    • CTRL+C优雅停止
    • 程序会自动关闭连接

自定义配置:

  1. 修改连接参数

    String host = "your.rabbitmq.host";
    int port = 5672;
    String username = "your_user";
    String password = "your_password";
    String queueName = "your_queue_name";
    
  2. 自定义消息处理
    修改processMessage方法实现你的业务逻辑:

    private void processMessage(String message) {// 示例:解析JSON消息// JSONObject json = new JSONObject(message);// System.out.println("收到订单: " + json.getString("orderId"));// 你的实际业务逻辑
    }
    
  3. 配置调整

    • 修改channel.basicQos()调整预取数量
    • 修改basicNackrequeue参数控制是否重新入队
    • 添加交换机绑定逻辑(如果需要)

这个实现遵循了RabbitMQ最佳实践,包括:

  • 手动消息确认
  • 公平分发(QoS设置)
  • 连接和通道的异常处理
  • 资源清理
  • 优雅关闭机制

如果需要处理更复杂的场景(如多个队列、消息持久化、死信队列等),可以在channel.queueDeclarechannel.basicConsume方法中添加相应参数。

相关文章:

  • 中心化交易所(CEX)架构:高并发撮合引擎与合规安全体系
  • [蓝桥杯 2024 国 Python B] 设计
  • TripGenie:畅游济南旅行规划助手:个人工作纪实(二十四)
  • Arduino入门教程:1、Arduino硬件介绍
  • LAN、WAN、WLAN、VLAN 、VPN对比
  • Java异步编程深度解析:从基础到复杂场景的难题拆解
  • 动态多目标进化算法:VARE(Vector Autoregressive Evolution)求解DF1-DF14,提供完整MATLAB代码
  • [服务器] Amazon Lightsail SSH连接黑屏的常见原因及解决方案
  • 曼昆《经济学原理》第九版 第十七章寡头垄断
  • 【leetcode】36. 有效的数独
  • 【Axure高保真原型】中继器表格更多操作
  • API:解锁数字化协作的钥匙及开放实现路径深度剖析
  • 产品升级 | 新一代高性能数据采集平台BRICK2 X11,助力ADAS与自动驾驶开发
  • 【AI】模型vs算法(以自动驾驶为例)
  • RPA与Agent技术如何结合,以实现跨系统、跨平台的工作流程自动化?
  • 本地docker部署的dify,不用git命令如何无损升级?
  • Redis分布式缓存(RDB、AOF、主从同步)
  • ArcGIS中坐标系一致但图层无法重叠问题解决
  • JavaWeb期末速成
  • 一套基于Apple watch电话手表包含150个覆盖商务、健康、爱好、定位、时钟、挂件的移动端UI界面的psd
  • 网站tdk优化/优化大师使用方法
  • 提供常州微信网站建设/网站推广该怎么做
  • 网站管理助手v3/网络运营怎么学
  • 门户网站建设制作/自己如何制作一个网站
  • 齐大胜请于果做网站是第几集/环球资源外贸平台免费
  • dede网站改成自适应/渠道营销推广方案