当前位置: 首页 > news >正文

RabbitMQ 核心概念与消息模型深度解析(二)

四、代码实战

了解了 RabbitMQ 的核心概念和消息模型后,接下来我们通过代码实战来进一步加深对它们的理解和掌握。下面将以 Java 和 Spring AMQP 为例,展示如何使用 RabbitMQ 进行消息的发送和接收。

4.1 环境准备

在开始编写代码之前,需要确保已经安装和配置好了 RabbitMQ 服务器 ,并且在项目中引入了相关的依赖。如果使用 Maven 项目,可以在pom.xml文件中添加以下依赖:

 

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

4.2 简单队列模型代码示例

4.2.1 生产者代码
 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class Producer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String queueName, String message) {

rabbitTemplate.convertAndSend(queueName, message);

System.out.println("发送消息到队列:" + queueName + ",消息内容:" + message);

}

}

4.2.2 消费者代码
 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class Consumer {

@RabbitListener(queues = "simple.queue")

public void receive(String message) {

System.out.println("接收到队列 simple.queue 的消息:" + message);

}

}

4.3 工作队列模型代码示例

4.3.1 生产者代码
 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class WorkProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String queueName, String message) {

for (int i = 0; i < 10; i++) {

String msg = message + " " + i;

rabbitTemplate.convertAndSend(queueName, msg);

System.out.println("发送消息到队列:" + queueName + ",消息内容:" + msg);

}

}

}

4.3.2 消费者代码
 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class WorkConsumer {

@RabbitListener(queues = "work.queue")

public void receive1(String message) {

System.out.println("消费者1接收到队列 work.queue 的消息:" + message);

}

@RabbitListener(queues = "work.queue")

public void receive2(String message) {

System.out.println("消费者2接收到队列 work.queue 的消息:" + message);

}

}

4.4 发布 / 订阅模型代码示例

4.4.1 配置交换机和队列
 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class FanoutConfig {

@Bean

public Queue fanoutQueue1() {

return new Queue("fanout.queue1");

}

@Bean

public Queue fanoutQueue2() {

return new Queue("fanout.queue2");

}

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange("fanout.exchange");

}

@Bean

public Binding binding1() {

return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());

}

@Bean

public Binding binding2() {

return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());

}

}

4.4.2 生产者代码
 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class FanoutProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String exchangeName, String message) {

rabbitTemplate.convertAndSend(exchangeName, "", message);

System.out.println("发送消息到交换机:" + exchangeName + ",消息内容:" + message);

}

}

4.4.3 消费者代码
 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class FanoutConsumer {

@RabbitListener(queues = "fanout.queue1")

public void receive1(String message) {

System.out.println("消费者1接收到队列 fanout.queue1 的消息:" + message);

}

@RabbitListener(queues = "fanout.queue2")

public void receive2(String message) {

System.out.println("消费者2接收到队列 fanout.queue2 的消息:" + message);

}

}

4.5 路由模型代码示例

4.5.1 配置交换机和队列
 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class DirectConfig {

@Bean

public Queue directQueue1() {

return new Queue("direct.queue1");

}

@Bean

public Queue directQueue2() {

return new Queue("direct.queue2");

}

@Bean

public DirectExchange directExchange() {

return new DirectExchange("direct.exchange");

}

@Bean

public Binding binding1() {

return BindingBuilder.bind(directQueue1()).to(directExchange()).with("routing.key1");

}

@Bean

public Binding binding2() {

return BindingBuilder.bind(directQueue2()).to(directExchange()).with("routing.key2");

}

}

4.5.2 生产者代码
 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class DirectProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String exchangeName, String routingKey, String message) {

rabbitTemplate.convertAndSend(exchangeName, routingKey, message);

System.out.println("发送消息到交换机:" + exchangeName + ",路由键:" + routingKey + ",消息内容:" + message);

}

}

4.5.3 消费者代码
 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class DirectConsumer {

@RabbitListener(queues = "direct.queue1")

public void receive1(String message) {

System.out.println("消费者1接收到队列 direct.queue1 的消息:" + message);

}

@RabbitListener(queues = "direct.queue2")

public void receive2(String message) {

System.out.println("消费者2接收到队列 direct.queue2 的消息:" + message);

}

}

4.6 主题模型代码示例

4.6.1 配置交换机和队列
 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class TopicConfig {

@Bean

public Queue topicQueue1() {

return new Queue("topic.queue1");

}

@Bean

public Queue topicQueue2() {

return new Queue("topic.queue2");

}

@Bean

public TopicExchange topicExchange() {

return new TopicExchange("topic.exchange");

}

@Bean

public Binding binding1() {

return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#");

}

@Bean

public Binding binding2() {

return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.*.test");

}

}

4.6.2 生产者代码
 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class TopicProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String exchangeName, String routingKey, String message) {

rabbitTemplate.convertAndSend(exchangeName, routingKey, message);

System.out.println("发送消息到交换机:" + exchangeName + ",路由键:" + routingKey + ",消息内容:" + message);

}

}

