首页 > 其他分享 >用这4招 优雅的实现Spring Boot 异步线程间数据传递

用这4招 优雅的实现Spring Boot 异步线程间数据传递

时间:2024-01-13 11:33:14浏览次数:23  
标签:get Spring Boot static 线程 executor new public

你好,我是蜗牛!

在实际开发中需要在父子线程之间传递一些数据,比如用户登录信息使用ThreadLocal存放保证线程隔离,代码如下:

/**
 * @author 公众号:woniuxgg
 * @description 用户上下文信息
 */
public class UserUtils {
    private static  final  ThreadLocal<String> userLocal=new ThreadLocal<>();

    public static  String getUserId(){
        return userLocal.get();
    }
    public static void setUserId(String userId){
        userLocal.set(userId);
    }

    public static void clear(){
        userLocal.remove();
    }}

那么子线程想要获取这个userId如何做呢?

1. 手动设置

代码如下:

public void handlerAsync() {
    //1. 获取父线程的userId
        String userId = UserUtils.getUserId();
        CompletableFuture.runAsync(()->{
            //2. 设置子线程的值,复用
            UserUtils.setUserId(userId);
            log.info("子线程的值:{}",UserUtils.getUserId());
        });
    }

这样子每次开异步线程都需要手动设置,重复代码太多,看了头疼!

2. 线程池设置TaskDecorator

TaskDecorator是一个执行回调方法的装饰器,主要应用于传递上下文,或者提供任务的监控/统计信息。

那我们该如何去使用?代码如下:

/**
 * @author 公众号:woniuxgg
 * @description 上下文装饰器
 */
public class CustomTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        String robotId = UserUtils.getUserId();
        System.out.println(robotId);
        return () -> {
            try {
                // 将主线程的请求信息,设置到子线程中
                UserUtils.setUserId(robotId);
                // 执行子线程,这一步不要忘了
                runnable.run();
            } finally {
                // 线程结束,清空这些信息,否则可能造成内存泄漏
                UserUtils.clear();
            }
        };    }
}

TaskDecorator需要结合线程池使用,实际开发中异步线程建议使用线程池,只需要在对应的线程池配置一下,代码如下:

@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
    log.info("start asyncServiceExecutor----------------");
    //ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    //使用可视化运行状态的线程池
    ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
    //配置核心线程数
    executor.setCorePoolSize(corePoolSize);
    //配置最大线程数
    executor.setMaxPoolSize(maxPoolSize);
    //配置队列大小
    executor.setQueueCapacity(queueCapacity);
    //配置线程池中的线程的名称前缀
    executor.setThreadNamePrefix(namePrefix);

    // rejection-policy:当pool已经达到max size的时候,如何处理新任务
    // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    //增加线程池修饰类
    executor.setTaskDecorator(new CustomTaskDecorator());
    //增加MDC的线程池修饰类
    //executor.setTaskDecorator(new MDCTaskDecorator());
    //执行初始化
    executor.initialize();
    log.info("end asyncServiceExecutor------------");
    return executor;
}

此时业务代码就不需要去设置子线程的值,直接使用即可,代码如下:

public void handlerAsync() {
    log.info("父线程的用户信息:{}", UserUtils.getUserId());
    //执行异步任务,需要指定的线程池
    CompletableFuture.runAsync(()->     log.info("子线程的用户信息:{}", UserUtils.getUserId()),    taskExecutor);
}

来看一下结果,如下图:

用这4招 优雅的实现Spring Boot 异步线程间数据传递_子线程

这里使用的是CompletableFuture执行异步任务,使用@Async这个注解同样是可行的。

注意:无论使用何种方式,都需要指定线程池

3. InheritableThreadLocal

InheritableThreadLocal虽然能够实现父子线程间的复用,但是在线程池中使用会存在复用的问题

这种方案使用也是非常简单,直接用InheritableThreadLocal替换ThreadLocal即可,代码如下:

/**
 * @author 公众号:程序员蜗牛g
 * @description 用户上下文信息
 */
public class UserUtils {
    private static  final  InheritableThreadLocal<String>  threadLocal = new InheritableThreadLocal<>();

    public static  String get(){
        return threadLocal.get();
    }
    public static void set(String userId){
        threadLocal.set(userId);
    }
    public static void clear(){
        threadLocal.remove();
    }
}

4. TransmittableThreadLocal

