RabbitMQ入门:生产者和消费者示例
RabbitMQ是一个开源的消息代理软件,实现了高级消息队列协议(AMQP)。它允许应用程序通过消息队列进行异步通信,提高系统的解耦性和扩展性。本文将展示一个简单的RabbitMQ生产者和消费者实现。
核心组件
1. 生产者(Producer.java)
生产者负责创建消息并将其发送到RabbitMQ队列:
package com.qcby.rabbitmq.one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Produce {public static final String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1"); // RabbitMQ服务器IPfactory.setUsername("lql"); // 用户名factory.setPassword("liu20020624."); // 密码// 2. 建立连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3. 声明队列(如果不存在则创建)channel.queueDeclare(QUEUE_NAME, // 队列名称false, // 是否持久化false, // 是否独占false, // 是否自动删除null // 其他参数);// 4. 发送消息String message = "hello world";channel.basicPublish("", // 使用默认交换机QUEUE_NAME, // 路由键(队列名称)null, // 消息属性message.getBytes() // 消息体);System.out.println("发送消息完毕");}}
}
关键点说明:
使用
ConnectionFactory
配置RabbitMQ连接queueDeclare()
创建队列(幂等操作)basicPublish()
发送消息到默认交换机使用try-with-resources自动关闭连接
2. 消费者(Consumer.java)
消费者监听队列并处理接收到的消息:
package com.qcby.rabbitmq.one;import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static final String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂(同生产者)ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");//换成你自己的ip地址factory.setUsername("lql");factory.setPassword("liu20020624.");// 2. 建立连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3. 定义消息处理回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("收到消息: " + message);};CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};// 4. 开始消费消息channel.basicConsume(QUEUE_NAME, // 队列名称true, // 自动确认deliverCallback, // 消息处理回调cancelCallback // 取消回调);}
}
关键点说明:
DeliverCallback
处理接收到的消息CancelCallback
处理消费中断情况basicConsume()
启动消息监听消费者需要保持运行状态以持续接收消息
工作流程
生产者工作流:
消费者工作流:
运行说明
启动顺序:
先启动消费者(保持运行状态)
再启动生产者(发送消息)
预期输出:
生产者控制台:
发送消息完毕
消费者控制台:
收到消息: hello world
常见问题解决
连接失败:
检查RabbitMQ服务状态:
rabbitmqctl status
验证防火墙设置(开放5672端口)
确认用户名/密码权限
消息未接收:
确保消费者在生产者之前启动
检查队列名称是否一致
验证网络连通性:
telnet <IP> 5672
SLF4J警告:
在pom.xml中添加日志实现依赖:<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.36</version> </dependency>
这个示例展示了RabbitMQ最基本的消息传递模式。实际应用中,可以结合交换机、绑定键、不同消息确认模式等实现更复杂的消息路由和处理逻辑。
通过这个示例,您可以快速理解RabbitMQ的核心概念和工作原理。建议从简单队列开始,逐步探索更高级的功能如发布/订阅、路由、主题匹配等。