源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(一)

源码解析-SpringCloudGateway
01 源码分析-SpringCloudGateway源码阅读准备
02 源码分析-SpringCloudGateway源码阅读-网关监控端点
03 源码分析-SpringCloudGateway源码阅读-网关配置类
04 源码分析-SpringCloudGateway源码阅读-服务发现
05 源码分析-SpringCloudGateway源码阅读-网关自定义事件
06 源码分析-SpringCloudGateway源码阅读-过滤器包类总览
07 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(一)
08 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(二)
09 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(三)
10 源码分析-SpringCloudGateway源码阅读-过滤器-路由过滤器源码解读
11 源码分析-SpringCloudGateway源码阅读-处理器源码

一 本小节全局过滤器类

  • AdaptCachedBodyGlobalFilter 缓存请求body的全局过滤器
  • RemoveCachedBodyFilter 清除缓存body的全局过滤器
  • NettyWriteResponseFilter Netty写响应全局过滤器
  • WebClientWriteResponseFilter 与NettyWriteResponseFilter功能类型(只是该过滤器实验性的,不依赖netty)

二 源码解读

AdaptCachedBodyGlobalFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import reactor.core.publisher.Mono;

import org.springframework.cloud.gateway.event.EnableBodyCachingEvent;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;

/**
* 缓存请求body的全局过滤器
* AdaptCachedBodyGlobalFilter在全局过滤器中优先级仅次于RemoveCachedBodyFilter
*/
public class AdaptCachedBodyGlobalFilter
implements GlobalFilter, Ordered, ApplicationListener<EnableBodyCachingEvent> {

private ConcurrentMap<String, Boolean> routesToCache = new ConcurrentHashMap<>();

/**
* Cached request body key.
* 缓存请求body的key
*/
@Deprecated
public static final String CACHED_REQUEST_BODY_KEY = CACHED_REQUEST_BODY_ATTR;

/**
* 支持监听EnableBodyCachingEvent,接收到该事件则对应的路由的请求body则会启用缓存
* @param event
*/
@Override
public void onApplicationEvent(EnableBodyCachingEvent event) {
this.routesToCache.putIfAbsent(event.getRouteId(), true);
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// the cached ServerHttpRequest is used when the ServerWebExchange can not be
// mutated, for example, during a predicate where the body is read, but still
// needs to be cached.
// 如果谓词
ServerHttpRequest cachedRequest = exchange
.getAttributeOrDefault(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, null);
// 如果读取body的谓词工厂(ReadBodyPredicateFactory)已经缓存过了请求body,
// 说明 ServerWebExchange.CACHED_REQUEST_BODY_ATTR之前也已经缓存过,并且把CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR缓存清空
if (cachedRequest != null) {
exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
return chain.filter(exchange.mutate().request(cachedRequest).build());
}

//
DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
// 缓存的body不为空 说明之前缓存过
if (body != null || !this.routesToCache.containsKey(route.getId())) {
return chain.filter(exchange);
}
// 此处是缓存过滤器的核心,在此工具方法中会将缓存存入网关上下文之中
// 将请求正文缓存在ServerWebExchange属性中。
// 该属性为{@link CACHED_REQUEST_BODY_ATTR}。
// 如果从无法变更ServerWebExchange的位置(例如谓词)调用此方法,则将cacheDecoratedRequest设置为true会将{@link ServerHttpRequestDecorator}放在属性{@link CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR}中,以备后用。
return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> {
// don't mutate and build if same request object
if (serverHttpRequest == exchange.getRequest()) {
return chain.filter(exchange);
}
return chain.filter(exchange.mutate().request(serverHttpRequest).build());
});
}

@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1000;
}

}

RemoveCachedBodyFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR;

/**
* 清除缓存body的全局过滤器
* RemoveCachedBodyFilter在全局过滤器中优先级最高
* 所以每次请求发送过来先将网关上线文中的请求body删除,然后再从请求中获取body缓存到网关上线文,属性是CACHED_REQUEST_BODY_ATTR(cachedRequestBody),这样就可以直接从网关上下文中拿到请求参数,而不会出现从request中拿到之后还要回填到请求体的问题
*/
public class RemoveCachedBodyFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(RemoveCachedBodyFilter.class);

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).doFinally(s -> {
// 清空缓存数据
Object attribute = exchange.getAttributes().remove(CACHED_REQUEST_BODY_ATTR);
if (attribute != null && attribute instanceof PooledDataBuffer) {
PooledDataBuffer dataBuffer = (PooledDataBuffer) attribute;
// 如果buffer已分配 则 释放
if (dataBuffer.isAllocated()) {
if (log.isTraceEnabled()) {
log.trace("releasing cached body in exchange attribute");
}
dataBuffer.release();
}
}
});
}

@Override
public int getOrder() {
// 最高优先级
return HIGHEST_PRECEDENCE;
}

}

NettyWriteResponseFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.util.List;

import io.netty.buffer.ByteBuf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR;

