首页 > 其他分享 >SpringCloudAlibaba Seata在Openfeign跨节点环境出现全局事务Xid失效原因底层探究

SpringCloudAlibaba Seata在Openfeign跨节点环境出现全局事务Xid失效原因底层探究

时间:2023-10-20 23:14:05浏览次数:42  
标签:事务 Xid Seata Openfeign RootContext 节点 全局 order ID

image

原创/朱季谦

曾经在SpringCloudAlibaba的Seata分布式事务搭建过程中,跨节点通过openfeign调用不同服务时,发现全局事务XID在当前节点也就是TM处,是正常能通过RootContext.getXID()获取到分布式全局事务XID的,但在下游节点就出现获取为NULL的情况,导致全局事务失效,出现异常时无法正常回滚。

当时看了一遍源码,才知道问题所在,故而把这个过程了解到的分布式事务XID是如何跨节点传输的原理记录下来。

本文默认是使用Seata的AT模式。

在那一次的搭建过程中,我设置了三个节点,分别是订单节点order,商品库存节点product,账户余额节点account,模拟购买下单逻辑,在分布式环境下,生成一份订单时,通过openfeign远程扣减库存,最后同样通过openfeign去扣减账户(当然,实际场景远不止这些,这里只是简单模拟这个过程)。

正常情况下,其中有一步出错,整个全局分布式事务就会进行回滚。
image

这三个节点在Seata AT模式下,流程图是这样的,order充当TM/RM角色,product和充当RM角色,按照在Linux服务器上的Seats Service就充当TC角色。
image

首先是最初调用订单节点order业务逻辑——

@Override
@Transactional
@GlobalTransactional(name = "zjq-create-order",rollbackFor = Exception.class)
public RestResponse createOrder(Orders order) {
    log.info("当前的XID:"+ RootContext.getXID());
    log.info("------>开始新建订单");
    //1、新建订单
    orderMapper.insert(order);

    //2、扣减库存
    productService.decrease(order.getProductId(),order.getCount());

    //3、扣减账户
    accountService.decrease(order.getUserId(),order.getMoney());

    ......
}

在Seata,order充当了TM角色,负责生成一个全局事务注册到TC,TC会返回一个全局事务ID给TM。

在该全局事务流程里,每一个分支模块理应都能获取到这一个共同的全局事务ID,在该全局事务ID统筹下,完成分支事务的提交或者回滚。

通过RootContext.getXID()获取到一个全局事务ID为:192.168.1.152:8091:458311058765479936
image

创建订单成功后,就会执行扣减库存操作productService.decrease(order.getProductId(),order.getCount())。

在该代码案例里,productService.decrease()内部是通过openfeign远程去调用的——

@FeignClient(contextId = "remoteProductService",value = "zjq-product",fallbackFactory = RemoteProductServiceFallbackFactory.class)
public interface RemoteProductService {
    @PostMapping(value = "/product/decrease")
    RestResponse decrease(@RequestParam("productId")Long productId, @RequestParam("count") Integer count);
}

最终decrease的服务层伪代码大概如下——

@Transactional(propagation = Propagation.REQUIRES_NEW)
public int decrease(Long productId, Integer count) {
    log.info("当前的XID:"+ RootContext.getXID());
    log.info("---------->开始查询商品是否存在");
    log.info("---------->开始扣减库存"); 
    ......
}

然而,到这一步,发现了一个问题,这里获取的全局事务ID为null——
image

这说明了一个问题,TM开启了一个全局事务后,已经从TC那里获取到了一个全局事务ID,但远程传送给product这个RM资源管理器后,没有传送成功,同理,另一个分支事务account模块的,同样获取到的全局事务ID为null。

基于这样一个现象,我就开始尝试研究了一下全局事务是如何在Openfeign跨节点环境进行传输和获取的,主要分为TM节点的全局事务ID发送和远程RM节点的接收。

一、TM节点的全局事务ID发送

通过debug代码去阅读,在调用 productService.decrease(order.getProductId(),order.getCount())时,内部做了反射调用,执行了一系列方案调用,调用核心过程如下——
image

本文只需要关注在整个HTTP调用过程,全局事务ID是如何放进来的,这个调用链涉及的类及方法,在后续学习中再进一步研究。

最终在SeataFeignClient的execute方法里,可以看到以下源码——

public Response execute(Request request, Request.Options options) throws IOException {
    Request modifiedRequest = this.getModifyRequest(request);
    return this.delegate.execute(modifiedRequest, options);
}

