当前位置: 首页 > news >正文

Dubbo 3.x源码(33)—Dubbo Consumer接收服务调用响应

基于Dubbo 3.1,详细介绍了Dubbo Consumer接收服务调用响应

此前我们学习了Dubbo Provider处理服务调用请求的流程,现在我们来学习Dubbo Consumer接收服务调用响应流程。

实际上接收请求和接收响应同属于接收消息,它们的流程的很多步骤是一样的。下面我们仅分析关键步骤。

Dubbo 3.x服务调用源码:

  1. Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
  2. Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
  3. Dubbo 3.x源码(31)—Dubbo消息的编码解码
  4. Dubbo 3.x源码(32)—Dubbo Provider处理服务调用请求源码
  5. Dubbo 3.x源码(33)—Dubbo Consumer接收服务调用响应

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
  10. Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
  8. Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布

文章目录

  • AllChannelHandler#received分发任务
    • getPreferredExecutorService获取首选线程池
  • ThreadlessExecutor#execute执行
  • HeaderExchangeHandler#received处理消息
  • HeaderExchangeHandler#handleResponse处理响应
  • DefaultFuture#received处理响应
  • DefaultFuture#doReceived处理响应

AllChannelHandler#received分发任务

将当前消息包装为一个ChannelEventRunnable分发给对应的线程池执行,这里的线程池就是dubbo业务线程池,到此IO线程的任务结束。

这种方式实现了线程资源的隔离,释放了IO线程,可以快速处理更多的IO操作,提升了系统吞吐量。

/*** AllChannelHandler的方法* <p>* 处理普通rpc请求请求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//获取对应的线程池,可能是ThreadlessExecutorExecutorService executor = getPreferredExecutorService(message);try {//创建一个线程任务ChannelEventRunnable,通过线程池执行//这里的handler是DecodeHandlerexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}

getPreferredExecutorService获取首选线程池

该方法获取处理业务的首选线程池,目前,这种方法主要是为了方便消费者端的线程模型而定制的。这是Dubo2.7.5之后的线程模型的优化:

  1. 对于响应消息,那么从DefaultFuture的静态字段缓存映射FUTURES中获取请求id对应的DefaultFuture。请求的id就是对应的响应的id。
    1. 然后获取执DefaultFuture的执行器,对于默认的同步请求,那自然是ThreadlessExecutor,这里面阻塞着发起同步调用的线程,将回调直接委派给发起调用的线程。对于异步请求,则获取异步请求的线程池。
    2. 因此后续的处理包括请求体的解码都是由发起调用的线程来执行,这样减轻了业务线程池的压力。后面我们将消费者接受响应的时候,会讲解对应的源码。
  2. 对于请求消息,则使用共享executor执行后续逻辑。对于共享线程池,默认为FixedThreadPool,固定200线程,阻塞队列长度为0,拒绝策略为打印异常日志并且抛出异常。

对于同步阻塞请求的响应,这是默认的请求方式,将会获取ThreadlessExecutor执行器,对于异步请求的响应,将会获取一个多线程的线程池。

/*** WrappedChannelHandler的方法* <p>* 目前,这种方法主要是为了方便消费者端的线程模型而定制的。* 1. 使用ThreadlessExecutor,又名,将回调直接委派给发起调用的线程。* 2. 使用共享executor执行回调。** @param msg 消息* @return 执行器*/
public ExecutorService getPreferredExecutorService(Object msg) {//如果是响应消息if (msg instanceof Response) {Response response = (Response) msg;//从DefaultFuture的静态字段缓存映射FUTURES中获取请求id对应的DefaultFutureDefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());// a typical scenario is the response returned after timeout, the timeout response may have completed the future//一个典型的场景是响应超时后返回,超时后的响应可能已经完成if (responseFuture == null) {//获取当前服务器或客户端的共享执行器return getSharedExecutorService();} else {//获取执future的执行器,对于默认的同步请i去,那自然是ThreadlessExecutor,这里面阻塞着发起同步调用的线程ExecutorService executor = responseFuture.getExecutor();if (executor == null || executor.isShutdown()) {//获取当前服务器或客户端的共享执行器executor = getSharedExecutorService();}return executor;}} else {//获取当前服务器或客户端的共享执行器return getSharedExecutorService();}
}/*** get the shared executor for current Server or Client** @return*/
public ExecutorService getSharedExecutorService() {// Application may be destroyed before channel disconnected, avoid create new application model// see https://github.com/apache/dubbo/issues/9127//在断开通道之前,应用程序可能被销毁,避免创建新的应用程序模型if (url.getApplicationModel() == null || url.getApplicationModel().isDestroyed()) {return GlobalResourcesRepository.getGlobalExecutorService();}// note: url.getOrDefaultApplicationModel() may create new application modelApplicationModel applicationModel = url.getOrDefaultApplicationModel();ExecutorRepository executorRepository =applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();//从执行器仓库中根据url获取对应的执行器,默认INTERNAL_SERVICE_EXECUTOR对应的执行器ExecutorService executor = executorRepository.getExecutor(url);if (executor == null) {executor = executorRepository.createExecutorIfAbsent(url);}return executor;
}

