源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(二)

源码解析-SpringCloudGateway
01 源码分析-SpringCloudGateway源码阅读准备
02 源码分析-SpringCloudGateway源码阅读-网关监控端点
03 源码分析-SpringCloudGateway源码阅读-网关配置类
04 源码分析-SpringCloudGateway源码阅读-服务发现
05 源码分析-SpringCloudGateway源码阅读-网关自定义事件
06 源码分析-SpringCloudGateway源码阅读-过滤器包类总览
07 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(一)
08 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(二)
09 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(三)
10 源码分析-SpringCloudGateway源码阅读-过滤器-路由过滤器源码解读
11 源码分析-SpringCloudGateway源码阅读-处理器源码

一 本小节全局过滤器类

  • NettyRoutingFilter Netty路由全局过滤器
  • WebClientHttpRoutingFilter 使用WebClient请求的全局过滤器 实验性质的不依赖netty的过滤器(只是该过滤器实验性的,不依赖netty)
  • RouteToRequestUrlFilter 路由到请求Url的全局过滤器
  • ForwardRoutingFilter 转发路由全局过滤器
  • ForwardPathFilter 转发路径全局过滤器
  • WebsocketRoutingFilter Websocket路由全局过虑器

二 源码解读

NettyRoutingFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.net.URI;
import java.time.Duration;
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientResponse;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.config.HttpClientProperties;
import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;
import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter.Type;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.TimeoutException;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.AbstractServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ResponseStatusException;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter.filterRequest;
import static org.springframework.cloud.gateway.support.RouteMetadataUtils.CONNECT_TIMEOUT_ATTR;
import static org.springframework.cloud.gateway.support.RouteMetadataUtils.RESPONSE_TIMEOUT_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_HEADER_NAMES;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.isAlreadyRouted;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.setAlreadyRouted;

/**
* Netty路由全局过滤器,最低优先级路由器
* 如果ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR的exchange属性中的URL带有http或https,则将运行NettyRoutingFilter。它使用Netty HttpClient发出下游代理请求。
* 响应将放入ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR的exchange属性中,供后面的过滤器使用。
* (还有一个实验性的WebClientHttpRoutingFilter,它执行相同的功能,但不需要Netty。)
*
* @author Spencer Gibb
* @author Biju Kunjummen
*/
public class NettyRoutingFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(NettyRoutingFilter.class);

private final HttpClient httpClient;

private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;

private final HttpClientProperties properties;

// do not use this headersFilters directly, use getHeadersFilters() instead.
private volatile List<HttpHeadersFilter> headersFilters;

public NettyRoutingFilter(HttpClient httpClient,
ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider,
HttpClientProperties properties) {
this.httpClient = httpClient;
this.headersFiltersProvider = headersFiltersProvider;
this.properties = properties;
}

/**
* 获取请求头过滤器
* @return
*/
public List<HttpHeadersFilter> getHeadersFilters() {
if (headersFilters == null) {
headersFilters = headersFiltersProvider.getIfAvailable();
}
return headersFilters;
}

@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}

@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 获取请求url
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

String scheme = requestUrl.getScheme();
// 如果请求已路由 并且 是http或https的schema
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
// 设置 GATEWAY_ALREADY_ROUTED_ATTR 标识已路由
setAlreadyRouted(exchange);

ServerHttpRequest request = exchange.getRequest();

final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
final String url = requestUrl.toASCIIString();
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

// 把请求头设置到 new DefaultHttpHeaders
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);

