当前位置: 首页 > news >正文

【RabbitMQ】实现RPC通信的完整指南

文章目录

    • RPC 通信
    • 创建相关队列
    • 客户端代码
      • 声明队列
      • 发送请求
      • 接收响应
      • 完整代码
    • 服务端代码
      • 设置同时只能获取一个消息
      • 接收消息
      • 完整代码
    • 运行程序
      • 启动客户端
      • 启动服务端

RPC 通信

RPC (Remote Procedure Call), 即远过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术

  • 类似 Http 远程调用

RabbitMQ 实现 RPC 通信的过程,大概是通过两个队列实现一个可回调的过程
image.png

  • 注意
    • 没有生产者和消费者,取而代之的是客户端和服务器
    • reply_to:回调队列的名称
    • correlation_id:不能重复,用来确保请求和响应是一对
  • 大概流程:
    1. 客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段制定了一个回调队列,服务端处理之后,会把响应结果发送到这个队列
    2. 服务端收到请求后,处理请求并发送响应到 replyTo 指定的回调队列
    3. 客户端再回调队列上等待响应消息,一旦收到响应,客户端会检查消息的 correlationID 属性,以确保它是所期望的响应
      • 等待响应消息,是通过一个阻塞队列来实现
      • 如果没有响应进来,就会一直阻塞。通过一个阻塞队列,来让其等待响应完成
      • 如果阻塞队列里面没有消息,就会一直等待,等到有消息为止

大致流程

  • 客户端:
    1. 发送请求(携带 replyToCorrelationID
    2. 接收响应(校验 correlationID
  • 服务端:
    1. 接收请求,进行响应
    2. 发送响应(按照客户端指定的 replyTo,设置 correlationID

创建相关队列

public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";  public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";  
  • 涉及到两个队列
    • 请求队列
    • 响应队列

客户端代码

客户端代码主要流程如下:

  1. 声明两个队列,包含回调队列 RPC_REQUEST_QUEUE,声明本次请求的唯一标志 correlationID
  2. RPC_REQUEST_QUEUEcorrelationID 配置到要发送的消息队列中
  3. 使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
  4. 使用阻塞队列有消息后,主线程被唤醒,打印返回内容

声明队列

channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);  
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);

发送请求

//发送请求(使用内置交换机)  
String msg = "hello rpc...";  
//设置请求的唯一标识  
String correlationID = UUID.randomUUID().toString();  
//设置请求的相关属性  
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()  .correlationId(correlationID)  .replyTo(Constants.RPC_RESPONSE_QUEUE)  .build();  
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

接收响应

使用阻塞队列,来存储回调结果

// 接收响应  
// 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制  
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);  
DefaultConsumer consumer = new DefaultConsumer(channel){  //逻辑是比对 correlationID 是否一致  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String respMsg = new String(body);  System.out.println("接收到回调信息:" + respMsg);  if (correlationID.equals(properties.getCorrelationId())) {  // 如果 correlationID 校验一致,说明就是我们要的响应  response.offer(respMsg);  }  }  
};  
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);  // 获取回调的结果
String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了  
System.out.println("[RPC Client 响应结果]: " + result);

完整代码

package rabbitmq.rpc;  import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.UUID;  
import java.util.concurrent.*;  /**  * RPC Client * 1. 发送请求  * 2. 接收响应  */  
public class RpcClient {  public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  //1. 建立连接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 开启信道  Channel channel = connection.createChannel();  //3. 声明队列  channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);  channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);  //4. 发送请求(使用内置交换机)  String msg = "hello rpc...";  //设置请求的唯一标识  String correlationID = UUID.randomUUID().toString();  //设置请求的相关属性  AMQP.BasicProperties props = new AMQP.BasicProperties().builder()  .correlationId(correlationID)  .replyTo(Constants.RPC_RESPONSE_QUEUE)  .build();  channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());  //5. 接收响应  // 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制  final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);  DefaultConsumer consumer = new DefaultConsumer(channel){  //逻辑是比对 correlationID 是否一致  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String respMsg = new String(body);  System.out.println("接收到回调信息:" + respMsg);  if (correlationID.equals(properties.getCorrelationId())) {  // 如果 correlationID 校验一致,说明就是我们要的响应  response.offer(respMsg);  }  }  };  channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);  String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了  System.out.println("[RPC Client 响应结果]: " + result);  }  
}