ThreadlessExecutor#execute执行

还记得我们此前学习Dubbo Consumer发起请求的流程吗?对于同步请求,在发送请求之后,在AbstractInvoker#waitForResultIfSync方法中将会执行异步转同步等待,具体的等待方法就是ThreadlessExecutor#waitAndDrain方法。

waitAndDrain方法中,请求调用线程将会执行queue.take()方法尝试获取一个任务,如果没有任务,那么当前线程等待直到任务队列中有一个任务,那么获取并执行。实际上,这里等待的任务就是请求对应的响应结果。

ThreadlessExecutor#execute方法在响应返回之后会执行。首先将ChannelEventRunnable包装为RunnableWrapper,对于run方法添加了try-catch,不会抛出异常仅仅打印日志。然后判断如果同步调用线程没有处于等待状态,那么当前线程直接执行该线程任务。

否则将当前任务加入到阻塞队列,如果同步调用线程还在因为调用waitAndDrain方而处于等待状态,那么将会因为队列中添加了元素而被唤醒,进而执行该线程任务。

/*** 如果调用线程仍在等待回调任务,则将该任务添加到阻塞队列中以等待调度。否则,直接提交到共享回调执行器。** @param runnable 可执行的任务ChannelEventRunnable*/
@Override
public void execute(Runnable runnable) {//包装RunnableWrapper,对于run方法添加了try-catch,不会抛出异常仅仅打印日志runnable = new RunnableWrapper(runnable);synchronized (lock) {//如果同步调用线程没有处于等待状态if (!isWaiting()) {//那么当前线程直接执行该线程任务runnable.run();return;}/** 将当前任务加入到阻塞队列,如果同步调用线程还在因为调用waitAndDrain方而处于等待状态* 那么将会因为队列中添加了元素而被唤醒,进而执行该线程任务*/queue.add(runnable);}
}

HeaderExchangeHandler#received处理消息

HeaderExchangeHandler#received方法对于消息进行分类并调用不同的方法继续处理。

对于请求消息,如果是双向消息,那么调用handleRequest方法继续处理,将会创建Response对象,然后调用dubboProtocol.requestHandler完成请求处理获取结果,并将结果封装到Response中后返回给客户端。如果是单向消息则仅仅调用dubboProtocol.requestHandler完成请求处理即可。

对于响应消息,将会调用DefaultFuture#received方法处理,此时就会根据响应id获取对应的DefaultFuture,将响应结果设置进去。这里的源码我们后面讲consumer获取响应结果的时候再讲解。

/*** HeaderExchangeHandler的方法* <p>* 分类处理消息** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);//请求消息if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {//处理事件消息handlerEvent(channel, request);} else {if (request.isTwoWay()) {//额外处理双向消息handleRequest(exchangeChannel, request);} else {//处理单向消息,直接调用下层DubboProtocol.requestHandler#received方法handler.received(exchangeChannel, request.getData());}}}//响应消息else if (message instanceof Response) {handleResponse(channel, (Response) message);}//字符串else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (StringUtils.isNotEmpty(echo)) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}
}

HeaderExchangeHandler#handleResponse处理响应

该方法用于处理响应,将会判断如果不是心跳响应,那么继续通过DefaultFuture#received处理响应。

我们在此前学习Dubbo Consumer发起请求的流程的时候就讲过,超时检查任务TimeoutCheckTask#timeoutCheck方法中,如果请求超时,那么同样会调用DefaultFuture#received方法处理超时响应。

/*** HeaderExchangeHandler的方法* <p>* 处理响应消息** @param channel  NettyChannel* @param response Response*/
static void handleResponse(Channel channel, Response response) throws RemotingException {//如果不是心跳响应,那么继续通过DefaultFuture#received处理响应if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}
}