4.6.3 消费者代码
 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class TopicConsumer {

@RabbitListener(queues = "topic.queue1")

public void receive1(String message) {

System.out.println("消费者1接收到队列 topic.queue1 的消息:" + message);

}

@RabbitListener(queues = "topic.queue2")

public void receive2(String message) {

System.out.println("消费者2接收到队列 topic.queue2 的消息:" + message);

}

}

4.7 RPC 模型代码示例

4.7.1 生产者代码
 

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

import java.io.IOException;

import java.util.UUID;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

public class RPCClient {

private static final String RPC_QUEUE_NAME = "rpc_queue";

private Connection connection;

private Channel channel;

private String replyQueueName;

private BlockingQueue<String> responseQueue = new LinkedBlockingQueue<>();

private String corrId;

public RPCClient() throws IOException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

connection = factory.newConnection();

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

if (properties.getCorrelationId().equals(corrId)) {

responseQueue.offer(new String(body, "UTF-8"));

}

}

});

}

public String call(String message) throws IOException, InterruptedException {

corrId = UUID.randomUUID().toString();

AMQP.BasicProperties props = new AMQP.BasicProperties

.Builder()

.correlationId(corrId)

.replyTo(replyQueueName)

.build();

channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));

return responseQueue.take();

}

public void close() throws IOException {

connection.close();

}

}

4.7.2 消费者代码
 

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

System.out.println(" [x] Awaiting RPC requests");

DefaultConsumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body) throws IOException {

AMQP.BasicProperties replyProps = new AMQP.BasicProperties

.Builder()

.correlationId(properties.getCorrelationId())

.build();

String message = new String(body, "UTF-8");

int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");

String response = "" + fib(n);

channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

channel.basicAck(envelope.getDeliveryTag(), false);

}

};

channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

}

private static int fib(int n) {

if (n == 0) return 0;

if (n == 1) return 1;

return fib(n - 1) + fib(n - 2);

}

}

通过以上代码示例,我们展示了如何使用 Java 和 Spring AMQP 实现 RabbitMQ 的各种消息模型 。在实际应用中,可以根据具体的业务需求选择合适的消息模型,并对代码进行相应的优化和扩展。

五、总结

RabbitMQ 作为一款强大的消息队列中间件,其核心概念和消息模型是理解和使用它的关键 。通过深入了解生产者、消费者、交换机、队列、绑定、路由键、连接、信道以及虚拟主机等核心概念,我们明白了 RabbitMQ 是如何在分布式系统中实现高效、可靠的消息传递的 。不同类型的交换机和绑定规则,使得消息的路由更加灵活多样,能够满足各种复杂的业务需求 。

而 RabbitMQ 提供的多种消息模型,如简单队列模型、工作队列模型、发布 / 订阅模型、路由模型、主题模型和 RPC 模型,为我们解决不同场景下的消息通信问题提供了丰富的选择 。每种模型都有其独特的特点和适用场景,在实际应用中,我们需要根据具体的业务需求来选择合适的消息模型,以实现系统的最优性能和扩展性 。

通过本文的介绍和代码实战,希望大家对 RabbitMQ 的核心概念和消息模型有了更深入的理解和掌握 。在实际项目中,不断地实践和探索,充分发挥 RabbitMQ 的优势,为分布式系统的开发和优化提供有力的支持 。

相关文章:

  • 关于Go语言的开发环境的搭建
  • 时间序列基础【学习记录】
  • ridecore流水线解读
  • 【人工智能】自然语言编程革命:腾讯云CodeBuddy实战5步搭建客户管理系统,效率飙升90%
  • 【Web应用】Vue 项目前端项目文件夹和文件介绍
  • 深入理解 JVM:StackOverFlow、OOM 与 GC overhead limit exceeded 的本质剖析及 Stack 与 Heap 的差异
  • 我的MCP相关配置记录
  • 自学新标日初级上二十一课
  • 高效跨平台文件传输与管理的工具
  • 推荐算法工程化:ZKmall模板商城的B2C 商城的用户分层推荐策略
  • 思维链实现 方式解析
  • [免费]微信小程序医院预约挂号管理系统(uni-app+SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】
  • 2025年01月10日浙江鑫越系统科技前端面试
  • 微信小程序学习之底部导航栏
  • web 自动化之 KDT 关键字驱动详解
  • udp多点通信和心跳包
  • 格雷希尔G10和G15系列自动化快速密封连接器,适用于哪些管件的密封,以及它们相关的特性有哪些?
  • 72.编辑距离
  • langchain学习
  • 我们来学nacos -- 集群nacos2.5.1mysql8.4
  • 中东睿评|特朗普中东三国行:喧嚣的形式与空洞的实质
  • 夜读|尊重生命的棱角
  • 在笔墨金石间,看胡问遂与梅舒适的艺术对话
  • 网信部门曝光网络谣言典型案例,“AI预测彩票号码百分百中奖”等在列
  • 第12届警博会在即:一批便民利企装备亮相,规模创历史新高
  • “不为一时一事所惑,不为风高浪急所扰”——习近平主席对俄罗斯进行国事访问并出席纪念苏联伟大卫国战争胜利80周年庆典纪实