首页 > 其他分享 >聊聊在springcloud gateway如何获取请求体

聊聊在springcloud gateway如何获取请求体

时间:2024-07-30 09:18:10浏览次数:14  
标签:body return exchange springcloud gateway 聊聊 EnableBodyCachingEvent null public

前言

在我们扩展scg时,获取requestbody也是一个挺常见的需求了,比如记录日志,我们要获取请求体里面的内容。在HTTP协议中,服务器接收到客户端的请求时,请求体(RequestBody)通常是以流的形式传输的。这个流在设计上是只读且不可重复读取的。即request body只能读取一次,但我们很多时候是更希望这个requestbody可以被多次读取,那我们今天就来聊下这个话题

实现思路

通常我们会实现一个全局过滤器,并将过滤器的优先级调到最高。

该过滤器调到最高的原因是防止一些内置过滤器优先读取到requestbody,会导致我们这个过滤器读取到requestbody,就已经报body只能读取一次的异常。

异常如下

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: Only one connection receive subscriber allowed.
Caused by: java.lang.IllegalStateException: Only one connection receive subscriber allowed.

在这个过滤器里面我们要实现的功能如下

  1. 将原有的request请求中的body内容读出来
  2. 使用ServerHttpRequestDecorator这个请求装饰器对request进行包装,重写getBody方法
  3. 将包装后的请求放到过滤器链中传递下去

示例

@RequiredArgsConstructor
public class RequestBodyParamsFetchGlobalFilter implements Ordered, GlobalFilter {

    private final GwCommonProperty gwCommonProperty;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        if (isSkipFetchRequestBodyParams(exchange)) {
            return chain.filter(exchange);
        } else {
            return DataBufferUtils.join(exchange.getRequest().getBody())
                    .flatMap(dataBuffer -> {
                        DataBufferUtils.retain(dataBuffer);
                        Flux<DataBuffer> cachedFlux = Flux
                                .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                         exchange.getAttributes().put(REQUEST_BODY_PARAMS_ATRR_NAME, RouteUtil.getRequestBodyParams(exchange));
                        ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(
                                exchange.getRequest()) {
                            @Override
                            public Flux<DataBuffer> getBody() {
                                return cachedFlux;
                            }
                        };
                        return chain.filter(exchange.mutate().request(mutatedRequest).build());
                    });
        }
    }

    private boolean isSkipFetchRequestBodyParams(ServerWebExchange exchange){
        if(!gwCommonProperty.isFetchRequestBodyParams()){
            return true;
        }

        if(exchange.getRequest().getHeaders().getContentType() == null && !HttpMethod.POST.name().equalsIgnoreCase(Objects.requireNonNull(exchange.getRequest().getMethod()).name())){
            return true;
        }else{
            return false;
        }
    }

@Override
public int getOrder() {
    return Ordered.HIGHEST_PRECEDENCE;
    }
}

大家如果搜索一下,scg获取请求体,有很大一部分都是这种写法。这种写法基本上是可以满足我们的需求。但是在请求压力比较大的情况下,可能会堆外内存溢出问题

reactor.netty.ReactorNetty$InternalNettyException: io.netty.util.internal.OutOfDirectMemoryError:failed to allocate

有没有更好的实现方式

我这边使用的springcloud版本是Hoxton.SR3,在这个版本我发现了一个挺好玩的过滤器

org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter

见名之意,这就是一个自适应的缓存body全局过滤器。这个过滤器的代码如下

