当前位置: 首页 > news >正文

Dubbo 通信流程 - 服务的调用

Dubbo 客户端的使用

在 Dubbo 应用中,往类成员注解 @DubboReference,服务启动后便可以调用到远端:

@Component
public class InvokeDemoFacade {

    @Autowired
    @DubboReference
    private DemoFacade demoFacade;

    public String hello(String name){
    	// 经过网络调用到服务端的 DemoFacade
        return demoFacade.sayHello(name);
    }
}

在 Dubbo 通信流程 - 客户端代理对象的创建 中讲到,Dubbo 会为注解了 @DubboReference 的 Bean 创建代理对象并注册到 Spring 容器中。对类成员进行依赖注入时,Spring 会调用工厂对象 ReferenceBean 的 getObject 方法获取 Bean,该方法返回一个懒加载的代理对象。

所以,当调用一个注解了 @DubboReference 对象的方法时,调用的实际是其代理对象的方法:

public class LazyTargetInvocationHandler implements InvocationHandler {

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    	if (target == null) {
            target = lazyTargetSource.getTarget();
        }
    	...
            try {
                return method.invoke(target, args);
            } catch (InvocationTargetException exception) {
                Throwable targetException = exception.getTargetException();
                if (targetException != null) {
                    throw targetException;
                }
            }
        ...
	}
}

Dubbo 客户端的调用流程

在 Dubbo 通信流程 - 客户端代理对象的创建 中分析到,上述 target 是 InvokerInvocationHandler 类型,在这里再回忆一下:

// ReferenceBean.class
    private Object getCallProxy() throws Exception {
        if (referenceConfig == null) {
            synchronized (LockUtils.getSingletonMutex(applicationContext)) {
                if (referenceConfig == null) {
                    referenceBeanManager.initReferenceBean(this);
                    applicationContext
                            .getBean(
                                    DubboConfigApplicationListener.class.getName(),
                                    DubboConfigApplicationListener.class)
                            .init();
                    logger.warn(
                            CONFIG_DUBBO_BEAN_INITIALIZER,
                            "",
                            "",
                            "ReferenceBean is not ready yet, please make sure to "
                                    + "call reference interface method after dubbo is started.");
                }
            }
        }
        // get reference proxy
        // Subclasses should synchronize on the given Object if they perform any sort of extended singleton creation
        // phase.
        // In particular, subclasses should not have their own mutexes involved in singleton creation, to avoid the
        // potential for deadlocks in lazy-init situations.
        // The redundant type cast is to be compatible with earlier than spring-4.2
        if (referenceConfig.configInitialized()) {
            return referenceConfig.get();
        }
        synchronized (LockUtils.getSingletonMutex(applicationContext)) {
            return referenceConfig.get();
        }
    }

    private class DubboReferenceLazyInitTargetSource implements LazyTargetSource {
        @Override
        public Object getTarget() throws Exception {
            return getCallProxy();
        }
    }
}

InvokerInvocationHandler 中,使用了 invoker + rpcInvocation 传入 InvocationUtil 工具类:

public class InvokerInvocationHandler implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ...
        // RpcInvocation 封装了这次请求的相关信息
        RpcInvocation rpcInvocation = new RpcInvocation(
                serviceModel,
                method.getName(),
                invoker.getInterface().getName(),
                protocolServiceKey,
                method.getParameterTypes(),
                args);

        if (serviceModel instanceof ConsumerModel) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);
            rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));
        }
        return InvocationUtil.invoke(invoker, rpcInvocation);
    }
}

上述 invoker 是 InvokerInvocationHandler 的成员变量,其实际类型是 MigrationInvoker。MigrationInvoker 是流量切换的核心组件,它的核心作用是在服务迁移期间,对服务调用流量进行灵活的控制和管理,实现新旧服务提供者之间的平稳切换,避免因服务迁移给业务带来影响。

MigrationInvoker
流量切换:根据预设的规则,逐步将服务调用的流量从旧的服务提供者转移到新的服务提供者。
服务兼容:在迁移过程中,同时支持对旧服务和新服务的调用,确保业务的连续性。
数据对比:在流量切换过程中,可以对新旧服务的调用结果进行对比,以验证新服务的正确性。

MigrationInvoker 的 invoker 是 ScopeClusterInvoker,ScopeClusterInvoker 是 Dubbo 框架里的一个关键组件,其核心作用是依据服务调用的范围(Scope)对服务调用进行集群管理,实现对不同范围的服务提供者进行有效的调用和容错处理。

