首页 > 编程语言 >手撕Gateway源码,今日撕工作流程、负载均衡源码

手撕Gateway源码,今日撕工作流程、负载均衡源码

时间:2022-11-08 17:02:05浏览次数:41  
标签:负载 调用 exchange Gateway routeUri 源码 response

Spring Cloud Gateway源码剖析

通过前面的学习,我们知道SpringCloud Gateway是一个微服务网关,主要实现不同功能服务路由,关于SpringCloud Gateway的实战使用我们就告一段落,我们接下来深入学习SpringCloud Gateway源码。

2.1 Gateway工作流程源码剖析

2.1.1 Gateway工作流程分析

手撕Gateway源码,今日撕工作流程、负载均衡源码_Java

前面我们已经学习过Gateway的工作流程,如上工作流程图,我们回顾一下工作流程:

1:所有都将由ReactorHttpHandlerAdapter.apply()方法拦截处理,此时会封装请求对象和响应对象,并传递到HttpWebHandlerAdapter.handle()方法。

2:HttpWebHandlerAdapter.handle(),将request和response封装成上下文对象ServerWebExchange,方法通过getDelegate()获取全局异常处理器ExceptionHandlingWebHandler执行全局异常处理

3:ExceptionHandlingWebHandler执行完成后,调用DispatcherHandler.handle(),循环所有handlerMappings查找处理当前请求的Handler

4:找到Handler后调用DispatcherHandler.invokeHandler()执行找到的Handler,此时会调用FilteringWebHandler.handle()

5:DefaultGatewayFilterChain.filter()是关键流程,所有过滤器都会在这里执行,比如服务查找、负载均衡、远程调用等,都在这一块。

上面工作流程我们都是基于说的层面,接下来我们一层一层分析Gateway源码,深入学习Gateway。

2.1.2 Gateway工作流程源码

我们首先来看一下Gateway拦截处理所有请求的方法handle():

/****
*处理所有请求
****/
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
//创建网关上下文对象
ServerWebExchange exchange = createExchange(request, response);

LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));

//getDelegate()获取当前的Handler
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}

上面getDelegate()方法源码如下:

/**
* Return the wrapped delegate.
* 返回WebHandler:处理web请求的对象
*/
public WebHandler getDelegate() {
return this.delegate;
}

我们进行Debug测试如下:

手撕Gateway源码,今日撕工作流程、负载均衡源码_工作流程_02

当前返回的WebHandler是​​ExceptionHandlingWebHandler​​​,而​​ExceptionHandlingWebHandler​​​的delegate是​​FilteringWebHandler​​​,而​​FilteringWebHandler​​​的delegate是​​delegate​​​是​​DispatcherHandler​​​,所有的delegate的​​handle()​​​方法都会依次执行,我们可以把断点放到​​DispatcherHandler.handler()​​方法上:

手撕Gateway源码,今日撕工作流程、负载均衡源码_Spring Cloud_03

handler()方法会调用所有handlerMappings的​​getHandler(exchange)​​​方法,而​​getHandler(exchange)​​​方法会调用​​getHandlerInternal(exchange)​​方法:

手撕Gateway源码,今日撕工作流程、负载均衡源码_Spring Cloud_04

​getHandlerInternal(exchange)​​​该方法由各个​​HandlerMapping​​​自行实现,我们可以观察下断言处理的​​RoutePredicateHandlerMapping​​​的​​getHandlerInternal(exchange)​​方法会调用lookupRoute方法,该方法用于返回对应的路由信息:

手撕Gateway源码,今日撕工作流程、负载均衡源码_Java_05

这里的路由匹配其实就是我们项目中对应路由配置的一个一个服务的信息,这些服务信息可以帮我们找到我们要调用的真实服务:

手撕Gateway源码,今日撕工作流程、负载均衡源码_负载均衡_06

每个Route对象如下:

手撕Gateway源码,今日撕工作流程、负载均衡源码_数据_07

Route的DEBUG数据如下:

手撕Gateway源码,今日撕工作流程、负载均衡源码_Spring Cloud_08

找到对应Route后会返回指定的FilterWebHandler,如下代码:

手撕Gateway源码,今日撕工作流程、负载均衡源码_数据_09

FilterWebHandler主要包含了所有的过滤器,过滤器按照一定顺序排序,主要是order值,越小越靠前排,过滤器中主要将请求交给指定真实服务处理了,debug测试如下:

手撕Gateway源码,今日撕工作流程、负载均衡源码_负载均衡_10

这里有​​RouteToRequestUrlFilter​​​和​​ForwardRoutingFilter​​​以及​​LoadBalancerClientFilter​​等多个过滤器。

2.1.3 请求处理

在上面FilterWebHandler中有2个过滤器,分别为​​RouteToRequestUrlFilter​​​和​​ForwardRoutingFilter ​​。