// 获取保留的host 由 PreserveHostHeaderGatewayFilterFactory 写入缓存的host头
boolean preserveHost = exchange
.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange)
.headers(headers -> {
headers.add(httpHeaders);
// 先清掉所有host头
headers.remove(HttpHeaders.HOST);
if (preserveHost) {
// 获取到第一个host头,添加进去
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
headers.add(HttpHeaders.HOST, host);
}
}).request(method).uri(url).send((req, nettyOutbound) -> {
if (log.isTraceEnabled()) {
nettyOutbound
.withConnection(connection -> log.trace("outbound route: "
+ connection.channel().id().asShortText()
+ ", inbound: " + exchange.getLogPrefix()));
}
return nettyOutbound.send(request.getBody().map(this::getByteBuf));
}).responseConnection((res, connection) -> {

// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
// 推迟提交响应,直到所有路由筛选器都运行为止。
// 将客户端响应作为ServerWebExchange属性,并在以后写入响应NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();

res.responseHeaders().forEach(
entry -> headers.add(entry.getKey(), entry.getValue()));

// 获取到contentType的值
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
// 缓存contentType的值
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
contentTypeValue);
}
// 设置响应状态
setResponseStatus(res, response);

// make sure headers filters run after setting status so it is
// available in response
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
getHeadersFilters(), headers, exchange, Type.RESPONSE);

if (!filteredResponseHeaders
.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders
.containsKey(HttpHeaders.CONTENT_LENGTH)) {
// It is not valid to have both the transfer-encoding header and
// the content-length header.
// Remove the transfer-encoding header in the response if the
// content-length header is present.
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}

exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
filteredResponseHeaders.keySet());

response.getHeaders().putAll(filteredResponseHeaders);

return Mono.just(res);
});

Duration responseTimeout = getResponseTimeout(route);
if (responseTimeout != null) {
responseFlux = responseFlux
.timeout(responseTimeout, Mono.error(new TimeoutException(
"Response took longer than timeout: " + responseTimeout)))
.onErrorMap(TimeoutException.class,
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
th.getMessage(), th));
}

return responseFlux.then(chain.filter(exchange));
}

protected ByteBuf getByteBuf(DataBuffer dataBuffer) {
if (dataBuffer instanceof NettyDataBuffer) {
NettyDataBuffer buffer = (NettyDataBuffer) dataBuffer;
return buffer.getNativeBuffer();
}
// MockServerHttpResponse creates these
else if (dataBuffer instanceof DefaultDataBuffer) {
DefaultDataBuffer buffer = (DefaultDataBuffer) dataBuffer;
return Unpooled.wrappedBuffer(buffer.getNativeBuffer());
}
throw new IllegalArgumentException(
"Unable to handle DataBuffer of type " + dataBuffer.getClass());
}

/**
* 设置响应状态
* @param clientResponse
* @param response
*/
private void setResponseStatus(HttpClientResponse clientResponse,
ServerHttpResponse response) {
HttpStatus status = HttpStatus.resolve(clientResponse.status().code());
if (status != null) {
response.setStatusCode(status);
}
else {
// 响应式的 ServerHttpResponseDecorator
while (response instanceof ServerHttpResponseDecorator) {
response = ((ServerHttpResponseDecorator) response).getDelegate();
}
if (response instanceof AbstractServerHttpResponse) {
((AbstractServerHttpResponse) response)
.setStatusCodeValue(clientResponse.status().code());
}
else {
// TODO: log warning here, not throw error?
throw new IllegalStateException("Unable to set status code "
+ clientResponse.status().code() + " on response of type "
+ response.getClass().getName());
}
}
}

/**
* Creates a new HttpClient with per route timeout configuration. Sub-classes that
* override, should call super.getHttpClient() if they want to honor the per route
* timeout configuration.
* @param route the current route.
* @param exchange the current ServerWebExchange.
* @return
*/
protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {
Object connectTimeoutAttr = route.getMetadata().get(CONNECT_TIMEOUT_ATTR);
if (connectTimeoutAttr != null) {
Integer connectTimeout = getInteger(connectTimeoutAttr);
return this.httpClient.tcpConfiguration((tcpClient) -> tcpClient
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout));
}
return httpClient;
}

