首页 > 编程语言 >Java跨线程传递数据方式总结

Java跨线程传递数据方式总结

时间:2022-08-20 19:47:08浏览次数:80  
标签:threadParam Java Thread get t1 线程 传递数据 new

在微服务调用链中, 需要定义一个共享变量, 在整个调用链中传递

不跨线程的ThreadLocal


  • 线程变量, 在当前线程任意地方都可共享(可理解为同一线程内部的全局变量)
public class A {
    static final ThreadLocal<String> threadParam = new ThreadLocal<>();

    public static void main(String[] args) throws InterruptedException {
        while (true) {
            // 创建并启动线程t1
            new Thread(() -> {
                // 给线程t1的成员变量threadLocalMap_t1对象加入新Entry
                // 这个Entry的key为WeakReference<threadParam>, value为"abc"
                threadParam.set("abc");
                System.out.println("t1:" + threadParam.get());

                // 当线程结束, 其成员变量threadLocalMap也会随之销毁
                // 假如t1就此被销毁, 则此remove操作(主动将threadLocalMap_t1的key为threadParam的Entry删除)是可以省略的
                threadParam.remove();
            }).start();

            TimeUnit.SECONDS.sleep(1);

            // 创建并线程t2
            new Thread(() -> {
                // 由于线程2的成员变量threadLocalMap_t2内并没有key为threadParam的Entry, 因此获取不到value"abc"
                System.out.println("t2:" + threadParam.get()); // null
            }).start();
        }
    }
}
  • 线程复用: 为了减小创建和销毁线程的开支, 使用

    • 池化技术
    • 并发流
    • fork-join

    预先创建一定数量的备用线程, 这些线程用完后不会销毁, 而是供下一个runnable任务继续使用. 这会导致已完成任务的线程的threadLocalMap也一起被复用

public class B {
    static final ThreadLocal<String> threadParam = new ThreadLocal<>();

    public static void main(String[] args) throws InterruptedException {
        //固定池内只有存活3个线程
        ExecutorService execService = Executors.newFixedThreadPool(3);
        while (true) {
            // 创建t1线程
            Thread t1 = new Thread(()->{
                    threadParam.set("abc");
                    System.out.println("t1:" + threadParam.get());
                    // 如果不主动调用remove清除已完成任务的t1线程的threadLocalMap内的Entry, 将导致threadParam被后续线程复用
                    // threadParam.remove();
            });
            // 用线程池里的线程执行, 执行完不销毁
            execService.execute(t);

            TimeUnit.SECONDS.sleep(1);

            // 创建t2线程
            Thread t2 = new Thread(()-> {
                    System.out.println("t2:" + threadParam.get());
            });
            // 用线程池里的线程执行, 可能复用到t1里未销毁的threadLocalMap里的旧Entry
            execService.execute(t2); // 可能读到"abc"
        }
    }
}

同一进程内跨线程数据传递方案


匿名线程类/lambda引用父线程局部变量

//  匿名类中逻辑引用外部类变量时,是通过在匿名类定义成员变量,并通过生成有参构造函数传递变量值。
void foo() {
    final int i = 0;
    new Thread(() -> sout(i)).start();
}

可继承线程变量java.lang.InheritableThreadLocal

  • ITL可以跨线程传递数据

  • 原理, 注意只有在创建线程时才会执行

void init() {
    // 初始化一个线程时,获取当前线程,作为父线程
    Thread parent = currentThread();
    // 如果父线程的成员变量inheritableThreadLocals 不为空时, 则子线程复制一份作为自己的inheritableThreadLocals 
    if (parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
}
  • 测试
ThreadLocal<Integer> a = new ThreadLocal<>();
InheritableThreadLocal<Integer> b = new InheritableThreadLocal<>();

void main() {
    // 父线程中设置
    a.set(3);
    b.set(3);
    new Thread(() -> {
        // 子线程中读取
        sout(a.get()); // null
        sout(b.get()); // 3
    }).start();

    // 线程池
    ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>(5));
    for (int i = 0; i < 5; i++) {
        // 子线程中读取
        poolExecutor.submit(() -> {
            System.out.println(Thread.currentThread().getName() + ":" + a.get()); // 5 * pool-1-thread-1:null
            System.out.println(Thread.currentThread().getName() + ":" + b.get()); // 5 * pool-1-thread-1:3
        });
    }
}
  • ITL的缺陷: 可继承线程变量是存储在Thread对象上的,当Thread对象被复用时也会有问题
public class B {
    static final InheritableThreadLocal<String> threadParam = new InheritableThreadLocal<>();