​RouteToRequestUrlFilter​​:用于根据匹配的 Route,计算请求地址得到 ​​lb://hailtaxi-order/order/list​

​ForwardRoutingFilter​​:转发路由网关过滤器。其根据 forward:// 前缀( Scheme )过滤处理,将请求转发到当前网关实例本地接口。

2.1.3.1 RouteToRequestUrlFilter真实服务查找

RouteToRequestUrlFilter源码如下:

/***
* 处理uri过滤器
* @param exchange
* @param chain
* @return
*/
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//获取当前的route
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
//得到uri = http://localhost:8001/order/list?token=123
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri();

if (hasAnotherScheme(routeUri)) {
// this is a special url, save scheme to special attribute
// replace routeUri with schemeSpecificPart
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}

if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
// Load balanced URIs should always have a host. If the host is null it is
// most
// likely because the host name was invalid (for example included an
// underscore)
throw new IllegalStateException("Invalid host: " + routeUri.toString());
}

//将uri换成 lb://hailtaxi-order/order/list?token=123
URI mergedUrl = UriComponentsBuilder.fromUri(uri)
// .uri(routeUri)
.scheme(routeUri.getScheme()).host(routeUri.getHost())
.port(routeUri.getPort()).build(encoded).toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
return chain.filter(exchange);
}

debug调试结果如下:

手撕Gateway源码,今日撕工作流程、负载均衡源码_Java_11

从上面调试结果我们可以看到所选择的Route以及uri和routeUri和mergedUrl,该过滤器其实就是将用户请求的地址换成服务地址,换成服务地址可以用来做负载均衡。

2.1.3.2 NettyRoutingFilter远程调用

SpringCloud在实现对后端服务远程调用是基于Netty发送Http请求实现,核心代码在​​NettyRoutingFilter.filter()​​中,其中核心代码为send()方法,代码如下:

Flux<HttpClientResponse> responseFlux = httpClientWithTimeoutFrom(route)
// 头信息处理
.headers(headers -> {
headers.add(httpHeaders);
// Will either be set below, or later by Netty
headers.remove(HttpHeaders.HOST);
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
headers.add(HttpHeaders.HOST, host);
}
// 执行发送,基于HTTP协议
}).request(method).uri(url).send((req, nettyOutbound) -> {
if (log.isTraceEnabled()) {
nettyOutbound
.withConnection(connection -> log.trace("outbound route: "
+ connection.channel().id().asShortText()
+ ", inbound: " + exchange.getLogPrefix()));
}
return nettyOutbound.send(request.getBody()
.map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
.getNativeBuffer()));
}).
// 响应结果
responseConnection((res, connection) -> {

// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
// 获取响应结果
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();

res.responseHeaders().forEach(
entry -> headers.add(entry.getKey(), entry.getValue()));

String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
contentTypeValue);
}

setResponseStatus(res, response);

// make sure headers filters run after setting status so it is
// available in response
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
getHeadersFilters(), headers, exchange, Type.RESPONSE);

if (!filteredResponseHeaders
.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders
.containsKey(HttpHeaders.CONTENT_LENGTH)) {
// It is not valid to have both the transfer-encoding header and
// the content-length header.
// Remove the transfer-encoding header in the response if the
// content-length header is present.
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}

exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
filteredResponseHeaders.keySet());

response.getHeaders().putAll(filteredResponseHeaders);

return Mono.just(res);
});

Duration responseTimeout = getResponseTimeout(route);

上面send方法最终会调用​​ChannelOperations>send()​​方法,而该方法其实是基于了Netty实现数据发送,核心代码如下:

手撕Gateway源码,今日撕工作流程、负载均衡源码_数据_12

2.1.3.3 Netty特性

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,他的并发性能得到了很大提高,对比于BIO(Blocking I/O,阻塞IO),隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。Netty 是一个广泛使用的 Java 网络编程框架。

传输极快

Netty的传输快其实也是依赖了NIO的一个特性——零拷贝。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。 Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。

手撕Gateway源码,今日撕工作流程、负载均衡源码_Java_13

良好的封装

Netty无论是性能还是封装性都远远超越传统Socket编程。

手撕Gateway源码,今日撕工作流程、负载均衡源码_Java_14

Channel:表示一个连接,可以理解为每一个请求,就是一个Channel。

ChannelHandler:核心处理业务就在这里,用于处理业务请求。

ChannelHandlerContext:用于传输业务数据。

ChannelPipeline:用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。

ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便你对整段字节缓存进行读写,也支持get/set,方便你对其中每一个字节进行读写,他的数据结构如下图所示:

手撕Gateway源码,今日撕工作流程、负载均衡源码_工作流程_15

2.2 Gateway负载均衡源码剖析

前面源码剖析主要剖析了Gateway的工作流程,我们接下来剖析Gateway的负载均衡流程。在最后的过滤器集合中有​​LoadBalancerClientFilter​​过滤器,该过滤器是用于实现负载均衡。

2.2.1 地址转换

