首页 > 其他分享 >JUC工具(Exchange)

JUC工具(Exchange)

时间:2024-04-25 19:44:06浏览次数:31  
标签:slot JUC Exchange 交换 arena 线程 Exchanger 工具 null

Exchanger(交换器),顾名思义,用于两个线程之间进行数据交换

两个线程通过 exchange() 方法交换数据,如果第一个线程先执行 exchange()方法,它会一直等待第二个线程也执行 exchange 方法,当两个线程都到达同步点时,这两个线程就可以交换数据

API方法

构造方法

  • Exchanger():创建一个交换器

常用方法

  • V exchange(V x): 交换数据,如果只有一个线程,会阻塞,直到另外一个线程也调用exchange, 支持中断
  • V exchange(V x, long timeout, TimeUnit unit): 带超时参数的交换数据

Exchanger实现机制

for (;;) {
    if (slot is empty) { // offer
        // slot为空时,将item 设置到Node 中        
        place item in a Node;
        if (can CAS slot from empty to node) {
            // 当将node通过CAS交换到slot中时,挂起线程等待被唤醒
            wait for release;
            // 被唤醒后返回node中匹配到的item
            return matching item in node;
        }
    } else if (can CAS slot from node to empty) { // release
         // 将slot设置为空
        // 获取node中的item,将需要交换的数据设置到匹配的item
        get the item in node;
        set matching item in node;
        // 唤醒等待的线程
        release waiting thread;
    }
    // else retry on CAS failure
}

比如有2条线程A和B,A线程交换数据时,发现slot为空,则将需要交换的数据放在slot中等待其它线程进来交换数据,等线程B进来,读取A设置的数据,然后设置线程B需要交换的数据,然后唤醒A线程,原理就是这么简单。但是当多个线程之间进行交换数据时就会出现问题,所以Exchanger加入了slot数组。

Exchanger源码解析

内部类 - Participant

static final class Participant extends ThreadLocal<Node> {
    public Node initialValue() { return new Node(); }
}

Participant的作用是为每个线程保留唯一的一个Node节点, 它继承ThreadLocal,说明每个线程具有不同的状态。

内部类 - Node

@sun.misc.Contended static final class Node {
     // arena的下标,多个槽位的时候利用
    int index; 
    // 上一次记录的Exchanger.bound
    int bound; 
    // 在当前bound下CAS失败的次数;
    int collides;
    // 用于自旋;
    int hash; 
    // 这个线程的当前项,也就是需要交换的数据;
    Object item; 
    //做releasing操作的线程传递的项;
    volatile Object match; 
    //挂起时设置线程值,其他情况下为null;
    volatile Thread parked;
}

在Node定义中有两个变量值得思考:bound以及collides。前面提到了数组area是为了避免竞争而产生的,如果系统不存在竞争问题,那么完全没有必要开辟一个高效的arena来徒增系统的复杂性。首先通过单个slot的exchanger来交换数据,当探测到竞争时将安排不同的位置的slot来保存线程Node,并且可以确保没有slot会在同一个缓存行上。如何来判断会有竞争呢? CAS替换slot失败,如果失败,则通过记录冲突次数来扩展arena的尺寸,我们在记录冲突的过程中会跟踪“bound”的值,以及会重新计算冲突次数在bound的值被改变时。

核心属性

private final Participant participant;
private volatile Node[] arena;
private volatile Node slot;
  • 为什么会有 arena数组槽?

slot为单个槽,arena为数组槽, 他们都是Node类型。在这里可能会感觉到疑惑,slot作为Exchanger交换数据的场景,应该只需要一个就可以了啊? 为何还多了一个Participant 和数组类型的arena呢? 一个slot交换场所原则上来说应该是可以的,但实际情况却不是如此,多个参与者使用同一个交换场所时,会存在严重伸缩性问题。既然单个交换场所存在问题,那么我们就安排多个,也就是数组arena。通过数组arena来安排不同的线程使用不同的slot来降低竞争问题,并且可以保证最终一定会成对交换数据。但是Exchanger不是一来就会生成arena数组来降低竞争,只有当产生竞争是才会生成arena数组

  • 那么怎么将Node与当前线程绑定呢?