/**
* Netty写响应全局过滤器
* ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR交换属性中存在Netty HttpClientResponse,则NettyWriteResponseFilter将运行。
* 它在所有其他筛选器完成后运行,并将代理响应写回到网关客户端响应。
*
* @author Spencer Gibb
*/
public class NettyWriteResponseFilter implements GlobalFilter, Ordered {

/**
* Order for write response filter.
*/
public static final int WRITE_RESPONSE_FILTER_ORDER = -1;

private static final Log log = LogFactory.getLog(NettyWriteResponseFilter.class);

private final List<MediaType> streamingMediaTypes;

public NettyWriteResponseFilter(List<MediaType> streamingMediaTypes) {
this.streamingMediaTypes = streamingMediaTypes;
}

@Override
public int getOrder() {
return WRITE_RESPONSE_FILTER_ORDER;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
// until the NettyRoutingFilter is run
// @formatter:off
return chain.filter(exchange)
.doOnError(throwable -> cleanup(exchange))
.then(Mono.defer(() -> {
// 获取连接
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
// 为空直接return
if (connection == null) {
return Mono.empty();
}
if (log.isTraceEnabled()) {
log.trace("NettyWriteResponseFilter start inbound: "
+ connection.channel().id().asShortText() + ", outbound: "
+ exchange.getLogPrefix());
}
ServerHttpResponse response = exchange.getResponse();

// TODO: needed?
final Flux<DataBuffer> body = connection
.inbound()
.receive()
.retain()
.map(byteBuf -> wrap(byteBuf, response));

MediaType contentType = null;
try {
// 从响应头获取contentType
contentType = response.getHeaders().getContentType();
}
catch (Exception e) {
if (log.isTraceEnabled()) {
log.trace("invalid media type", e);
}
}
return (isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body));
})).doOnCancel(() -> cleanup(exchange));
// @formatter:on
}

protected DataBuffer wrap(ByteBuf byteBuf, ServerHttpResponse response) {
if (response.bufferFactory() instanceof NettyDataBufferFactory) {
NettyDataBufferFactory factory = (NettyDataBufferFactory) response
.bufferFactory();
return factory.wrap(byteBuf);
}
// MockServerHttpResponse creates these
else if (response.bufferFactory() instanceof DefaultDataBufferFactory) {
DefaultDataBufferFactory factory = (DefaultDataBufferFactory) response
.bufferFactory();
return factory.wrap(byteBuf.nioBuffer());
}
throw new IllegalArgumentException(
"Unkown DataBufferFactory type " + response.bufferFactory().getClass());
}

/**
* 清理exchange
* @param exchange
*/
private void cleanup(ServerWebExchange exchange) {
// 获取缓存的连接 该exchange中的CLIENT_RESPONSE_CONN_ATTR缓存 是由 NettyRoutingFilter 写入的
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
if (connection != null) {
// 关闭通道
connection.dispose();
}
}

// TODO: use framework if possible
// TODO: port to WebClientWriteResponseFilter
private boolean isStreamingMediaType(@Nullable MediaType contentType) {
// 判断给定的Media类型 是否与响应头contentType兼容
return (contentType != null && this.streamingMediaTypes.stream()
.anyMatch(contentType::isCompatibleWith));
}

}

WebClientHttpRoutingFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.net.URI;
import java.util.List;

import reactor.core.publisher.Mono;

import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter.filterRequest;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.isAlreadyRouted;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.setAlreadyRouted;

/**
* 使用WebClient请求的全局过滤器 实验性质的不依赖netty的过滤器
* @author Spencer Gibb
*/
public class WebClientHttpRoutingFilter implements GlobalFilter, Ordered {

private final WebClient webClient;

private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;

// do not use this headersFilters directly, use getHeadersFilters() instead.
private volatile List<HttpHeadersFilter> headersFilters;

public WebClientHttpRoutingFilter(WebClient webClient,
ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider) {
this.webClient = webClient;
this.headersFiltersProvider = headersFiltersProvider;
}

public List<HttpHeadersFilter> getHeadersFilters() {
if (headersFilters == null) {
headersFilters = headersFiltersProvider.getIfAvailable();
}
return headersFilters;
}

@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

String scheme = requestUrl.getScheme();
// 已路由 并且 是http或https的的schema 说明已经执行过该过滤器
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);

ServerHttpRequest request = exchange.getRequest();

HttpMethod method = request.getMethod();

HttpHeaders filteredHeaders = filterRequest(getHeadersFilters(), exchange);

boolean preserveHost = exchange
.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);

RequestBodySpec bodySpec = this.webClient.method(method).uri(requestUrl)
.headers(httpHeaders -> {
httpHeaders.addAll(filteredHeaders);
// TODO: can this support preserviceHostHeader?
if (!preserveHost) {
httpHeaders.remove(HttpHeaders.HOST);
}
});

RequestHeadersSpec<?> headersSpec;
if (requiresBody(method)) {
headersSpec = bodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
}
else {
headersSpec = bodySpec;
}

return headersSpec.exchange()
// .log("webClient route")
.flatMap(res -> {
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().putAll(res.headers().asHttpHeaders());
response.setStatusCode(res.statusCode());
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
return chain.filter(exchange);
});
}

private boolean requiresBody(HttpMethod method) {
switch (method) {
case PUT:
case POST:
case PATCH:
return true;
default:
return false;
}
}

}