TransmittableThreadLocal是由阿里开发的一个线程变量传递工具包,解决了InheritableThreadLocal只能再new Thread的时候传递本地变量,无法应用到线程池的问题。可以应用来作链路追踪,传递变量等用途,下面我们来了解一下原理。

使用起来也是非常简单,添加依赖如下:

<dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>transmittable-thread-local</artifactId>
 <version>2.14.2</version>
</dependency>

UserUtils改造代码如下:

/**
 * @author 公众号:程序员蜗牛g
 * @description 用户上下文信息
 */
public class UserUtils  {
    private static  final TransmittableThreadLocal<String> threadLocal =new TransmittableThreadLocal<>();

    public static  String get(){
        return threadLocal.get();
    }
    public static void set(String userId){
        threadLocal.set(loginVal);
    }
    public static void clear(){
        threadLocal.remove();
    }
}

TransmittableThreadLocal原理

 TransmittableThreadLocal继承自InheritableThreadLocal,因此它可以在创建线程的时候将值传递给子线程,那么怎么确保使用线程池的时候也有效呢?我们来看一下源码

1、构造方法

public TransmittableThreadLocal() {
    this(false);
 }

 public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) {
    // 是否忽略null值set,默认false
    this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics;
 }

2、set方法

/**
 * {@inheritDoc}
 */
@Override
public final void set(T value) {
    if (!disableIgnoreNullValueSemantics && value == null) {
        // may set null to remove value
        remove();
    } else {
        super.set(value);
        addThisToHolder();
    }
}

先看addThisToHolder方法

@SuppressWarnings("unchecked")
private void addThisToHolder() {
    if (!holder.get().containsKey(this)) {
        holder.get().put((TransmittableThreadLocal<Object>) this, null);
// WeakHashMap supports null value.
    }
}

属性holder又是什么呢?

1、final static修饰的变量,只会存在一份

2、使用了WeakHashMap,弱引用,方便垃圾回收

3、key就是TransmittableThreadLocal对象

    remove方法

/**
 * {@inheritDoc}
 */
@Override
public final void remove() {
    removeThisFromHolder();
    super.remove();
}

4、get方法

/**
 * {@inheritDoc}
 */
@Override
public final T get() {
    T value = super.get();
    if (disableIgnoreNullValueSemantics || value != null) addThisToHolder();
    return value;
}

5、当我们使用线程池时,需要使用TtlRunnable.get(runnable)对runnable进行包装,或者使用TtlExecutors.getTtlExecutor(executor)对执行器进行包装,才能使线程池的变量传递起效果,那么我们就接着看一下源码的执行流程

TtlExecutors.getTtlExecutor(executor)

public static Executor getTtlExecutor(@Nullable Executor executor) {
    if (TtlAgent.isTtlAgentLoaded() || null == executor || executor instanceof TtlEnhanced) {
        return executor;
    }
    // 包装执行器
    return new ExecutorTtlWrapper(executor, true);
}
ExecutorTtlWrapper(@NonNull Executor executor, boolean idempotent) {
    this.executor = executor;
    this.idempotent = idempotent;
}
public void execute(@NonNull Runnable command) {
    // 实际上也是通过TtlRunnable对原runnable进行包装
    executor.execute(TtlRunnable.get(command, false, idempotent));
}

可以看到,两种方式原理一样,我们直接看TtlRunnable.get()

public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {

    if (null == runnable) return null;
    if (runnable instanceof TtlEnhanced) {
        if (idempotent) return (TtlRunnable) runnable;
        else throw new IllegalStateException("Already TtlRunnable!");
    }
    // 返回TtlRunnable
    return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}

构建TtlRunnable

private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
      // 原子引用
      this.capturedRef = new AtomicReference<Object>(capture());
      this.runnable = runnable;
      this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
  }

capture捕获父线程的ttl   

public static Object capture() {
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}

private static
HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
    HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value =
new HashMap<TransmittableThreadLocal<Object>, Object>();
    // 遍历了所有holder
 for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
  // copyValue实际上调用了TransmittableThreadLocal的get方法获取线程存储的变量值
    ttl2Value.put(threadLocal, threadLocal.copyValue());
 }
  return ttl2Value;
}

private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value =
new HashMap<ThreadLocal<Object>, Object>();
    for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry :
threadLocalHolder.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        final TtlCopier<Object> copier = entry.getValue();
        threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
    }
    return threadLocal2Value;
}

再看TtlRunnable的run方法