Participant,Participant 的作用就是为每个线程保留唯一的一个Node节点,它继承ThreadLocal,同时在Node节点中记录在arena中的下标index。

构造函数

/**
* Creates a new Exchanger.
*/
public Exchanger() {
    participant = new Participant();
}

初始化participant对象。

核心方法 - exchange(V x)

等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。

public V exchange(V x) throws InterruptedException {
    Object v;
    // 当参数为null时需要将item设置为空的对象
    Object item = (x == null) ? NULL_ITEM : x; // translate null args
    // 注意到这里的这个表达式是整个方法的核心
    if ((arena != null ||
            (v = slotExchange(item, false, 0 L)) == null) &&
        ((Thread.interrupted() || // disambiguates null return
            (v = arenaExchange(item, false, 0 L)) == null)))
        throw new InterruptedException();
    return (v == NULL_ITEM) ? null : (V) v;
}

这个方法比较好理解:arena为数组槽,如果为null,则执行slotExchange()方法,否则判断线程是否中断,如果中断值抛出InterruptedException异常,没有中断则执行arenaExchange()方法。整套逻辑就是:如果slotExchange(Object item, boolean timed, long ns)方法执行失败了就执行arenaExchange(Object item, boolean timed, long ns)方法,最后返回结果V。

NULL_ITEM 为一个空节点,其实就是一个Object对象而已,slotExchange()为单个slot交换。

slotExchange(Object item, boolean timed, long ns)

private final Object slotExchange(Object item, boolean timed, long ns) {
    // 获取当前线程node对象
    Node p = participant.get();
    // 当前线程
    Thread t = Thread.currentThread();
    // 若果线程被中断,就直接返回null
    if (t.isInterrupted()) // preserve interrupt status so caller can recheck
        return null;
	// 自旋
    for (Node q;;) {
        // 将slot值赋给q
        if ((q = slot) != null) {
             // slot 不为null,即表示已有线程已经把需要交换的数据设置在slot中了
			// 通过CAS将slot设置成null
            if (U.compareAndSwapObject(this, SLOT, q, null)) {
                // CAS操作成功后,将slot中的item赋值给对象v,以便返回。
                // 这里也是就读取之前线程要交换的数据
                Object v = q.item;
                // 将当前线程需要交给的数据设置在q中的match
                q.match = item;
                 // 获取被挂起的线程
                Thread w = q.parked;
                if (w != null)
                    // 如果线程不为null,唤醒它
                    U.unpark(w);
                // 返回其他线程给的V
                return v;
            }
            // create arena on contention, but continue until slot null
            // CAS 操作失败,表示有其它线程竞争,在此线程之前将数据已取走
            // NCPU:CPU的核数
            // bound == 0 表示arena数组未初始化过,CAS操作bound将其增加SEQ
            if (NCPU > 1 && bound == 0 &&
                U.compareAndSwapInt(this, BOUND, 0, SEQ))
                // 初始化arena数组
                arena = new Node[(FULL + 2) << ASHIFT];
        }
        // 上面分析过,只有当arena不为空才会执行slotExchange方法的
		// 所以表示刚好已有其它线程加入进来将arena初始化
        else if (arena != null)
            // 这里就需要去执行arenaExchange
            return null; // caller must reroute to arenaExchange
        else {
            // 这里表示当前线程是以第一个线程进来交换数据
            // 或者表示之前的数据交换已进行完毕,这里可以看作是第一个线程
            // 将需要交换的数据先存放在当前线程变量p中
            p.item = item;
            // 将需要交换的数据通过CAS设置到交换区slot
            if (U.compareAndSwapObject(this, SLOT, null, p))
                // 交换成功后跳出自旋
                break;
            // CAS操作失败,表示有其它线程刚好先于当前线程将数据设置到交换区slot
            // 将当前线程变量中的item设置为null,然后自旋获取其它线程存放在交换区slot的数据
            p.item = null;
        }
    }

    // await release
    // 执行到这里表示当前线程已将需要的交换的数据放置于交换区slot中了,
    // 等待其它线程交换数据然后唤醒当前线程
    int h = p.hash;
    long end = timed ? System.nanoTime() + ns : 0 L;
    // 自旋次数
    int spins = (NCPU > 1) ? SPINS : 1;
    Object v;
    // 自旋等待直到p.match不为null,也就是说等待其它线程将需要交换的数据放置于交换区slot
    while ((v = p.match) == null) {
        // 下面的逻辑主要是自旋等待,直到spins递减到0为止
        if (spins > 0) {
            h ^= h << 1;
            h ^= h >>> 3;
            h ^= h << 10;
            if (h == 0)
                h = SPINS | (int) t.getId();
            else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                Thread.yield();
        } else if (slot != p)
            spins = SPINS;
        // 此处表示未设置超时或者时间未超时
        else if (!t.isInterrupted() && arena == null &&
            (!timed || (ns = end - System.nanoTime()) > 0 L)) {
            // 设置线程t被当前对象阻塞
            U.putObject(t, BLOCKER, this);
            // 给p挂机线程的值赋值
            p.parked = t;
            if (slot == p)
                // 如果slot还没有被置为null,也就表示暂未有线程过来交换数据,需要将当前线程挂起
                U.park(false, ns);
            // 线程被唤醒,将被挂起的线程设置为null
            p.parked = null;
            // 设置线程t未被任何对象阻塞
            U.putObject(t, BLOCKER, null);
        // 不是以上条件时(可能是arena已不为null或者超时)    
        } else if (U.compareAndSwapObject(this, SLOT, p, null)) {
             // arena不为null则v为null,其它为超时则v为超市对象TIMED_OUT,并且跳出循环
            v = timed && ns <= 0 L && !t.isInterrupted() ? TIMED_OUT : null;
            break;
        }
    }
    // 取走match值,并将p中的match置为null
    U.putOrderedObject(p, MATCH, null);
    // 设置item为null
    p.item = null;
    p.hash = h;
    // 返回交换值
    return v;
}