static Integer getInteger(Object connectTimeoutAttr) {
Integer connectTimeout;
if (connectTimeoutAttr instanceof Integer) {
connectTimeout = (Integer) connectTimeoutAttr;
}
else {
connectTimeout = Integer.parseInt(connectTimeoutAttr.toString());
}
return connectTimeout;
}

private Duration getResponseTimeout(Route route) {
Object responseTimeoutAttr = route.getMetadata().get(RESPONSE_TIMEOUT_ATTR);
Long responseTimeout = null;
if (responseTimeoutAttr != null) {
if (responseTimeoutAttr instanceof Number) {
responseTimeout = ((Number) responseTimeoutAttr).longValue();
}
else {
responseTimeout = Long.valueOf(responseTimeoutAttr.toString());
}
}
return responseTimeout != null ? Duration.ofMillis(responseTimeout)
: properties.getResponseTimeout();
}

}

WebClientHttpRoutingFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR;

/**
* 使用WebClient写响应过滤器 实验性质的不依赖netty的过滤器
* @author Spencer Gibb
*/
public class WebClientWriteResponseFilter implements GlobalFilter, Ordered {

/**
* Order of Write Response Filter.
*/
public static final int WRITE_RESPONSE_FILTER_ORDER = -1;

private static final Log log = LogFactory.getLog(WebClientWriteResponseFilter.class);

@Override
public int getOrder() {
return WRITE_RESPONSE_FILTER_ORDER;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
// until the WebHandler is run
return chain.filter(exchange).doOnError(throwable -> cleanup(exchange))
.then(Mono.defer(() -> {
// 获取客户端响应属性
ClientResponse clientResponse = exchange
.getAttribute(CLIENT_RESPONSE_ATTR);
if (clientResponse == null) {
return Mono.empty();
}
log.trace("WebClientWriteResponseFilter start");
ServerHttpResponse response = exchange.getResponse();

return response
.writeWith(
clientResponse.body(BodyExtractors.toDataBuffers()))
// .log("webClient response")
.doOnCancel(() -> cleanup(exchange));
}));
}

private void cleanup(ServerWebExchange exchange) {
ClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
if (clientResponse != null) {
clientResponse.bodyToMono(Void.class).subscribe();
}
}

}

RouteToRequestUrlFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.net.URI;
import java.util.regex.Pattern;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.cloud.gateway.route.Route;
import org.springframework.core.Ordered;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.UriComponentsBuilder;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.containsEncodedParts;

/**
* 路由到请求Url的全局过滤器
*
* 如果ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR的exchange属性中存在Route对象,则RouteToRequestUrlFilter将运行。
* 它基于请求URI创建一个新URI,但使用Route对象的URI属性进行更新。
* 新的URI放置在ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR的exchange属性中。
* @author Spencer Gibb
*/
public class RouteToRequestUrlFilter implements GlobalFilter, Ordered {

/**
* Order of Route to URL.
*/
public static final int ROUTE_TO_URL_FILTER_ORDER = 10000;

private static final Log log = LogFactory.getLog(RouteToRequestUrlFilter.class);

// schema正则
private static final String SCHEME_REGEX = "[a-zA-Z]([a-zA-Z]|\\d|\\+|\\.|-)*:.*";
static final Pattern schemePattern = Pattern.compile(SCHEME_REGEX);

/* for testing */
static boolean hasAnotherScheme(URI uri) {
return schemePattern.matcher(uri.getSchemeSpecificPart()).matches()
&& uri.getHost() == null && uri.getRawPath() == null;
}

@Override
public int getOrder() {
return ROUTE_TO_URL_FILTER_ORDER;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 获取往网关路由属性exchange中的缓存
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri();

if (hasAnotherScheme(routeUri)) {
// this is a special url, save scheme to special attribute
// replace routeUri with schemeSpecificPart
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}
// 如果schema是 lb 并且 host == null host必须有
if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
// Load balanced URIs should always have a host. If the host is null it is
// most
// likely because the host name was invalid (for example included an
// underscore)
throw new IllegalStateException("Invalid host: " + routeUri.toString());
}

URI mergedUrl = UriComponentsBuilder.fromUri(uri)
// .uri(routeUri)
.scheme(routeUri.getScheme()).host(routeUri.getHost())
.port(routeUri.getPort()).build(encoded).toUri();
// 设置路由请求url属性
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
return chain.filter(exchange);
}

}

ForwardRoutingFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.net.URI;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.core.Ordered;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.isAlreadyRouted;

/**
* 转发路由过滤器
*
* ForwardRoutingFilter在exchange属性ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR中查找URI。
* 如果URL具有转发scheme(例如forward:/// localendpoint),则它将使用Spring DispatcherHandler来处理请求。
* 请求URL的路径部分被转发URL中的路径覆盖。
* 未经修改的原始URL会附加到ServerWebExchangeUtils.GATEWAY_ORIGINAL_REQUEST_URL_ATTR属性中的列表中。
*/
public class ForwardRoutingFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(ForwardRoutingFilter.class);

private final ObjectProvider<DispatcherHandler> dispatcherHandlerProvider;

// do not use this dispatcherHandler directly, use getDispatcherHandler() instead.
private volatile DispatcherHandler dispatcherHandler;

public ForwardRoutingFilter(
ObjectProvider<DispatcherHandler> dispatcherHandlerProvider) {
this.dispatcherHandlerProvider = dispatcherHandlerProvider;
}

private DispatcherHandler getDispatcherHandler() {
if (dispatcherHandler == null) {
dispatcherHandler = dispatcherHandlerProvider.getIfAvailable();
}

return dispatcherHandler;
}

@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 获取请求头url 是有处理的
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

String scheme = requestUrl.getScheme();
// 已路由 并且 schema是 forward
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}

// TODO: translate url?

if (log.isTraceEnabled()) {
log.trace("Forwarding to URI: " + requestUrl);
}
// 转发处理
return this.getDispatcherHandler().handle(exchange);
}

}

ForwardPathFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.net.URI;

import reactor.core.publisher.Mono;

import org.springframework.cloud.gateway.route.Route;
import org.springframework.core.Ordered;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.isAlreadyRouted;

/**
* 转发路径过滤器 - 只要有 forward转发
* Filter to set the path in the request URI if the {@link Route} URI has the scheme
* <code>forward</code>.
*
* @author Ryan Baxter
*/
public class ForwardPathFilter implements GlobalFilter, Ordered {

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
URI routeUri = route.getUri();
String scheme = routeUri.getScheme();
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
exchange = exchange.mutate()
.request(exchange.getRequest().mutate().path(routeUri.getPath()).build())
.build();
return chain.filter(exchange);
}

@Override
public int getOrder() {
return 0;
}

}

WebsocketRoutingFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.util.UriComponentsBuilder;

import static org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter.filterRequest;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.containsEncodedParts;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.isAlreadyRouted;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.setAlreadyRouted;
import static org.springframework.util.StringUtils.commaDelimitedListToStringArray;

