源码分析-SpringCloudGateway源码阅读-处理器源码

源码解析-SpringCloudGateway
01 源码分析-SpringCloudGateway源码阅读准备
02 源码分析-SpringCloudGateway源码阅读-网关监控端点
03 源码分析-SpringCloudGateway源码阅读-网关配置类
04 源码分析-SpringCloudGateway源码阅读-服务发现
05 源码分析-SpringCloudGateway源码阅读-网关自定义事件
06 源码分析-SpringCloudGateway源码阅读-过滤器包类总览
07 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(一)
08 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(二)
09 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(三)
10 源码分析-SpringCloudGateway源码阅读-过滤器-路由过滤器源码解读
11 源码分析-SpringCloudGateway源码阅读-处理器源码

一 处理器类与相关谓词工厂类如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
.
└── org
└── springframework
└── cloud
└── gateway
└── handler
   ├── AsyncPredicate.java 异步路由谓词接口
   ├── FilteringWebHandler.java 过滤Web处理器
   ├── RoutePredicateHandlerMapping.java 路由谓词处理器映射器 - 主要用于查找路由
   └── predicate
   ├── AbstractRoutePredicateFactory.java 抽象路由谓词工厂
   ├── AfterRoutePredicateFactory.java 匹配日期之后发生的请求的路由谓词工厂
   ├── BeforeRoutePredicateFactory.java 匹配日期之前发生的请求的路由谓词工厂
   ├── BetweenRoutePredicateFactory.java 匹配时间范围内发生的请求路由谓词工厂
   ├── CloudFoundryRouteServiceRoutePredicateFactory.java CF路由服务路由谓词工厂
   ├── CookieRoutePredicateFactory.java Cookie匹配路由谓词工厂
   ├── GatewayPredicate.java 网关谓词
   ├── HeaderRoutePredicateFactory.java 请求头匹配路由谓词工厂
   ├── HostRoutePredicateFactory.java Host匹配路由谓词工厂
   ├── MethodRoutePredicateFactory.java 请求Method匹配路由谓词工厂
   ├── PathRoutePredicateFactory.java 路径匹配路由谓词工厂
   ├── PredicateDefinition.java 谓词定义
   ├── QueryRoutePredicateFactory.java 查询参数匹配路由谓词工厂
   ├── ReadBodyPredicateFactory.java 读body谓词工厂
   ├── RemoteAddrRoutePredicateFactory.java 远程地址匹配路由谓词工厂
   ├── RoutePredicateFactory.java 路由谓词工厂接口类
   └── WeightRoutePredicateFactory.java 权重路由谓词工厂

其中接口及抽象类视图如下:

RoutePredicateFactory

在包org.springframework.cloud.gateway.handler.predicate下其他的谓词工厂...PredicateFactory都是继承自AbstractRoutePredicatFactory.

二 具体类解析

RoutePredicateHandlerMapping

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.handler;

import java.util.function.Function;

import reactor.core.publisher.Mono;

import org.springframework.cloud.gateway.config.GlobalCorsProperties;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.core.env.Environment;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.reactive.handler.AbstractHandlerMapping;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping.ManagementPortType.DIFFERENT;
import static org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping.ManagementPortType.DISABLED;
import static org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping.ManagementPortType.SAME;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_HANDLER_MAPPER_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;

/**
* 路由谓词处理器映射器
* @author Spencer Gibb
*/
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {

private final FilteringWebHandler webHandler;

private final RouteLocator routeLocator;

private final Integer managementPort;

private final ManagementPortType managementPortType;

public RoutePredicateHandlerMapping(FilteringWebHandler webHandler,
RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties,
Environment environment) {
this.webHandler = webHandler;
this.routeLocator = routeLocator;

this.managementPort = getPortProperty(environment, "management.server.");
this.managementPortType = getManagementPortType(environment);
setOrder(1);
// 设置跨域配置
setCorsConfigurations(globalCorsProperties.getCorsConfigurations());
}

/**
* 获取管理端点端口类型
* @param environment
* @return
*/
private ManagementPortType getManagementPortType(Environment environment) {
Integer serverPort = getPortProperty(environment, "server.");
if (this.managementPort != null && this.managementPort < 0) {
return DISABLED;
}
return ((this.managementPort == null
|| (serverPort == null && this.managementPort.equals(8080))
|| (this.managementPort != 0 && this.managementPort.equals(serverPort)))
? SAME : DIFFERENT);
}

/**
* 获取端口配置
* @param environment
* @param prefix
* @return
*/
private static Integer getPortProperty(Environment environment, String prefix) {
return environment.getProperty(prefix + "port", Integer.class);
}

@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

// 查找路由
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
// 删除我们正在测试的当前路线
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug(
"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
// 缓存匹配的网关路由
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
// 删除我们正在测试的当前路线
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for ["
+ getExchangeDesc(exchange) + "]");
}
})));
}