程序首先通过participant获取当前线程节点Node。检测是否中断,如果中断return null,等待后续抛出InterruptedException异常。

  • 如果slot不为null,则进行slot消除,成功直接返回数据V,否则失败,则创建arena消除数组。
  • 如果slot为null,但arena不为null,则返回null,进入arenaExchange逻辑。
  • 如果slot为null,且arena也为null,则尝试占领该slot,失败重试,成功则跳出循环进入spin+block(自旋+阻塞)模式。

在自旋+阻塞模式中,首先取得结束时间和自旋次数。如果match(做releasing操作的线程传递的项)为null,其首先尝试spins+随机次自旋(改自旋使用当前节点中的hash,并改变之)和退让。当自旋数为0后,假如slot发生了改变(slot != p)则重置自旋数并重试。否则假如:当前未中断&arena为null&(当前不是限时版本或者限时版本+当前时间未结束):阻塞或者限时阻塞。假如:当前中断或者arena不为null或者当前为限时版本+时间已经结束:不限时版本:置v为null;限时版本:如果时间结束以及未中断则TIMED_OUT;否则给出null(原因是探测到arena非空或者当前线程中断)。

match不为空时跳出循环。

image.png

当 slot 被别是线程使用了,那么就需要创建一个 arena 的数组了。通过操纵数组里面的元素来实现数据交换。

用@sun.misc.Contended来规避伪共享?

伪共享说明:假设一个类的两个相互独立的属性a和b在内存地址上是连续的(比如FIFO队列的头尾指针),那么它们通常会被加载到相同的cpu cache line里面。并发情况下,如果一个线程修改了a,会导致整个cache line失效(包括b),这时另一个线程来读b,就需要从内存里再次加载了,这种多线程频繁修改ab的情况下,虽然a和b看似独立,但它们会互相干扰,非常影响性能。

我们再看Node节点的定义, 在Java 8 中我们是可以利用sun.misc.Contended来规避伪共享的。所以说通过 << ASHIFT方式加上sun.misc.Contended,所以使得任意两个可用Node不会再同一个缓存行中。

