当前位置: 首页 > wzjs >正文

做的比较好的购物网站关键词歌词打印

做的比较好的购物网站,关键词歌词打印,深圳seo优化外包公司,给宝宝做衣服网站好文章目录 RPC 通信创建相关队列客户端代码声明队列发送请求接收响应完整代码 服务端代码设置同时只能获取一个消息接收消息完整代码 运行程序启动客户端启动服务端 RPC 通信 RPC (Remote Procedure Call), 即远过程调用。它是一种通过网络从远程计算机上请求服务,而…

文章目录

    • RPC 通信
    • 创建相关队列
    • 客户端代码
      • 声明队列
      • 发送请求
      • 接收响应
      • 完整代码
    • 服务端代码
      • 设置同时只能获取一个消息
      • 接收消息
      • 完整代码
    • 运行程序
      • 启动客户端
      • 启动服务端

RPC 通信

RPC (Remote Procedure Call), 即远过程调用。它是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术

  • 类似 Http 远程调用

RabbitMQ 实现 RPC 通信的过程,大概是通过两个队列实现一个可回调的过程
image.png

  • 注意
    • 没有生产者和消费者,取而代之的是客户端和服务器
    • reply_to:回调队列的名称
    • correlation_id:不能重复,用来确保请求和响应是一对
  • 大概流程:
    1. 客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段制定了一个回调队列,服务端处理之后,会把响应结果发送到这个队列
    2. 服务端收到请求后,处理请求并发送响应到 replyTo 指定的回调队列
    3. 客户端再回调队列上等待响应消息,一旦收到响应,客户端会检查消息的 correlationID 属性,以确保它是所期望的响应
      • 等待响应消息,是通过一个阻塞队列来实现
      • 如果没有响应进来,就会一直阻塞。通过一个阻塞队列,来让其等待响应完成
      • 如果阻塞队列里面没有消息,就会一直等待,等到有消息为止

大致流程

  • 客户端:
    1. 发送请求(携带 replyToCorrelationID
    2. 接收响应(校验 correlationID
  • 服务端:
    1. 接收请求,进行响应
    2. 发送响应(按照客户端指定的 replyTo,设置 correlationID

创建相关队列

public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";  public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";  
  • 涉及到两个队列
    • 请求队列
    • 响应队列

客户端代码

客户端代码主要流程如下:

  1. 声明两个队列,包含回调队列 RPC_REQUEST_QUEUE,声明本次请求的唯一标志 correlationID
  2. RPC_REQUEST_QUEUEcorrelationID 配置到要发送的消息队列中
  3. 使用阻塞队列来阻塞当前进程,监听回调队列中的消息,把请求放到阻塞队列中
  4. 使用阻塞队列有消息后,主线程被唤醒,打印返回内容

声明队列

channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);  
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);

发送请求

//发送请求(使用内置交换机)  
String msg = "hello rpc...";  
//设置请求的唯一标识  
String correlationID = UUID.randomUUID().toString();  
//设置请求的相关属性  
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()  .correlationId(correlationID)  .replyTo(Constants.RPC_RESPONSE_QUEUE)  .build();  
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

接收响应

使用阻塞队列,来存储回调结果

// 接收响应  
// 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制  
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);  
DefaultConsumer consumer = new DefaultConsumer(channel){  //逻辑是比对 correlationID 是否一致  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String respMsg = new String(body);  System.out.println("接收到回调信息:" + respMsg);  if (correlationID.equals(properties.getCorrelationId())) {  // 如果 correlationID 校验一致,说明就是我们要的响应  response.offer(respMsg);  }  }  
};  
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);  // 获取回调的结果
String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了  
System.out.println("[RPC Client 响应结果]: " + result);

完整代码

package rabbitmq.rpc;  import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.UUID;  
import java.util.concurrent.*;  /**  * RPC Client * 1. 发送请求  * 2. 接收响应  */  
public class RpcClient {  public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  //1. 建立连接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 开启信道  Channel channel = connection.createChannel();  //3. 声明队列  channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);  channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);  //4. 发送请求(使用内置交换机)  String msg = "hello rpc...";  //设置请求的唯一标识  String correlationID = UUID.randomUUID().toString();  //设置请求的相关属性  AMQP.BasicProperties props = new AMQP.BasicProperties().builder()  .correlationId(correlationID)  .replyTo(Constants.RPC_RESPONSE_QUEUE)  .build();  channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());  //5. 接收响应  // 如果不阻塞,就会从上到下执行完了。所以要使用一个阻塞队列,完成同步机制  final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);  DefaultConsumer consumer = new DefaultConsumer(channel){  //逻辑是比对 correlationID 是否一致  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String respMsg = new String(body);  System.out.println("接收到回调信息:" + respMsg);  if (correlationID.equals(properties.getCorrelationId())) {  // 如果 correlationID 校验一致,说明就是我们要的响应  response.offer(respMsg);  }  }  };  channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);  String result = response.take(); //阻塞队列一直阻塞到这里,直到 response 有值了  System.out.println("[RPC Client 响应结果]: " + result);  }  
}