@Override
protected CorsConfiguration getCorsConfiguration(Object handler,
ServerWebExchange exchange) {
// TODO: support cors configuration via properties on a route see gh-229
// see RequestMappingHandlerMapping.initCorsConfiguration()
// also see
// https://github.com/spring-projects/spring-framework/blob/master/spring-web/src/test/java/org/springframework/web/cors/reactive/CorsWebFilterTests.java
return super.getCorsConfiguration(handler, exchange);
}

// TODO: get desc from factory?
private String getExchangeDesc(ServerWebExchange exchange) {
StringBuilder out = new StringBuilder();
out.append("Exchange: ");
out.append(exchange.getRequest().getMethod());
out.append(" ");
out.append(exchange.getRequest().getURI());
return out.toString();
}

/**
* 查找路由
* @param exchange
* @return
*/
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
// 添加我们正在测试的当前路线
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error(
"Error applying predicate for route: " + route.getId(),
e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
// 校验路由 校验的方法里其实并未做什么
validateRoute(route, exchange);
return route;
});

/*
* TODO: trace logging if (logger.isTraceEnabled()) {
* logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
*/
}

/**
* Validate the given handler against the current request.
* <p>
* The default implementation is empty. Can be overridden in subclasses, for example
* to enforce specific preconditions expressed in URL mappings.
* @param route the Route object to validate
* @param exchange current exchange
* @throws Exception if validation failed
*/
@SuppressWarnings("UnusedParameters")
protected void validateRoute(Route route, ServerWebExchange exchange) {
}

/**
* 路由谓词处理器映射器的名称
* @return
*/
protected String getSimpleName() {
return "RoutePredicateHandlerMapping";
}

public enum ManagementPortType {

/**
* The management port has been disabled.
* 禁用了管理端口
*/
DISABLED,

/**
* The management port is the same as the server port.
* 管理端口与服务端口相同
*/
SAME,

/**
* The management port and server port are different.
* 管理端口与服务端口不同
*/
DIFFERENT;

}

}

FilteringWebHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.handler;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.OrderedGatewayFilter;
import org.springframework.cloud.gateway.filter.factory.GatewayFilterFactory;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationAwareOrderComparator;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebHandler;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;

/**
* 过滤Web处理器
*
* WebHandler that delegates to a chain of {@link GlobalFilter} instances and
* {@link GatewayFilterFactory} instances then to the target {@link WebHandler}.
*
* @author Rossen Stoyanchev
* @author Spencer Gibb
* @since 0.1
*/
public class FilteringWebHandler implements WebHandler {

protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class);

private final List<GatewayFilter> globalFilters;

public FilteringWebHandler(List<GlobalFilter> globalFilters) {
this.globalFilters = loadFilters(globalFilters);
}

private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
return filters.stream().map(filter -> {
// 把全局过滤器适配成 GatewayFilter
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
// 包装成有序的OrderedGatewayFilter
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
}

/*
* TODO: relocate @EventListener(RefreshRoutesEvent.class) void handleRefresh() {
* this.combinedFiltersForRoute.clear();
*/

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
// 获取路由过滤器
List<GatewayFilter> gatewayFilters = route.getFilters();
// 全局过滤器合并路由过滤器
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// TODO: needed or cached?
// 根据Order排序
AnnotationAwareOrderComparator.sort(combined);

if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
// 创建网关过滤器链
return new DefaultGatewayFilterChain(combined).filter(exchange);
}

/**
* 默认网关过滤器链
*/
private static class DefaultGatewayFilterChain implements GatewayFilterChain {

private final int index;

private final List<GatewayFilter> filters;

/**
* 从第0个开始的构造
* @param filters
*/
DefaultGatewayFilterChain(List<GatewayFilter> filters) {
this.filters = filters;
this.index = 0;
}

/**
* 可指定调用链下标的构造
* @param parent
* @param index
*/
private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
this.filters = parent.getFilters();
this.index = index;
}

public List<GatewayFilter> getFilters() {
return filters;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
// 递归调用每个过滤器filter方法
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}

}

/**
* 过滤器适配器 - 使用了适配器模式
* 将全局过滤器 适配成 GatewayFilter 易于与路由过滤器合并
*/
private static class GatewayFilterAdapter implements GatewayFilter {

private final GlobalFilter delegate;

GatewayFilterAdapter(GlobalFilter delegate) {
this.delegate = delegate;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return this.delegate.filter(exchange, chain);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{");
sb.append("delegate=").append(delegate);
sb.append('}');
return sb.toString();
}

}

}

其他谓词工厂的代码就不在这里贴出来了,谓词工厂基本都是根据给定的值去做匹配,逻辑都相对简单,需要的朋友可以直接看我的github注释