@sun.misc.Contended static final class Node{
....
}

我们再次回到arenaExchange()。取得arena中的node节点后,如果定位的节点q 不为空,且CAS操作成功,则交换数据,返回交换的数据,唤醒等待的线程。

  • 如果q等于null且下标在bound & MMASK范围之内,则尝试占领该位置,如果成功,则采用自旋 + 阻塞的方式进行等待交换数据。
  • 如果下标不在bound & MMASK范围之内获取由于q不为null但是竞争失败的时候:消除p。加入bound 不等于当前节点的bond(b != p.bound),则更新p.bound = b,collides = 0 ,i = m或者m - 1。如果冲突的次数不到m 获取m 已经为最大值或者修改当前bound的值失败,则通过增加一次collides以及循环递减下标i的值;否则更新当前bound的值成功:我们令i为m+1即为此时最大的下标。最后更新当前index的值。

更深入理解

  • SynchronousQueue对比?

Exchanger是一种线程间安全交换数据的机制。可以和之前分析过的SynchronousQueue对比一下:线程A通过SynchronousQueue将数据a交给线程B;线程A通过Exchanger和线程B交换数据,线程A把数据a交给线程B,同时线程B把数据b交给线程A。可见,SynchronousQueue是交给一个数据,Exchanger是交换两个数据。

  • 不同JDK实现有何差别?
    • 在JDK5中Exchanger被设计成一个容量为1的容器,存放一个等待线程,直到有另外线程到来就会发生数据交换,然后清空容器,等到下一个到来的线程。
    • 从JDK6开始,Exchanger用了类似ConcurrentMap的分段思想,提供了多个slot,增加了并发执行时的吞吐量。

JDK1.6实现可以参考 这里在新窗口打开

Exchanger示例

来一个非常经典的并发问题:你有相同的数据buffer,一个或多个数据生产者,和一个或多个数据消费者。只是Exchange类只能同步2个线程,所以你只能在你的生产者和消费者问题中只有一个生产者和一个消费者时使用这个类。

