有效性
Spring Cloud Gateway 2.0.0.RELEASE
調(diào)試方法
新建一個GlobalFilter,在filter中加斷點即可調(diào)試filter,通過chain參數(shù)可以查看其它的filter及執(zhí)行順序(order)
filters(按執(zhí)行順序)
1. AdaptCachedBodyGlobalFilter
核心代碼
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1000;
}
public static final String CACHED_REQUEST_BODY_KEY = "cachedRequestBody";
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Flux<DataBuffer> body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_KEY, null);
if (body != null) {
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return body;
}
};
return chain.filter(exchange.mutate().request(decorator).build());
}
return chain.filter(exchange);
}
提供替換request 的 body的能力
2.NettyWriteResponseFilter
核心代碼
public static final int WRITE_RESPONSE_FILTER_ORDER = -1;
public int getOrder() {
return WRITE_RESPONSE_FILTER_ORDER;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).then(Mono.defer(() -> {
//見 后文的 NettyRoutingFilter
HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
final Flux<NettyDataBuffer> body = clientResponse.receive()
.map(factory::wrap);
MediaType contentType = response.getHeaders().getContentType();
return (isStreamingMediaType(contentType) ?
response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body));
}));
}
具體的將被代理的服務的內(nèi)容返回的類,文檔
3.ForwardPathFilter
核心代碼
public int getOrder() {
return 0;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
URI routeUri = route.getUri();
String scheme = routeUri.getScheme();
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
exchange = exchange.mutate().request(
exchange.getRequest().mutate().path(routeUri.getPath()).build())
.build();
return chain.filter(exchange);
}
forward協(xié)議的url替換類
4.在Route中配置的各種GatewayFilter
核心代碼
/**
* RouteDefinitionRouteLocator#loadGatewayFilters GatewayFilter的order
*/
ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
for (int i = 0; i < filters.size(); i++) {
GatewayFilter gatewayFilter = filters.get(i);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
}
else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
根據(jù)配置不同實現(xiàn)具體的功能,詳見文檔
5.RouteToRequestUrlFilter
核心代碼
public static final int ROUTE_TO_URL_FILTER_ORDER = 10000;
public int getOrder() {
return ROUTE_TO_URL_FILTER_ORDER;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri();
//匹配 http:http://locahost:80/a/b/c?q=1,并把第一個 http: 去掉
if (hasAnotherScheme(routeUri)) {
// uri格式 [scheme:]scheme-specific-part[#fragment]
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}
URI requestUrl = UriComponentsBuilder.fromUri(uri)
.uri(routeUri)
.build(encoded)
.toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
private static final String SCHEME_REGEX = "[a-zA-Z]([a-zA-Z]|\\d|\\+|\\.|-)*:.*";
static final Pattern schemePattern = Pattern.compile(SCHEME_REGEX);
static boolean hasAnotherScheme(URI uri) {
return schemePattern.matcher(uri.getSchemeSpecificPart()).matches() && uri.getHost() == null
&& uri.getRawPath() == null;
}
路由功能的具體執(zhí)行類,文檔
6.LoadBalancerClientFilter(如果啟用了eureka)
核心代碼
public static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10100;
public int getOrder() {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
//一大波轉(zhuǎn)換操作
addOriginalRequestUrl(exchange, url);
final ServiceInstance instance = loadBalancer.choose(url.getHost());
if (instance == null) {
throw new NotFoundException("Unable to find instance for " + url.getHost());
}
URI uri = exchange.getRequest().getURI();
String overrideScheme = null;
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
//轉(zhuǎn)換后的url填入 GATEWAY_REQUEST_URL_ATTR 屬性
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
lb協(xié)議的路由功能,文檔
7.WebsocketRoutingFilter
核心代碼
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE - 1;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//upgrade頭 見https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism
//或見 https://httpwg.org/specs/rfc7230.html#header.upgrade
changeSchemeIfIsWebSocketUpgrade(exchange);
//跳過一大波參數(shù)檢查與參數(shù)獲取
return this.webSocketService.handleRequest(exchange,
new ProxyWebSocketHandler(requestUrl, this.webSocketClient,
filtered, protocols));
}
/**
* ProxyWebSocketHandler#handle 橋接兩個webSocket
*/
public Mono<Void> handle(WebSocketSession session) { //session為客戶端
return client.execute(url, this.headers, new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession proxySession) { //proxySession為被代理的WebSocket
Mono<Void> proxySessionSend = proxySession
.send(session.receive().doOnNext(WebSocketMessage::retain));
Mono<Void> serverSessionSend = session
.send(proxySession.receive().doOnNext(WebSocketMessage::retain));
return Mono.zip(proxySessionSend, serverSessionSend).then();
}
//省略其它方法
});
}
WebSocket的代理功能,文檔
8.NettyRoutingFilter
核心代碼
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
//省略一大波參數(shù)獲取和參數(shù)校驗
final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
final String url = requestUrl.toString();
return this.httpClient.request(method, url, req -> {
//省略http數(shù)據(jù)發(fā)送代碼
}).doOnNext(res -> {
ServerHttpResponse response = exchange.getResponse();
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
//注意,如果ContentType為null會 NPE,特別是301或302跳轉(zhuǎn)
exchange.getAttributes().put("original_response_content_type", headers.getContentType());
//省略其它http解析代碼
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); //與前面的 NettyWriteResponseFilter 對應
}).then(chain.filter(exchange));
}
}
http協(xié)議的代理功能,文檔
9.ForwardRoutingFilter
核心代碼
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
if (log.isTraceEnabled()) {
log.trace("Forwarding to URI: "+requestUrl);
}
return this.dispatcherHandler.handle(exchange);
}
將未處理的forward協(xié)議的請求交由spring來處理,文檔
其中 NettyRoutingFilter 和 NettyWriteResponseFilter 內(nèi)置有 WebClientHttpRoutingFilter和WebClientWriteResponseFilter 作為備用替換版本。