首页 > 编程语言 >集合类源码浅析のJDK1.8ConcurrentHashMap(下篇)

集合类源码浅析のJDK1.8ConcurrentHashMap(下篇)

时间:2024-11-17 19:18:28浏览次数:3  
标签:Node JDK1.8 int 源码 线程 tab sc null 浅析

文章目录


前言

  主要记录ConcurrentHashMap(笔记中简称CHM)的查询,删除,以及扩容方法的关键源码分析。


一、分段扩容

1、addCount

  扩容的逻辑主要在addCount方法的后半段:

    private final void addCount(long x, int check) {
    		//....前半段是进行桶数组中所有元素的累加计数
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            //满足条件就会一直循环:桶数组中所有元素的累加计数>=扩容阈值 并且 桶数组不为空 并且 桶数组的长度<最大长度
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                //通过对数组长度n进行计算得到的标记值
                int rs = resizeStamp(n);
                //sc为负数证明在哈希表正在扩容中(存在并发扩容的情况)
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //尝试将SIZECTL的值 + 1 代表有1个线程在协助扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //否则通过CAS的操作尝试将SIZECTL设置成一个负数标记
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    //真正扩容的逻辑                         
                    transfer(tab, null);
                //重新计算旧桶数组中所有元素的数量    
                s = sumCount();
            }
        }
    }

  其中resizeStamp(n),假设目前桶数组的length是16,那么计算出的值是:32795

        int rs = Integer.numberOfLeadingZeros(16) | (1 << (16 - 1));
        System.out.println("rs = " + rs);

  rs << RESIZE_STAMP_SHIFT) + 2计算出的是-2145714174。将SIZECTL设置为负数说明已经有一个线程去执行了扩容。

2、transfer

  transfer是真正执行扩容的逻辑:

	  //tab:旧表 nextTab:新表
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    		 //n 旧表的长度 stride 每个线程迁移的步长
        int n = tab.length, stride;
        //计算步长,最小为16 即每个线程最低负责处理16个桶数据的迁移
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        //新表还没有创建    
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                //创建新表,长度为旧表的2倍
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                //将新表赋值给nextTab 变量(注意,此时还没有真正把新表指向CHM的nextTab 属性)
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //这一步才是将新表指向CHM的nextTable 属性
            nextTable = nextTab;
            //旧表的长度作为迁移的索引
            transferIndex = n;
        }
        //nextn 新表的长度
        int nextn = nextTab.length;
        //将新表包装成ForwardingNode对象。
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //前进标记
        boolean advance = true;
        //结束标记
        boolean finishing = false; // to ensure sweep before committing nextTab

  到上面为止,只是初始化一些变量,以及在新表为空的情况下,创建出了新表,并赋值给了CHM的nextTable属性, 将旧表的长度赋值给CHM的transferIndex 属性,以及将新表包装成ForwardingNode对象,ForwardingNode的hash值是-1。

        //无限循环
        for (int i = 0, bound = 0;;) {
        		//f 是当前桶中的节点,fh 是节点的哈希值
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                //先将属性transferIndex赋值局部变量nextIndex。给如果 transferIndex 小于等于 0,表示没有任务可做,退出迁移过程。
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //通过CAS保证不会两个线程竞争到同一个区域
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }

  对于上面这一段的逻辑,可以画图表示,假设旧表的长度为4,每次迁移的步长为2:
在这里插入图片描述   在else if ((nextIndex = transferIndex) <= 0) 这一步,将nextIndex 赋值为了4:
在这里插入图片描述  在nextBound = (nextIndex > stride ? nextIndex - stride : 0))这一步,计算出nextBound = 4 - 2 = 2,并且将TRANSFERINDEX通过CAS改成了2
在这里插入图片描述
  最后经过bound = nextBound;i = nextIndex - 1; 计算出:
在这里插入图片描述  然后将advance = false,跳出while循环。
  上面的过程,假设又有另一个线程进入了while循环,在else if ((nextIndex = transferIndex) <= 0) 这一步,将nextIndex 赋值为了2(第一个线程通过CAS修改的):
在这里插入图片描述
  然后在nextBound = (nextIndex > stride ? nextIndex - stride : 0))这一步,计算出nextBound = 0,并且将TRANSFERINDEX通过CAS改成了0:
在这里插入图片描述  最后经过bound = nextBound;i = nextIndex - 1; 计算出:
在这里插入图片描述  也就是线程一负责2,3桶的迁移,线程二负责0,1桶的迁移。当然在不存在多线程并发的情况下,某个线程在迁移完了2,3桶后,依旧会判断是否要前进进行0,1桶的迁移。

					//i相比较于数组发生越界,当前线程已经将自己的区域转移完毕
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //结束标记为true时,这里不能仅仅根据当前线程判断结束标记,需要所有参与迁移的线程全部结束
                if (finishing) {
                		 //将CHM的临时新表指向null
                    nextTable = null;
                    //将CHM的table属性指向新表
                    table = nextTab;
                    //将CHM的sizeCtl 属性设置为 扩容阈值(计算是按照0.75的扩容因子进行的)
                    //n:32时 经过计算是 24
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //尝试CAS将SIZECTL - 1 标记当前线程已经完成了迁移。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                		 //仍然有其他线程未释放标记
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //所有线程都迁移完成
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }

  当某个线程在CAS成功,第一次进入transfer时,SIZECTL会被更改成一个负数,其他的线程需要帮助扩容,则会对SIZECTL + 1:
在这里插入图片描述  SIZECTL(sc)的初始值是 (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2:
在这里插入图片描述  而上面的代码片段中,if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 可以转换为 sc != resizeStamp(n) << RESIZE_STAMP_SHIFT) +2,代表依旧存在其他线程没有完成迁移,否则sc会复位成最初的 (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2

					//某个桶的头节点为空
            else if ((f = tabAt(tab, i)) == null)
                //就尝试通过CAS将null值替换为fwd节点
                advance = casTabAt(tab, i, null, fwd);
            //某个桶的头节点的hash值为-1 证明已经转换成了fwd节点
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
            		//对桶的头节点加锁,真正根据不同情况迁移的逻辑
                synchronized (f) {
                		  //再次判断某个桶的头节点有没有发生改变,链表的迁移
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //红黑树的迁移
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

  上面的代码片段,主要是迁移元素的逻辑,也是分为链表和红黑树,并且某个桶在迁移完成后,会标记为为fwd节点。其他线程在put时,发现当前桶的下标为fwd节点,则会走帮助扩容的逻辑:
在这里插入图片描述  链表的迁移,主要做了以下四步,和HashMap根据高低位进行分组是同样的思路:

  1. 遍历链表中的节点,找出链表中哈希值相同的分组。
  2. 根据分组将节点分别插入到两个新的链表中。
  3. 将这两个新链表插入到新的哈希表的相应位置。
  4. 将原来的链表标记为已处理。

  int runBit = fh & n; 是用当前节点的hash值去 & 旧表的长度,结果只有可能是0或者旧表的长度,例如:

        System.out.println(2365651 & 4);
        System.out.println(1926817 & 4);
        System.out.println(1848602 & 4);
        System.out.println(4731302 & 4);

在这里插入图片描述

3、helpTransfer

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        //会再次判断传入的tab 是否为空 以及桶下标位置的头节点是否为fwd 以及 fwd对象中的nextTable 是否为空
        //防止在帮助扩容的过程中,新表已经迁移完成并且赋值给了CHM的table属性
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            //根据旧表的长度计算得到一个标记
            int rs = resizeStamp(tab.length);
            //sizeCtl < 0 代表正在有其他线程在扩容
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                 //扩容已经结束  
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                 //尝试将SIZECTL + 1 标记当前线程正在参与扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            //返回临时的新表
            return nextTab;
        }
        //返回新表
        return table;
    }

二、查询

  这一段查询的逻辑,和HashMap类似,区别在于加入了对于fwd节点的判断,如果当前的节点是fwd节点,就去临时的新表中查询。

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        //再次判断桶数组是否不为空,以及根据hash算出的桶的头节点是否不为空
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //查找的节点是根据hash算出的桶的头节
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                		//直接返回值
                    return e.val;
            }
            //hash值小于0 是fwd节点 或者红黑树
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //处理链表的查找
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        //没有找到就返回空值
        return null;
    }

