响应式编程-基于Reactor模式WebFlux框架的Spring Gateway
背景
响应式编程在一些java生态的框架中应用很多,如Spring Cloud Gateway、Spring AI等中,Spring Cloud Gateway依靠响应式编程框架WebFlux基于Netty实现了一个非阻塞的网关,解决传统网关请求量大时候的线程耗尽问题,另外对于日常AIGC生产一般的大模型流式数据返回也是使用到了响应式编程,这样可以第一时间拿到响应内容,提升用户体验。
响应式编程
- Observer、Observable
- 发布者Flux、Mono
- 调度器Schedulers
Spring Cloud Gateway原理
- 自动配置相关组件
- 工作流程
- 定制优化动态路由配置、过滤器热加载的设计思路
响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。
为了应对高并发服务器端开发场景,在2009 年,微软提出了一个更优雅地实现异步编程的方式——Reactive Programming,我们称之为响应式编程。随后,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技术,使得Java 平台也有了能够实现响应式编程的框架。
在2017 年9 月28 日,Spring 5 正式发布。Spring 5 发布最大的意义在于,它将响应式编程技术的普及向前推进了一大步。而同时,作为在背后支持Spring 5 响应式编程的框架Spring Reactor,也进入了里程碑式的3.1.0 版本。
Observer、Observable
在Java8的版本,可以使用Observer、Observable实现响应式编程。
Java9 已弃用 此类和 Observer 接口已弃用。Observer 和 Observable 支持的事件模型非常有限,Observable 传递的通知顺序未指定,并且状态更改与通知不是一一对应的。对于更丰富的事件模型,请考虑使用 java.beans 包。要在线程之间实现可靠和有序的消息传递,请考虑使用 java.util.concurrent 包中的并发数据结构之一。有关反应式流样式编程,请参阅 java.util.concurrent.Flow API。
发布者Flux、Mono
发布者Publisher:Publisher是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber<? super T>的需求推送元素。一个Publisher可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。
订阅者Subscriber:Publisher提供了subscribe方法,允许消费者在有结果可用时进行消费。如果没有消费者Publisher不会做任何事情,不会耗费资源去计算、传输数据,他根据消费情况进行响应,通俗理解把整个事件比作做菜,传统的数据处理如直接向List放元素在处理可以理解为不管有无处理,菜已经做好了,而响应的发布订阅模式只是提前准备好原始原料,等到消费者订阅的时候才开始做菜。
Publisher可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型Mono和Flux,使用他们可以发布元素值、完成信号、错误信号,错误信号是终止数据流,然后把错误信息传递给Subscriber,错误信号和完成信号都是终止信号,不能共存。
- Flux:传输0个或者多个事件,常用方法just表示创建的Flux序列在发布这些元素之后自动结束,generate、create一般创建比较复杂的Flux序列,generate方法表示同步、逐一的方式生成序列,表示往序列添加元素的next方法只能被调用一次,而create支持同步、异步的消息产生,next方法可以调用多次。
- Mono:返回0个或者1个事件
调度器Schedulers
在响应式编程中,特别是使用Reactor框架时,调度器(Scheduler)用于控制代码执行的线程模型。以下是你提到的几种调度器的特点和区别:
- Schedulers.immediate():特点:在当前线程上立即执行任务。用途:适合需要在调用线程上同步执行的操作,通常用于测试或简单的同步任务。
- Schedulers.single():特点:使用单一的可复用线程。用途:适用于需要串行执行的任务,这样可以避免多线程环境中的竞争条件。
- Schedulers.elastic():特点:使用弹性的线程池,线程可以被复用。若线程闲置时间过长,则会被销毁。用途:适合I/O操作,如文件读写或网络请求,因为这些操作通常会阻塞线程。弹性线程池可以根据需要动态调整线程数量。
- Schedulers.parallel():特点:使用为并行操作优化的线程池,线程数量通常与CPU核心数一致。用途:适用于CPU密集型任务,如复杂计算或数据处理。这种调度器可以充分利用多核CPU的性能。
- Schedulers.timer():特点:支持任务调度的调度器。用途:适合需要在特定时间点或延时后执行的任务。通常用于定时任务或周期性任务。
- Schedulers.fromExecutorService():特点:从已有的
ExecutorService
对象创建调度器。用途:适合需要自定义线程池配置的场景,允许使用已有的ExecutorService
来控制线程的创建和管理。
这些调度器提供了多种线程管理策略,可以根据任务的特性选择合适的调度器,以优化性能和资源使用。
在响应式编程中,元素发送的顺序可能会受到使用的调度器和操作符的影响。以下是一些可能影响元素发送顺序的因素:
- Schedulers.immediate() 和 Schedulers.single():通常会按照代码的顺序发送元素,因为这些调度器在单一线程上执行。
- Schedulers.elastic() 和 Schedulers.parallel():可能会改变元素的发送顺序,因为这些调度器涉及多线程操作,尤其是在并行执行的情况下。
Spring Cloud Gateway
Spring Cloud Gateway 原理介绍和应用
在Spring Cloud Gateway未发布前,Spring Cloud使用的是Zuul网关,Zuul使用的是基于Servlet的阻塞模型,每次请求需要分配专门的线程处理,所以资源开销比较大,在高并发场景需要大量的线程,线程数成为了系统的瓶颈,所以作为替代,Spring Cloud Gateway网络层使用了基于非阻塞的Netty服务,从而解决线程瓶颈提升了性能。
Spring Cloud Gateway 是基于 Spring 5 和 Spring Boot 2 搭建的,本质上是一个 Spring Boot 应用。网关作为统一的流量入口,一些请求预处理如鉴权、限流、服务保护等可以在做在网关层,这样各个微服务只需要专注自己的业务逻辑,另外网关的作用
- 请求路由
- 修改请求响应
- 权限校验
- 限流熔断
- 请求重试:可以对那些幂等性的接口网关转发到微服务失败的重试
- 响应缓存:缓存一些频繁访问不经常变动的静态资源
- 响应聚合:将不同的微服务响应聚合一块响应
- 灰度流量
- 异常响应处理
SpringMVC使用了传统的阻塞Servlet框架,Gateway使用了Spring WebFlux非阻塞Reactor框架,网络层使用的是非阻塞Netty,主要作用是处理请求进行预处理以及转发,包含的组件
- Route:路由ID + 转发URI + 多个 Predicate + 多个 Filters 组成,可以针对一个api配置多个Route,匹配的时候按照优先级
- Predicate:路由的匹配条件,一个Route可以包含多个Predicates,最终会被合并一个Predicate
- Filter:对请求、响应进行pre、post处理,分为Route Filter和全局Filter
- 全局Filter:作用域全部的Route
- Route Filter:作用域是某个Route
自动配置相关组件
在GatewayAutoConfiguration类中,相关注解
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@AutoConfigureBefore({ HttpHandlerAutoConfiguration.class, WebFluxAutoConfiguration.class })
@AutoConfigureAfter({ GatewayReactiveLoadBalancerClientAutoConfiguration.class,
GatewayClassPathWarningAutoConfiguration.class })
@ConditionalOnClass(DispatcherHandler.class) // 仅在WebFlux存在的时候该配置类生效
自动配置类注册一些功能bean
- 转换器相关的bean:比如字符串和时间日期互转、键值对相关转换bean
- 路由定位器相关的bean:主要是RouteLocator的编排类CompositeRouteLocator、CompositeRouteDefinitionLocator可以设置多种加载方式编排所有的RouteLocator然后放入CachingRouteLocator组合路由定位器缓存,提高路由查找性能。
- RouteLocatorBuilder:提供一个DSL API 路由定位器构建器,用于创建和配置路由。
- RouteDefinitionLocator:通过getRoutes获取全部的路由定位器
- RouteDefinitionRouteLocator:负责从不同的数据源(如配置文件、数据库等)加载RouteDefinition,通过getRouteDefinitions获取全部的RouteDefinition
- PropertiesRouteDefinitionLocator:从配置属性中加载路由定义。
- InMemoryRouteDefinitionRepository:在内存中存储路由定义,作为默认的路由定义存储库。
- RouteDefinitionRouteLocator:负责从不同的数据源(如配置文件、数据库等)加载RouteDefinition,通过getRouteDefinitions获取全部的RouteDefinition
- 路由刷新监听相关的bean
- RouteRefreshListener:监听路由刷新事件,当路由变化时,触发响应的处理
- 处理器映射器HandlerMapping相关bean
- FilteringWebHandler:处理请求的过滤逻辑,应用全局过滤器和路由过滤器。
- RoutePredicateHandlerMapping:据路由谓词匹配请求,将请求映射到相应的Handler处理器。
- 跨域相关bean
- GlobalCorsProperties:配置全局跨域属性
- CorsGatewayFilterApplicationListener:处理跨域请求,应用跨域过滤器
- 全局过滤器相关bean:一些请求通用的过滤器,如:适配缓存的请求体、处理转发路径、处理WebSocket路由请求
- 路由谓词工厂相关的bean:一些根据请求头部、路径、方法、Cookie等信息判断请求是否匹配的类
- 相关内部配置bean
- Bucket4jConfiguration:配置Bucket4j限流相关的bean
- NettyConfiguration:配置Netty相关的bean
工作流程
当接收到一个请求,需要的处理过程
- 路由匹配 :根据请求的属性(如路径、请求头等),匹配到对应的路由规则。
- 过滤器处理 :对匹配到的路由应用一系列的过滤器,包括全局过滤器和路由特定的过滤器。
- 请求转发 :将处理后的请求转发到目标服务。
- 响应处理 :接收目标服务的响应,并应用过滤器进行处理,最后返回给客户端
调度器初始化
调度器初始化:在此过程DispatcherHadler作为调度器,在Spring启动后,会扫描响应的处理器、处理器适配器类型的bean加载到成员属性
public class DispatcherHandler implements WebHandler, PreFlightRequestHandler, ApplicationContextAware {
// 请求处理器
@Nullable
private List<HandlerMapping> handlerMappings;
// 处理器适配器
@Nullable
private List<HandlerAdapter> handlerAdapters;
// 处理器处理结果
@Nullable
private List<HandlerResultHandler> resultHandlers;
// 省略...
// 在Spring启动之后调用initStrategies
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
initStrategies(applicationContext);
}
// 扫描相关的bean放入成员属性
protected void initStrategies(ApplicationContext context) {
Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerMapping.class, true, false);
ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
AnnotationAwareOrderComparator.sort(mappings);
this.handlerMappings = Collections.unmodifiableList(mappings);
// 这个方法是 Return all beans of the given type or subtypes
Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerAdapter.class, true, false);
this.handlerAdapters = new ArrayList<>(adapterBeans.values());
AnnotationAwareOrderComparator.sort(this.handlerAdapters);
Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
context, HandlerResultHandler.class, true, false);
this.resultHandlers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(this.resultHandlers);
}
}
路由匹配
需要匹配对应的Route信息:接收到请求之后,请求相关信息被封装到了ServerWebExchange类,DispatcherHandler会调用handler方法,会找对应的所有HandlerMapping异步处理请求,调用handlerMapping.getHandler方法拿到handler处理请求
// ServerWebExchange 接口提供了一个统一的方式来处理 HTTP 请求和响应,以及与服务器端处理相关的其他属性和特性。
// 通过实现该接口,可以方便地构建自定义的 Web 应用程序。
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
// 检查当前请求是否为 CORS 预检请求。如果是,则调用 handlePreFlight 方法处理预检请求,并返回处理结果
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
return Flux.fromIterable(this.handlerMappings) // 将 handlerMappings 列表转换为一个 Flux 对象,用于异步处理每个 HandlerMapping
.concatMap(mapping -> mapping.getHandler(exchange)) // 对每个 HandlerMapping 调用 getHandler 方法,获取能够处理当前请求的处理器,并将结果合并为一个新的 Flux 对象。
.next() // 从 Flux 中获取第一个元素,如果没有元素,则返回一个空的 Mono 对象。
.switchIfEmpty(createNotFoundError())
.onErrorResume(ex -> handleResultMono(exchange, Mono.error(ex)))
// 如果成功获取到处理器webHandler(即找到了Route放在了WebServerExchange本地请求上下文并返回了FilteringWebHandler)
// 调用 handleRequestWith 方法处理请求,并返回处理结果。
.flatMap(handler -> handleRequestWith(exchange, handler));
}
// AbstractHandlerMapping#getHandler方法
@Override
public Mono<Object> getHandler(ServerWebExchange exchange)
// 调用子类的getHandlerInternal
return getHandlerInternal(exchange).map(handler -> {
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
}
ServerHttpRequest request = exchange.getRequest();
if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
CorsConfiguration config = (this.corsConfigurationSource != null ?
this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
if (config != null) {
config.validateAllowCredentials();
config.validateAllowPrivateNetwork();
}
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
return NO_OP_HANDLER;
}
}
return handler;
});
}
找到 Route 是否意味着找到了具体的 Controller
找到 Route 并不意味着找到了具体的 Controller 。 Route 只是定义了请求的转发规则,它将客户端的请求转发到目标服务的 URI。而 Controller 是 Spring MVC 或 Spring WebFlux 中的一个组件,用于处理具体的请求并返回响应。
在 Spring Cloud Gateway 中, Route 负责将请求路由到后端服务,而后端服务中的 Controller 负责处理具体的业务逻辑。例如,一个 Route 可能将请求转发到 http://backend-service/api/users ,而后端服务中的 UserController 会处理这个请求。
为什么需要把Route存储在WebServerExchange中?
在Gateway中对应的实现类是RoutePredicateHandlerMapping,注意找到Route之后,会有一个像ServerWebExchange放入属性的动作,为什么要这样操作?
是因为ServerWebExchange 是 Spring WebFlux 中表示一次请求 - 响应交互的核心对象,它包含了请求和响应的各种属性。
- 用于路由Route信息的传递:通过 exchange.getAttributes() 可以获取一个属性映射,这个映射可以用来在不同的处理环节之间传递数据。当 RoutePredicateHandlerMapping 类根据请求匹配到一个路由 r 后,将这个路由信息存储到 ServerWebExchange 的属性中,这样后续的处理器(如 FilteringWebHandler )就可以从这个属性中获取到匹配的路由信息
- 用于后续过滤器使用:通过将路由信息存储在 ServerWebExchange 的属性中,过滤器和处理器可以方便地获取这些信息,从而实现更灵活的处理逻辑。比如某些过滤器可能需要根据路由的目标 URI 来修改请求的目标地址,或者根据路由的配置来添加请求头。
private final FilteringWebHandler webHandler;
// 获取路由信息
private final RouteLocator routeLocator;
private final Integer managementPort;
private final ManagementPortType managementPortType;
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getLocalAddress() != null
&& exchange.getRequest().getLocalAddress().getPort() == this.managementPort) {
return Mono.empty();
}
// 将当前HandlerMapping的名称添加到请求的属性中,以便后续使用
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
// - deferContextual方法来处理请求,该方法允许在订阅时访问当前的上下文。
// - 将当前的上下文视图添加到请求的属性中。
return Mono.deferContextual(contextView -> {
exchange.getAttributes().put(GATEWAY_REACTOR_CONTEXT_ATTR, contextView);
// 来查找与当前请求匹配的路由。
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.map((Function<Route, ?>) r -> {
// 则将其映射到 webHandler ,并将路由信息添加到请求的属性中。
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
// 为什么需要放入一个属性?
// 1、路由信息传递
// 2、方便后续过滤器快速拿到路由信息作一些处理
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
// 这里返回的WebHandler起始就是FilteingWebHandler
return webHandler;
})
.switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
// 如果没有找到匹配的路由,则清除请求的缓存体,并记录日志。
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
ServerWebExchangeUtils.clearCachedRequestBody(exchange);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
});
}
自定义一个CustomGatewayFilter打印当前请求存储的Route信息demo
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
public class CustomGatewayFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
// 从 ServerWebExchange 的属性中获取匹配的路由信息
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
if (route != null) {
// 根据路由信息执行自定义逻辑
System.out.println("Matched route: " + route.getId());
}
return chain.filter(exchange);
}
}
最后RoutePredicateHandlerMapping类中的lookupRoute负责匹配的理由Route信息,lookupRoute 方法会遍历所有可用的路由,对每个路由应用谓词( Predicate )来判断是否与当前请求匹配,若匹配到第一个路由则返回该路由的 Mono 对象;若未匹配到任何路由,则返回一个空的 Mono ,存储在ServerWebExchange中
// RoutePredicateHandlerMapping#lookupRoute
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes().filterWhen(route -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, route.getId());
try {
return route.getPredicate().apply(exchange);
}
catch (Exception e) {
logger.error("Error applying predicate for route: " + route.getId(), e);
}
return Mono.just(false);
})
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
/*
* TODO: trace logging if (logger.isTraceEnabled()) {
* logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
*/
}
对应找到的的Route实体存储的信息,然后存储到了上面说的ServerWebExchange的attribute中,这一步的目的是让后续的Handler(可以理解为Controller)处理器和过滤器能够访问到匹配的路由信息。
public class Route implements Ordered {
// 路由的唯一标识符。
private final String id;
// 路由的目标 URI。
private final URI uri;
// 路由的顺序,用于排序。
private final int order;
// 异步谓词,用于判断请求是否匹配该路由。
// 匹配条件。多个 Predicates 会合并成一个聚合的条件。
private final AsyncPredicate<ServerWebExchange> predicate;
// 路由的过滤器列表。
// 这些过滤器最终会和全局过滤器一起排序处理匹配成功的请求。
private final List<GatewayFilter> gatewayFilters;
// 路由的元数据,用于存储额外信息。
private final Map<String, Object> metadata;
}
过滤器处理
RoutePredicateHandlerMapping 找到匹配的Route路由后,对应Route的信息是存在ServerWebExchange的属性中,然后会返回webHandler,这成员属性就是在构造的时候传入的FilteringWebHandler,进而将请求传递给 FilteringWebHandler 进行处理。
回到DispatcherHandler中,会调用handlerRequestWith方法,然后找出对应的HandlerAdapter调用webHandler
// DispatcherHandler#handleRequestWith
private Mono<Void> handleRequestWith(ServerWebExchange exchange, Object handler) {
if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
return Mono.empty(); // CORS rejection
}
if (this.handlerAdapters != null) {
// 找出对应的HandlerAdapter处理器适配器,来调用handler,这里的handler就是上面返回的webHandler
for (HandlerAdapter adapter : this.handlerAdapters) {
if (adapter.supports(handler)) {
Mono<HandlerResult> resultMono = adapter.handle(exchange, handler);
// 调用handler
return handleResultMono(exchange, resultMono);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
为什么需要使用HandlerAdapter调用Handler而不是直接调用?
为什么找出对应的HandlerAdapter处理器适配器来调用handler(这里的handler就是上面返回的FilteringWebHandler类型的webHandler)?
在Spring WebFlux的 DispatcherHandler 中使用 HandlerAdapter 来调用 handler ,主要是为了实现代码的解耦和灵活性,以下是具体原因
- 解耦:HandlerAdapter 作为一个中间层,将 DispatcherHandler 与具体的 handler 实现进行解耦。 DispatcherHandler 负责请求的分发,而 HandlerAdapter 负责处理具体的 handler 。这样, DispatcherHandler 不需要关心 handler 的具体类型和实现细节,只需要通过 HandlerAdapter 来调用 handler 。
- 支持多种类型的Handler:在Spring WebFlux应用中,可能存在多种不同类型的 handler ,例如函数式处理方式、注解式处理方式等。 HandlerAdapter 可以根据不同的 handler 类型提供不同的处理逻辑,从而支持多种处理方式。
- 便于拓展:使用 HandlerAdapter 模式可以方便地扩展系统,添加新的 handler 类型。当需要支持新的处理方式时,只需要实现一个新的 HandlerAdapter 并注册到系统中,不需要修改DispatcherHandler
对于哪个Handler该找对应的HandlerAdapter,在supports方法中定义了适配规则,因为FilteringWebHandler implements WebHandler,所以适配器使用的是SimpleHandlerAdapter
FilteringWebHandler 是 Spring Cloud Gateway 中处理请求的核心处理器,它会应用路由上配置的过滤器,他会获取Route路由上配置的过滤器列表(包括全局过滤器和路由特定的过滤器),并按照顺序依次应用这些过滤器。过滤器可以对请求和响应进行修改,例如添加请求头、修改请求路径等。
在 FilteringWebHandler 的 handle 方法中,会将过滤器和请求传递给 DefaultGatewayFilterChain 进行处理
// SimpleHandlerAdapter#handler方法来到了FilteringWebHandler#handle 方法
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
// 1、获取存储在ServerWebExchange即本次请求中的Route路由信息
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
// 2、路由信息中包含了全局过滤器以及本路由特定的过滤器
List<GatewayFilter> combined = getCombinedFilters(route);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
// 3、执行器过滤去链
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
FilteringWebHandler 会把所有的 GlobalFilter 实例加载进来并且使用 GatewayFilterAdapter 适配成 GatewayFilter 。在 handle 方法处理请求时,会把适配后的 GlobalFilter 以及路由本身的 GatewayFilter 全并在一个 List 里,然后按 Order 排序。排序完之后会构造一个 GatewayFilterChain ,由 GatewayFilterChain 的 filter 方法触发这些 Filter 的执行。
// DefaultGatewayFilterChain#filter
// DefaultGatewayFilterChain是FilteringWebHandler的内部类
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
请求转发
执行完Filter之后,需要将请求转发到指定的目标服务。
一般网关需要配合注册中心使用,如下面直接转发并负载均衡到service-a目标服务
spring:
cloud:
gateway:
routes:
- id: service-a-route
uri: lb://service-a
predicates:
- Path=/service/**
URI 以 lb:// 开头的路由会被 Gateway 自带的 ReactiveLoadBalancerClientFilter 处理,这个 Filter 会识别 URI 中的服务名,并为其创建一个 ReactorLoadBalancer,然后负载均衡策略从对应服务的实例中获取其中一个实例作为请求转发目标。
来到GatewayReactiveLoadBalancerClientAutoConfiguration自动配置类,定义了两个bean
- 响应式负载均衡客户端过滤器:负责获取负载均衡可用的服务表,比如使用client从注册中心fetch可以用的服务。
- 服务实例 Cookie 过滤器:
- 用于会话粘性,会话粘性是指让同一个客户端的多次请求都被路由到同一个服务实例上。在分布式系统中,服务通常会被部署多个实例以提高可用性和性能。但有些应用场景下,客户端的会话状态是存储在服务实例本地的,例如用户登录状态、购物车信息等。如果客户端的不同请求被路由到不同的服务实例上,就会导致会话状态丢失。
- 用于服务实例选择:在负载均衡的场景下,Cookie 过滤器可以根据 Cookie 中的信息来选择合适的服务实例。例如,当某个服务实例的性能出现问题时,可以通过修改 Cookie 中的信息,将客户端的请求路由到其他性能较好的服务实例上。
- 跟踪和监控:Cookie 过滤器可以在 Cookie 中记录一些有用的信息,如请求的时间、服务实例的 ID 等。这些信息可以用于跟踪和监控客户端的请求,帮助开发人员分析系统的性能和用户行为。
@Bean
// 省略注解
public ReactiveLoadBalancerClientFilter gatewayLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,
GatewayLoadBalancerProperties properties) {
return new ReactiveLoadBalancerClientFilter(clientFactory, properties);
}
@Bean
public LoadBalancerServiceInstanceCookieFilter loadBalancerServiceInstanceCookieFilter(
LoadBalancerClientFactory loadBalancerClientFactory) {
return new LoadBalancerServiceInstanceCookieFilter(loadBalancerClientFactory);
}
网关负载均衡的路由?
ReactorLoadBalancer 需要一个 ServiceInstanceListSupplier 为其提供服务列表。ServiceInstanceListSupplier 有多种实现。
- FixedServiceInstanceListSupplier : 每次返回固定的服务实例列表。
- DiscoveryClientServiceInstanceListSupplier : 从 DiscoveryClient 动态获取服务的实例列表。
- HealthCheckServiceInstanceListSupplier : 依赖一个被代理的 Supplier 实现,为其返回的服务实例执行主动健康检查,只返回健康的实例。
- ZonePreferenceServiceInstanceListSupplier : 依赖一个被代理的 Supplier 实现,只返回同可用区的服务实例,当可用区下没有实例时,会返回全部实例。
- CachingServiceInstanceListSupplier : 依赖一个被代理的 Supplier 实现,将代理的 Supplier 返回的实例缓存起来并定时刷新缓存。
如通过注册中心选择服务进行负载均衡
处理响应
在执行完过滤器链,将请求转发到了目标服务器之后,需要处理目标服务器的响应,转发给请求客户端。
NettyRoutingFilter 类负责将请求转发到目标服务器,并且接收目标服务器的响应。通过 Netty 实现了 HTTP 请求的转发,处理了请求头、响应头、状态码和超时等问题。它是 Spring Cloud Gateway 中一个重要的过滤器,用于将请求路由到目标服务。
public class NettyRoutingFilter implements GlobalFilter, Ordered {
/**
* The order of the NettyRoutingFilter. See {@link Ordered#LOWEST_PRECEDENCE}.
*/
// 最低优先级的filter
public static final int ORDER = Ordered.LOWEST_PRECEDENCE;
private static final Log log = LogFactory.getLog(NettyRoutingFilter.class);
private final HttpClient httpClient;
// headersFiltersProvider 是一个 ObjectProvider ,用于提供 HTTP 头过滤器列表
// List<HttpHeadersFilter> 是一个 volatile 变量,用于存储 HTTP 头过滤器列表。
private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;
private final HttpClientProperties properties;
}
核心逻辑在filter方法中,用于根据Netty的HttpClient处理请求和响应。
- 首先获取请求的 URL,并检查是否已经路由过或协议是否为 http 或 https 。如果不满足条件,则直接调用下一个过滤器。
- 设置请求已经路由过的标志。
- 获取请求的方法和 URL,并过滤请求头。
- 创建 HttpClient 并发送请求,处理响应。
- 如果配置了响应超时时间,则设置超时处理逻辑。
- 最后返回响应并继续调用下一个过滤器。
@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 从 ServerWebExchange 中获取 GATEWAY_REQUEST_URL_ATTR 属性,该属性存储了请求的目标 URL
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
// 获取目标 URL 的协议(如 http 或 https)
String scheme = requestUrl.getScheme();
// 检查请求是否已经被路由过,或者目标 URL 的协议是否不是 http 或 https
if (isAlreadyRouted(exchange) || (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme))) {
// 如果请求已经被路由过或者协议不支持,则跳过当前过滤器,继续执行下一个过滤器
return chain.filter(exchange);
}
// 标记该请求已经被路由过
setAlreadyRouted(exchange);
// 从 ServerWebExchange 中获取当前的请求对象
ServerHttpRequest request = exchange.getRequest();
// 将 Spring 的 HttpMethod 转换为 Netty 的 HttpMethod
final HttpMethod method = HttpMethod.valueOf(request.getMethod().name());
// 将目标 URL 转换为 ASCII 字符串
final String url = requestUrl.toASCIIString();
// 对请求头进行过滤,应用配置的请求头过滤器
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
// 创建 Netty 的 HttpHeaders 对象,并将过滤后的请求头添加进去
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
// 检查是否需要保留原始的 Host 头
boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
// 从 ServerWebExchange 中获取当前的路由信息
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
// 获取配置好的 HttpClient 并发起请求,返回一个包含响应的 Flux
Flux<HttpClientResponse> responseFlux = getHttpClientMono(route, exchange)
.flatMapMany(httpClient -> httpClient.headers(headers -> {
// 添加过滤后的请求头
headers.add(httpHeaders);
// 移除原始的 Host 头
headers.remove(HttpHeaders.HOST);
// 如果需要保留原始的 Host 头,则添加到请求头中
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
headers.add(HttpHeaders.HOST, host);
}
})
// 设置请求方法
.request(method)
// 设置请求的目标 URL
.uri(url)
// 发送请求体
.send((req, nettyOutbound) -> {
if (log.isTraceEnabled()) {
nettyOutbound.withConnection(connection -> log.trace("outbound route: "
+ connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix()));
}
// 将请求体的 DataBuffer 转换为 Netty 的 ByteBuf 并发送
return nettyOutbound.send(request.getBody().map(this::getByteBuf));
})
// 处理响应和连接
.responseConnection((res, connection) -> {
// 将客户端响应和连接信息存储到 ServerWebExchange 的属性中,以便后续处理
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
// 获取当前的响应对象
ServerHttpResponse response = exchange.getResponse();
// 创建一个新的 HttpHeaders 对象,用于存储响应头
HttpHeaders headers = new HttpHeaders();
// 将客户端响应的头信息添加到新的 HttpHeaders 对象中
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
// 获取响应的 Content-Type 头信息,并存储到 ServerWebExchange 的属性中
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
}
// 设置响应的状态码
setResponseStatus(res, response);
// 对响应头进行过滤,应用配置的响应头过滤器
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,
Type.RESPONSE);
// 如果响应头中包含 Content-Length 头,则移除 Transfer-Encoding 头
if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
// 将过滤后的响应头的键存储到 ServerWebExchange 的属性中
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());
// 将过滤后的响应头添加到当前的响应对象中
response.getHeaders().addAll(filteredResponseHeaders);
// 返回一个包含客户端响应的 Mono
return Mono.just(res);
}));
// 获取路由配置的响应超时时间
Duration responseTimeout = getResponseTimeout(route);
if (responseTimeout != null) {
// 如果配置了响应超时时间,则设置超时处理逻辑
responseFlux = responseFlux
.timeout(responseTimeout,
Mono.defer(() -> Mono
.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))))
.onErrorMap(TimeoutException.class,
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
}
// 当响应处理完成后,继续执行下一个过滤器
return responseFlux.then(chain.filter(exchange));
}
定制优化动态路由配置、过滤器热加载
目前的Gateway修改路由配置或者添加新的过滤器不支持热刷新,尽管可以使用一些方式比如@RefreshScope注解配合yml动态刷新路由,但是配置方式不太好,或者修改代码需要重启服务,会降低网关的可用性,期望对Spring Cloud Gateway功能优化能够本地管理路由、过滤器,并且需要能够热刷新。
实现思路
- 路由的动态配置:定义一个DynamicRouteDefinitionLocator,内部开启定时任务监听数据库的RouteDefinition数据变化,然后动态更新给RouteDefinitionLocator的getRoutes拿到这些变化的RouteDefinition热设置给上层的 RouteLocator ,这样就能获取到最新的 RouteDefinition。因为 RouteDefinitionMonitor 是定时检查的(默认 30 秒),所以 RouteDefinition 的更新并不是实时的,会有一定的延迟。但我们其实并不需要 RouteDefinition 实时生效,所以这个延迟是可以接受的。
- 匹配规则和热过滤器热加载:想要自定义匹配规则和规律器之需要实现对应的接口AbstractRoutePredicateFactory、AbstractGatewayFilterFactory就行,但是修改代码需要重启服务,为了实现热更新,使用 Groovy 来实现自定义的匹配规则和过滤器,然后利用 JVM 对 Groovy 脚本的支持,使用 GroovyClassLoader 来解析这些 Groovy 实现的匹配规则和过滤器的类,再提供给 Gateway 相关的模块使用。
- 非注册中心服务列表配置:但对于一些没有接入注册中心 的历史服务,或者是基于外部系统搭建的服务,则还是不方便使用 注册中心的服务发现。Gateway 的路由转发 URI 支持直接配置 IP 地址或者 Host,但是只能配置一个。这些服务没有办法利用服务发现来动态管理服务实例,所以我们需要一种手动管理这些表态服务列表的方式,实现思路实现自定义StaticLoadBalancerClientFilter,类似Gateway自己的ReactiveLoadBalancerClientFilter,然后将服务的相关IP、host配置在数据库,在ReactiveLoadBalancerClientFilter过滤器执行之前执行StaticLoadBalancerClientFilter的时候,如果检查到想要访问的是非注册中心服务,是手动在数据库配置的服务,那么从数据库查询非注册中心服务信息,拿到对应的IP、host替换原始的请求路径,否则跳过自己的filter进行ReactiveLoadBalancerClientFilter保持原始逻辑。
- 管理的Admin界面:crud维护数据库中存储的动态路由、热过滤器、非注册中心服务配置等数据。
参考:Spring Cloud Gateway 原理介绍和应用
自定义路由器谓词工厂(匹配规则)demo
package org.springframework.cloud.gateway.handler.predicate;
import org.springframework.cloud.gateway.handler.predicate.AbstractRoutePredicateFactory;
import org.springframework.web.server.ServerWebExchange;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
public class CustomRoutePredicateFactory extends AbstractRoutePredicateFactory<CustomRoutePredicateFactory.Config> {
public static final String CONFIG_KEY = "contains";
public CustomRoutePredicateFactory() {
super(Config.class);
}
@Override
public List<String> shortcutFieldOrder() {
return Arrays.asList(CONFIG_KEY);
}
@Override
public Predicate<ServerWebExchange> apply(Config config) {
return exchange -> {
String path = exchange.getRequest().getURI().getPath();
return path.contains(config.getContains());
};
}
public static class Config {
private String contains;
public String getContains() {
return contains;
}
public void setContains(String contains) {
this.contains = contains;
}
}
}
修改yml配置文件
spring:
cloud:
gateway:
routes:
- id: custom_route
uri: http://example.com
predicates:
- Custom=contains=custom
filters:
- Custom=X-Custom-Header,Custom-Value
自定义过滤器demo
package org.springframework.cloud.gateway.filter.factory;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.List;
@Component
public class CustomGatewayFilterFactory extends AbstractGatewayFilterFactory<CustomGatewayFilterFactory.Config> {
public static final String HEADER_NAME_KEY = "headerName";
public static final String HEADER_VALUE_KEY = "headerValue";
public CustomGatewayFilterFactory() {
super(Config.class);
}
@Override
public List<String> shortcutFieldOrder() {
return Collections.unmodifiableList(Arrays.asList(HEADER_NAME_KEY, HEADER_VALUE_KEY));
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
ServerHttpRequest request = exchange.getRequest().mutate()
.header(config.getHeaderName(), config.getHeaderValue())
.build();
return chain.filter(exchange.mutate().request(request).build());
};
}
public static class Config {
private String headerName;
private String headerValue;
public String getHeaderName() {
return headerName;
}
public void setHeaderName(String headerName) {
this.headerName = headerName;
}
public String getHeaderValue() {
return headerValue;
}
public void setHeaderValue(String headerValue) {
this.headerValue = headerValue;
}
}
}
修改yml配置文件
spring:
cloud:
gateway:
routes:
- id: custom_route
uri: http://example.com
predicates:
- Custom=contains=custom
filters:
- Custom=X-Custom-Header,Custom-Value
参考
Flux 和 Mono 、reactor实战 (史上最全) - 疯狂创客圈 - 博客园
Reactor 3 参考指南
3W字吃透:微服务网关SpringCloud gateway底层原理和实操 - 疯狂创客圈 - 博客园