【RabbitMQ】实现RPC通信的完整指南
文章目录
- RPC 通信
- 创建相关队列
- 客户端代码
- 声明队列
- 发送请求
- 接收响应
- 完整代码
- 服务端代码
- 设置同时只能获取一个消息
- 接收消息
- 完整代码
- 运行程序
- 启动客户端
- 启动服务端
RPC 通信
RPC
(Remote Procedure Call
), 即远过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术
- 类似
Http
远程调用
RabbitMQ
实现 RPC
通信的过程,大概是通过两个队列实现一个可回调的过程
- 注意
- 没有生产者和消费者,取而代之的是客户端和服务器
reply_to
:回调队列的名称correlation_id
:不能重复,用来确保请求和响应是一对
- 大概流程:
- 客户端发送消息到一个指定的队列,并在消息属性中设置
replyTo
字段,这个字段制定了一个回调队列,服务端处理之后,会把响应结果发送到这个队列 - 服务端收到请求后,处理请求并发送响应到
replyTo
指定的回调队列 - 客户端再回调队列上等待响应消息,一旦收到响应,客户端会检查消息的
correlationID
属性,以确保它是所期望的响应- 等待响应消息,是通过一个阻塞队列来实现
- 如果没有响应进来,就会一直阻塞。通过一个阻塞队列,来让其等待响应完成
- 如果阻塞队列里面没有消息,就会一直等待,等到有消息为止
- 客户端发送消息到一个指定的队列,并在消息属性中设置
大致流程:
- 客户端:
- 发送请求(携带
replyTo
、CorrelationID
) - 接收响应(校验
correlationID
)
- 发送请求(携带
- 服务端:
- 接收请求,进行响应
- 发送响应(按照客户端指定的
replyTo
,设置correlationID
)
创建相关队列
public static final String RPC_REQUEST_QUEUE = "rpc.request.queue"; public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
- 涉及到两个队列
- 请求队列
- 响应队列
客户端代码
客户端代码主要流程如下:
- 声明两个队列,包含回调队列
RPC_REQUEST_QUEUE
,声明本次请求的唯一标志correlationID
- 将
RPC_REQUEST_QUEUE
和correlationID
配置到要发送的消息队列中 - 使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
- 使用阻塞队列有消息后,主线程被唤醒,打印返回内容
声明队列
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
发送请求
//发送请求(使用内置交换机)
String msg = "hello rpc...";
//设置请求的唯一标识
String correlationID = UUID.randomUUID().toString();
//设置请求的相关属性
AMQP.BasicProperties props = new AMQP.BasicProperties().builder() .correlationId(correlationID) .replyTo(Constants.RPC_RESPONSE_QUEUE) .build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
接收响应
使用阻塞队列,来存储回调结果
// 接收响应
// 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel){ //逻辑是比对 correlationID 是否一致 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String respMsg = new String(body); System.out.println("接收到回调信息:" + respMsg); if (correlationID.equals(properties.getCorrelationId())) { // 如果 correlationID 校验一致,说明就是我们要的响应 response.offer(respMsg); } }
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer); // 获取回调的结果
String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了
System.out.println("[RPC Client 响应结果]: " + result);
完整代码
package rabbitmq.rpc; import com.rabbitmq.client.*;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.*; /** * RPC Client * 1. 发送请求 * 2. 接收响应 */
public class RpcClient { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null); channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null); //4. 发送请求(使用内置交换机) String msg = "hello rpc..."; //设置请求的唯一标识 String correlationID = UUID.randomUUID().toString(); //设置请求的相关属性 AMQP.BasicProperties props = new AMQP.BasicProperties().builder() .correlationId(correlationID) .replyTo(Constants.RPC_RESPONSE_QUEUE) .build(); channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes()); //5. 接收响应 // 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制 final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); DefaultConsumer consumer = new DefaultConsumer(channel){ //逻辑是比对 correlationID 是否一致 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String respMsg = new String(body); System.out.println("接收到回调信息:" + respMsg); if (correlationID.equals(properties.getCorrelationId())) { // 如果 correlationID 校验一致,说明就是我们要的响应 response.offer(respMsg); } } }; channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer); String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了 System.out.println("[RPC Client 响应结果]: " + result); }
}
服务端代码
服务端代码主要流程如下:
- 接收消息
- 根据消息内容进行相应处理,把应答结果返回到回调队列中
设置同时只能获取一个消息
//设置一次只能接收一条消息
channel.basicQos(1);
如果不设置 basicQos
,RabbitMQ
会使用默认的 Qos
设置,其 prefetchCount
默认值为 0
- 当
prefetchCount
为0
时,RabbitMQ
会根据内部实现和当前的网络状况等因素,可能会同时发送多条消息给消费者 - 这意味着在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有所波动
在 RPC
模式下,同上期望的是一对一的消息处理,即一个请求对应一个相应。消费者在处理完一个消息并确认之后,才会接收到下一条消息
接收消息
接收消息,并做出相应处理
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String request = new String(body, "UTF-8"); System.out.println("接收到请求:" + request); String responses = "针对 request:" + request + ",响应成功"; AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .correlationId(properties.getCorrelationId()) .build(); channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); }
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
RabbitMQ
消息确定机制
- 在
RabbitMQ
中,basicConsume
方法的autoAck
参数用于指定消费者是否应该自动向消息对类确认消息- 自动确认(
autoAck=true
):消息对类在将消息发送给消费者之后,会立即从内存中删除该消息。这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费 - 手动确认(
autoAck=false
):消息队列在将消息发送给消费者之后,需要消费者显式地调用basicAck
方法来确认消息。手动确认提供了更高的可靠性,确保消息不会意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景
- 自动确认(
完整代码
package rabbitmq.rpc; import com.rabbitmq.client.*;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; /** * RPC server * 1. 接收请求 * 2. 发送响应 */
public class RpcServer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 接收请求 //设置一次只能接收一条消息 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String request = new String(body, "UTF-8"); System.out.println("接收到请求:" + request); String responses = "针对 request:" + request + ",响应成功"; AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder() .correlationId(properties.getCorrelationId()) .build(); channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer); }
}
运行程序
启动客户端
启动服务端
运行服务端
接收到请求:hello rpc...