DefaultFuture#received处理响应

当请求结果返回或者请求超时之后,将会通过DefaultFuture#received处理响应。

/*** DefaultFuture的方法* <p>* 处理响应结果** @param channel  NettyChannel* @param response Response*/
public static void received(Channel channel, Response response) {//调用另一个received方法,timeout参数为falsereceived(channel, response, false);
}public static void received(Channel channel, Response response, boolean timeout) {try {//由于获得了响应,那么将该请求的缓存从FUTURES中移除DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {//该请求的超时检查任务Timeout t = future.timeoutCheckTask;//如果没有超时,那么if (!timeout) {// decrease Time//取消该任务t.cancel();}//通过future处理响应,设置结果future.doReceived(response);} else {logger.warn("The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response status is " + response.getStatus()+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()+ " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");}} finally {//移除正在处理的channel缓存CHANNELS.remove(response.getId());}
}

DefaultFuture#doReceived处理响应

设置响应结果并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程。随后AsyncRpcResult#getAppResponse方法可以获取响应结果并返回。

/*** DefaultFuture的方法* <p>* 处理响应设置结果*/
private void doReceived(Response res) {if (res == null) {throw new IllegalStateException("response cannot be null");}//如果响应成功,则将设置响应结果并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程if (res.getStatus() == Response.OK) {this.complete(res.getResult());}//如果是客户端或者服务端超时,抛出超时异常并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));}//抛出远程调用异常并唤醒在AsyncRpcResult#getAppResponse方法中因为调用responseFuture.get()而阻塞的线程else {this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));}// the result is returning, but the caller thread may still wait// to avoid endless waiting for whatever reason, notify caller thread to return.//结果正在返回,但是调用者线程可能仍在等待,为了避免无休止的等待,通知调用线程返回。if (executor != null && executor instanceof ThreadlessExecutor) {ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;if (threadlessExecutor.isWaiting()) {threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +" which is not an expected state, interrupt the thread manually by returning an exception."));}}
}
http://www.dtcms.com/a/327376.html

相关文章:

  • Python day42
  • tensorrt-llm0.20.0离线部署DeepSeek-R1-Distill-Qwen-32B
  • 第六十三章:AI模型的“跨界之旅”:不同硬件架构下的兼容性方案
  • Linux NAPI 实现机制深度解析
  • 【CDA 新一级】学习笔记第1篇:数据分析的时代背景
  • 【前端八股文面试题】【JavaScript篇7】什么是JavaScript的原型、原型链? 有什么特点
  • 【设计模式精解】Java实现责任链模式(职责链模式)优雅处理多级请求(概述,使用场景,优缺点,代码示例)
  • Rust:构造函数 new() 如何进行错误处理?
  • 信号(Signal)** 是一种进程间异步通信机制,用于通知进程发生发生了某种事件(如错误、用户中断等)
  • 疯狂星期四文案网第37天运营日记
  • Apache POI中通过WorkBook写入图片后出现导出PDF文件时在不同页重复写入该图片问题,如何在通过sheet获取绘图对象清除该图片
  • 通过限制对象的内存分配位置来实现特定的设计目标
  • 【数据结构入门】堆
  • powerbi本地报表发布到web,以得到分享链接
  • C99中的变长数组(VLA)
  • 什么是 Spring MVC?
  • 中扬立库与西门子深化技术协同 共绘智能仓储创新蓝图
  • clean install 和 clean package 的区别
  • JVM学习笔记-----图解方法执行流程
  • 百胜软件×华为云联合赋能,“超级国民品牌”海澜之家新零售加速前行
  • 【力扣494】目标和
  • 【软考中级网络工程师】知识点之 IP QoS 技术
  • JVM宝典
  • 面试八股之从Java到JVM层面深入解析ReentrantLock实现原理
  • 力扣top100(day01-05)--矩阵
  • 开放原子开源生态大会:麒麟信安加入openEuler社区AI联合工作组,聚焦操作系统开源实践与行业赋能
  • Linux下的软件编程——文件IO
  • Openlayers基础教程|从前端框架到GIS开发系列课程(24)openlayers结合canva绘制矩形绘制线
  • 循环神经网络
  • THCV215一种高速视频数据收发器,采用低电压差分信号(LVDS)技术支持高速串行数据传输,支持1080p/60Hz高分辨率传输