Sentinel规则持久化pull模式核心源码解析
文章目录
- 前言
- 一、服务端新增/修改规则
- 1.1、repository.save
- 1.2、publishRules
- 二、客户端接收规则
- 三、持久化扩展
- 3.1、持久化原理
- 3.1.1、FileRefreshableDataSource
- 3.1.1.1、super关键字
- 3.1.1.2、firstLoad()方法
- 3.1.2、FlowRuleManager.register2Property
- 3.2、持久化实现
- 总结:Sentinel 规则持久化:Pull 模式下的客户端与服务端处理流程
前言
在Sentinel的控制台(服务端)设置了流控、系统等规则后,点击保存按钮,会调用服务端FlowControllerV1
的接口。Sentinel默认会将规则在服务端的内存中保存一份,然后推送到客户端,默认情况下也是推送到客户端的内存中。因为是保存在内存中,所以应用重启后,上一次配置的规则就会消失,需要重新进行配置。
如果希望在客户端持久化一份,可以在服务端将规则推送到客户端后,自己实现扩展,或引入官方的jar包:
<!-- https://mvnrepository.com/artifact/com.alibaba.csp/sentinel-datasource-extension -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-extension</artifactId>
<version>1.8.0</version>
</dependency>
将规则更新到本地文件或数据库中。同时监控线程定期读取本地文件或数据库中的配置信息,如果发生变化,就会更新最新的逻辑到内存中:
图片来源:图灵学院
一、服务端新增/修改规则
当在服务端的控制台页面上新增流控规则时,实际会调用/v1/flow/rule
对应的方法:
该方法在FlowControllerV1
:
参数FlowRuleEntity
,也就是流控规则窗口中的选项:
最关键的是下图中的两个方法(修改规则同理,是 @PutMapping(“/save.json”) 的请求方式,最终同样会调用下图中的两个方法)
1.1、repository.save
该方法的作用,是将控制台页面上设置的规则,保存一份到服务端
的内存中:
在调用save
方法的InMemoryRuleRepositoryAdapter
类中,有三个属性:
- machineRules:保存每台机器对应的规则。
MachineInfo
表示一个具体的 Sentinel 客户端:
MachineInfo(ip=192.168.1.10, port=8719) → {
1001L → FlowRule(…),
1002L → FlowRule(…)
}
- allRules:保存所有机器对应的规则。
1001L → FlowRule(…),
1002L → DegradeRule(…),
1003L → ParamFlowRule(…)
- appRules:保存每个应用对应的规则集合
“order-service” → {
1001L → FlowRule(…),
1002L → DegradeRule(…)
}
“payment-service” → {
2001L → FlowRule(…)
}
1.2、publishRules
该方法是将服务端的规则信息推送一份到客户端,首先会获取当前机器下配置的所有规则,然后调用setFlowRuleOfMachineAsync
:
会再加一个参数,标记当前规则的类型
在该方法中发起post请求,进行推送规则,在推送规则前,还会通过toRule
方法,将参数转换为各自规则的客户端接收参数类型(例:流控规则,服务端的参数是FlowRuleEntity
,而客户端接收的参数类型是FlowRule
,所以需要转换一下)
二、客户端接收规则
Sentinel 控制台和客户端之间通过 Socket(基于 TCP)通信。当客户端启动时,会启动一个SimpleHttpCommandCenter
(服务端也有),监听一个端口( 8719),并注册各种命令处理器。
- 启动时扫描所有标注了 @CommandMapping 的类;
- 将这些处理类注册到一个 Map<String, CommandHandler> 中,键是 name(比如 “setRules”);
- 当 Socket 接收到客户端发来的请求命令:
- 解析请求命令名(如 /setRules?type=flow&data=[…]);
- 根据命令名找到对应的 handler;
- 调用该类的 handle() 方法来处理这次命令请求;
- 返回处理结果作为 socket 响应。
最终调用的是ModifyRulesCommandHandler
的handle
方法:
在该方法中,会进行请求类型的判断,进入不同的分支。例如流控,先还原出请求的实体类型,然后通过各自manager
管理器的loadRules
方法,将规则保存一份到**客户端(微服务)**的内存中:
flowRules
就是规则在服务端内存的体现:
- key:资源名,即需要被限流的业务资源,比如接口名、方法名、URL 等。
- value:针对该资源的所有流控规则的列表。
将各自的规则存入内存中后,客户端在进行对应的规则校验时,就会进行读取,例如限流:
获取该资源设置的所有流控规则
这里的flowRules就是在接收到服务端setRules请求后设置的
在这一行代码中,会去尝试将规则持久化到文件或DB,Redis等中间件中
:
如果没有自定义实现,或者引入第三方的jar包,这里的WritableDataSource
是空的。也就是不会走持久化的逻辑。
没有持久化,返回给服务端的信息中会进行标识:
三、持久化扩展
3.1、持久化原理
官方对于客户端的持久化,有一个案例,在Sentinel的sentinel-demo
模块下的sentinel-demo-dynamic-file-rule
中:
可以运行一下这个demo:
public class FileDataSourceDemo {
public static void main(String[] args) throws Exception {
FileDataSourceDemo demo = new FileDataSourceDemo();
demo.listenRules();
/*
* Start to require tokens, rate will be limited by rule in FlowRule.json
*/
FlowQpsRunner runner = new FlowQpsRunner();
runner.simulateTraffic();
runner.tick();
}
private void listenRules() throws Exception {
ClassLoader classLoader = getClass().getClassLoader();
String flowRulePath = URLDecoder.decode(classLoader.getResource("FlowRule.json").getFile(), "UTF-8");
// String degradeRulePath = URLDecoder.decode(classLoader.getResource("DegradeRule.json").getFile(), "UTF-8");
// String systemRulePath = URLDecoder.decode(classLoader.getResource("SystemRule.json").getFile(), "UTF-8");
// Data source for FlowRule
FileRefreshableDataSource<List<FlowRule>> flowRuleDataSource = new FileRefreshableDataSource<>(
flowRulePath, flowRuleListParser);
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
// Data source for DegradeRule
// FileRefreshableDataSource<List<DegradeRule>> degradeRuleDataSource
// = new FileRefreshableDataSource<>(
// degradeRulePath, degradeRuleListParser);
// DegradeRuleManager.register2Property(degradeRuleDataSource.getProperty());
//
// // Data source for SystemRule
// FileRefreshableDataSource<List<SystemRule>> systemRuleDataSource
// = new FileRefreshableDataSource<>(
// systemRulePath, systemRuleListParser);
// SystemRuleManager.register2Property(systemRuleDataSource.getProperty());
}
private Converter<String, List<FlowRule>> flowRuleListParser = source -> JSON.parseObject(source,
new TypeReference<List<FlowRule>>() {});
private Converter<String, List<DegradeRule>> degradeRuleListParser = source -> JSON.parseObject(source,
new TypeReference<List<DegradeRule>>() {});
private Converter<String, List<SystemRule>> systemRuleListParser = source -> JSON.parseObject(source,
new TypeReference<List<SystemRule>>() {});
}
输出的效果:
83 send qps is: 39
1744526839080, total:39, pass:20, block:19
82 send qps is: 37
1744526840080, total:37, pass:17, block:20
81 send qps is: 34
1744526841081, total:34, pass:21, block:13
因为在FlowRule.json
文件中是这样配置的:
运行时修改target目录下该文件的值:
发现放行的请求数量也随之发生改变。
1744526970761, total:38, pass:20, block:18
93 send qps is: 41
1744526971761, total:41, pass:20, block:21
92 send qps is: 37
1744526972761, total:37, pass:25, block:12
91 send qps is: 37
1744526973762, total:37, pass:37, block:0
90 send qps is: 41
1744526974762, total:41, pass:40, block:1
89 send qps is: 42
1744526975770, total:42, pass:42, block:0
88 send qps is: 46
1744526976771, total:46, pass:39, block:7
87 send qps is: 42
1744526977771, total:42, pass:38, block:4
这个Demo实际就是模拟了将规则持久化到本地文件,并且动态感知文件变化,刷新规则的操作。
那么是如何实现上面的效果的?关键代码在于FileRefreshableDataSource
和FlowRuleManager.register2Property
:
3.1.1、FileRefreshableDataSource
在实例化FileRefreshableDataSource
时,实际会调用到下面的构造。其中关键代码在于super
关键字的调用以及firstLoad()
方法:
3.1.1.1、super关键字
super
关键字会调用其父类AutoRefreshDataSource
的构造。
在startTimerService()
方法中,会利用线程池启动一个定时任务,默认是3s执行一次:
该定时任务主要完成的逻辑:
- 判断客户端持久化的规则文件是否被修改,
- 如果被修改,就重新去读取,并且利用对应监听器的
configUpdate
方法,将持久化的规则文件中的最新值,刷回到客户端内存的FlowRuleManager
的flowRules
中。
3.1.1.2、firstLoad()方法
firstLoad
则是在第一次启动时,读取持久化到本地的配置规则的值,并且刷回到客户端内存的FlowRuleManager
的flowRules
中。
所以完成图上第6步的关键,就在于FileRefreshableDataSource
3.1.2、FlowRuleManager.register2Property
该方法的参数,就是上一步得到的规则实例:
主要用于给新传进来的 property (上一步得到的规则实例)添加监听器,监听规则变化:
上面的Demo,只演示了当持久化的规则文件发生变化时,通过定时任务监控变化,并且修改内存中的值的操作,还缺少客户端接收服务端的推送,然后进行持久化的操作。也就是ModifyRulesCommandHandler
的handle
方法的:
3.2、持久化实现
对于持久化的实现,也有一个案例,只不过最终多了一个客户端接收服务端的推送,然后进行持久化的操作。
public class FileDataSourceInit implements InitFunc {
@Override
public void init() throws Exception {
// A fake path.
String flowRuleDir = System.getProperty("user.home") + File.separator + "sentinel" + File.separator + "rules";
String flowRuleFile = "flowRule.json";
String flowRulePath = flowRuleDir + File.separator + flowRuleFile;
ReadableDataSource<String, List<FlowRule>> ds = new FileRefreshableDataSource<>(
flowRulePath, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})
);
// Register to flow rule manager.
FlowRuleManager.register2Property(ds.getProperty());
WritableDataSource<List<FlowRule>> wds = new FileWritableDataSource<>(flowRulePath, this::encodeJson);
// Register to writable data source registry so that rules can be updated to file
// when there are rules pushed from the Sentinel Dashboard.
WritableDataSourceRegistry.registerFlowDataSource(wds);
}
private <T> String encodeJson(T t) {
return JSON.toJSONString(t);
}
}
上面的案例,可以直接拿来使用。因为实现了InitFunc
接口,只需要进行SPI的配置,具体是在工程的resource下加入:
文件的内容是实现了InitFunc
的包名 + 类名。
这样在调用被保护的资源时,会进入Env的静态代码块:
通过SPI机制加载文件,并且执行其中的init方法:
这里就有我们刚刚自定义的:
执行自定义的init,会向WritableDataSourceRegistry
中注册一个数据源:
List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
这一行代码中,解析出的init实现类,还有一个CommandCenterInitFunc
也非常的关键,优先级也是最高的,在它的内部,会启动线程,去监听控制台开放的端口,并且还有初始化所有CommandHanlder,以及调用转发的逻辑。
总结:Sentinel 规则持久化:Pull 模式下的客户端与服务端处理流程
在 Sentinel 的规则持久化 Pull 模式 中,客户端与服务端(控制台)分别承担不同的职责与处理逻辑。该模式下,控制台主动推送规则,客户端被动接收并进行持久化。
服务端(控制台)处理逻辑
当在控制台执行新增、修改或删除规则操作时,请求将进入 FlowControllerV1
,其内部处理流程包括以下两个步骤:
- 规则同步到内存:
将页面上传递的规则进行新增、修改或删除,并同步更新服务端内存中的规则缓存。 - 规则推送至客户端:
通过 Socket 通信,将最新的规则数据推送至对应的客户端实例。
客户端处理逻辑
客户端在接收到服务端推送的规则后,会由 ModifyRulesCommandHandler
的 handle
方法负责处理,具体包括:
- 规则写入客户端内存:
解析服务端下发的规则,并更新客户端本地内存中的规则集合。 - 规则持久化(可选):
若客户端实现了自定义持久化逻辑(通过 SPI 机制,提供InitFunc
接口的实现类),则会按照配置的持久化策略,将规则保存到本地文件、数据库或其他存储介质中。
客户端规则持久化关键点
- 服务端推送后的规则持久化
- 实现方式:调用
WritableDataSourceRegistry.registerFlowDataSource(...)
方法; - 作用:将规则注册为可写数据源,便于在接收到规则更新后,触发持久化操作。
- 本地规则文件变更的自动感知
- 实现方式:使用
FileRefreshableDataSource
组件; - 作用:支持客户端实时监听本地规则文件的变化,一旦检测到更新,即可自动加载并刷新内存中的规则数据,保持运行时规则与文件内容同步。
如需支持其他数据源(如 Nacos、Zookeeper、Apollo 等),可基于 Sentinel 提供的 SPI 接口扩展 ReadableDataSource
和 WritableDataSource
,实现自定义的数据加载与保存逻辑。
如果想要在Spring Boot启动时,就直接完成init逻辑,可以自定义实现ApplicationRunner
或CommandLineRunner
接口。通过SPI机制的init实现,实际上是懒加载的,在调用目标资源方法时才会执行。