ScopeClusterInvoker
支持多范围的服务调用:在复杂的分布式系统里,服务可能会存在于不同的范围中,例如不同的数据中心、不同的业务单元等。ScopeClusterInvoker 可以按照预先设定的范围,对服务提供者进行分组管理。当服务消费者发起调用时,它能够精准地在特定范围的服务提供者中挑选合适的节点进行调用。
实现服务调用的隔离:借助对不同范围的服务提供者进行隔离调用,ScopeClusterInvoker 可以防止某个范围的服务故障影响到其他范围的服务。例如,当一个数据中心出现故障时,服务调用会被限制在其他正常的数据中心内进行,这样就能保证服务的可用性和稳定性。
动态调整调用范围:在系统运行过程中,ScopeClusterInvoker 能够根据实际情况动态地调整服务调用的范围。例如,当某个范围的服务提供者性能下降时,可以减少对该范围的调用,将更多的流量导向其他性能较好的范围。这种动态调整的能力可以提高系统的整体性能和资源利用率。
容错和负载均衡:ScopeClusterInvoker 集成了 Dubbo 的容错和负载均衡机制。在每个范围内,它会依据配置的负载均衡策略(如随机、轮询、最少活跃调用数等)选择合适的服务提供者进行调用。同时,当调用失败时,它会按照预设的容错策略(如失败重试、失败快速返回等)进行处理,确保服务调用的可靠性。

MigrationInvoker 的 invoker 是 MockClusterInvoker,MockClusterInvoker 是一个集群调用器,它会在服务调用时拦截调用请求,并根据配置的 Mock 策略来决定是调用实际的服务提供者还是返回 Mock 结果。

经过一系列的 Invoker 后,调用来到 AbstractCluster,AbstractCluster 调用 FilterChainBuilder 的 invoke 方法,进入过滤器链,依次经过 FutureFilter、MonitorFilter 等过滤器后,进入 FailoverClusterInvoker,FailoverClusterInvoker 在服务调用失败时,通过重试机制选择其他可用的服务提供者进行调用,以此保证服务调用的可靠性。

继续 Debug,来到 DubboInvoker:

///                  
// org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
// 按照dubbo协议发起调用实现类
///
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);
    
    // 获取发送数据的客户端
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 看看是单程发送不需要等待响应,还是发送完了后需要等待响应
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 获取超时时间
        int timeout = calculateTimeout(invocation, methodName);
        invocation.setAttachment(TIMEOUT_KEY, timeout);
        
        if (isOneway) {
        	// 单程发送,不需要等待响应
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            // 发送完了之后需要等待响应
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            // 操作 currentClient 发送了一个 request 请求,
            // 然后接收了一个 CompletableFuture 对象,说明这里存在异步操作
            CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            result.setExecutor(executor);
            return result;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

从代码流程看,拿到了一个交换数据的客户端类,然后走两个发送数据的分支,一条分支逻辑单程调用不需要响应,一条有响应,两条分支最终都返回了一个异步结果对象。

ReferenceCountExchangeClient.request -> HeaderExchangeChannel.request -> AbstractPeer.send -> AbstractClient.send -> NettyChannel.send

在 HeaderExchangeChannel 中 new 了一个 DefaultFuture,便调用 AbstractPeer 的 send 方法,进入异步的发送流程。

调用流程回顾

在这里插入图片描述

相关文章:

  • TCP可靠传输与慢启动机制
  • 项目上传github——SSH连接配置文档
  • 无参数读文件RCE
  • STRUCTBERT:将语言结构融入预训练以提升深度语言理解
  • AWS Aurora存算分离架构
  • Java可变参数:灵活的函数调用方式
  • 前端Material-UI面试题及参考答案
  • 洛谷题单1-P1001 A+B Problem-python-流程图重构
  • 初识 spring ai 之rag、mcp、tools calling使用
  • 存储效能驱动业务价值:星飞全闪关键业务场景性能实测报告
  • 解释 Webpack 中的模块打包机制,如何配置 Webpack 进行项目构建?
  • 调用deepseek大模型时智能嵌入函数
  • 使用 Spring AI Aliabab Module RAG 构建 Web Search 应用
  • 中药材图像分类,解锁小样本高精度建模秘籍-MATLAB赋能科研:基于AlexNet的迁移学习
  • 力扣hot100——最长连续序列(哈希unordered_set)
  • 力扣 第 153 场双周赛 讲题
  • 【redis】集群 数据分片算法:哈希求余、一致性哈希、哈希槽分区算法
  • 通过万能SPI设备驱动spidev.c来操作挂接在SPI总线上的SPI设备(DAC模块)【spidev.c代码详解、SPI控制器及SPI设备的设备树语句解析】
  • 工具——(常用的软件)视频编辑器
  • UE5学习笔记 FPS游戏制作31 显示计分板
  • 找专业做网站/国内新闻最新5条
  • 15 企业网站优化方案有哪些内容/seo职位具体做什么
  • 长兴做网站/chatgpt 网站
  • 优酷wordpress建站教程/百度怎么优化网站排名
  • 吴忠建设网站/快速刷排名seo软件
  • 自己做网站卖水果/windows优化大师怎么样