public class AdaptCachedBodyGlobalFilter
		implements GlobalFilter, Ordered, ApplicationListener<EnableBodyCachingEvent> {

	private ConcurrentMap<String, Boolean> routesToCache = new ConcurrentHashMap<>();

	/**
	 * Cached request body key.
	 */
	@Deprecated
	public static final String CACHED_REQUEST_BODY_KEY = CACHED_REQUEST_BODY_ATTR;

	@Override
	public void onApplicationEvent(EnableBodyCachingEvent event) {
		this.routesToCache.putIfAbsent(event.getRouteId(), true);
	}

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		// the cached ServerHttpRequest is used when the ServerWebExchange can not be
		// mutated, for example, during a predicate where the body is read, but still
		// needs to be cached.
		ServerHttpRequest cachedRequest = exchange
				.getAttributeOrDefault(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, null);
		if (cachedRequest != null) {
			exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
			return chain.filter(exchange.mutate().request(cachedRequest).build());
		}

		//
		DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
		Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

		if (body != null || !this.routesToCache.containsKey(route.getId())) {
			return chain.filter(exchange);
		}

		return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> {
			// don't mutate and build if same request object
			if (serverHttpRequest == exchange.getRequest()) {
				return chain.filter(exchange);
			}
			return chain.filter(exchange.mutate().request(serverHttpRequest).build());
		});
	}

	@Override
	public int getOrder() {
		return Ordered.HIGHEST_PRECEDENCE + 1000;
	}

}

看到这个源码,是不是有种豁然开朗的感觉,它的实现套路不就是我们上文说的实现思路吗,根据源码,我们仅需发布EnableBodyCachingEvent事件,并将要监听的routeId送入EnableBodyCachingEvent,剩下缓存requestbody的事情,就交给AdaptCachedBodyGlobalFilter来帮我们处理

示例

**
 * @see AdaptCachedBodyGlobalFilter
 */
@Configuration
@AutoConfigureAfter(GatewayAutoConfiguration.class)
@RequiredArgsConstructor
public class RequestBodyCacheConfig implements ApplicationContextAware, CommandLineRunner {


    private final RouteLocator routeDefinitionRouteLocator;
    private ApplicationContext applicationContext;

    @Override
    public void run(String... args) throws Exception {
        List<Signal<Route>> routes = routeDefinitionRouteLocator.getRoutes().materialize()
                .collect(Collectors.toList()).block();

        assert routes != null;
        routes.forEach(routeSignal -> {
            if (routeSignal.get() != null) {
                Route route = routeSignal.get();
                System.out.println(route.getId());
                publishEnableBodyCachingEvent(route.getId());
            }
        });
    }


    @EventListener
    public void refreshRoutesEvent(RefreshRoutesEvent refreshRoutesEvent){
        if(refreshRoutesEvent.getSource() instanceof NewRouteId){
            publishEnableBodyCachingEvent(((NewRouteId) refreshRoutesEvent.getSource()).getRouteId());
        }else{
            routeDefinitionRouteLocator.getRoutes().subscribe(route -> {
                publishEnableBodyCachingEvent(route.getId());
            });
        }
    }


    private void publishEnableBodyCachingEvent(String routeId){
        EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(this, routeId);
        applicationContext.publishEvent(enableBodyCachingEvent);
    }


    public void addRouteRouteDefinition(RouteDefinition routeDefinition){
        NewRouteId source = NewRouteId.builder().routeId(routeDefinition.getId()).source(this).build();
        applicationContext.publishEvent(new RefreshRoutesEvent(source));
    }



    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }


}

这个代码的意思就是在项目启动时,遍历一下路由,发送EnableBodyCachingEvent。并再监听RefreshRoutesEvent 事件,当有路由新增时,再次发送EnableBodyCachingEvent事件。其业务语义是让每个route都能被AdaptCachedBodyGlobalFilter处理,并缓存requestbody

发布EnableBodyCachingEvent事件的核心代码如下

  private void publishEnableBodyCachingEvent(String routeId){
        EnableBodyCachingEvent enableBodyCachingEvent = new EnableBodyCachingEvent(this, routeId);
        applicationContext.publishEvent(enableBodyCachingEvent);
    }

做完上述的事情后,我们仅需在我们需要获取requestbody的地方,写下如下代码即可

String bodyContent = null;
 DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
 if(body != null){
      bodyContent = body.toString(StandardCharsets.UTF_8);
   }

总结

框架也是不断在演进,因此对于我们日常使用的框架,要多多关注下,有现成的轮子,就使用现成的轮子,现成轮子满不足不了,先看下该轮子是否有预留扩展点,如果没有,我们再考虑自己制造轮子