服务端代码

服务端代码主要流程如下:

  1. 接收消息
  2. 根据消息内容进行相应处理,把应答结果返回到回调队列中

设置同时只能获取一个消息

//设置一次只能接收一条消息  
channel.basicQos(1);

如果不设置 basicQosRabbitMQ 会使用默认的 Qos 设置,其 prefetchCount 默认值为 0

  • prefetchCount0 时,RabbitMQ 会根据内部实现和当前的网络状况等因素,可能会同时发送多条消息给消费者
  • 这意味着在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有所波动

RPC 模式下,同上期望的是一对一的消息处理,即一个请求对应一个相应。消费者在处理完一个消息并确认之后,才会接收到下一条消息

接收消息

接收消息,并做出相应处理

DefaultConsumer consumer = new DefaultConsumer(channel) {  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String request = new String(body, "UTF-8");  System.out.println("接收到请求:" + request);  String responses = "针对 request:" + request + ",响应成功";  AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()  .correlationId(properties.getCorrelationId())  .build();  channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());  channel.basicAck(envelope.getDeliveryTag(), false);  }  
};  
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);

RabbitMQ 消息确定机制

  • RabbitMQ 中,basicConsume 方法的 autoAck 参数用于指定消费者是否应该自动向消息对类确认消息
    • 自动确认(autoAck=true):消息对类在将消息发送给消费者之后,会立即从内存中删除该消息。这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
    • 手动确认(autoAck=false):消息队列在将消息发送给消费者之后,需要消费者显式地调用 basicAck 方法来确认消息。手动确认提供了更高的可靠性,确保消息不会意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景

完整代码

package rabbitmq.rpc;  import com.rabbitmq.client.*;  
import rabbitmq.constant.Constants;  import java.io.IOException;  
import java.util.concurrent.TimeoutException;  /**  * RPC server * 1. 接收请求  * 2. 发送响应  */  
public class RpcServer {  public static void main(String[] args) throws IOException, TimeoutException {  //1. 建立连接  ConnectionFactory connectionFactory = new ConnectionFactory();  connectionFactory.setHost(Constants.HOST);  connectionFactory.setPort(Constants.PORT);  connectionFactory.setUsername(Constants.USER_NAME);  connectionFactory.setPassword(Constants.PASSWORD);  connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);  Connection connection = connectionFactory.newConnection();  //2. 开启信道  Channel channel = connection.createChannel();  //3. 接收请求  //设置一次只能接收一条消息  channel.basicQos(1);  DefaultConsumer consumer = new DefaultConsumer(channel) {  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  String request = new String(body, "UTF-8");  System.out.println("接收到请求:" + request);  String responses = "针对 request:" + request + ",响应成功";  AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()  .correlationId(properties.getCorrelationId())  .build();  channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, responses.getBytes());  channel.basicAck(envelope.getDeliveryTag(), false);  }  };  channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);  }  
}

运行程序

启动客户端

image.png

启动服务端

运行服务端

接收到请求:hello rpc...
http://www.dtcms.com/wzjs/248461.html

相关文章:

  • 网站建设网上商城优化网站的方法
  • 多少企业需要网站建设seo关键词优化软件官网
  • 网站推广营销怎么做seo优化关键词排名
  • 南昌市房产网哈尔滨关键词优化方式
  • 商丘市做1企业网站的公司热搜词排行榜关键词
  • 想调用等三方网站数据该怎么做如何点击优化神马关键词排名
  • 全国大型免费网站建设天津百度优化
  • 深圳商业网站建设哪家好网络新闻发布平台发稿
  • 婚纱摄影网站设计毕业论文网络推广员的工作内容和步骤
  • 沧州网站建设的集成商全国广告投放平台
  • dw做网站的流程百度极速版推广员怎么申请
  • 手机在线做ppt的网站seo排名点击首页
  • 织梦网站打开速度慢人工智能培训心得体会
  • 怎么用vs2010做网站seo最强
  • 扬州做企业网站哪家公司好成人职业技能培训有哪些项目
  • 做平面设计的网站新手seo入门教程
  • 台州自助建站系统免费行情网站的推荐理由
  • 韩国优秀网站培训体系
  • 我做网站编辑写文章很慢怎么办全网营销推广靠谱吗
  • 百度搜索不到任何网站电子商务网站建设与管理
  • 网站建设公众号小程序推广开发口碑营销的名词解释
  • 做php网站教程视频搜索引擎关键词优化
  • 中文wordpress主题苏州seo关键词排名
  • 合肥建设企业网站免费做网站
  • 网站收录怎么设置百度网盘下载速度慢破解方法
  • 网站开发实战课程新闻稿撰写
  • 自己做优惠劵网站赚钱吗临沂做网站推广的公司
  • 众筹网站怎么做推广站长工具seo综合查询可以访问
  • 安徽奶茶加盟网站建设成都百度搜索排名优化
  • 石家庄做网站建设的公司哪家好上海百度推广电话客服