首页 > 其他分享 >Spring Boot + 事务钩子函数,打造高效支付系统!

Spring Boot + 事务钩子函数,打造高效支付系统!

时间:2024-05-06 14:11:25浏览次数:25  
标签:TransactionSynchronizationManager 事务 Spring synchronizations Boot kafka synchron

作者:avengerEug
链接:https://juejin.cn/post/6984574787511123999

前言

经过前面对Spring AOP、事务的总结,我们已经对它们有了一个比较感性的认知了。

今天,我继续安利一个独门绝技:Spring 事务的钩子函数。单纯的讲技术可能比较枯燥乏味。接下来,我将以一个实际的案例来描述Spring事务钩子函数的正确使用姿势。

一、案例背景

拿支付系统相关的业务来举例。

在支付系统中,我们需要记录每个账户的资金流水(记录用户A因为哪个操作扣了钱,因为哪个操作加了钱),这样我们才能对每个账户的做到心中有数,对于支付系统而言,资金流水的数据可谓是最重要的。

因此,为了防止支付系统的老大徇私舞弊,CTO提了一个流水存档的需求:要求支付系统对每个账户的资金流水做一份存档,要求支付系统在写流水的时候,把流水相关的信息以消息的形式推送到kafka,由存档系统消费这个消息并落地到库里(这个库只有存档系统拥有写权限)。

推荐一个开源免费的 Spring Boot 实战项目:

https://github.com/javastacks/spring-boot-best-practice

整个需求的流程如下所示:

整个需求的流程还是比较简单的,考虑到后续会有其他事业部也要进行数据存档操作,CTO建议支付系统团队内部开发一个二方库,这个二方库的主要功能就是发送消息到kafka中去。

二、确定方案

既然要求开发一个二方库,因此,我们需要考虑如下几件事情:

1、技术栈使用的springboot,因此,这里最好以starter的方式提供

2、二方库需要发送消息给kafka,最好是二方库内部基于kafka生产者的api创建生产者,不要使用Spring自带的kafkaTemplate,因为集成方有可能已经使用了kafkaTemplate。不能与集成方造成冲突。

3、减少对接方的集成难度、学习成本,最好是提供一个简单实用的api,业务侧能简单上手。

4、发送消息这个操作需要支持事务,尽量不影响主业务

在上述的几件事情中,最需要注意的应该就是第4点:发送消息这个操作需要支持事务,尽量不影响主业务

这是什么意思呢?

首先,尽量不影响主业务,这个最简单的方式就是使用异步机制。

其次,需要支持事务是指:假设我们的api是在事务方法内部调用的,那么我们需要保证事务提交后再执行这个api。那么,我们的流水落地api应该要有这样的功能:

内部可以判断当前是否存在事务,如果存在事务,则需要等事务提交后再异步发送消息给kafka。

如果不存在事务则直接异步发送消息给kafka。而且这样的判断逻辑得放在二方库内部才行。那现在摆在我们面前的问题就是:我要如何判断当前是否存在事务,以及如何在事务提交后再触发我们自定义的逻辑呢?

三、TransactionSynchronizationManager显神威

这个类内部所有的变量、方法都是static修饰的,也就是说它其实是一个工具类。是一个事务同步器。下述是流水落地API的伪代码,这段代码就解决了我们上述提到的疑问:

private final ExecutorService executor = Executors.newSingleThreadExecutor();

public void sendLog() {
    // 判断当前是否存在事务
    if (!TransactionSynchronizationManager.isSynchronizationActive()) {
        // 无事务,异步发送消息给kafka

        executor.submit(() -> {
            // 发送消息给kafka
            try {
                // 发送消息给kafka
            } catch (Exception e) {
                // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
            }
        });
        return;
    }

    // 有事务,则添加一个事务同步器,并重写afterCompletion方法(此方法在事务提交后会做回调)
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

        @Override
        public void afterCompletion(int status) {
            if (status == TransactionSynchronization.STATUS_COMMITTED) {
                // 事务提交后,再异步发送消息给kafka
                executor.submit(() -> {
                    try {
	                    // 发送消息给kafka
                    } catch (Exception e) {
    	                // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
                    }
                });
            }
        }

    });

}

代码比较简单,其主要是TransactionSynchronizationManager的使用。

推荐一个开源免费的 Spring Boot 实战项目:

https://github.com/javastacks/spring-boot-best-practice

3.1、判断是否存在事务?

TransactionSynchronizationManager.isSynchronizationActive() 方法显神威

我们先看下这个方法的源码:

// TransactionSynchronizationManager.java类内部的部分代码

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
			new NamedThreadLocal<>("Transaction synchronizations");

public static boolean isSynchronizationActive() {
    return (synchronizations.get() != null);
}

很明显,synchronizations是一个线程变量(ThreadLocal)。那它是在什么时候set进去的呢?

这里的话,可以参考下这个方法:org.springframework.transaction.support.TransactionSynchronizationManager#initSynchronization,其源码如下所示:

/**
  * Activate transaction synchronization for the current thread.
  * Called by a transaction manager on transaction begin.
  * @throws IllegalStateException if synchronization is already active
  */
public static void initSynchronization() throws IllegalStateException {
    if (isSynchronizationActive()) {
        throw new IllegalStateException("Cannot activate transaction synchronization - already active");
    }
    logger.trace("Initializing transaction synchronization");
    synchronizations.set(new LinkedHashSet<>());
}

由源码中的注释也可以知道,它是在事务管理器开启事务时调用的。

换句话说,只要我们的程序执行到带有事务特性的方法时,就会在线程变量中放入一个LinkedHashSet,用来标识当前存在事务。只要isSynchronizationActive返回true,则代表当前有事务。

因此,结合这两个方法我们是指能解决我们最开始提出的疑问:要如何判断当前是否存在事务

3.2、如何在事务提交后触发自定义逻辑?

TransactionSynchronizationManager.registerSynchronization()方法显神威

我们来看下这个方法的源代码:

/**
  * Register a new transaction synchronization for the current thread.
  * Typically called by resource management code.
  * <p>Note that synchronizations can implement the
  * {@link org.springframework.core.Ordered} interface.
  * They will be executed in an order according to their order value (if any).
  * @param synchronization the synchronization object to register
  * @throws IllegalStateException if transaction synchronization is not active
  * @see org.springframework.core.Ordered
  */
public static void registerSynchronization(TransactionSynchronization synchronization)
    throws IllegalStateException {

    Assert.notNull(synchronization, "TransactionSynchronization must not be null");
    if (!isSynchronizationActive()) {
        throw new IllegalStateException("Transaction synchronization is not active");
    }
    synchronizations.get().add(synchronization);
}

这里又使用到了synchronizations线程变量,我们在判断是否存在事务时,就是判断这个线程变量内部是否有值。那我们现在想在事务提交后触发自定义逻辑和这个有什么关系呢?

我们在上面构建流水落地api的伪代码中有向synchronizations内部添加了一个TransactionSynchronizationAdapter,内部并重写了afterCompletion方法,其代码如下所示:

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

    @Override
    public void afterCompletion(int status) {
        if (status == TransactionSynchronization.STATUS_COMMITTED) {
            // 事务提交后,再异步发送消息给kafka
            executor.submit(() -> {
                    try {
	                    // 发送消息给kafka
                    } catch (Exception e) {
    	                // 记录异常信息,发邮件或者进入待处理列表,让开发人员感知异常
                    }
            });
        }
    }

});

我们结合registerSynchronization的源码来看,其实这段代码主要就是向线程变量内部的LinkedHashSet添加了一个对象而已,但就是这么一个操作,让Spring在事务执行的过程中变得“有事情可做”。这是什么意思呢?

是因为Spring在执行事务方法时,对于操作事务的每一个阶段都有一个回调操作,比如:trigger系列的回调

invoke系列的回调

而我们现在的需求就是在事务提交后触发自定义的函数,那就是在invokeAfterCommit和invokeAfterCompletion这两个方法来选了。首先,这两个方法都会拿到所有TransactionSynchronization的集合(其中会包括我们上述添加的TransactionSynchronizationAdapter)。

但是要注意一点:invokeAfterCommit只能拿到集合,invokeAfterCompletion除了集合还有一个int类型的参数,而这个int类型的参数其实是当前事务的一种状态。也就是说,如果我们重写了invokeAfterCompletion方法,我们除了能拿到集合外,还能拿到当前事务的状态。

因此,此时我们可以根据这个状态来做不同的事情,比如:可以在事务提交时做自定义处理,也可以在事务回滚时做自定义处理等等。