public class Test {
    static class Producer extends Thread {
        private Exchanger<Integer> exchanger;
        private static int data = 0;
        Producer(String name, Exchanger<Integer> exchanger) {
            super("Producer-" + name);
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            for (int i=1; i<5; i++) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = i;
                    System.out.println(getName()+" 交换前:" + data);
                    data = exchanger.exchange(data);
                    System.out.println(getName()+" 交换后:" + data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer extends Thread {
        private Exchanger<Integer> exchanger;
        private static int data = 0;
        Consumer(String name, Exchanger<Integer> exchanger) {
            super("Consumer-" + name);
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (true) {
                data = 0;
                System.out.println(getName()+" 交换前:" + data);
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = exchanger.exchange(data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(getName()+" 交换后:" + data);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        new Producer("", exchanger).start();
        new Consumer("", exchanger).start();
        TimeUnit.SECONDS.sleep(7);
        System.exit(-1);
    }
}

可以看到,其结果可能如下:

Consumer- 交换前:0
Producer- 交换前:1
Consumer- 交换后:1
Consumer- 交换前:0
Producer- 交换后:0
Producer- 交换前:2
Producer- 交换后:0
Consumer- 交换后:2
Consumer- 交换前:0
Producer- 交换前:3
Producer- 交换后:0
Consumer- 交换后:3
Consumer- 交换前:0
Producer- 交换前:4
Producer- 交换后:0
Consumer- 交换后:4
Consumer- 交换前:0

参考博客:https://pdai.tech/md/java/thread/java-thread-x-juc-tool-exchanger.html

标签:slot,JUC,Exchange,交换,arena,线程,Exchanger,工具,null
From: https://www.cnblogs.com/luojw/p/18158440

相关文章

  • 【笔记】拓扑图工具调研
    一、在线拓扑图编辑工具零代码三维地图开发http://www.emapgis.com/数字孪生https://www.hightopo.com/demos/index.htmlvue-antvx6-demo推荐。https://gitee.com/yanggengzhen/vue-antvx6-demo/tree/masterhttps://qunee.com/数百个HTML5例子学习HT图形组件–WebGL3D......
  • 日期工具类,获取全年每月有几周,分别是多少,且每周对应的是几号到几号
    一、概述项目需要做日程组件,其中涉及到了日历这块的内容。需求:1.获取全年有多少个月2.获取每月有多少个周3.获取每月每一天对应的是星期几4.单独获取某一天对应的是星期几5.把以上四条组织成为一个集合二、代码示例/**......
  • shell脚本文本处理工具
    声明:以下内容为个人笔记,内容不完全正确,请谨慎参考。文本处理工具cut:cut工作是“剪”,具体来说就是在文件中负责剪切数据。cut命令从文件的每个行剪切字节、字符和字段输出。1、基本语法:cut[选项参数]filename说明:默认分隔符是副表符2、选项参数说明选项参数:1)-f2)-......
  • 使用Spring HttpExchange替代FeignClient进行http远程服务调用
    背景springboot3.0使用的springframework6.0里有一个全新的http服务调用注解@HttpExchange,该注解的用途是可以进行申明式http远程服务调用。与Feign作用相同,在springboot3.x里,由于本身spring内置,相比Feign可以大幅减少第三方包依赖,且比Feign进轻巧。依赖:@HttpExchange位......
  • WPF 使用 ManipulationDemo 工具辅助调试设备触摸失效问题
    本文将和大家介绍我所在的团队开源的ManipulationDemo工具。通过ManipulationDemo工具可以提升调试设备触摸失效的效率此工具在GitHub上完全开源,请看https://github.com/dotnet-campus/ManipulationDemo/软件界面效果大概如下可以显示接收到的Win32消息、当前的触摸......
  • 推荐一个使用 HardLink 硬链接减少重复文件占用磁盘空间的工具
    在NTFS文件系统里面,咱可以使用HardLink硬链接的方式,将多个重复的文件链接到磁盘的同一份记录里面,从而减少在磁盘里面对重复文件存储多份记录,减少磁盘空间的占用。本文将和大家推荐我所做的基于HardLink硬链接减少重复文件占用磁盘空间的工具此工具名为UsingHardLinkToZipN......
  • 开源渗透测试工具--关于数据库
        SQLmap是一个功能强大的开源渗透测试工具SQLmap是一个功能强大的开源渗透测试工具,它自动化了检测和利用数据库驱动的Web应用程序中的SQL注入漏洞的过程。它旨在识别Web应用程序中各种类型的SQL注入漏洞,并帮助安全专业人员或黑客评估Web应用程序后端数据......
  • 利用VS(Visual Studio)自带的工具查看DLL文件相关信息
    装完VS后,就可以使用其自带的dumpbin命令来查看DLL文件的信息,首先在开始菜单中打开VS的DeveloperCommandPrompt命令窗打开后,输入dumpbin后,按Enter,会显示dumpbin的使用参数  查看DLL文件的方法有两种:1.使用dumpbin命令:dumpbin/exportsC:\Users\Administrator\D......
  • golang工具函数,把一个金额整型,单位为分,转成"1,231,111.00"格式的字符串
    这个函数首先将整数除以100来获取代表元的浮点数,然后格式化此数值为两位小数的字符串。接下来,函数将字符串分成整数和小数部分,并且为整数部分添加千位分隔符。最后,如果存在小数部分,它会将这两部分重新组合并返回正确格式化的金额字符串。为了正确地处理负数,我们需要先检查金额是......
  • ETL工具-nifi干货系列 第十六讲 nifi Process Group实战教程,一文轻松搞定
    1、目前nifi系列已经更新了10多篇教程了,跟着教程走的同学应该已经对nifi有了初步的解,但是我相信同学们应该有一个疑问:nifi设计好的数据流列表在哪里?如何同时运行多个数据流?如启停单个数据流?带着这些疑问,今天的主角nifiProcessGroup正式登场,先给大家看个图。2、ProcessGroup(......