其中,在Request modifiedRequest = this.getModifyRequest(request)这行代码里,对请求头做了一些补充操作。

private Request getModifyRequest(Request request) {
    String xid = RootContext.getXID();
    if (StringUtils.isEmpty(xid)) {
        return request;
    } else {
        Map<String, Collection<String>> headers = new HashMap(16);
        headers.putAll(request.headers());
        List<String> seataXid = new ArrayList();
        seataXid.add(xid);
        headers.put("TX_XID", seataXid);
        return Request.create(request.method(), request.url(), headers, request.body(), request.charset());
    }
}

debug到这里,可以看到,这里将一个全局事务ID存储到了headers里——
image

这个headers其实是HTTP组装的请求头,可以看到,这里是将全局事务ID放到了HTTP请求头里,传送给了远程机器。

Request(HttpMethod method, String url, Map<String, Collection<String>> headers, Body body, RequestTemplate requestTemplate) {
    this.httpMethod = (HttpMethod)Util.checkNotNull(method, "httpMethod of %s", new Object[]{method.name()});
    this.url = (String)Util.checkNotNull(url, "url", new Object[0]);
    this.headers = (Map)Util.checkNotNull(headers, "headers of %s %s", new Object[]{method, url});
    this.body = body;
    this.requestTemplate = requestTemplate;
}

通过debug,可以发现,在HTTP组装过程中,已经将全局事务ID放到了请求头里,说明在HTTP发生成功后,是会携带全局事务到远程product模块的,但是为何product模块打印RootContext.getXID()得到的是null呢?

HTTP请求传送到远程product模块后,在调用具体的Controller前,会流转到MVC进行拦截转发,在这过程当中,涉及到seata分布式事务时,理应会有这样一个叫TransactionPropagationInterceptor的拦截器,用来处理分布式事务的传播,有两个方法,分别是preHandle()和afterCompletion(),暂时只需要关注preHandle方法即可:

  • preHandle()

​ 在处理远程请求之前被调用,在该方法中,通过RootContext.getXID()获取到当前线程上下文中的全局事务ID和通过request.getHeader("TX_XID")获取HTTP请求头中的事务ID。这里的请求头里的事务ID,正是前面发送HTTP时放到请求头里的。

若RootContext.getXID()获取到当前线程上下文中的全局事务ID为空并且HTTP请求头的事务ID不为空,就会将该HTTP请求头里的事务ID绑定到该线程上下文当中,用于确保全局事务的传播和关联。

public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
    String xid = RootContext.getXID();
    String rpcXid = request.getHeader("TX_XID");
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
    }

    if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(rpcXid)) {
        RootContext.bind(rpcXid);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("bind[{}] to RootContext", rpcXid);
        }
    }

    return true;
}

进入到bind方法当中,可以看到,这里是HTTP请求头里的事务ID缓存到了 CONTEXT_HOLDER.put("TX_XID", xid),它本质其实是一个ThreadLocal,可以存储线程隔离的变量。

public static void bind(@Nonnull String xid) {
    if (StringUtils.isBlank(xid)) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid is blank, switch to unbind operation!");
        }

        unbind();
    } else {
        MDC.put("X-TX-XID", xid);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("bind {}", xid);
        }

        CONTEXT_HOLDER.put("TX_XID", xid);
    }

}

缓存成功后,下一次通过 RootContext.getXID()就能获取到该线程缓存的全局事务ID了, RootContext.getXID()本质就是——

public static String getXID() {
    return (String)CONTEXT_HOLDER.get("TX_XID");
}

在本次搭建seata环境中,发现该TransactionPropagationInterceptor过滤器当中的preHandle方法一直没有执行,这就造成全局事务当中,远程跨环境的分支事务节点一直无法获取到全局事务ID。

于是,我尝试手动将该TransactionPropagationInterceptor拦截器加入到Spring MVC流程中——

@Configuration
public class WebMvcInterceptorsConfig extends WebMvcConfigurationSupport {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new TransactionPropagationInterceptor());
    }

    @Bean
    public ServerCodecConfigurer serverCodecConfigurer() {
        return ServerCodecConfigurer.create();
    }
    
}

重新运行后,这次拦截器TransactionPropagationInterceptor终于生效里,可以debug到了preHandle方法里,将HTTP请求头的全局事务ID取出,然后通过RootContext.bind(rpcXid)缓存到线程上下文当中——
image