标签:body,return,exchange,springcloud,gateway,聊聊,EnableBodyCachingEvent,null,public
From: https://www.cnblogs.com/linyb-geek/p/18101054

相关文章

  • OAuth2 + Gateway统一认证一步步实现(公司项目能直接使用),密码模式&授权码模式
    文章目录认证的具体实现环境的搭建基础版授权服务搭建引入依赖创建数据表yml配置配置SpringSecurity定义认证授权的配置类授权服务器存储客户端信息修改授权服务配置,支持密码模式基础版授权服务测试授权码模式测试密码模式测试**测试校验token接口**整合JWT使用jwt基......
  • 科普文:springcloud之-Ribbon
    一、SpringCloudRibbon 客户端负载均衡       Ribbon是Netflix发布的云中间层服务开源项目,其主要功能是提供客户端侧负载均衡算法。Ribbon客户端组件提供一系列完善的配置项如连接超时,重试等。Bibbon 的架构图如下所示:        简单的说,SpringcloudRibbo......
  • 【第12章】Spring Cloud之集成 Spring Cloud Gateway
    文章目录前言一、新建项目1.项目结构2.引入依赖3.启动类4.基本配置二、新建配置三、新建服务1.提供者2.消费者四、单元测试1.启动网关服务2.提供者3.消费者总结前言SpringCloudGateway是一个基于SpringFramework5、SpringBoot2和ProjectReactor......
  • 简单聊聊JavaScript 中的原型链、null 和 undefined 的区别
    1.原型链个人观点:原型链和逻辑判断里三段论有些类似,一个大前提、一个小前提、一个结论。比如,动物会吃肉,狗是动物,所以狗会吃肉。这也是继承的思想原型和构造函数JavaScript是基于原型的面向对象编程语言,每个对象都有一个内部链接到另一个对象(即原型)。这个机制被称为原型链。原......
  • SpringCloud网关登录校验
    SpringCloud网关登录校验文章目录SpringCloud网关登录校验1、鉴权思路分析2、网关过滤器3、自定义过滤器3.1、自定义GatewayFilter3.2、自定义GlobalFilter4、登录校验5、微服务获取用户5.1、保存用户到请求头5.2、拦截器获取用户6、OpenFeign传递用户1、鉴权......
  • 【深入理解SpringCloud微服务】深入理解Ribbon原理并手写一个微服务负载均衡器
    深入理解Ribbon原理并手写一个微服务负载均衡器负载均衡器理解Ribbon原理手写一个微服务负载均衡器总体设计LoadBalanceClientHttpRequestFactorySimpleLoadBalanceClientSimpleLoadBalancerLoadBalanceRulespring.factories与LoadBalanceConfig负载均衡器在微服务......
  • SpringCloud+Vue3多对多,多表联查
    ♥️作者:小宋1021......
  • SpringCloud Alibaba - nacos服务自动注册流程
    在SpringCloudAlibaba中,Nacos服务自动注册流程大致遵循以下步骤:依赖集成:首先,通过在项目的pom.xml文件中添加spring-cloud-starter-alibaba-nacos-discovery依赖,将Nacos服务发现功能集成到SpringBoot应用中。配置文件设置:在application.properties或application.yml中......
  • 项目环境出现PHP 502 Bad Gateway 问题排查
    一、现象昨天运维人员被告知,在升级完客户集群环境后,访问管理页面偶尔会报502BadGateway。登录客户环境,发现只要请求分发到node2,就会报502,开始解决问题... 二、排查思路1、看到502第一时间想到的应该是php-fpm出问题了,先看下nginx日志,连接被对端关闭,说明php-fpm......
  • 黑马程序员2024最新SpringCloud微服务开发与实战 个人学习心得、踩坑、与bug记录Day4
    你好,我是Qiuner.为帮助别人少走弯路和记录自己编程学习过程而写博客这是我的githubhttps://github.com/Qiuner⭐️giteehttps://gitee.com/Qiuner......