​LoadBalancerClientFilter​​过滤器首先会将用户请求地址转换成真实服务地址,也就是IP:端口号,源码如下:

/***
* 负载均衡过滤
* @param exchange
* @param chain
* @return
*/
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//负载均衡的URL = lb://hailtaxi-order/order/list?token=123
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}

//服务选择
final ServiceInstance instance = choose(exchange);

if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}
//用户提交的URI = http://localhost:8001/order/list?token=123
URI uri = exchange.getRequest().getURI();

// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
//真实服务的URL =http://192.168.211.1:18182/order/list?token=123
URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}

exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}

2.2.2 负载均衡服务选择

上面代码的关键是​​choose(exchange)​​的调用,该方法调用其实就是选择指定服务,这里涉及到负载均衡服务轮询调用算法等,我们可以跟踪进去查看方法执行流程。

手撕Gateway源码,今日撕工作流程、负载均衡源码_Java_16

Gateway自身已经集成Ribbon,所以看到的对象是RibbonLoadBalancerClient,我们跟踪进去接着查看:

手撕Gateway源码,今日撕工作流程、负载均衡源码_Spring Cloud_17

上面方法会依次调用到getInstance()方法,该方法会返回所有可用实例,有可能有多个实例,如果有多个实例就涉及到负载均衡算法,方法调用如下图:

手撕Gateway源码,今日撕工作流程、负载均衡源码_Spring Cloud_18

此时调用getServer()方法,再调用​​BaseLoadBalancer.chooseServer()​​,这里是根据指定算法获取对应实例,代码如下:

手撕Gateway源码,今日撕工作流程、负载均衡源码_工作流程_19

​BaseLoadBalancer​​​是属于Ribbon的算法,我们可以通过如下依赖包了解,并且该算法默认用的是​​RoundRobinRule​​,也就是随机算法,如下代码:

手撕Gateway源码,今日撕工作流程、负载均衡源码_负载均衡_20


本文由传智教育博学谷 - 狂野架构师教研团队发布 如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力 转载请注明出处!

标签:负载,调用,exchange,Gateway,routeUri,源码,response
From: https://blog.51cto.com/boxuegu/5833765

相关文章

  • 【性能分析】平均负载
    目录一、平均负载介绍二、工具说明三、典型场景分析1、CPU密集型2、IO密集型3、多进程四、思考参考:(1)linux性能优化实战(2)LinuxLoadAverages:SolvingtheMystery一、......
  • Kubelet源码分析(一):工作原理以及启动流程分析
    一、概要kubelet是运行在每个节点上的主要的“节点代理”,每个节点都会启动kubelet进程,用来处理Master节点下发到本节点的任务,按照PodSpec描述来管理Pod和其中的容......
  • Redisson源码解读-公平锁
    前言我在上一篇文章聊了Redisson的可重入锁,这次继续来聊聊Redisson的公平锁。下面是官方原话:它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线......
  • SQLite源码安装部署
    1.下载下载地址:https://www.sqlite.org/download.html我这里下载的是:sqlite-autoconf-3390400.tar.gz 2.解压编译[root@localhostsoft]#yuminstallgcc[root@localh......
  • log4go使用及源码解析
    1.功能模块1.1配置文件日志输出支持:控制台、log文件、xml、分级日志:FINEST|FINE|DEBUG|TRACE|INFO|WARNING|ERROR 2.源码解析  2.1文件结构    2.2 ......
  • vue源码分析-v-model的本质
    双向数据绑定这个概念或者大家并不陌生,视图影响数据,数据同样影响视图,两者间有双向依赖的关系。在响应式系统构建的上,中,下篇我已经对数据影响视图的原理详细阐述清楚了。而......
  • vue源码中的渲染过程是怎样的
    4.1VirtualDOM4.1.1浏览器的渲染流程当浏览器接收到一个Html文件时,JS引擎和浏览器的渲染引擎便开始工作了。从渲染引擎的角度,它首先会将html文件解析成一个DOM树,与此......
  • vue源码分析-diff算法核心原理
    这一节,依然是深入剖析Vue源码系列,上几节内容介绍了VirtualDOM是Vue在渲染机制上做的优化,而渲染的核心在于数据变化时,如何高效的更新节点,这就是diff算法。由于源码中关于d......
  • 【深入浅出 Yarn 架构与实现】1-2 搭建 Hadoop 源码阅读环境
    本文将介绍如何使用idea搭建Hadoop源码阅读环境。(默认已安装好Java、Maven环境)一、搭建源码阅读环境一)idea导入hadoop工程从github上拉取代码。https://gith......
  • QT5-打开源码中的所有打印信息
    移植QT时,可能要查看QT的源码中的打印日志,以便确定问题。修改方法如下:/etc/profile文件exportQT_DEBUG_PLUGINS=1exportQT_LOGGING_RULES="*=false;*=true"规则如下:关闭所......