这时,product节点终于能拿到从TM远程传送过来的全局事务ID了——
image

最后总结一下,全局事务ID在SpringCloudAlibaba Seata在Openfeign跨节点环境里的传送方式,是将该全局事务ID放入到HTTP请求头当中,远程传送给分支事务节点,各分支事务节点会在TransactionPropagationInterceptor拦截器当中,取出HTTP请求头大全局事务ID,通过RootContext.bind(rpcXid)将全局事务ID缓存到线程上下文里,这样,分支事务就可以在其执行过程当中,获取到全局事务ID啦。

标签:事务,Xid,Seata,Openfeign,RootContext,节点,全局,order,ID
From: https://www.cnblogs.com/zhujiqian/p/17778205.html

相关文章

  • seata事务管理
    seata事务管理中三个重要角色:TC事务协调者,维护全局和分支事务的状态,协调全局事务的提交与回滚TM事务管理者,定义全局事务的范围,全局事务的开始,事务的提交与回滚RM资源管理者,管理分支事务处理的资源,和TC交谈以注册分支事务,报告分支事务状态,并驱动事务的提交和回滚......
  • 分布式事务 —— SpringCloud Alibaba Seata
    Seata简介传统的单体应用中,业务操作使用同一条连接操作不同的数据表,一旦出现异常就可以整体回滚。随着公司的快速发展、业务需求的变化,单体应用被拆分成微服务应用,原来的单体应用被拆分成多个独立的微服务,分别使用独立的数据源,业务操作需要调用三个服务来完成。此时每个服务内部......
  • 声明式调用 —— SpringCloud OpenFeign
    Feign简介SpringCloudFeign是一个HTTP请求调用的轻量级框架,可以以Java接口注解的方式调用HTTP请求,而不用通过封装HTTP请求报文的方式直接调用Feign通过处理注解,将请求模板化,当实际调用的时候传入参数,根据参数再应用到请求上,进而转化成真正的请求第一个Feign程......
  • postgresql xid回卷预防及排查
    监控WITHmax_ageAS(SELECT2000000000asmax_old_xid,settingASautovacuum_freeze_max_ageFROMpg_catalog.pg_settingsWHEREname='autovacuum_freeze_max_age'),per_database_statsAS(SELECTdatname......
  • openfeign开启日志Logger.Level feignLoggerLevel()中Level爆红的解决
    问题原因:引错包了!!!应该引入如下这个包importfeign.Logger;......
  • Seata XA模式一阶段为什么一直锁定资源等二阶段成功?AT模式怎么解决的这个缺陷?
    Winwin:SeataXA模式一阶段为什么一直锁定资源等二阶段成功?AT模式怎么解决的这个缺陷?兔子:Seata是一个非常强大的分布式事务解决方案,它提供了XA模式和AT模式来支持分布式事务的一致性和可靠性。关于你的问题,我们先来聊一下SeataXA模式的一阶段和二阶段,好吗?在SeataXA模式的一......
  • Seata架构实现分布式事务
    Seata架构官网地址:http://seata.io/zh-cn/Seata架构实现模型 TC(TransactionCoordinator):事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。监控和通知各个事务,包括分支事务和全局事务。TM(TransactionManager):事务管理器:定义全局事务的范围、开始全局事务、......
  • 分布式事务解决方案-Seata01
    分布式事务-使用Seata传统数据库事务A-原子性:①事务中的所有操作,要么全部成功,要么全部失败。②影响事务的操作,一般指的是增删改,也就是一个事务中,有多个增删改的SQLC-一致性:①事务开始前到事务结束后,数据状态需要一致②例如:转账增减金额和支付减去金额+修改订单状态、减库存I-......
  • Seata+naocs 使用
    1.环境seata1.5.2.  nacos2.1.0本地配置好nacos之后 新建一个seata的命名空间,seata需要使用  seata准备1:创建mysql的seata数据库 执行\seata-server-1.5.2\script\server\db\mysql.sql2:拷贝config.txt  从seata-server-1.5.2\script\config-center\confi......
  • OpenFeign
    OpenFeign底层实现上:获取到你正在运行的实例(instance(也即是注册在nacos等注册中心的applicationName))编辑请求(包括函数的url,参数,方法,返回值等)(通常是调用一些方法来构成这个请求)负载均衡:最烂的做法就是获取总共的该服务有多少instance然后在其中取个随机值即可发......