public void run() {
    // 获取Snapshot对象,里面存储了父线程的值
    final Object captured = capturedRef.get();
    if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
        throw new IllegalStateException("TTL value reference is released after run!");
    }
    // 传入capture方法捕获的ttl,然后在子线程重放,也就是调用ttl的set方法,
    // 这样就会把值设置到当前的线程中去,最后会把子线程之前存在的ttl返回
    final Object backup = replay(captured);
    try {
        // 调用原runnable的run
        runnable.run();
    } finally {
        // 
        restore(backup);
    }
}

总结

上述列举了4种方案,蜗牛这里推荐方案2和方案4,其中两种方案的缺点非常明显,实际开发中也是采用的方案2或者方案4。


最后说一句(求关注!)

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

关注公众号:woniuxgg,在公众号中回复:笔记  就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!

标签:get,Spring,Boot,static,线程,executor,new,public
From: https://blog.51cto.com/u_16502039/9231014

相关文章

  • 详解Java多线程之循环栅栏技术CyclicBarrier
    第1章:引言大家好,我是小黑,工作中,咱们经常会遇到需要多个线程协同工作的情况。CyclicBarrier,直译过来就是“循环屏障”。它是Java中用于管理一组线程,并让它们在某个点上同步的工具。简单来说,咱们可以把一群线程想象成一队马拉雪橇的驯鹿,CyclicBarrier就像是一个指定的集合点,所有驯......
  • Springcloud智慧工地管理云平台源码 AI智能识别
    智慧工地解决方案一、现状描述 建筑工程建设具有明显的生产规模大宗性与生产场所固定性的特点。建筑企业70%左右的工作都发生在施工现场,施工阶段的现场管理对工程成本、进度、质量及安全等至关重要。同时随着工程建设规模不断扩大,工艺流程纷繁复杂,如何搞好现场施工现场管理,控制......
  • golang进程(主线程)与协程
    概念主线程:golang中的主线程(在go中主线程就是进程,相比与其他编程语言叫法不一样)协程:golang中协程是轻量级的线程(相比于其他语言,只有进程和线程);python中有进程和线程的概念,也有协程的概念;python中的协程通过async来实现并发与并行的概念并发:在一个cpu上有10个线程,每个线程10......
  • Java之线程的详细解析二
     2.线程同步2.1卖票【应用】案例需求某电影院目前正在上映国产大片,共有100张票,而它有3个窗口卖票,请设计一个程序模拟该电影院卖票实现步骤定义一个类SellTicket实现Runnable接口,里面定义一个成员变量:privateinttickets=100;在SellTicket类中重写run()方法实现卖票,代码步骤如下......
  • Spring -- IOC 手写
    .java反射Java反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意方法和属性;这种动态获取信息以及动态调用对象方法的功能称为Java语言的反射机制。简单来说,反射机制指的是程序在运行时能够获取自身的信息。要想解剖一......
  • 多线程(Java.Thread)学习
    多线程(Java.Thread)学习线程简介:1、线程是独立执行的路径2、程序运行时有很多后台进程比如主线程、young.gc、full.gc()3、main是主线程,系统入口,用于执行整个程序4、一个进程中、如果开辟多个线程,线程的运行由调度器安排调度、调度器的先后顺序不能人为干预5、对同一份资......
  • Spring Boot和 Spring Cloud的区别;单体架构与微服务架构的区别以及优缺点
    SpringBoot简介SpringBoot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置。通过这种方式,SpringBoot致力于在蓬勃发展的快速应用开发领域(rapidapplicat......
  • 蚂蚁爱购--靠谱的SpringBoot项目
    ​简介这是一个靠谱的SpringBoot项目实战,名字叫蚂蚁爱购。从零开发项目,视频加文档,十天就能学会开发JavaWeb项目。教程路线是:搭建环境=>安装软件=>创建项目=>添加依赖和配置=>通过表生成代码=>编写Java代码=>代码自测=>前后端联调=>准备找工作。学完即可成为合格的Java......
  • 不雅文字过滤?springboot可以这样做!实战!
    这里主要讲敏感词过滤与替换两个功能,引入相关maven依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boo......
  • org.springframework.kafka.listener.ListenerExecutionFailedException: Listener me
    问题描述kafka在yml文件中未开启批量消费时,程序正常运行;但一开启正常消费后,就直接报错;排查问题的过程中一直觉得是配置文件里的问题,最后发现是消费者接受的参数类型错误 问题本质  消费者开启批量消费数据后,不能用单个实体类接收参数,而应该用list 解决方法  修改......