/**
* websocket过滤器
* @author Spencer Gibb
* @author Nikita Konev
*/
public class WebsocketRoutingFilter implements GlobalFilter, Ordered {

/**
* Sec-Websocket protocol.
*/
public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";

private static final Log log = LogFactory.getLog(WebsocketRoutingFilter.class);

/**
* websocket客户端
*/
private final WebSocketClient webSocketClient;

/**
* websocket服务
*/
private final WebSocketService webSocketService;

private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;

// do not use this headersFilters directly, use getHeadersFilters() instead.
private volatile List<HttpHeadersFilter> headersFilters;

public WebsocketRoutingFilter(WebSocketClient webSocketClient,
WebSocketService webSocketService,
ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) {
this.webSocketClient = webSocketClient;
this.webSocketService = webSocketService;
this.headersFiltersProvider = headersFiltersProvider;
}

/* for testing */
static String convertHttpToWs(String scheme) {
scheme = scheme.toLowerCase();
return "http".equals(scheme) ? "ws" : "https".equals(scheme) ? "wss" : scheme;
}

@Override
public int getOrder() {
// Before NettyRoutingFilter since this routes certain http requests
return Ordered.LOWEST_PRECEDENCE - 1;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
changeSchemeIfIsWebSocketUpgrade(exchange);

URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();

// 如果已经路由 或者 并非 ws 或 wss 的 scheme
if (isAlreadyRouted(exchange)
|| (!"ws".equals(scheme) && !"wss".equals(scheme))) {
return chain.filter(exchange);
}
// 设置为已路由
setAlreadyRouted(exchange);

HttpHeaders headers = exchange.getRequest().getHeaders();
// 后去请求头过滤器
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

// 获取请求头的协议
List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
if (protocols != null) {
protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream().flatMap(
header -> Arrays.stream(commaDelimitedListToStringArray(header)))
.map(String::trim).collect(Collectors.toList());
}

return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(
requestUrl, this.webSocketClient, filtered, protocols));
}

/**
* 获取请求头过滤器
* @return
*/
private List<HttpHeadersFilter> getHeadersFilters() {
if (this.headersFilters == null) {
this.headersFilters = this.headersFiltersProvider
.getIfAvailable(ArrayList::new);

headersFilters.add((headers, exchange) -> {
HttpHeaders filtered = new HttpHeaders();
headers.entrySet().stream()
.filter(entry -> !entry.getKey().toLowerCase()
.startsWith("sec-websocket"))
.forEach(header -> filtered.addAll(header.getKey(),
header.getValue()));
return filtered;
});
}

return this.headersFilters;
}

/**
* 如果Web套接字升级则更改Scheme
* @param exchange
*/
static void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) {
// Check the Upgrade
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme().toLowerCase();
String upgrade = exchange.getRequest().getHeaders().getUpgrade();
// change the scheme if the socket client send a "http" or "https"
if ("WebSocket".equalsIgnoreCase(upgrade)
&& ("http".equals(scheme) || "https".equals(scheme))) {
String wsScheme = convertHttpToWs(scheme);
boolean encoded = containsEncodedParts(requestUrl);
URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme)
.build(encoded).toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);
if (log.isTraceEnabled()) {
log.trace("changeSchemeTo:[" + wsRequestUrl + "]");
}
}
}

/**
* 代理Websocket处理器
*/
private static class ProxyWebSocketHandler implements WebSocketHandler {

private final WebSocketClient client;

private final URI url;

private final HttpHeaders headers;

private final List<String> subProtocols;

ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers,
List<String> protocols) {
this.client = client;
this.url = url;
this.headers = headers;
if (protocols != null) {
this.subProtocols = protocols;
}
else {
this.subProtocols = Collections.emptyList();
}
}

@Override
public List<String> getSubProtocols() {
return this.subProtocols;
}

@Override
public Mono<Void> handle(WebSocketSession session) {
// pass headers along so custom headers can be sent through
// 传递header,以便可以通过发送自定义header
return client.execute(url, this.headers, new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession proxySession) {
// Use retain() for Reactor Netty
Mono<Void> proxySessionSend = proxySession
.send(session.receive().doOnNext(WebSocketMessage::retain));
// .log("proxySessionSend", Level.FINE);
Mono<Void> serverSessionSend = session.send(
proxySession.receive().doOnNext(WebSocketMessage::retain));
// .log("sessionSend", Level.FINE);
return Mono.zip(proxySessionSend, serverSessionSend).then();
}

/**
* Copy subProtocols so they are available downstream.
* @return
*/
@Override
public List<String> getSubProtocols() {
return ProxyWebSocketHandler.this.subProtocols;
}
});
}

}

}