    public static void main(String[] args) throws InterruptedException {
        //固定池内只有存活3个线程
        ExecutorService execService = Executors.newFixedThreadPool(3);
        //多循环几次, 看出各种复用效果
        while (true) {
            // 创建父线程t1,里面有两个子线程t2, t3
            Thread t1 = new Thread(() -> {
                // 父线程t1设置值
                threadParam.set("abc");
                System.out.println("t1:" + threadParam.get());

                // 子线程t2读取父线程t1
                Thread t2 = new Thread(() -> {
                    System.out.println("t2:" + threadParam.get());
                });
                execService.execute(t2);
                
                // 子线程t3读取父线程t1
                Thread t3 = new Thread(() -> {
                    System.out.println("t3:" + threadParam.get()); // 可能因为复用到t4的InheritableThreadLocal而打印CBA
                    // threadParam.remove();
                });
                execService.execute(t3);
            });
            execService.execute(t);

            TimeUnit.SECONDS.sleep(1);
            // 创建同级父线程t4, 与线程t1同级
            Thread t4 = new Thread(() -> {
                // 同级父线程t4设置值
                threadParam.set("CBA");
                System.out.println("t4:" + threadParam.get()); // 可能因为复用到t1的InheritableThreadLocal而打印abc
            });
            execService.execute(t4);
        }
    }
}

Runnable只是线程方法, Thread才是线程, 需要给Runnable加上一个线程的壳, 调用start()submit()才会使线程执行.

这里线程池只存活3个线程, 那么在线程池复用线程(壳)的时候, 壳的属性只有在创建的时候才会被重新设置值(如果有操作的话, 例如:InheritableThreadLocal, ThreadLocal).

这些壳被创建好以后提交给了线程池, 但是线程方法并没有马上执行, 然后被其他壳修改了属性.当这个线程方法开始执行的时候, 已经不是自己创建的壳了

这里线程3, 因为线程切换使用了被线程4修改以后的壳的属性.

加大线程池, 以满足每个线程方法独立使用一个线程只能保证第一次运行正确, 因为没有真正解决Thread(壳)重用的问题.

  • 解决重用:
    • 每次线程使用完主动清理线程变量, 防止壳重用
    • 自定义Runnable的实现类:
      • 添加一个类成员变量captured,专门用于存储线程变量
      • 在run()方法开头的位置设置线程变量, 即transmittable-thread-local

TtlCallable、TtlRunnable

public class B {
    static final TransmittableThreadLocal<String> threadParam = new TransmittableThreadLocal<>();

    public static void main(String[] args) throws InterruptedException {
        //固定池内只有存活3个线程
        ExecutorService execService = Executors.newFixedThreadPool(3);
        //多循环几次, 看出各种复用效果
        while (true) {
            // 创建父线程t1,里面有两个子线程t2, t3
            Thread t1 = new Thread(() -> {
                // 父线程t1设置值
                threadParam.set("abc");
                System.out.println("t1:" + threadParam.get());

                // 子线程t2读取父线程t1
                Thread t2 = new Thread(TtlRunnable.get(
                    () -> System.out.println("t2:" + threadParam.get());
                ));
                execService.execute(t2);
                
                // 子线程t3读取父线程t1
                Thread t3 = new Thread(TtlRunnable.get(
                    () -> System.out.println("t3:" + threadParam.get()); // 可能复用到t4的TransmittableThreadLocal, 但由于在run()时重置为父线程t1, 所以只打印abc
                    // threadParam.remove();
                ));
                execService.execute(t3);
            });
            execService.execute(t1);

            TimeUnit.SECONDS.sleep(1);
            // 创建同级父线程t4, 与线程t1同级
            Thread t4 = new Thread(() -> {
                // 同级父线程t4设置值
                threadParam.set("CBA");
                TtlRunnable.get(() -> System.out.println("t4:" + threadParam.get())); // 可能复用到t1的TransmittableThreadLocal, 但由于在run()时重置为同级父线程t4, 所以只打印CBA
            });
            execService.execute(t4);
        }
    }
}
package com.alibaba.ttl#TtlRunnable;

public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
    // capturedRef成员变量用于存储当前时刻的线程变量
    private final AtomicReference<Object> capturedRef;
    private final Runnable runnable;
    private final boolean releaseTtlValueReferenceAfterRun;

    private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        // capture()为快照方法, 扫描并将当前线程当前时刻的ThreadLocal + TTL引用复制到原子引用里
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
    }

    // CRR操作: capture -> replay -> restore
    @Override
    public void run() {
        // 1. 抓取A线程的TTL + threadLocal值, 存为captured
        final Object captured = capturedRef.get();
        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }

        // 2. 用在A线程中抓取的变量, 挤掉当前线程B的线程变量
        // 同时, 保存挤掉前B的TTL + threadLocal, 用于执行完后进行恢复
        final Object backup = replay(captured);
        try {
            runnable.run();
        } finally {
            // 3. 恢复B的线程变量为挤掉前的状态
            restore(backup);
        }
    }
}

封装线程池TtlExecutors

  • 使用TtlRunnable需要改写原有的Runnable, 工程量大
  • 业务代码都是通过调用线程池java.util.concurrent.ExecutorService#submit(...)方法向线程池提交任务
  • 改写submit()方法,让TTL框架来包装Runnable,这样创建线程的代码就无需改造了