四、总结

上面有说到,我们判断当前是否存在事务、添加钩子函数都是依赖线程变量的。因此,我们在使用过程中,一定要避免切换线程。否则会出现不生效的情况。

更多文章推荐:

1.Spring Boot 3.x 教程,太全了!

2.2,000+ 道 Java面试题及答案整理(2024最新版)

3.免费获取 IDEA 激活码的 7 种方式(2024最新版)

觉得不错,别忘了随手点赞+转发哦!

标签:TransactionSynchronizationManager,事务,Spring,synchronizations,Boot,kafka,synchron
From: https://www.cnblogs.com/javastack/p/18174916

相关文章

  • Spring Bean 获取和注入
    BeanFactorybean创建1.导入Maven坐标(此处导入spring-context包即可,已经包含了springframework中核心的包) 2.定义service接口及其实现类3.创建beans.xml文件,将service信息配置到该文件中recources下新建XML配置文件,选择SpringConfig,创建beans.xml。因为我们第一......
  • springboot~CompletableFuture并行计算
    在Spring中,CompletableFuture通常用于异步编程,可以方便地处理异步任务的执行和结果处理,CompletableFuture是Java8引入的一个类,用于支持异步编程和并发操作。它基于Future和CompletionStage接口,提供了丰富的方法来处理异步任务的执行和结果处理。下面是CompletableFuture......
  • Spring学习之——Bean加载流程
    Spring IOC容器就像是一个生产产品的流水线上的机器,Spring创建出来的Bean就好像是流水线的终点生产出来的一个个精美绝伦的产品。既然是机器,总要先启动,Spring也不例外。因此Bean的加载流程总体上来说可以分为两个阶段:容器启动阶段Bean创建阶段一、容器启动阶段:容器的启动阶......
  • 手写Spring框架
    1.手写Spring框架@目录1.手写Spring框架每博一文案2.反射机制的回顾3.开始手写Spring框架3.1第一步:使用IDE创建模块myspring3.2第二步:准备好我们要管理的Bean3.3第三步:准备myspring.xml配置文件3.4第四步:编写ApplicationContext接口3.5第五步:编写ClassPathXmlApplic......
  • spring项目创建
    从springinitializer下载一个demoSpringboot 在idea中需要配置java版本和maven版本之后:mvnpackage不需要下载tomcat,Spring里面pom中包含内置tomcat<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-star......
  • 3. SpringBoot 整合第三方技术
    1.整合Junit一般来说是不需要进行处理的,因为在创建SpringBoot工程时,会自动整合junit​的要说怎么配置的话?也可以写一下相关的配置:以下就是SpringBoot整合Junit相关步骤导入相关依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-b......
  • Camunda 整合SpringBoot基本Api
    代码实现:需要接口@AutowiredprivateRuntimeServiceruntimeService;@AutowiredprivateRepositoryServicerepositoryService;@AutowiredprivateTaskServicetaskService;发布流程:@GetMapping("/deploy")publicObjectdeploy(){......
  • SpringBoot3.1.5对应新版本SpringCloud开发(2)-Eureka的负载均衡
    Eureka的负载均衡负载均衡原理负载均衡流程老版本流程介绍当order-servic发起的请求进入Ribbon后会被LoadBalancerInterceptor负载均衡拦截器拦截,拦截器获取到请求中的服务名称,交给RibbonLoadBanlancerCient,然后RibbonLoadBanlancerCient会将服务名称当作服务id交给Dynamic......
  • Spring 的容器配置
    除了XML配置外,Spring框架还提供了两种主要的方式来配置和管理应用中的bean:基于注解(Annotation-basedContainerConfiguration)和基于Java的配置(Java-basedContainerConfiguration)。这两种方式都是为了替代传统的XML配置,以更加面向代码的方式简化配置并提高可读性。下面是两者......
  • 测试 springboot 项目苍穹外卖,解决 Unable to connect to Redis 错误问题
       使用IDEA启动springboot项目苍穹外卖后台项目sky-take-out,测试“菜品批量删除”接口时,能够正常完成操作,但是服务器始终显示下面错误信息:2024-05-0320:54:24.134ERROR24360---[nio-8181-exec-3]o.a.c.c.C.[.[.[/].[dispatcherServlet]  :Servlet.service()fo......