Dubbo源码解读与实战-基础知识(上)
00 开篇词「深入掌握Dubbo原理与实现,提升你的职场竞争力
为什么要学习 Dubbo
我们在谈论任何一项技术的时候,都需要强调它所适用的业务场景,因为: 技术之所以有价值,就是因为它解决了一些业务场景难题。
一家公司由小做大,业务会不断发展,随之而来的是 DAU、订单量、数据量的不断增长,用来支撑业务的系统复杂度也会不断提高,模块之间的依赖关系也会日益复杂。这时候我们一般会从单体架构进入集群架构(如下图所示),在集群架构中通过负载均衡技术,将流量尽可能均摊到集群中的每台机器上,以此克服单台机器硬件资源的限制,做到横向扩展。
单体架构 VS 集群架构
之后,又由于业务系统本身的实现较为复杂、扩展性较差、性能也有上限,代码和功能的复用能力较弱,我们会将一个巨型业务系统拆分成多个微服务,根据不同服务对资源的不同要求,选择更合理的硬件资源。例如,有些流量较小的服务只需要几台机器构成的集群即可,而核心业务则需要成百上千的机器来支持,这样就可以最大化系统资源的利用率。
另外一个好处是,可以在服务维度进行重用,在需要某个服务的时候,直接接入即可,从而提高开发效率。拆分成独立的服务之后(如下图所示),整个服务可以最大化地实现重用,也可以更加灵活地扩展。
微服务架构图
但是在微服务架构落地的过程中,我们需要解决的问题有很多,如:
- 服务之间如何高性能地通信?
- 服务调用如何做到负载均衡、FailOver、限流?
- 如何有效地划清服务边界?
- 如何进行服务治理?
- ……
Apache Dubbo是一款高性能、轻量级的开源 Java RPC 框架,它提供了三大核心能力:
- 面向接口的远程方法调用;
- 可靠、智能的容错和负载均衡;
- 服务自动注册和发现能力。
简单地说, Dubbo 是一个分布式服务框架,致力于提供高性能、透明化的 RPC 远程服务调用方案以及服务治理方案,以帮助我们解决微服务架构落地时的问题。
Dubbo 是由阿里开源,后来加入了 Apache 基金会,目前已经从孵化器毕业,成为 Apache 的顶级项目。Apache Dubbo 目前已经有接近 32.8 K 的 Star、21.4 K 的 Fork,其热度可见一斑, 很多互联网大厂(如阿里、滴滴、去哪儿网等)都是直接使用 Dubbo 作为其 RPC 框架,也有些大厂会基于 Dubbo 进行二次开发实现自己的 RPC 框架 ,如当当网的 DubboX。
Dubbo 和 Spring Cloud 是目前主流的微服务框架,阿里、京东、小米、携程、去哪儿网等互联网公司的基础设施早已落成,并且后续的很多项目还是以 Dubbo 为主。Dubbo 重启之后,已经开始规划 3.0 版本,相信后面还会有更加惊艳的表现。
另外,RPC 框架的核心原理和设计都是相通的,阅读过 Dubbo 源码之后,你再去了解其他 RPC 框架的代码,就是一件非常简单的事情了。
阅读 Dubbo 源码的痛点
学习和掌握一项技能的时候,一般都是按照“是什么”“怎么用”“为什么”(原理)逐层深入的:
同样,你可以通过阅读官方文档或是几篇介绍性的文章,迅速了解 Dubbo 是什么;接下来,再去上手,用 Dubbo 写几个项目,从而更加全面地熟悉 Dubbo 的使用方式和特性,成为一名“熟练工”,但这也是很多开发者所处的阶段。而“有技术追求”的开发者,一般不会满足于每天只是写写业务代码,而是会开始研究 Dubbo 的源码实现以及底层原理,这就对应了上图中的核心层:“原理”。
而开始阅读源码时,不少开发者会提前去网上查找资料,或者直接埋头钻研源码,并因为这样的学习路径而普遍面临一些痛点问题:
- 网络资料不少,但大多是复制 Dubbo 官方文档,甚至干脆就是粘贴了一堆 Dubbo 源码过来,没有任何自己的个人实践和经验分享,学习花费精力不说,收获却不大。
- 相关资料讲述的 Dubbo 版本比较陈旧,没有跟上最新的设计和优化,有时候还会误导你。或者切入点很小,只针对 Dubbo 的一个流程进行介绍,看完之后,你只知道这一条调用分支上的相关内容,代码一旦运行到其他地方,还是一脸懵。
- 若抛开参考资料,自己直接去阅读 Dubbo 源码,你本身又需要具备一定的技术功底,而且要对整个开源项目有比较高的熟练度,这样你才能够循着它的核心逻辑去快速掌握它。而对于一个相对陌生的开源项目来说,这可能就是一个非常痛苦的过程了,并且最致命的是,由于对整个架构的“视野”受限,你很可能会迷失在代码迷宫中,最后虽然也花了很大力气去阅读和 Debug 源码,却在关上 IDEA 之后依然“雾里看花”。
课程设置
具体来说,在这个课程中我会:
- 从基础知识开始,通过丰富的 Demo 演示,手把手带你分析 Dubbo 涉及的核心知识点。之后再带你使用这些核心技术,通过编写一个简易版本的 RPC 框架串联所有知识点。
- 带你自底向上剖析 Dubbo 的源码,深入理解 Dubbo 的工作原理及核心实现,让你不再停留在简单使用 Dubbo 的阶段,做到知其然,也知其所以然。例如,Provider 是如何将服务发布到注册中心的、Consumer 是如何从注册中心订阅服务的,等等问题都可以在这里找到解答。
- 点名 Dubbo 源码中的设计模式,让你了解设计模式的优秀实践方式,帮助你从“纸上谈兵”变成“用兵如神”,这样在你进行架构设计以及代码编写的时候,就可以真正使用这些设计模式,让你的代码扩展性更强、可维护性更好。
- 带你领略 Dubbo 2.7.5 版本之后的最新优化和设计,让你紧跟时代潮流,更好地反馈到工作实践中。
讲师寄语
最后,我想和你说的是: 沉迷于代码,但不要只沉迷于代码。
阅读源码的目的是提升自身的技术能力,而提升技术能力的目的是更好地支持业务。阅读源码不是终点,你还需要结合实际业务,更好地体会开源项目的设计理念,并将这种设计应用到实践中。
让我们开启一次紧张刺激的 Dubbo 探秘之旅!我也希望你能在留言区与我分享你的 Dubbo 学习情况,分享你的成长心得和学习痛点,学习不是单向的输出,而是一次交流反馈的过程!加油。
为便于你更好地学习,我将整个 Dubbo 的源码(带注释的)放到 GitHub 上了,你可以按需查看:https://github.com/xxxlxy2008/dubbo。
01 Dubbo源码环境搭建:千里之行,始于足下
Dubbo 架构简介
为便于你更好理解和学习,在开始搭建 Dubbo 源码环境之前,我们先来简单介绍一下 Dubbo 架构中的核心角色,帮助你简单回顾一下 Dubbo 的架构,也帮助不熟悉 Dubbo 的小伙伴快速了解 Dubbo。下图展示了 Dubbo 核心架构:
Dubbo 核心架构图
- Registry:注册中心。 负责服务地址的注册与查找,服务的 Provider 和 Consumer 只在启动时与注册中心交互。注册中心通过长连接感知 Provider 的存在,在 Provider 出现宕机的时候,注册中心会立即推送相关事件通知 Consumer。
- Provider:服务提供者。 在它启动的时候,会向 Registry 进行注册操作,将自己服务的地址和相关配置信息封装成 URL 添加到 ZooKeeper 中。
- Consumer:服务消费者。 在它启动的时候,会向 Registry 进行订阅操作。订阅操作会从 ZooKeeper 中获取 Provider 注册的 URL,并在 ZooKeeper 中添加相应的监听器。获取到 Provider URL 之后,Consumer 会根据负载均衡算法从多个 Provider 中选择一个 Provider 并与其建立连接,最后发起对 Provider 的 RPC 调用。 如果 Provider URL 发生变更,Consumer 将会通过之前订阅过程中在注册中心添加的监听器,获取到最新的 Provider URL 信息,进行相应的调整,比如断开与宕机 Provider 的连接,并与新的 Provider 建立连接。Consumer 与 Provider 建立的是长连接,且 Consumer 会缓存 Provider 信息,所以一旦连接建立,即使注册中心宕机,也不会影响已运行的 Provider 和 Consumer。
- Monitor:监控中心。 用于统计服务的调用次数和调用时间。Provider 和 Consumer 在运行过程中,会在内存中统计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。监控中心在上面的架构图中并不是必要角色,监控中心宕机不会影响 Provider、Consumer 以及 Registry 的功能,只会丢失监控数据而已。
搭建Dubbo源码环境
当然,要搭建Dubbo 源码环境,你首先需要下载源码。这里你可以直接从官方仓库 https://github.com/apache/dubboFork 到自己的仓库,直接执行下面的命令去下载代码:
git clone git@github.com:xxxxxxxx/dubbo.git
然后切换分支,因为目前最新的是 Dubbo 2.7.7 版本,所以这里我们就用这个新版本:
git checkout -b dubbo-2.7.7 dubbo-2.7.7
接下来,执行 mvn 命令进行编译:
mvn clean install -Dmaven.test.skip=true
最后,执行下面的命令转换成 IDEA 项目:
mvn idea:idea // 要是执行报错,就执行这个 mvn idea:workspace
然后,在 IDEA 中导入源码,因为这个导入过程中会下载所需的依赖包,所以会耗费点时间。
Dubbo源码核心模块
在 IDEA 成功导入 Dubbo 源码之后,你看到的项目结构如下图所示:
下面我们就来简单介绍一下这些核心模块的功能,至于详细分析,在后面的课时中我们还会继续讲解。
- dubbo-common 模块: Dubbo 的一个公共模块,其中有很多工具类以及公共逻辑,例如课程后面紧接着要介绍的 Dubbo SPI 实现、时间轮实现、动态编译器等。
- dubbo-remoting 模块: Dubbo 的远程通信模块,其中的子模块依赖各种开源组件实现远程通信。在 dubbo-remoting-api 子模块中定义该模块的抽象概念,在其他子模块中依赖其他开源组件进行实现,例如,dubbo-remoting-netty4 子模块依赖 Netty 4 实现远程通信,dubbo-remoting-zookeeper 通过 Apache Curator 实现与 ZooKeeper 集群的交互。
- dubbo-rpc 模块: Dubbo 中对远程调用协议进行抽象的模块,其中抽象了各种协议,依赖于 dubbo-remoting 模块的远程调用功能。dubbo-rpc-api 子模块是核心抽象,其他子模块是针对具体协议的实现,例如,dubbo-rpc-dubbo 子模块是对 Dubbo 协议的实现,依赖了 dubbo-remoting-netty4 等 dubbo-remoting 子模块。 dubbo-rpc 模块的实现中只包含一对一的调用,不关心集群的相关内容。
- dubbo-cluster 模块: Dubbo 中负责管理集群的模块,提供了负载均衡、容错、路由等一系列集群相关的功能,最终的目的是将多个 Provider 伪装为一个 Provider,这样 Consumer 就可以像调用一个 Provider 那样调用 Provider 集群了。
- dubbo-registry 模块: Dubbo 中负责与多种开源注册中心进行交互的模块,提供注册中心的能力。其中, dubbo-registry-api 子模块是顶层抽象,其他子模块是针对具体开源注册中心组件的具体实现,例如,dubbo-registry-zookeeper 子模块是 Dubbo 接入 ZooKeeper 的具体实现。
- dubbo-monitor 模块: Dubbo 的监控模块,主要用于统计服务调用次数、调用时间以及实现调用链跟踪的服务。
- dubbo-config 模块: Dubbo 对外暴露的配置都是由该模块进行解析的。例如,dubbo-config-api 子模块负责处理 API 方式使用时的相关配置,dubbo-config-spring 子模块负责处理与 Spring 集成使用时的相关配置方式。有了 dubbo-config 模块,用户只需要了解 Dubbo 配置的规则即可,无须了解 Dubbo 内部的细节。
- dubbo-metadata 模块: Dubbo 的元数据模块(本课程后续会详细介绍元数据的内容)。dubbo-metadata 模块的实现套路也是有一个 api 子模块进行抽象,然后其他子模块进行具体实现。
- dubbo-configcenter 模块: Dubbo 的动态配置模块,主要负责外部化配置以及服务治理规则的存储与通知,提供了多个子模块用来接入多种开源的服务发现组件。
Dubbo 源码中的 Demo 示例
在 Dubbo 源码中我们可以看到一个 dubbo-demo 模块,共包括三个非常基础 的 Dubbo 示例项目,分别是: 使用 XML 配置的 Demo 示例、使用注解配置的 Demo 示例 以及 直接使用 API 的 Demo 示例 。下面我们将从这三个示例的角度,简单介绍 Dubbo 的基本使用。同时,这三个项目也将作为后续 Debug Dubbo 源码的入口,我们会根据需要在其之上进行修改 。不过在这儿之前,你需要先启动 ZooKeeper 作为注册中心,然后编写一个业务接口作为 Provider 和 Consumer 的公约。
启动 ZooKeeper
在前面 Dubbo 的架构图中,你可以看到 Provider 的地址以及配置信息是通过注册中心传递给 Consumer 的。 Dubbo 支持的注册中心尽管有很多, 但在生产环境中, 基本都是用 ZooKeeper 作为注册中心 。因此,在调试 Dubbo 源码时,自然需要在本地启动 ZooKeeper。
那怎么去启动 ZooKeeper 呢?
首先,你得下载 zookeeper-3.4.14.tar.gz 包(下载地址: https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/)。下载完成之后执行如下命令解压缩:
tar -zxf zookeeper-3.4.14.tar.gz
解压完成之后,进入 zookeeper-3.4.14 目录,复制 conf/zoo_sample.cfg 文件并重命名为 conf/zoo.cfg,之后执行如下命令就可以启动 ZooKeeper了。
>./bin/zkServer.sh start # 下面为输出内容
ZooKeeper JMX enabled by default
Using config: /Users/xxx/zookeeper-3.4.14/bin/../conf/zoo.cfg # 配置文件
Starting zookeeper ... STARTED # 启动成功
业务接口
在使用 Dubbo 之前,你还需要一个业务接口,这个业务接口可以认为是 Dubbo Provider 和 Dubbo Consumer 的公约,反映出很多信息:
- Provider ,如何提供服务、提供的服务名称是什么、需要接收什么参数、需要返回什么响应;
- Consumer ,如何使用服务、使用的服务名称是什么、需要传入什么参数、会得到什么响应。
dubbo-demo-interface 模块就是定义业务接口的地方,如下图所示:
其中,DemoService 接口中定义了两个方法:
public interface DemoService { String sayHello(String name); // 同步调用 // 异步调用 default CompletableFuture<String> sayHelloAsync(String name) { return CompletableFuture.completedFuture(sayHello(name)); }
}
Demo 1:基于 XML 配置
在 dubbo-demo 模块下的 dubbo-demo-xml 模块,提供了基于 Spring XML 的 Provider 和 Consumer。
我们先来看 dubbo-demo-xml-provider 模块,其结构如下图所示:
在其 pom.xml 中除了一堆 dubbo 的依赖之外,还有依赖了 DemoService 这个公共接口:
<dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-demo-interface</artifactId> <version>${project.parent.version}</version>
</dependency>
DemoServiceImpl 实现了 DemoService 接口,sayHello() 方法直接返回一个字符串,sayHelloAsync() 方法返回一个 CompletableFuture 对象。
在 dubbo-provider.xml 配置文件中,会将 DemoServiceImpl 配置成一个 Spring Bean,并作为 DemoService 服务暴露出去:
<!-- 配置为 Spring Bean -->
<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/> <!-- 作为 Dubbo 服务暴露出去 -->
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/>
还有就是指定注册中心地址(就是前面 ZooKeeper 的地址),这样 Dubbo 才能把暴露的 DemoService 服务注册到 ZooKeeper 中:
<!-- Zookeeper 地址 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
最后,在 Application 中写个 main() 方法,指定 Spring 配置文件并启动 ClassPathXmlApplicationContext 即可。
接下来再看 dubbo-demo-xml-consumer 模块,结构如下图所示:
在 pom.xml 中同样依赖了 dubbo-demo-interface 这个公共模块。
在 dubbo-consumer.xml 配置文件中,会指定注册中心地址(就是前面 ZooKeeper 的地址),这样 Dubbo 才能从 ZooKeeper 中拉取到 Provider 暴露的服务列表信息:
<!-- Zookeeper地址 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
还会使用 dubbo:reference 引入 DemoService 服务,后面可以作为 Spring Bean 使用:
<!--引入DemoService服务,并配置成Spring Bean-->
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/>
最后,在 Application 中写个 main() 方法,指定 Spring 配置文件并启动 ClassPathXmlApplicationContext 之后,就可以远程调用 Provider 端的 DemoService 的 sayHello() 方法了。
Demo 2:基于注解配置
dubbo-demo-annotation 模块是基于 Spring 注解配置的示例,无非就是将 XML 的那些配置信息转移到了注解上。
我们先来看 dubbo-demo-annotation-provider 这个示例模块:
public class Application { public static void main(String[] args) throws Exception { // 使用AnnotationConfigApplicationContext初始化Spring容器, // 从ProviderConfiguration这个类的注解上拿相关配置信息 AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( ProviderConfiguration.class); context.start(); System.in.read(); } @Configuration // 配置类 // @EnableDubbo注解指定包下的Bean都会被扫描,并做Dubbo服务暴露出去 @EnableDubbo(scanBasePackages = "org.apache.dubbo.demo.provider") // @PropertySource注解指定了其他配置信息 @PropertySource("classpath:/spring/dubbo-provider.properties") static class ProviderConfiguration { @Bean public RegistryConfig registryConfig() { RegistryConfig registryConfig = new RegistryConfig(); registryConfig.setAddress("zookeeper://127.0.0.1:2181"); return registryConfig; } }
}
这里,同样会有一个 DemoServiceImpl 实现了 DemoService 接口,并且在 org.apache.dubbo.demo.provider 目录下,能被扫描到,暴露成 Dubbo 服务。
接着再来看 dubbo-demo-annotation-consumer 模块,其中 Application 中也是通过 AnnotationConfigApplicationContext 初始化 Spring 容器,也会扫描指定目录下的 Bean,会扫到 DemoServiceComponent 这个 Bean,其中就通过 @Reference 注解注入 Dubbo 服务相关的 Bean:
@Component("demoServiceComponent")
public class DemoServiceComponent implements DemoService { @Reference // 注入Dubbo服务 private DemoService demoService; @Override public String sayHello(String name) { return demoService.sayHello(name); } // 其他方法
}
Demo 3:基于 API 配置
在有的场景中,不能依赖于 Spring 框架,只能使用 API 来构建 Dubbo Provider 和 Consumer,比较典型的一种场景就是在写 SDK 的时候。
先来看 dubbo-demo-api-provider 模块,其中 Application.main() 方法是入口:
// 创建一个ServiceConfig的实例,泛型参数是业务接口实现类,
// 即DemoServiceImpl
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>(); // 指定业务接口
service.setInterface(DemoService.class); // 指定业务接口的实现,由该对象来处理Consumer的请求
service.setRef(new DemoServiceImpl()); // 获取DubboBootstrap实例,这是个单例的对象
DubboBootstrap bootstrap = DubboBootstrap.getInstance(); //生成一个 ApplicationConfig 的实例、指定ZK地址以及ServiceConfig实例
bootstrap.application(new ApplicationConfig("dubbo-demo-api-provider")) .registry(new RegistryConfig("zookeeper://127.0.0.1:2181")) .service(service) .start() .await();
这里,同样会有一个 DemoServiceImpl 实现了 DemoService 接口,并且在 org.apache.dubbo.demo.provider 目录下,能被扫描到,暴露成 Dubbo 服务。
再来看 dubbo-demo-api-consumer 模块,其中 Application 中包含一个普通的 main() 方法入口:
// 创建ReferenceConfig,其中指定了引用的接口DemoService ReferenceConfig<DemoService> reference = new ReferenceConfig<>(); reference.setInterface(DemoService.class); reference.setGeneric("true"); // 创建DubboBootstrap,指定ApplicationConfig以及RegistryConfig DubboBootstrap bootstrap = DubboBootstrap.getInstance(); bootstrap.application(new ApplicationConfig("dubbo-demo-api-consumer")) .registry(new RegistryConfig("zookeeper://127.0.0.1:2181")) .reference(reference) .start(); // 获取DemoService实例并调用其方法 DemoService demoService = ReferenceConfigCache.getCache() .get(reference); String message = demoService.sayHello("dubbo"); System.out.println(message);
总结
在本课时,我们首先介绍了 Dubbo 的核心架构以及各核心组件的功能,接下来又搭建了 Dubbo 源码环境,并详细介绍了 Dubbo 核心模块的功能,为后续分析 Dubbo 源码打下了基础。最后我们还深入分析了 Dubbo 源码中自带的三个 Demo 示例,现在你就可以以这三个 Demo 示例为入口 Debug Dubbo 源码了。
在后面的课时中,我们将解决几个问题:Dubbo 是如何与 ZooKeeper 等注册中心进行交互的?Provider 与 Consumer 之间是如何交互的?为什么我们在编写业务代码的时候,感受不到任何网络交互?Dubbo Provider 发布到注册中心的数据是什么?Consumer 为何能正确识别?两者的统一契约是什么?这个契约是如何做到可扩展的?这个契约还会用在 Dubbo 的哪些地方?
02 Dubbo的配置总线:抓住URL,就理解了半个Dubbo
在互联网领域,每个信息资源都有统一的且在网上唯一的地址,该地址就叫 URL(Uniform Resource Locator,统一资源定位符),它是互联网的统一资源定位标志,也就是指网络地址。
URL 本质上就是一个特殊格式的字符串。一个标准的 URL 格式可以包含如下的几个部分:
protocol://username:password@host:port/path?key=value&key=value
- protocol:URL 的协议。我们常见的就是 HTTP 协议和 HTTPS 协议,当然,还有其他协议,如 FTP 协议、SMTP 协议等。
- username/password:用户名/密码。 HTTP Basic Authentication 中多会使用在 URL 的协议之后直接携带用户名和密码的方式。
- host/port:主机/端口。在实践中一般会使用域名,而不是使用具体的 host 和 port。
- path:请求的路径。
- parameters:参数键值对。一般在 GET 请求中会将参数放到 URL 中,POST 请求会将参数放到请求体中。
URL 是整个 Dubbo 中非常基础,也是非常核心的一个组件,阅读源码的过程中你会发现很多方法都是以 URL 作为参数的,在方法内部解析传入的 URL 得到有用的参数,所以有人将 URL 称为Dubbo 的配置总线。
例如,在下一课时介绍的 Dubbo SPI 核心实现中,你会看到 URL 参与了扩展实现的确定;在本课程后续介绍注册中心实现的时候,你还会看到 Provider 将自身的信息封装成 URL 注册到 ZooKeeper 中,从而暴露自己的服务, Consumer 也是通过 URL 来确定自己订阅了哪些 Provider 的。
由此可见,URL 之于 Dubbo 是非常重要的,所以说“抓住 URL,就理解了半个 Dubbo”。那本文我们就来介绍 URL 在 Dubbo 中的应用,以及 URL 作为 Dubbo 统一契约的重要性,最后我们再通过示例说明 URL 在 Dubbo 中的具体应用。
Dubbo 中的 URL
Dubbo 中任意的一个实现都可以抽象为一个 URL,Dubbo 使用 URL 来统一描述了所有对象和配置信息,并贯穿在整个 Dubbo 框架之中。这里我们来看 Dubbo 中一个典型 URL 的示例,如下:
dubbo://172.17.32.91:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=32508&release=&side=provider×tamp=1593253404714dubbo://172.17.32.91:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=32508&release=&side=provider×tamp=1593253404714
这个 Demo Provider 注册到 ZooKeeper 上的 URL 信息,简单解析一下这个 URL 的各个部分:
- protocol:dubbo 协议。
- username/password:没有用户名和密码。
- host/port:172.17.32.91:20880。
- path:org.apache.dubbo.demo.DemoService。
- parameters:参数键值对,这里是问号后面的参数。
下面是 URL 的构造方法,你可以看到其核心字段与前文分析的 URL 基本一致:
public URL(String protocol, String username, String password, String host, int port, String path, Map<String, String> parameters, Map<String, Map<String, String>> methodParameters) { if (StringUtils.isEmpty(username) && StringUtils.isNotEmpty(password)) { throw new IllegalArgumentException("Invalid url"); } this.protocol = protocol; this.username = username; this.password = password; this.host = host; this.port = Math.max(port, 0); this.address = getAddress(this.host, this.port); while (path != null && path.startsWith("/")) { path = path.substring(1); } this.path = path; if (parameters == null) { parameters = new HashMap<>(); } else { parameters = new HashMap<>(parameters); } this.parameters = Collections.unmodifiableMap(parameters); this.methodParameters = Collections.unmodifiableMap(methodParameters);
}
另外,在 dubbo-common 包中还提供了 URL 的辅助类:
- URLBuilder, 辅助构造 URL;
- URLStrParser, 将字符串解析成 URL 对象。
契约的力量
对于 Dubbo 中的 URL,很多人称之为“配置总线”,也有人称之为“统一配置模型”。虽然说法不同,但都是在表达一个意思,URL 在 Dubbo 中被当作是“公共的契约”。一个 URL 可以包含非常多的扩展点参数,URL 作为上下文信息贯穿整个扩展点设计体系。
其实,一个优秀的开源产品都有一套灵活清晰的扩展契约,不仅是第三方可以按照这个契约进行扩展,其自身的内核也可以按照这个契约进行搭建。如果没有一个公共的契约,只是针对每个接口或方法进行约定,就会导致不同的接口甚至同一接口中的不同方法,以不同的参数类型进行传参,一会儿传递 Map,一会儿传递字符串,而且字符串的格式也不确定,需要你自己进行解析,这就多了一层没有明确表现出来的隐含的约定。
所以说,在 Dubbo 中使用 URL 的好处多多,增加了便捷性:
- 使用 URL 这种公共契约进行上下文信息传递,最重要的就是代码更加易读、易懂,不用花大量时间去揣测传递数据的格式和含义,进而形成一个统一的规范,使得代码易写、易读。
- 使用 URL 作为方法的入参(相当于一个 Key/Value 都是 String 的 Map),它所表达的含义比单个参数更丰富,当代码需要扩展的时候,可以将新的参数以 Key/Value 的形式追加到 URL 之中,而不需要改变入参或是返回值的结构。
- 使用 URL 这种“公共的契约”可以简化沟通,人与人之间的沟通消耗是非常大的,信息传递的效率非常低,使用统一的契约、术语、词汇范围,可以省去很多沟通成本,尽可能地提高沟通效率。
Dubbo 中的 URL 示例
了解了 URL 的结构以及 Dubbo 使用 URL 的原因之后,我们再来看 Dubbo 中的三个真实示例,进一步感受 URL 的重要性。
1. URL 在 SPI 中的应用
Dubbo SPI 中有一个依赖 URL 的重要场景——适配器方法,是被 @Adaptive 注解标注的, URL 一个很重要的作用就是与 @Adaptive 注解一起选择合适的扩展实现类。
例如,在 dubbo-registry-api 模块中我们可以看到 RegistryFactory 这个接口,其中的 getRegistry() 方法上有 @Adaptive({“protocol”}) 注解,说明这是一个适配器方法,Dubbo 在运行时会为其动态生成相应的 “$Adaptive” 类型,如下所示:
public class RegistryFactory$Adaptiveimplements RegistryFactory { public Registry getRegistry(org.apache.dubbo.common.URL arg0) { if (arg0 == null) throw new IllegalArgumentException("..."); org.apache.dubbo.common.URL url = arg0; // 尝试获取URL的Protocol,如果Protocol为空,则使用默认值"dubbo" String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("..."); // 根据扩展名选择相应的扩展实现,Dubbo SPI的核心原理在下一课时深入分析 RegistryFactory extension = (RegistryFactory) ExtensionLoader .getExtensionLoader(RegistryFactory.class) .getExtension(extName); return extension.getRegistry(arg0); }
}
我们会看到,在生成的 RegistryFactory$Adaptive 类中会自动实现 getRegistry() 方法,其中会根据 URL 的 Protocol 确定扩展名称,从而确定使用的具体扩展实现类。我们可以找到 RegistryProtocol 这个类,并在其 getRegistry() 方法中打一个断点, Debug 启动上一课时介绍的任意一个 Demo 示例中的 Provider,得到如下图所示的内容:
这里传入的 registryUrl 值为:
zookeeper://127.0.0.1:2181/org.apache.dubbo...
那么在 RegistryFactory$Adaptive 中得到的扩展名称为 zookeeper,此次使用的 Registry 扩展实现类就是 ZookeeperRegistryFactory。至于 Dubbo SPI 的完整内容,我们将在下一课时详细介绍,这里就不再展开了。
2. URL 在服务暴露中的应用
我们再来看另一个与 URL 相关的示例。上一课时我们在介绍 Dubbo 的简化架构时提到,Provider 在启动时,会将自身暴露的服务注册到 ZooKeeper 上,具体是注册哪些信息到 ZooKeeper 上呢?我们来看 ZookeeperRegistry.doRegister() 方法,在其中打个断点,然后 Debug 启动 Provider,会得到下图:
传入的 URL 中包含了 Provider 的地址(172.18.112.15:20880)、暴露的接口(org.apache.dubbo.demo.DemoService)等信息, toUrlPath() 方法会根据传入的 URL 参数确定在 ZooKeeper 上创建的节点路径,还会通过 URL 中的 dynamic 参数值确定创建的 ZNode 是临时节点还是持久节点。
3. URL 在服务订阅中的应用
Consumer 启动后会向注册中心进行订阅操作,并监听自己关注的 Provider。那 Consumer 是如何告诉注册中心自己关注哪些 Provider 呢?
我们来看 ZookeeperRegistry 这个实现类,它是由上面的 ZookeeperRegistryFactory 工厂类创建的 Registry 接口实现,其中的 doSubscribe() 方法是订阅操作的核心实现,在第 175 行打一个断点,并 Debug 启动 Demo 中 Consumer,会得到下图所示的内容:
我们看到传入的 URL 参数如下:
consumer://...?application=dubbo-demo-api-consumer&category=providers,configurators,routers&interface=org.apache.dubbo.demo.DemoService...
其中 Protocol 为 consumer ,表示是 Consumer 的订阅协议,其中的 category 参数表示要订阅的分类,这里要订阅 providers、configurators 以及 routers 三个分类;interface 参数表示订阅哪个服务接口,这里要订阅的是暴露 org.apache.dubbo.demo.DemoService 实现的 Provider。
通过 URL 中的上述参数,ZookeeperRegistry 会在 toCategoriesPath() 方法中将其整理成一个 ZooKeeper 路径,然后调用 zkClient 在其上添加监听。
通过上述示例,相信你已经感觉到 URL 在 Dubbo 体系中称为“总线”或是“契约”的原因了,在后面的源码分析中,我们还将看到更多关于 URL 的实现。
总结
在本课时,我们重点介绍了 Dubbo 对 URL 的封装以及相关的工具类,然后说明了统一契约的好处,当然也是 Dubbo 使用 URL 作为统一配置总线的好处,最后我们还介绍了 Dubbo SPI、Provider 注册、Consumer 订阅等场景中与 URL 相关的实现,这些都可以帮助你更好地感受 URL 在其中发挥的作用。
这里你可以想一下,在其他框架或是实际工作中,有没有类似 Dubbo URL 这种统一的契约?
03 Dubbo SPI精析,接口实现两极反转(上)
Dubbo 为了更好地达到 OCP 原则(即“对扩展开放,对修改封闭”的原则),采用了“微内核+插件”的架构。那什么是微内核架构呢?微内核架构也被称为插件化架构(Plug-in Architecture),这是一种面向功能进行拆分的可扩展性架构。内核功能是比较稳定的,只负责管理插件的生命周期,不会因为系统功能的扩展而不断进行修改。功能上的扩展全部封装到插件之中,插件模块是独立存在的模块,包含特定的功能,能拓展内核系统的功能。
微内核架构中,内核通常采用 Factory、IoC、OSGi 等方式管理插件生命周期,Dubbo 最终决定采用 SPI 机制来加载插件,Dubbo SPI 参考 JDK 原生的 SPI 机制,进行了性能优化以及功能增强。因此,在讲解 Dubbo SPI 之前,我们有必要先来介绍一下 JDK SPI 的工作原理。
JDK SPI
SPI(Service Provider Interface)主要是被框架开发人员使用的一种技术。例如,使用 Java 语言访问数据库时我们会使用到 java.sql.Driver 接口,不同数据库产品底层的协议不同,提供的 java.sql.Driver 实现也不同,在开发 java.sql.Driver 接口时,开发人员并不清楚用户最终会使用哪个数据库,在这种情况下就可以使用 Java SPI 机制在实际运行过程中,为 java.sql.Driver 接口寻找具体的实现。
1. JDK SPI 机制
当服务的提供者提供了一种接口的实现之后,需要在 Classpath 下的 META-INF/services/ 目录里创建一个以服务接口命名的文件,此文件记录了该 jar 包提供的服务接口的具体实现类。当某个应用引入了该 jar 包且需要使用该服务时,JDK SPI 机制就可以通过查找这个 jar 包的 META-INF/services/ 中的配置文件来获得具体的实现类名,进行实现类的加载和实例化,最终使用该实现类完成业务功能。
下面我们通过一个简单的示例演示下 JDK SPI 的基本使用方式:
.png]
首先我们需要创建一个 Log 接口,来模拟日志打印的功能:
public interface Log { void log(String info);
}
接下来提供两个实现—— Logback 和 Log4j,分别代表两个不同日志框架的实现,如下所示:
public class Logback implements Log { @Override public void log(String info) { System.out.println("Logback:" + info); }
} public class Log4j implements Log { @Override public void log(String info) { System.out.println("Log4j:" + info); }
}
在项目的 resources/META-INF/services 目录下添加一个名为 com.xxx.Log 的文件,这是 JDK SPI 需要读取的配置文件,具体内容如下:
com.xxx.impl.Log4j
com.xxx.impl.Logback
最后创建 main() 方法,其中会加载上述配置文件,创建全部 Log 接口实现的实例,并执行其 log() 方法,如下所示:
public class Main { public static void main(String[] args) { ServiceLoader<Log> serviceLoader = ServiceLoader.load(Log.class); Iterator<Log> iterator = serviceLoader.iterator(); while (iterator.hasNext()) { Log log = iterator.next(); log.log("JDK SPI"); } }
} // 输出如下:
// Log4j:JDK SPI
// Logback:JDK SPI
2. JDK SPI 源码分析
通过上述示例,我们可以看到 JDK SPI 的入口方法是 ServiceLoader.load() 方法,接下来我们就对其具体实现进行深入分析。
在 ServiceLoader.load() 方法中,首先会尝试获取当前使用的 ClassLoader(获取当前线程绑定的 ClassLoader,查找失败后使用 SystemClassLoader),然后调用 reload() 方法,调用关系如下图所示:
在 reload() 方法中,首先会清理 providers 缓存(LinkedHashMap 类型的集合),该缓存用来记录 ServiceLoader 创建的实现对象,其中 Key 为实现类的完整类名,Value 为实现类的对象。之后创建 LazyIterator 迭代器,用于读取 SPI 配置文件并实例化实现类对象。
ServiceLoader.reload() 方法的具体实现,如下所示:
// 缓存,用来缓存 ServiceLoader创建的实现对象
private LinkedHashMap<String,S> providers = new LinkedHashMap<>(); public void reload() { providers.clear(); // 清空缓存 lookupIterator = new LazyIterator(service, loader); // 迭代器
}
在前面的示例中,main() 方法中使用的迭代器底层就是调用了 ServiceLoader.LazyIterator 实现的。Iterator 接口有两个关键方法:hasNext() 方法和 next() 方法。这里的 LazyIterator 中的next() 方法最终调用的是其 nextService() 方法,hasNext() 方法最终调用的是 hasNextService() 方法,调用关系如下图所示:
首先来看 LazyIterator.hasNextService() 方法,该方法主要负责查找 META-INF/services 目录下的 SPI 配置文件,并进行遍历,大致实现如下所示:
private static final String PREFIX = "META-INF/services/";
Enumeration<URL> configs = null;
Iterator<String> pending = null;
String nextName = null; private boolean hasNextService() { if (nextName != null) { return true; } if (configs == null) { // PREFIX前缀与服务接口的名称拼接起来,就是META-INF目录下定义的SPI配 // 置文件(即示例中的META-INF/services/com.xxx.Log) String fullName = PREFIX + service.getName(); // 加载配置文件 if (loader == null) configs = ClassLoader.getSystemResources(fullName); else configs = loader.getResources(fullName); } // 按行SPI遍历配置文件的内容 while ((pending == null) || !pending.hasNext()) { if (!configs.hasMoreElements()) { return false; } // 解析配置文件 pending = parse(service, configs.nextElement()); } nextName = pending.next(); // 更新 nextName字段 return true;
}
在 hasNextService() 方法中完成 SPI 配置文件的解析之后,再来看 LazyIterator.nextService() 方法,该方法负责实例化 hasNextService() 方法读取到的实现类,其中会将实例化的对象放到 providers 集合中缓存起来,核心实现如下所示:
private S nextService() { String cn = nextName; nextName = null; // 加载 nextName字段指定的类 Class<?> c = Class.forName(cn, false, loader); if (!service.isAssignableFrom(c)) { // 检测类型 fail(service, "Provider " + cn + " not a subtype"); } S p = service.cast(c.newInstance()); // 创建实现类的对象 providers.put(cn, p); // 将实现类名称以及相应实例对象添加到缓存 return p;
}
以上就是在 main() 方法中使用的迭代器的底层实现。最后,我们再来看一下 main() 方法中使用ServiceLoader.iterator() 方法拿到的迭代器是如何实现的,这个迭代器是依赖 LazyIterator 实现的一个匿名内部类,核心实现如下:
public Iterator<S> iterator() { return new Iterator<S>() { // knownProviders用来迭代providers缓存 Iterator<Map.Entry<String,S>> knownProviders = providers.entrySet().iterator(); public boolean hasNext() { // 先走查询缓存,缓存查询失败,再通过LazyIterator加载 if (knownProviders.hasNext()) return true; return lookupIterator.hasNext(); } public S next() { // 先走查询缓存,缓存查询失败,再通过 LazyIterator加载 if (knownProviders.hasNext()) return knownProviders.next().getValue(); return lookupIterator.next(); } // 省略remove()方法 };
}
3. JDK SPI 在 JDBC 中的应用
了解了 JDK SPI 实现的原理之后,我们再来看实践中 JDBC 是如何使用 JDK SPI 机制加载不同数据库厂商的实现类。
JDK 中只定义了一个 java.sql.Driver 接口,具体的实现是由不同数据库厂商来提供的。这里我们就以 MySQL 提供的 JDBC 实现包为例进行分析。
在 mysql-connector-java-*.jar 包中的 META-INF/services 目录下,有一个 java.sql.Driver 文件中只有一行内容,如下所示:
com.mysql.cj.jdbc.Driver
在使用 mysql-connector-java-*.jar 包连接 MySQL 数据库的时候,我们会用到如下语句创建数据库连接:
String url = "jdbc:xxx://xxx:xxx/xxx";
Connection conn = DriverManager.getConnection(url, username, pwd);
DriverManager 是 JDK 提供的数据库驱动管理器,其中的代码片段,如下所示:
static { loadInitialDrivers(); println("JDBC DriverManager initialized");
}
在调用 getConnection() 方法的时候,DriverManager 类会被 Java 虚拟机加载、解析并触发 static 代码块的执行;在 loadInitialDrivers() 方法中通过 JDK SPI 扫描 Classpath 下 java.sql.Driver 接口实现类并实例化,核心实现如下所示:
private static void loadInitialDrivers() { String drivers = System.getProperty("jdbc.drivers") // 使用 JDK SPI机制加载所有 java.sql.Driver实现类 ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class); Iterator<Driver> driversIterator = loadedDrivers.iterator(); while(driversIterator.hasNext()) { driversIterator.next(); } String[] driversList = drivers.split(":"); for (String aDriver : driversList) { // 初始化Driver实现类 Class.forName(aDriver, true, ClassLoader.getSystemClassLoader()); }
}
在 MySQL 提供的 com.mysql.cj.jdbc.Driver 实现类中,同样有一段 static 静态代码块,这段代码会创建一个 com.mysql.cj.jdbc.Driver 对象并注册到 DriverManager.registeredDrivers 集合中(CopyOnWriteArrayList 类型),如下所示:
static { java.sql.DriverManager.registerDriver(new Driver());
}
在 getConnection() 方法中,DriverManager 从该 registeredDrivers 集合中获取对应的 Driver 对象创建 Connection,核心实现如下所示:
private static Connection getConnection(String url, java.util.Properties info, Class<?> caller) throws SQLException { // 省略 try/catch代码块以及权限处理逻辑 for(DriverInfo aDriver : registeredDrivers) { Connection con = aDriver.driver.connect(url, info); return con; }
}
总结
本文我们通过一个示例入手,介绍了 JDK 提供的 SPI 机制的基本使用,然后深入分析了 JDK SPI 的核心原理和底层实现,对其源码进行了深入剖析,最后我们以 MySQL 提供的 JDBC 实现为例,分析了 JDK SPI 在实践中的使用方式。
JDK SPI 机制虽然简单易用,但是也存在一些小瑕疵,你可以先思考一下。
04 Dubbo SPI精析,接口实现两极反转(下)
在上一课时,我们一起学习了 JDK SPI 的基础使用以及核心原理,不过 Dubbo 并没有直接使用 JDK SPI 机制,而是借鉴其思想,实现了自身的一套 SPI 机制,这就是本课时将重点介绍的内容。
Dubbo SPI
在开始介绍 Dubbo SPI 实现之前,我们先来统一下面两个概念。
- 扩展点:通过 SPI 机制查找并加载实现的接口(又称“扩展接口”)。前文示例中介绍的 Log 接口、com.mysql.cj.jdbc.Driver 接口,都是扩展点。
- 扩展点实现:实现了扩展接口的实现类。
通过前面的分析可以发现,JDK SPI 在查找扩展实现类的过程中,需要遍历 SPI 配置文件中定义的所有实现类,该过程中会将这些实现类全部实例化。如果 SPI 配置文件中定义了多个实现类,而我们只需要使用其中一个实现类时,就会生成不必要的对象。例如,org.apache.dubbo.rpc.Protocol 接口有 InjvmProtocol、DubboProtocol、RmiProtocol、HttpProtocol、HessianProtocol、ThriftProtocol 等多个实现,如果使用 JDK SPI,就会加载全部实现类,导致资源的浪费。
Dubbo SPI 不仅解决了上述资源浪费的问题,还对 SPI 配置文件扩展和修改。
首先,Dubbo 按照 SPI 配置文件的用途,将其分成了三类目录。
- META-INF/services/ 目录:该目录下的 SPI 配置文件用来兼容 JDK SPI 。
- META-INF/dubbo/ 目录:该目录用于存放用户自定义 SPI 配置文件。
- META-INF/dubbo/internal/ 目录:该目录用于存放 Dubbo 内部使用的 SPI 配置文件。
然后,Dubbo 将 SPI 配置文件改成了 KV 格式,例如:
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
其中 key 被称为扩展名(也就是 ExtensionName),当我们在为一个接口查找具体实现类时,可以指定扩展名来选择相应的扩展实现。例如,这里指定扩展名为 dubbo,Dubbo SPI 就知道我们要使用:org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol 这个扩展实现类,只实例化这一个扩展实现即可,无须实例化 SPI 配置文件中的其他扩展实现类。
使用 KV 格式的 SPI 配置文件的另一个好处是:让我们更容易定位到问题。假设我们使用的一个扩展实现类所在的 jar 包没有引入到项目中,那么 Dubbo SPI 在抛出异常的时候,会携带该扩展名信息,而不是简单地提示扩展实现类无法加载。这些更加准确的异常信息降低了排查问题的难度,提高了排查问题的效率。
下面我们正式进入 Dubbo SPI 核心实现的介绍。
1. @SPI 注解
Dubbo 中某个接口被 @SPI注解修饰时,就表示该接口是扩展接口,前文示例中的 org.apache.dubbo.rpc.Protocol 接口就是一个扩展接口:
@SPI 注解的 value 值指定了默认的扩展名称,例如,在通过 Dubbo SPI 加载 Protocol 接口实现时,如果没有明确指定扩展名,则默认会将 @SPI 注解的 value 值作为扩展名,即加载 dubbo 这个扩展名对应的 org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol 这个扩展实现类,相关的 SPI 配置文件在 dubbo-rpc-dubbo 模块中,如下图所示:
那 ExtensionLoader 是如何处理 @SPI 注解的呢?
ExtensionLoader 位于 dubbo-common 模块中的 extension 包中,功能类似于 JDK SPI 中的 java.util.ServiceLoader。Dubbo SPI 的核心逻辑几乎都封装在 ExtensionLoader 之中(其中就包括 @SPI 注解的处理逻辑),其使用方式如下所示:
Protocol protocol = ExtensionLoader .getExtensionLoader(Protocol.class).getExtension("dubbo");
这里首先来了解一下 ExtensionLoader 中三个核心的静态字段。
- strategies(LoadingStrategy[]类型): LoadingStrategy 接口有三个实现(通过 JDK SPI 方式加载的),如下图所示,分别对应前面介绍的三个 Dubbo SPI 配置文件所在的目录,且都继承了 Prioritized 这个优先级接口,默认优先级是
DubboInternalLoadingStrategy > DubboLoadingStrategy > ServicesLoadingStrateg
- EXTENSION_LOADERS(ConcurrentMap类型) :Dubbo 中一个扩展接口对应一个 ExtensionLoader 实例,该集合缓存了全部 ExtensionLoader 实例,其中的 Key 为扩展接口,Value 为加载其扩展实现的 ExtensionLoader 实例。
- EXTENSION_INSTANCES(ConcurrentMap, Object>类型):该集合缓存了扩展实现类与其实例对象的映射关系。在前文示例中,Key 为 Class,Value 为 DubboProtocol 对象。
下面我们再来关注一下 ExtensionLoader 的实例字段。
- type(Class<?>类型):当前 ExtensionLoader 实例负责加载扩展接口。
- cachedDefaultName(String类型):记录了 type 这个扩展接口上 @SPI 注解的 value 值,也就是默认扩展名。
- cachedNames(ConcurrentMap, String>类型):缓存了该 ExtensionLoader 加载的扩展实现类与扩展名之间的映射关系。
- cachedClasses(Holder>>类型):缓存了该 ExtensionLoader 加载的扩展名与扩展实现类之间的映射关系。cachedNames 集合的反向关系缓存。
- cachedInstances(ConcurrentMap>类型):缓存了该 ExtensionLoader 加载的扩展名与扩展实现对象之间的映射关系。
ExtensionLoader.getExtensionLoader() 方法会根据扩展接口从 EXTENSION_LOADERS 缓存中查找相应的 ExtensionLoader 实例,核心实现如下:
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader;
}
得到接口对应的 ExtensionLoader 对象之后会调用其 getExtension() 方法,根据传入的扩展名称从 cachedInstances 缓存中查找扩展实现的实例,最终将其实例化后返回:
public T getExtension(String name) { // getOrCreateHolder()方法中封装了查找cachedInstances缓存的逻辑 Holder<Object> holder = getOrCreateHolder(name); Object instance = holder.get(); if (instance == null) { // double-check防止并发问题 synchronized (holder) { instance = holder.get(); if (instance == null) { // 根据扩展名从SPI配置文件中查找对应的扩展实现类 instance = createExtension(name); holder.set(instance); } } } return (T) instance;
}
在 createExtension() 方法中完成了 SPI 配置文件的查找以及相应扩展实现类的实例化,同时还实现了自动装配以及自动 Wrapper 包装等功能。其核心流程是这样的:
- 获取 cachedClasses 缓存,根据扩展名从 cachedClasses 缓存中获取扩展实现类。如果 cachedClasses 未初始化,则会扫描前面介绍的三个 SPI 目录获取查找相应的 SPI 配置文件,然后加载其中的扩展实现类,最后将扩展名和扩展实现类的映射关系记录到 cachedClasses 缓存中。这部分逻辑在 loadExtensionClasses() 和 loadDirectory() 方法中。
- 根据扩展实现类从 EXTENSION_INSTANCES 缓存中查找相应的实例。如果查找失败,会通过反射创建扩展实现对象。
- 自动装配扩展实现对象中的属性(即调用其 setter)。这里涉及 ExtensionFactory 以及自动装配的相关内容,本课时后面会进行详细介绍。
- 自动包装扩展实现对象。这里涉及 Wrapper 类以及自动包装特性的相关内容,本课时后面会进行详细介绍。
- 如果扩展实现类实现了 Lifecycle 接口,在 initExtension() 方法中会调用 initialize() 方法进行初始化。
private T createExtension(String name) { Class<?> clazz = getExtensionClasses().get(name); // --- 1 if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); // --- 2 if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance); // --- 3 Set<Class<?>> wrapperClasses = cachedWrapperClasses; // --- 4 if (CollectionUtils.isNotEmpty(wrapperClasses)) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } initExtension(instance); // ---5return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance (name: " + name + ", class: " + type + ") couldn't be instantiated: " + t.getMessage(), t); }
}
2. @Adaptive 注解与适配器
@Adaptive 注解用来实现 Dubbo 的适配器功能,那什么是适配器呢?这里我们通过一个示例进行说明。Dubbo 中的 ExtensionFactory 接口有三个实现类,如下图所示,ExtensionFactory 接口上有 @SPI 注解,AdaptiveExtensionFactory 实现类上有 @Adaptive 注解。
AdaptiveExtensionFactory 不实现任何具体的功能,而是用来适配 ExtensionFactory 的 SpiExtensionFactory 和 SpringExtensionFactory 这两种实现。AdaptiveExtensionFactory 会根据运行时的一些状态来选择具体调用 ExtensionFactory 的哪个实现。
@Adaptive 注解还可以加到接口方法之上,Dubbo 会动态生成适配器类。例如,Transporter接口有两个被 @Adaptive 注解修饰的方法:
@SPI("netty")
public interface Transporter { @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException; @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
Dubbo 会生成一个 Transporter$Adaptive 适配器类,该类继承了 Transporter 接口:
public class Transporter$Adaptive implements Transporter { public org.apache.dubbo.remoting.Client connect(URL arg0, ChannelHandler arg1) throws RemotingException { // 必须传递URL参数 if (arg0 == null) throw new IllegalArgumentException("url == null"); URL url = arg0; // 确定扩展名,优先从URL中的client参数获取,其次是transporter参数 // 这两个参数名称由@Adaptive注解指定,最后是@SPI注解中的默认值 String extName = url.getParameter("client", url.getParameter("transporter", "netty")); if (extName == null) throw new IllegalStateException("..."); // 通过ExtensionLoader加载Transporter接口的指定扩展实现 Transporter extension = (Transporter) ExtensionLoader .getExtensionLoader(Transporter.class) .getExtension(extName); return extension.connect(arg0, arg1); } ... // 省略bind()方法
}
生成 Transporter$Adaptive 这个类的逻辑位于 ExtensionLoader.createAdaptiveExtensionClass() 方法,若感兴趣你可以看一下相关代码,其中涉及的 javassist 等方面的知识,在后面的课时中我们会进行介绍。
明确了 @Adaptive 注解的作用之后,我们回到 ExtensionLoader.createExtension() 方法,其中在扫描 SPI 配置文件的时候,会调用 loadClass() 方法加载 SPI 配置文件中指定的类,如下图所示:
loadClass() 方法中会识别加载扩展实现类上的 @Adaptive 注解,将该扩展实现的类型缓存到 cachedAdaptiveClass 这个实例字段上(volatile修饰):
private void loadClass(){ if (clazz.isAnnotationPresent(Adaptive.class)) { // 缓存到cachedAdaptiveClass字段 cacheAdaptiveClass(clazz, overridden);} else ... // 省略其他分支
}
我们可以通过 ExtensionLoader.getAdaptiveExtension() 方法获取适配器实例,并将该实例缓存到 cachedAdaptiveInstance 字段(Holder类型)中,核心流程如下:
- 首先,检查 cachedAdaptiveInstance 字段中是否已缓存了适配器实例,如果已缓存,则直接返回该实例即可。
- 然后,调用 getExtensionClasses() 方法,其中就会触发前文介绍的 loadClass() 方法,完成 cachedAdaptiveClass 字段的填充。
- 如果存在 @Adaptive 注解修饰的扩展实现类,该类就是适配器类,通过 newInstance() 将其实例化即可。如果不存在 @Adaptive 注解修饰的扩展实现类,就需要通过 createAdaptiveExtensionClass() 方法扫描扩展接口中方法上的 @Adaptive 注解,动态生成适配器类,然后实例化。
- 接下来,调用 injectExtension() 方法进行自动装配,就能得到一个完整的适配器实例。
- 最后,将适配器实例缓存到 cachedAdaptiveInstance 字段,然后返回适配器实例。
getAdaptiveExtension() 方法的流程涉及多个方法,这里不再粘贴代码,感兴趣的同学可以参考上述流程分析相应源码。
此外,我们还可以通过 API 方式(addExtension() 方法)设置 cachedAdaptiveClass 这个字段,指定适配器类型(这个方法你知道即可)。
总之,适配器什么实际工作都不用做,就是根据参数和状态选择其他实现来完成工作。 。
3. 自动包装特性
Dubbo 中的一个扩展接口可能有多个扩展实现类,这些扩展实现类可能会包含一些相同的逻辑,如果在每个实现类中都写一遍,那么这些重复代码就会变得很难维护。Dubbo 提供的自动包装特性,就可以解决这个问题。 Dubbo 将多个扩展实现类的公共逻辑,抽象到 Wrapper 类中,Wrapper 类与普通的扩展实现类一样,也实现了扩展接口,在获取真正的扩展实现对象时,在其外面包装一层 Wrapper 对象,你可以理解成一层装饰器。
了解了 Wrapper 类的基本功能,我们回到 ExtensionLoader.loadClass() 方法中,可以看到:
private void loadClass(){ ... // 省略前面对@Adaptive注解的处理 } else if (isWrapperClass(clazz)) { // ---1 cacheWrapperClass(clazz); // ---2 } else ... // 省略其他分支
}
- 在 isWrapperClass() 方法中,会判断该扩展实现类是否包含拷贝构造函数(即构造函数只有一个参数且为扩展接口类型),如果包含,则为 Wrapper 类,这就是判断 Wrapper 类的标准。
- 将 Wrapper 类记录到 cachedWrapperClasses(Set>类型)这个实例字段中进行缓存。
前面在介绍 createExtension() 方法时的 4 处,有下面这段代码,其中会遍历全部 Wrapper 类并一层层包装到真正的扩展实例对象外层:
Set<Class<?>> wrapperClasses = cachedWrapperClasses;if (CollectionUtils.isNotEmpty(wrapperClasses)) { for (Class<?> wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass .getConstructor(type).newInstance(instance)); }
}
4. 自动装配特性
在 createExtension() 方法中我们看到,Dubbo SPI 在拿到扩展实现类的对象(以及 Wrapper 类的对象)之后,还会调用 injectExtension() 方法扫描其全部 setter 方法,并根据 setter 方法的名称以及参数的类型,加载相应的扩展实现,然后调用相应的 setter 方法填充属性,这就实现了 Dubbo SPI 的自动装配特性。简单来说,自动装配属性就是在加载一个扩展点的时候,将其依赖的扩展点一并加载,并进行装配。
下面简单看一下 injectExtension() 方法的具体实现:
private T injectExtension(T instance) { if (objectFactory == null) { // 检测objectFactory字段 return instance; } for (Method method : instance.getClass().getMethods()) { ... // 如果不是setter方法,忽略该方法(略) if (method.getAnnotation(DisableInject.class) != null) { continue; // 如果方法上明确标注了@DisableInject注解,忽略该方法 } // 根据setter方法的参数,确定扩展接口 Class<?> pt = method.getParameterTypes()[0]; ... // 如果参数为简单类型,忽略该setter方法(略) // 根据setter方法的名称确定属性名称 String property = getSetterProperty(method); // 加载并实例化扩展实现类 Object object = objectFactory.getExtension(pt, property); if (object != null) { method.invoke(instance, object); // 调用setter方法进行装配 } } return instance;
}
injectExtension() 方法实现的自动装配依赖了 ExtensionFactory(即 objectFactory 字段),前面我们提到过 ExtensionFactory 有 SpringExtensionFactory 和 SpiExtensionFactory 两个真正的实现(还有一个实现是 AdaptiveExtensionFactory 是适配器)。下面我们分别介绍下这两个真正的实现。
第一个,SpiExtensionFactory。 根据扩展接口获取相应的适配器,没有到属性名称:
@Override
public <T> T getExtension(Class<T> type, String name) { if (type.isInterface() && type.isAnnotationPresent(SPI.class)) { // 查找type对应的ExtensionLoader实例 ExtensionLoader<T> loader = ExtensionLoader .getExtensionLoader(type); if (!loader.getSupportedExtensions().isEmpty()) { return loader.getAdaptiveExtension(); // 获取适配器实现 } } return null;
}
第二个,SpringExtensionFactory。 将属性名称作为 Spring Bean 的名称,从 Spring 容器中获取 Bean:
public <T> T getExtension(Class<T> type, String name) { ... // 检查:type必须为接口且必须包含@SPI注解(略) for (ApplicationContext context : CONTEXTS) { // 从Spring容器中查找Bean T bean = BeanFactoryUtils.getOptionalBean(context,name,type); if (bean != null) { return bean; } } return null;
}
5. @Activate注解与自动激活特性
这里以 Dubbo 中的 Filter 为例说明自动激活特性的含义,org.apache.dubbo.rpc.Filter 接口有非常多的扩展实现类,在一个场景中可能需要某几个 Filter 扩展实现类协同工作,而另一个场景中可能需要另外几个实现类一起工作。这样,就需要一套配置来指定当前场景中哪些 Filter 实现是可用的,这就是 @Activate 注解要做的事情。
@Activate 注解标注在扩展实现类上,有 group、value 以及 order 三个属性。
- group 属性:修饰的实现类是在 Provider 端被激活还是在 Consumer 端被激活。
- value 属性:修饰的实现类只在 URL 参数中出现指定的 key 时才会被激活。
- order 属性:用来确定扩展实现类的排序。
我们先来看 loadClass() 方法对 @Activate 的扫描,其中会将包含 @Activate 注解的实现类缓存到 cachedActivates 这个实例字段(Map类型,Key为扩展名,Value为 @Activate 注解):
private void loadClass(){ if (clazz.isAnnotationPresent(Adaptive.class)) { // 处理@Adaptive注解 cacheAdaptiveClass(clazz, overridden); } else if (isWrapperClass(clazz)) { // 处理Wrapper类 cacheWrapperClass(clazz); } else { // 处理真正的扩展实现类 clazz.getConstructor(); // 扩展实现类必须有无参构造函数 ...// 兜底:SPI配置文件中未指定扩展名称,则用类的简单名称作为扩展名(略) String[] names = NAME_SEPARATOR.split(name); if (ArrayUtils.isNotEmpty(names)) { // 将包含@Activate注解的实现类缓存到cachedActivates集合中 cacheActivateClass(clazz, names[0]); for (String n : names) { // 在cachedNames集合中缓存实现类->扩展名的映射 cacheName(clazz, n);// 在cachedClasses集合中缓存扩展名->实现类的映射 saveInExtensionClass(extensionClasses, clazz, n, overridden); } } }
}
使用 cachedActivates 这个集合的地方是 getActivateExtension() 方法。首先来关注 getActivateExtension() 方法的参数:url 中包含了配置信息,values 是配置中指定的扩展名,group 为 Provider 或 Consumer。下面是 getActivateExtension() 方法的核心逻辑:
- 首先,获取默认激活的扩展集合。默认激活的扩展实现类有几个条件:①在 cachedActivates 集合中存在;②@Activate 注解指定的 group 属性与当前 group 匹配;③扩展名没有出现在 values 中(即未在配置中明确指定,也未在配置中明确指定删除);④URL 中出现了 @Activate 注解中指定的 Key。
- 然后,按照 @Activate 注解中的 order 属性对默认激活的扩展集合进行排序。
- 最后,按序添加自定义扩展实现类的对象。
public List<T> getActivateExtension(URL url, String[] values, String group) { List<T> activateExtensions = new ArrayList<>(); // values配置就是扩展名 List<String> names = values == null ?new ArrayList<>(0) : asList(values); if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {// ---1 getExtensionClasses(); // 触发cachedActivates等缓存字段的加载 for (Map.Entry<String, Object> entry :cachedActivates.entrySet()) { String name = entry.getKey(); // 扩展名 Object activate = entry.getValue(); // @Activate注解 String[] activateGroup, activateValue; if (activate instanceof Activate) { // @Activate注解中的配置 activateGroup = ((Activate) activate).group(); activateValue = ((Activate) activate).value(); } else { continue; } if (isMatchGroup(group, activateGroup) // 匹配group // 没有出现在values配置中的,即为默认激活的扩展实现 && !names.contains(name)// 通过"-"明确指定不激活该扩展实现 && !names.contains(REMOVE_VALUE_PREFIX + name)// 检测URL中是否出现了指定的Key && isActive(activateValue, url)) { // 加载扩展实现的实例对象,这些都是激活的 activateExtensions.add(getExtension(name)); } } // 排序 --- 2 activateExtensions.sort(ActivateComparator.COMPARATOR); } List<T> loadedExtensions = new ArrayList<>(); for (int i = 0; i < names.size(); i++) { // ---3 String name = names.get(i); // 通过"-"开头的配置明确指定不激活的扩展实现,直接就忽略了 if (!name.startsWith(REMOVE_VALUE_PREFIX) && !names.contains(REMOVE_VALUE_PREFIX + name)) { if (DEFAULT_KEY.equals(name)) { if (!loadedExtensions.isEmpty()) { // 按照顺序,将自定义的扩展添加到默认扩展集合前面 activateExtensions.addAll(0, loadedExtensions); loadedExtensions.clear(); } } else { loadedExtensions.add(getExtension(name)); } } } if (!loadedExtensions.isEmpty()) { // 按照顺序,将自定义的扩展添加到默认扩展集合后面 activateExtensions.addAll(loadedExtensions); } return activateExtensions;
}
最后举个简单的例子说明上述处理流程,假设 cachedActivates 集合缓存的扩展实现如下表所示:
在 Provider 端调用 getActivateExtension() 方法时传入的 values 配置为 “demoFilter3、-demoFilter2、default、demoFilter1”,那么根据上面的逻辑:
- 得到默认激活的扩展实实现集合中有 [ demoFilter4, demoFilter6 ];
- 排序后为 [ demoFilter6, demoFilter4 ];
- 按序添加自定义扩展实例之后得到 [ demoFilter3, demoFilter6, demoFilter4, demoFilter1 ]。
总结
本课时我们深入全面地讲解了 Dubbo SPI 的核心实现:首先介绍了 @SPI 注解的底层实现,这是 Dubbo SPI 最核心的基础;然后介绍了 @Adaptive 注解与动态生成适配器类的核心原理和实现;最后分析了 Dubbo SPI 中的自动包装和自动装配特性,以及 @Activate 注解的原理。
Dubbo SPI 是 Dubbo 框架实现扩展机制的核心,希望你仔细研究其实现,为后续源码分析过程打下基础。
05 海量定时任务,一个时间轮搞定
在很多开源框架中,都需要定时任务的管理功能,例如 ZooKeeper、Netty、Quartz、Kafka 以及 Linux 操作系统。
JDK 提供的 java.util.Timer 和 DelayedQueue 等工具类,可以帮助我们实现简单的定时任务管理,其底层实现使用的是堆这种数据结构,存取操作的复杂度都是 O(nlog(n)),无法支持大量的定时任务。在定时任务量比较大、性能要求比较高的场景中,为了将定时任务的存取操作以及取消操作的时间复杂度降为 O(1),一般会使用时间轮的方式。
时间轮是一种高效的、批量管理定时任务的调度模型。时间轮一般会实现成一个环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务;指针周期性地跳动,跳动到一个槽位,就执行该槽位的定时任务。
时间轮环形结构示意图
需要注意的是,单层时间轮的容量和精度都是有限的,对于精度要求特别高、时间跨度特别大或是海量定时任务需要调度的场景,通常会使用多级时间轮以及持久化存储与时间轮结合的方案。
那在 Dubbo 中,时间轮的具体实现方式是怎样的呢?本课时我们就重点探讨下。Dubbo 的时间轮实现位于 dubbo-common 模块的 org.apache.dubbo.common.timer 包中,下面我们就来分析时间轮涉及的核心接口和实现。
核心接口
在 Dubbo 中,所有的定时任务都要继承 TimerTask 接口。TimerTask 接口非常简单,只定义了一个 run() 方法,该方法的入参是一个 Timeout 接口的对象。Timeout 对象与 TimerTask 对象一一对应,两者的关系类似于线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系。通过 Timeout 对象,我们不仅可以查看定时任务的状态,还可以操作定时任务(例如取消关联的定时任务)。Timeout 接口中的方法如下图所示:
.png
Timer 接口定义了定时器的基本行为,如下图所示,其核心是 newTimeout() 方法:提交一个定时任务(TimerTask)并返回关联的 Timeout 对象,这有点类似于向线程池提交任务的感觉。
HashedWheelTimeout
HashedWheelTimeout 是 Timeout 接口的唯一实现,是 HashedWheelTimer 的内部类。HashedWheelTimeout 扮演了两个角色:
- 第一个,时间轮中双向链表的节点,即定时任务 TimerTask 在 HashedWheelTimer 中的容器。
- 第二个,定时任务 TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle),用于在时间轮外部查看和控制定时任务。
HashedWheelTimeout 中的核心字段如下:
- prev、next(HashedWheelTimeout类型),分别对应当前定时任务在链表中的前驱节点和后继节点。
- task(TimerTask类型),指实际被调度的任务。
- deadline(long类型),指定时任务执行的时间。这个时间是在创建 HashedWheelTimeout 时指定的,计算公式是:currentTime(创建 HashedWheelTimeout 的时间) + delay(任务延迟时间) - startTime(HashedWheelTimer 的启动时间),时间单位为纳秒。
- state(volatile int类型),指定时任务当前所处状态,可选的有三个,分别是 INIT(0)、CANCELLED(1)和 EXPIRED(2)。另外,还有一个 STATE_UPDATER 字段(AtomicIntegerFieldUpdater类型)实现 state 状态变更的原子性。
- remainingRounds(long类型),指当前任务剩余的时钟周期数。时间轮所能表示的时间长度是有限的,在任务到期时间与当前时刻的时间差,超过时间轮单圈能表示的时长,就出现了套圈的情况,需要该字段值表示剩余的时钟周期。
HashedWheelTimeout 中的核心方法有:
- isCancelled()、isExpired() 、state() 方法, 主要用于检查当前 HashedWheelTimeout 状态。
- cancel() 方法, 将当前 HashedWheelTimeout 的状态设置为 CANCELLED,并将当前 HashedWheelTimeout 添加到 cancelledTimeouts 队列中等待销毁。
- expire() 方法, 当任务到期时,会调用该方法将当前 HashedWheelTimeout 设置为 EXPIRED 状态,然后调用其中的 TimerTask 的 run() 方法执行定时任务。
- remove() 方法, 将当前 HashedWheelTimeout 从时间轮中删除。
HashedWheelBucket
HashedWheelBucket 是时间轮中的一个槽,时间轮中的槽实际上就是一个用于缓存和管理双向链表的容器,双向链表中的每一个节点就是一个 HashedWheelTimeout 对象,也就关联了一个 TimerTask 定时任务。
HashedWheelBucket 持有双向链表的首尾两个节点,分别是 head 和 tail 两个字段,再加上每个 HashedWheelTimeout 节点均持有前驱和后继的引用,这样就可以正向或是逆向遍历整个双向链表了。
下面我们来看 HashedWheelBucket 中的核心方法。
- addTimeout() 方法:新增 HashedWheelTimeout 到双向链表的尾部。
- pollTimeout() 方法:移除双向链表中的头结点,并将其返回。
- remove() 方法:从双向链表中移除指定的 HashedWheelTimeout 节点。
- clearTimeouts() 方法:循环调用 pollTimeout() 方法处理整个双向链表,并返回所有未超时或者未被取消的任务。
- expireTimeouts() 方法:遍历双向链表中的全部 HashedWheelTimeout 节点。 在处理到期的定时任务时,会通过 remove() 方法取出,并调用其 expire() 方法执行;对于已取消的任务,通过 remove() 方法取出后直接丢弃;对于未到期的任务,会将 remainingRounds 字段(剩余时钟周期数)减一。
HashedWheelTimer
HashedWheelTimer 是 Timer 接口的实现,它通过时间轮算法实现了一个定时器。HashedWheelTimer 会根据当前时间轮指针选定对应的槽(HashedWheelBucket),从双向链表的头部开始迭代,对每个定时任务(HashedWheelTimeout)进行计算,属于当前时钟周期则取出运行,不属于则将其剩余的时钟周期数减一操作。
下面我们来看 HashedWheelTimer 的核心属性。
- workerState(volatile int类型):时间轮当前所处状态,可选值有 init、started、shutdown。同时,有相应的 AtomicIntegerFieldUpdater 实现 workerState 的原子修改。
- startTime(long类型):当前时间轮的启动时间,提交到该时间轮的定时任务的 deadline 字段值均以该时间戳为起点进行计算。
- wheel(HashedWheelBucket[]类型):该数组就是时间轮的环形队列,每一个元素都是一个槽。当指定时间轮槽数为 n 时,实际上会取大于且最靠近 n 的 2 的幂次方值。
- timeouts、cancelledTimeouts(LinkedBlockingQueue类型):timeouts 队列用于缓冲外部提交时间轮中的定时任务,cancelledTimeouts 队列用于暂存取消的定时任务。HashedWheelTimer 会在处理 HashedWheelBucket 的双向链表之前,先处理这两个队列中的数据。
- tick(long类型):该字段在 HashedWheelTimer$Worker 中,是时间轮的指针,是一个步长为 1 的单调递增计数器。
- mask(int类型):掩码, mask = wheel.length - 1,执行 ticks & mask 便能定位到对应的时钟槽。
- ticksDuration(long类型):时间指针每次加 1 所代表的实际时间,单位为纳秒。
- pendingTimeouts(AtomicLong类型):当前时间轮剩余的定时任务总数。
- workerThread(Thread类型):时间轮内部真正执行定时任务的线程。
- worker(Worker类型):真正执行定时任务的逻辑封装这个 Runnable 对象中。
时间轮对外提供了一个 newTimeout() 接口用于提交定时任务,在定时任务进入到 timeouts 队列之前会先调用 start() 方法启动时间轮,其中会完成下面两个关键步骤:
- 确定时间轮的 startTime 字段;
- 启动 workerThread 线程,开始执行 worker 任务。
之后根据 startTime 计算该定时任务的 deadline 字段,最后才能将定时任务封装成 HashedWheelTimeout 并添加到 timeouts 队列。
下面我们来分析时间轮指针一次转动的全流程。
- 时间轮指针转动,时间轮周期开始。
- 清理用户主动取消的定时任务,这些定时任务在用户取消时,会记录到 cancelledTimeouts 队列中。在每次指针转动的时候,时间轮都会清理该队列。
- 将缓存在 timeouts 队列中的定时任务转移到时间轮中对应的槽中。
- 根据当前指针定位对应槽,处理该槽位的双向链表中的定时任务。
- 检测时间轮的状态。如果时间轮处于运行状态,则循环执行上述步骤,不断执行定时任务。如果时间轮处于停止状态,则执行下面的步骤获取到未被执行的定时任务并加入 unprocessedTimeouts 队列:遍历时间轮中每个槽位,并调用 clearTimeouts() 方法;对 timeouts 队列中未被加入槽中循环调用 poll()。
- 最后再次清理 cancelledTimeouts 队列中用户主动取消的定时任务。
上述核心逻辑在 HashedWheelTimer$Worker.run() 方法中,若你感兴趣的话,可以翻看一下源码进行分析。
Dubbo 中如何使用定时任务
在 Dubbo 中,时间轮并不直接用于周期性操作,而是只向时间轮提交执行单次的定时任务,在上一次任务执行完成的时候,调用 newTimeout() 方法再次提交当前任务,这样就会在下个周期执行该任务。即使在任务执行过程中出现了 GC、I/O 阻塞等情况,导致任务延迟或卡住,也不会有同样的任务源源不断地提交进来,导致任务堆积。
Dubbo 中对时间轮的应用主要体现在如下两个方面:
- 失败重试, 例如,Provider 向注册中心进行注册失败时的重试操作,或是 Consumer 向注册中心订阅时的失败重试等。
- 周期性定时任务, 例如,定期发送心跳请求,请求超时的处理,或是网络连接断开后的重连机制。
总结
本课时我们重点介绍了 Dubbo 中时间轮相关的内容:
- 首先介绍了 JDK 提供的 Timer 定时器以及 DelayedQueue 等工具类的问题,并说明了时间轮的解决方案;
- 然后深入讲解了 Dubbo 对时间轮的抽象,以及具体实现细节;
- 最后还说明了 Dubbo 中时间轮的应用场景,在我们后面介绍 Dubbo 其他模块的时候,你还会看到时间轮的身影。
这里再给你留个课后思考题:如果存在海量定时任务,并且这些任务的开始时间跨度非常长,例如,有的是 1 分钟之后执行,有的是 1 小时之后执行,有的是 1 年之后执行,那你该如何对时间轮进行扩展,处理这些定时任务呢?