当前位置: 首页 > news >正文

RabbitMQ入门:生产者和消费者示例

RabbitMQ是一个开源的消息代理软件,实现了高级消息队列协议(AMQP)。它允许应用程序通过消息队列进行异步通信,提高系统的解耦性和扩展性。本文将展示一个简单的RabbitMQ生产者和消费者实现。

核心组件

1. 生产者(Producer.java)

生产者负责创建消息并将其发送到RabbitMQ队列:

package com.qcby.rabbitmq.one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Produce {public static final String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1"); // RabbitMQ服务器IPfactory.setUsername("lql");     // 用户名factory.setPassword("liu20020624."); // 密码// 2. 建立连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3. 声明队列(如果不存在则创建)channel.queueDeclare(QUEUE_NAME,   // 队列名称false,        // 是否持久化false,        // 是否独占false,        // 是否自动删除null          // 其他参数);// 4. 发送消息String message = "hello world";channel.basicPublish("",           // 使用默认交换机QUEUE_NAME,   // 路由键(队列名称)null,         // 消息属性message.getBytes() // 消息体);System.out.println("发送消息完毕");}}
}

关键点说明

  • 使用ConnectionFactory配置RabbitMQ连接

  • queueDeclare()创建队列(幂等操作)

  • basicPublish()发送消息到默认交换机

  • 使用try-with-resources自动关闭连接

2. 消费者(Consumer.java)

消费者监听队列并处理接收到的消息:

package com.qcby.rabbitmq.one;import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static final String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂(同生产者)ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");//换成你自己的ip地址factory.setUsername("lql");factory.setPassword("liu20020624.");// 2. 建立连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3. 定义消息处理回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("收到消息: " + message);};CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};// 4. 开始消费消息channel.basicConsume(QUEUE_NAME,   // 队列名称true,         // 自动确认deliverCallback, // 消息处理回调cancelCallback   // 取消回调);}
}

关键点说明

  • DeliverCallback处理接收到的消息

  • CancelCallback处理消费中断情况

  • basicConsume()启动消息监听

  • 消费者需要保持运行状态以持续接收消息

工作流程

  1. 生产者工作流

  2. 消费者工作流

运行说明

  1. 启动顺序

    • 先启动消费者(保持运行状态)

    • 再启动生产者(发送消息)

  2. 预期输出

    • 生产者控制台:发送消息完毕

    • 消费者控制台:收到消息: hello world

常见问题解决

  1. 连接失败

    • 检查RabbitMQ服务状态:rabbitmqctl status

    • 验证防火墙设置(开放5672端口)

    • 确认用户名/密码权限

  2. 消息未接收

    • 确保消费者在生产者之前启动

    • 检查队列名称是否一致

    • 验证网络连通性:telnet <IP> 5672

  3. SLF4J警告
    在pom.xml中添加日志实现依赖:

    <dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.36</version>
    </dependency>

这个示例展示了RabbitMQ最基本的消息传递模式。实际应用中,可以结合交换机、绑定键、不同消息确认模式等实现更复杂的消息路由和处理逻辑。


        通过这个示例,您可以快速理解RabbitMQ的核心概念和工作原理。建议从简单队列开始,逐步探索更高级的功能如发布/订阅、路由、主题匹配等。

http://www.dtcms.com/a/335675.html

相关文章:

  • Java注解学习记录
  • 什么是EDA(Exploratory Data Analysis,探索性数据分析)
  • AI出题人给出的Java后端面经(十七)(日更)
  • 第 463 场周赛(GPT-3,Me-1)
  • Foreign-Memory Access API外部内存API
  • 混沌工程(Chaos engineering):系统韧性保障之道
  • 计算机网络 HTTPS 全流程
  • p5.js 3D 形状 “预制工厂“——buildGeometry ()
  • 【位运算】查询子数组最大异或值|2693
  • 图灵完备(Turing Complete)免安装中文版
  • 关于pygsp引发的一系列问题和实例小demo
  • ​​Vue 3 开发速成手册
  • 裸机框架:按键模组
  • macos 安装nodepad++ (教程+安装包+报错后的解决方法)
  • AI证书怎么选
  • 交叉编译 手动安装 SQLite 库 移植ARM
  • 基于Vue + Node能源采购系统的设计与实现/基于express的能源管理系统#node.js
  • JavaScript 性能优化实战大纲
  • 记SpringBoot3.x + Thymeleaf 项目实现(MVC架构模式)
  • .NET 中的延迟初始化:Lazy<T> 与LazyInitializer
  • 【Java后端】MyBatis-Plus 原理解析
  • Unity进阶--C#补充知识点--【Unity跨平台的原理】了解.Net
  • Linux | i.MX6ULL网络通信-套字节 UDP(第十八章)
  • 【牛客刷题】后缀子串字母统计:O(n)高效算法详解
  • python实现梅尔频率倒谱系数(MFCC) 除了傅里叶变换和离散余弦变换
  • 数学建模 15 逻辑回归与随机森林
  • 大上墨水屏显示器Paperlike253 Mac 特别版 使用体会
  • MySQL数据库初识
  • 黑马java八股文全集
  • AUTOSAR ARXML介绍