public class B {
    static final TransmittableThreadLocal<String> threadParam = new TransmittableThreadLocal<>();

    public static void main(String[] args) throws InterruptedException {
        // 只需在创建线程池时封装submit方法即可
        ExecutorService execService = TtlExecutors.getTtlExecutor(Executors.newFixedThreadPool(3));

        while (true) {
            Thread t1 = new Thread(() -> {
                threadParam.set("abc");
                System.out.println("t1:" + threadParam.get());

                Thread t2 = new Thread(() -> System.out.println("t2:" + threadParam.get()));
                execService.execute(t2);
                
                Thread t3 = new Thread(() -> System.out.println("t3:" + threadParam.get())); //abc
                execService.execute(t3);
            });
            execService.execute(t);

            TimeUnit.SECONDS.sleep(1);

            Thread t4 = new Thread(() -> {
                threadParam.set("CBA");
                System.out.println("t4:" + threadParam.get()); // CBA
            });
            execService.execute(t4);
        }
    }
}
  • 原理
class ExecutorTtlWrapper implements Executor, TtlWrapper<Executor>, TtlEnhanced {
    private final Executor executor;
    protected final boolean idempotent;

    ExecutorTtlWrapper(@NonNull Executor executor, boolean idempotent) {
        this.executor = executor;
        this.idempotent = idempotent;
    }

    @Override
    public void execute(@NonNull Runnable command) {
        // 重写execute方法, 使用TtlRunnable
        executor.execute(TtlRunnable.get(command, false, idempotent));
    }

TtlAgent

  • JDK定义原生线程池的字节码做了保护, 防止篡改
  • 但是使用线程池的地方生成的字节码不属于JDK,可以修改(UT mock框架也是类似原理)
  • 通过javaagent, 在类加载期修改字节码
  • 无侵入, 但启动时长明显加大,这主要是因为在加载class时需要对字节码的搜索、修改。

Disruptor

  • 以上方案都是基于线程复用技术(底层为BlockingQueue)的跨线程交流方案
  • Disruptor则是基于多播

跨进程跨线程数据传递方案


  • RCP
  • Kafka
  • HTTP/HTTPS

标签:threadParam,Java,Thread,get,t1,线程,传递数据,new
From: https://www.cnblogs.com/rellik96/p/16593067.html

相关文章

  • 多线程
    使用场景异步化并发化削峰填谷ThreadThreadt=newThread(()->sout("hellofrom:")+Thread.currentThread().getName());t.start();不建议使用,数量无......
  • 阿里巴巴Java开发手册 (黄山版)
    阿里巴巴Java开发手册(黄山版)1.变量命名1.1POJO类中的任何布尔类型的变量,都不要加is前缀,否则部分框架解析会引起序列化错误。说明:本文MySQL规约中的建表约定第......
  • Java基础语法
    Java基础语法一、注释注释不会被编译和执行,但是可以提高代码的可读性和可维护性。单行注释//注释内容多行注释/*注释内容*/文档注释/***注释内容*/......
  • Java集合框架
    Java集合一、定义1、对象的容器,定义了对多个对象进行操作的常用方法,可实现数组的功能集合所在包为:Java。util.*2、集合和数组区别:数组长度固定,集合长度不固定数组可......
  • JAVA基础知识和安装
    java基础知识java的类型JAVASE标准版主要用于桌面JAVAME移动版主要用于手机JAVAEE企业版主要用于服务器JDKJREJBMJDKjavaDevelopmentKitJREJavaRuntim......
  • 记Window 10 WSL 下运行hbase 本机模式的一个错误及解决: /bin/java: No such file or
    运行环境及问题描述:系统:Window10WSL(LinuxUbuntu)在window环境里安装了JDK11至目录:C:\Application\Java\jdk-11.0.16,并设置了JAVA_HOME环境变量在WSLUbuntu也......
  • Java List转 JSONObject
    JavaList转JSONObjectpom.xml<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>......
  • 多线程基础知识!!!
    目录1.线程创建的三种方式1.1、继承Thread类(重点)1.2、实现Runnable接口(重点,推荐)1.3、实现Callable接口(了解)2.线程的五大状态3.Lamda表达式4.线程初进阶5.线程同步5.1、了解......
  • JavaScript中的运动(2)
    运动swiper插件(内置css和js)概述:swiper是一个开源的免费的一个滚动的组件(他可以运用于轮播图焦点图滑动效果等)内置的Demo(演示)他里面包含对应的css(以class的形式......
  • JAVA基础--类型转换--2022年8月20日
    第一节1、为什么要进行类型转换存在不同类型的变量给赋值给其他类型的变量2、自动类型转换是什么样的类型范围小的变量,可以直接赋值给类型范围大的变量 第......