服务端代码

服务端代码主要流程如下:

  1. 接收消息
  2. 根据消息内容进行相应处理,把应答结果返回到回调队列中

设置同时只能获取一个消息

//设置一次只能接收一条消息  
channel.basicQos(1);

如果不设置 basicQosRabbitMQ 会使用默认的 Qos 设置,其 prefetchCount 默认值为 0

  • prefetchCount0 时,RabbitMQ 会根据内部实现和当前的网络状况等因素,可能会同时发送多条消息给消费者
  • 这意味着在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有所波动

RPC 模式下,同上期望的是一对一的消息处理,即一个请求对应一个相应。消费者在处理完一个消息并确认之后,才会接收到下一条消息

接收消息

接收消息,并做出相应处理

DefaultConsumer consumer = new DefaultConsumer(channel) {  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String request = new String(body, "UTF-8");  System.out.println("接收到请求:" + request);  String responses = "针对 request:" + request + ",响应成功";  AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()  .correlationId(properties.getCorrelationId())  .build();  channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());  channel.basicAck(envelope.getDeliveryTag(), false);  }  
};  
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);

RabbitMQ 消息确定机制

  • RabbitMQ 中,basicConsume 方法的 autoAck 参数用于指定消费者是否应该自动向消息对类确认消息
    • 自动确认(autoAck=true):消息对类在将消息发送给消费者之后,会立即从内存中删除该消息。这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
    • 手动确认(autoAck=false):消息队列在将消息发送给消费者之后,需要消费者显式地调用 basicAck 方法来确认消息。手动确认提供了更高的可靠性,确保消息不会意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景

完整代码

package rabbitmq.rpc;  import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  /**  * RPC server * 1. 接收请求  * 2. 发送响应  */  
public class RpcServer {  public static void main(String[] args) throws IOException, TimeoutException {  //1. 建立连接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 开启信道  Channel channel = connection.createChannel();  //3. 接收请求  //设置一次只能接收一条消息  channel.basicQos(1);  DefaultConsumer consumer = new DefaultConsumer(channel) {  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String request = new String(body, "UTF-8");  System.out.println("接收到请求:" + request);  String responses = "针对 request:" + request + ",响应成功";  AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()  .correlationId(properties.getCorrelationId())  .build();  channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());  channel.basicAck(envelope.getDeliveryTag(), false);  }  };  channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);  }  
}

运行程序

启动客户端

image.png

启动服务端

运行服务端

接收到请求:hello rpc...

相关文章:

  • MySQL——1、数据库基础
  • 25.5.15
  • homeassistant安装
  • 社区电商场景的 社群推广与维护系统化分析框架
  • MySQL如何查看某个表所占空间大小?(表空间大小查看方法)
  • 《教育退费那些事儿:从困境到破局》
  • 中国版 Cursor?腾讯推出 AI 编程助手 CodeBuddy,重新定义编程体验
  • 硅基计划2.0 学习总结 贰
  • 软件工程之软件产品的环境
  • 在文件检索方面doris和elasticsearch的区别
  • 最新版VSCode通过SSH远程连接Ubuntu 16.04等旧版Linux的方法
  • 中国近代史3
  • 阿里开源通义万相 Wan2.1-VACE,开启视频创作新时代
  • 【工具】metaTP:一种集成了自动化工作流程的元转录组数据分析工具包
  • git 本地提交后修改注释
  • YOLO11解决方案之距离计算探索
  • NVIDIA Omniverse 现已支持中文!
  • 1. 数字组合1
  • 实验6 电子邮件
  • nohup命令使用
  • 跨越三十年友情,61岁余隆和60岁齐默尔曼在上海再度合作
  • 银行积分大幅贬值遭质疑,涉及工行、中行、农行等
  • 国家统计局向多省份反馈统计督察意见
  • “大型翻车现场”科技满满,黄骅打造现代化港口和沿海新城典范
  • 中国证券业协会修订发布《证券纠纷调解规则》
  • 横跨万里穿越百年,《受到召唤·敦煌》中张艺兴一人分饰两角