二、删除

    public V remove(Object key) {
        return replaceNode(key, null, null);
    }

  删除的逻辑,也加入了判断,如果当前要删除的桶的头节点是MOVED,则和新增元素一样,去走帮助扩容的逻辑,并且删除也是对单个桶去加锁的,其他的逻辑和HashMap大致相似,也是分为链表和红黑树,先遍历找出需要删除的元素,再执行删除逻辑:

    final V replaceNode(Object key, V value, Object cv) {
        int hash = spread(key.hashCode());
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            validated = true;
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        if (value != null)
                                            e.val = value;
                                        else if (pred != null)
                                            pred.next = e.next;
                                        else
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                pred = e;
                                if ((e = e.next) == null)
                                    break;
                            }
                        }
                        else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                if (validated) {
                    if (oldVal != null) {
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }

总结

  在CHM的新增/删除时,如果发现目标节点的状态是MOVED,就说明旧表数据正在被迁移,于是当前线程会走帮助扩容的逻辑。CHM的扩容迁移,也是分段进行的,每个线程会计算步长(最小为16),也就是至少负责对16个桶元素的迁移。
  如果是单线程的情况下,当前线程在处理完16个桶后,会继续处理后续的16个桶。如果是多线程的场景下,通过CAS保证不会有多个线程同时处理相同步长范围内的桶。并且链表/红黑树迁移的过程,也是和HashMap的类似,通过节点的hash值 & 旧表长度计算出高位低位进行分组迁移,当某个桶下标迁移完成后,会将当前位置标记为fwd节点。 最后需要等到所有的线程都迁移完成后,才会真正将新表的引用指向CHM的table属性。

下一篇:集合类源码浅析のJDK1.8 HashMap中的红黑树

标签:Node,JDK1.8,int,源码,线程,tab,sc,null,浅析
From: https://blog.csdn.net/2301_77599076/article/details/143750443

相关文章

  • 从源码角度分析JDK动态代理
    文章目录前言一、JDK动态代理二、动态代理的生成三、invoke的运行时调用总结前言  本篇从源码的角度,对JDK动态代理的实现,工作原理做简要分析。一、JDK动态代理  JDK动态代理是运行时动态代理的一种实现,相比较于CGLIB,目标对象必须实现接口,下面是一个简单案例......
  • 浅析注意力(Attention)机制
    Attention顾名思义,说明这项机制是模仿人脑的注意力机制建立的,我们不妨从这个角度展开理解2.1人脑的注意力机制人脑的注意力机制,就是将有限的注意力资源分配到当前关注的任务,或关注的目标之上,暂时忽略其他不重要的因素,这是人类利用有限的注意力资源从大量信息中快速筛选出高价值......
  • SpringBoot在线投票数据分析平台研究与设计8kxf0(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、项目背景与意义随着互联网技术的普及,在线投票活动逐渐成为各类组织、企业和个人进行决策和意见收集的重要手段。然而,如何高效地收集、整理和分......
  • SpringBoot在线教育系统a1q7y(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、项目背景与意义随着互联网技术的快速发展,教育领域正经历着前所未有的变革。在线教育系统作为数字化教育的重要载体,以其跨越时空限制、灵活便捷......
  • 基于Springboot+Vue的停车管理系统 (含源码数据库)
    1.开发环境开发系统:Windows10/11架构模式:MVC/前后端分离JDK版本:JavaJDK1.8开发工具:IDEA数据库版本:mysql5.7或8.0数据库可视化工具:navicat服务器:SpringBoot自带apachetomcat主要技术:Java,Springboot,mybatis,mysql,vue2.视频演示地址3.功能这个系......
  • 基于Springboot+Vue的中国蛇类识别系统 (含源码数据库)
    1.开发环境开发系统:Windows10/11架构模式:MVC/前后端分离JDK版本:JavaJDK1.8开发工具:IDEA数据库版本:mysql5.7或8.0数据库可视化工具:navicat服务器:SpringBoot自带apachetomcat主要技术:Java,Springboot,mybatis,mysql,vue2.视频演示地址3.功能这个系......
  • ssm131保险业务管理系统设计与实现+jsp(论文+源码)_kaic
     毕业设计(论文)题目:保险业务管理系统设计与实现      摘 要现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本保险业务管理系统就是在这样的大环境下诞生,其可以帮助管理者在短时......
  • 解读 DelayQueue 源码:探究其精妙的设计架构与实现细节
    一、简介DelayQueue是JUC包(java.util.concurrent)为我们提供的延迟队列,用于实现延时任务比如订单下单15分钟未支付直接取消。它是BlockingQueue的一种,底层是一个基于PriorityQueue实现的一个无界队列,是线程安全的BlockingQueue的实现类DelayQueue 中存放的元素......
  • flask火车购票系统(毕设源码+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于火车购票系统的研究,现有研究多侧重于传统购票方式的改进以及购票系统的基本功能实现。在国内外,随着信息技术的发展,购票系统已逐渐......
  • flask基于SpringBoot的私人物品管理平台(毕设源码+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景随着人们生活水平的提高,私人物品的数量和种类不断增加,如何有效地管理私人物品成为一个重要问题。关于私人物品管理平台的研究,现有研究......