源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(三)

源码解析-SpringCloudGateway
01 源码分析-SpringCloudGateway源码阅读准备
02 源码分析-SpringCloudGateway源码阅读-网关监控端点
03 源码分析-SpringCloudGateway源码阅读-网关配置类
04 源码分析-SpringCloudGateway源码阅读-服务发现
05 源码分析-SpringCloudGateway源码阅读-网关自定义事件
06 源码分析-SpringCloudGateway源码阅读-过滤器包类总览
07 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(一)
08 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(二)
09 源码分析-SpringCloudGateway源码阅读-过滤器-全局过滤器源码(三)
10 源码分析-SpringCloudGateway源码阅读-过滤器-路由过滤器源码解读
11 源码分析-SpringCloudGateway源码阅读-处理器源码

一 本小节全局过滤器类

  • GatewayMetricsFilter 网关度量监控全局过滤器
  • ReactiveLoadBalancerClientFilter 响应式的负载均衡全局过滤器
  • LoadBalancerClientFilter 同步网关负载均衡客户端全局过滤器(已弃用)

二 源码解读

GatewayMetricsFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.util.Arrays;
import java.util.List;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.Timer.Sample;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.cloud.gateway.support.tagsprovider.GatewayHttpTagsProvider;
import org.springframework.cloud.gateway.support.tagsprovider.GatewayRouteTagsProvider;
import org.springframework.cloud.gateway.support.tagsprovider.GatewayTagsProvider;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.server.ServerWebExchange;

/**
* 网关度量监控全局过滤器
* 该过滤器主要用于做网关度量监控的,要启用,需添加spring-boot-starter-actuator依赖。
* 然后,默认情况下,只要属性spring.cloud.gateway.metrics.enabled未设置为false,GatewayMetricsFilter就会运行。
* 这些指标随后可从/actuator/metrics/gateway.requests中进行抓取
*
* @author Tony Clarke
* @author Ingyu Hwang
*/
public class GatewayMetricsFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory.getLog(GatewayMetricsFilter.class);

private final MeterRegistry meterRegistry;

private GatewayTagsProvider compositeTagsProvider;

public GatewayMetricsFilter(MeterRegistry meterRegistry,
List<GatewayTagsProvider> tagsProviders) {
this.meterRegistry = meterRegistry;
this.compositeTagsProvider = tagsProviders.stream()
.reduce(exchange -> Tags.empty(), GatewayTagsProvider::and);
}

@Deprecated
public GatewayMetricsFilter(MeterRegistry meterRegistry) {
this(meterRegistry, Arrays.asList(new GatewayHttpTagsProvider(),
new GatewayRouteTagsProvider()));
}

@Override
public int getOrder() {
// start the timer as soon as possible and report the metric event before we write
// response to client
// 在我们将响应给客户端之前,尽快启动计时器并报告度量标准事件 所以顺序在NettyWriteResponseFilter之后
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Sample sample = Timer.start(meterRegistry);

return chain.filter(exchange)
.doOnSuccess(aVoid -> endTimerRespectingCommit(exchange, sample))
.doOnError(throwable -> endTimerRespectingCommit(exchange, sample));
}

/**
* 结束计时器
* @param exchange
* @param sample
*/
private void endTimerRespectingCommit(ServerWebExchange exchange, Sample sample) {

ServerHttpResponse response = exchange.getResponse();
// HttpOutputMessage已提交
if (response.isCommitted()) {
endTimerInner(exchange, sample);
}
else {
response.beforeCommit(() -> {
endTimerInner(exchange, sample);
return Mono.empty();
});
}
}

private void endTimerInner(ServerWebExchange exchange, Sample sample) {
Tags tags = compositeTagsProvider.apply(exchange);

if (log.isTraceEnabled()) {
log.trace("gateway.requests tags: " + tags);
}
sample.stop(meterRegistry.timer("gateway.requests", tags));
}

}

ReactiveLoadBalancerClientFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.net.URI;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools;
import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.core.Ordered;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.addOriginalRequestUrl;

/**
* 响应式的负载均衡过滤器
* A {@link GlobalFilter} implementation that routes requests using reactive Spring Cloud
* LoadBalancer.
*
* @author Spencer Gibb
* @author Tim Ysewyn
* @author Olga Maciaszek-Sharma
*/
public class ReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {

private static final Log log = LogFactory
.getLog(ReactiveLoadBalancerClientFilter.class);

private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;

private final LoadBalancerClientFactory clientFactory;

private LoadBalancerProperties properties;

public ReactiveLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,
LoadBalancerProperties properties) {
this.clientFactory = clientFactory;
this.properties = properties;
}

@Override
public int getOrder() {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}

@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
// url 为空 或则scheme不是 lb
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// 保留原始 url 路径
addOriginalRequestUrl(exchange, url);

if (log.isTraceEnabled()) {
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName()
+ " url before: " + url);
}

return choose(exchange).doOnNext(response -> {

if (!response.hasServer()) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}

URI uri = exchange.getRequest().getURI();

// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = null;
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}

DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(
response.getServer(), overrideScheme);

URI requestUrl = reconstructURI(serviceInstance, uri);

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
}).then(chain.filter(exchange));
}

protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}

/**
* 负载均衡取一个实例
* @param exchange
* @return
*/
private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory
.getInstance(uri.getHost(), ReactorLoadBalancer.class,
ServiceInstance.class);
if (loadBalancer == null) {
throw new NotFoundException("No loadbalancer available for " + uri.getHost());
}
return loadBalancer.choose(createRequest());
}

private Request createRequest() {
return ReactiveLoadBalancer.REQUEST;
}

}

LoadBalancerClientFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.filter;

import java.net.URI;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.core.Ordered;
import org.springframework.web.server.ServerWebExchange;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.addOriginalRequestUrl;

/**
* 网关负载均衡客户端过滤器(同步) 已弃用
* @deprecated in favour of {@link ReactiveLoadBalancerClientFilter}
* @author Spencer Gibb
* @author Tim Ysewyn
*/
@Deprecated
public class LoadBalancerClientFilter implements GlobalFilter, Ordered {

/**
* Filter order for {@link LoadBalancerClientFilter}.
*/
public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100;

private static final Log log = LogFactory.getLog(LoadBalancerClientFilter.class);

protected final LoadBalancerClient loadBalancer;

private LoadBalancerProperties properties;

public LoadBalancerClientFilter(LoadBalancerClient loadBalancer,
LoadBalancerProperties properties) {
this.loadBalancer = loadBalancer;
this.properties = properties;
}

@Override
public int getOrder() {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}

@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
// url 为空 或则scheme不是 lb
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}

final ServiceInstance instance = choose(exchange);

if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}

URI uri = exchange.getRequest().getURI();

// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}

URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}

exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}

/**
* 负载均衡取一个实例
* @param exchange
* @return
*/
protected ServiceInstance choose(ServerWebExchange exchange) {
return loadBalancer.choose(
((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
}

}