首页 > 编程语言 >集合类源码浅析のJDK1.8ConcurrentHashMap(上篇)

集合类源码浅析のJDK1.8ConcurrentHashMap(上篇)

时间:2024-11-17 19:19:11浏览次数:3  
标签:JDK1.8 int value 累加 源码 线程 数组 null 浅析

文章目录


前言

  本篇是JDK1.8的ConcurrentHashMap源码个人学习笔记,ConcurrentHashMap(笔记中简称CHM)是一种线程安全的HashMap,1.8中废弃了segment数组,而是对单个桶进行线程安全控制,并且引入了分段计数,多线程扩容的思想,限于篇幅,上篇重点记录增加元素以及分段计数的源码实现


一、概述

  HashMap是线程不安全的,尤其是在1.7中的插入元素使用的头插法,会导致多线程下扩容死链的问题,使CPU直接100%。如果要保证线程安全,可以使用HashTable和CHM。相比于HashTable简单粗暴地在每个方法上加悲观锁,CHM的效率会更高一些,一方面在于,CHM是悲观锁与CAS操作相结合,并且锁的粒度更细,精确到每个桶下标。因为假设A,B两个线程同时要将自己的元素插入CHM,元素的位置又不在同一个桶下标,如果锁整个Table,无疑是一种性能上的浪费。
  而计算Table中所有的桶中的元素,使用的是分段计数的思想,即每个线程创建自己的累加数组,累加数组是独立于Table的,也可以进行扩容,最终再进行合计(在JUC的LongAddr中使用的也是这样的思想)。扩容也是多线程参与的,每个线程参与一定步长的桶的元素迁移。

二、CHM的属性

1、属性

  因为CHM的属性和HashMap的一些是相同的,所以只介绍CHM特有的重要属性

	  //当某个节点被从一个桶迁移到另一个桶时,原位置的节点会被标记为 MOVED,以防止重复操作或访问已经“转发”的节点。
    static final int MOVED     = -1; // hash for forwarding nodes
	  //用于标识一个桶中的链表已经被转换成树形结构。
    static final int TREEBIN   = -2; // hash for roots of trees
    //用于表示一个桶位置正在被暂时保留。它通常出现在扩容或重哈希过程中,表示某个位置正在等待数据填充或进一步处理。
    static final int RESERVED  = -3; // hash for transient reservations
    //保留 int 类型哈希值的前 31 位,因为 int 类型的哈希值为 32 位,最高位通常用于标识负数(符号位),而 HASH_BITS 会屏蔽掉符号位,只保留 31 位有效的哈希值部分。
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

  桶的容量,只有一个线程可以对其进行修改。

	  private transient volatile long baseCount;

  用于指示哈希表的当前状态:

  • 在创建CHM时,用来表示初始的哈希表大小(或容量)。
  • sizeCtl 还用于控制何时触发扩容。
  • sizeCtl 在负数时表示哈希表正在扩容中。
  • sizeCtl 在正数值表示哈希表的初始容量或扩容的阈值。

在这里插入图片描述

	  private transient volatile int sizeCtl;

  transferIndex 由一个线程初始化,并在扩容过程中逐渐递增。每个线程通过读取和更新 transferIndex,知道自己需要迁移哪些桶。
  假设需要扩容,原来有 16 个桶,而扩容后会有 32 个桶。在迁移过程中,transferIndex 会从 16 开始,每个线程负责将数据从桶 16 到桶 31 迁移到新数组中的相应位置。迁移完成后,transferIndex 更新为 32,表示所有桶已完成迁移。

    private transient volatile int transferIndex;

  在扩容的过程中,cellsBusy 会作为一种标识,表示是否有线程正在修改当前的桶区域。值为 1 时,表示当前有线程正在执行某个桶的扩容任务;为 0 时,表示当前没有线程在处理这些任务。

    private transient volatile int cellsBusy;

  累加单元组成的数组

    private transient volatile CounterCell[] counterCells;

三、新增方法

1、put

    public V put(K key, V value) {
        return putVal(key, value, false);
    }

  putVal是新增的主要逻辑,和HashMap的思路大致相同,也是分为了table为空,插入位置为链表的头节点,链表插入和红黑树插入的逻辑。区别在于:

  • 如果发现数组正在扩容,该线程会帮助数组进行元素的迁移。
  • 要插入位置的桶中头元素为空时,是通过CAS的操作保证插入元素的线程安全。
  • 其他情况都是通过悲观锁锁定单个下标以保证线程安全。
    final V putVal(K key, V value, boolean onlyIfAbsent) {
    		 //和HashMap不同的是,CHM的键值强制不能为空,否则抛出异常
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        //binCount 记录桶中的元素数量
        int binCount = 0;
        //先将属性table赋值给tab 数组,然后无限循环直到满足条件退出
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //CHM的table为空,说明是第一次插入元素,就走初始化桶数组的逻辑
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //通过hash计算,将要插入位置的桶中头元素为空,说明该桶还没有元素
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            		//通过CAS的方式将k,v包装成node对象放入桶的头部位置
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    //结束
                    break;                   // no lock when adding to empty bin
            }
            //通过hash计算,将要插入位置的桶的hash值为-1,证明该数组正在扩容,就走帮助扩容的逻辑
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
            	  //某个桶下标已经存在元素
                V oldVal = null;
                //加悲观锁
                synchronized (f) {
                		//f是某个桶的头节点。
                    if (tabAt(tab, i) == f) {
                    		//该数组不是正在扩容
                        if (fh >= 0) {
                        		//初始化该桶下标的元素为1
                            binCount = 1;
                            //遍历链表,循环一次binCount +1 记录该桶下标中元素的数量
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //如果是TreeBin的类型就走红黑树的逻辑
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //某个桶下标中元素的数量不为0
                if (binCount != 0) {
                		 //超过阈值就走转换为红黑树的逻辑
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    //有key相同的就返回value    
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //多线程并发对table进行操作时,统计各自桶下标中元素的数量
        addCount(1L, binCount);
        return null;
    }

  并且CHM中的红黑树,被包装成了TreeBin,是对HashMap的TreeNode的二次包装:
在这里插入图片描述  具体为什么要这样设计,是因为对红黑树进行元素新增的过程是上锁的,并且锁的是桶下标的头节点,红黑树在插入的过程中,是有可能进行旋转的,也就是root节点会发生改变,例如A线程,进入了插入方法,获取到的锁是20的节点,插入了一个元素,导致了红黑树进行旋转(20不再是根节点),另一个B线程,也进入了插入方法,拿到的锁就不是同一个对象了。
  使用了TreeBin后,是对TreeBin整个对象加锁,并不用关心内部的红黑树有没有进行翻转,根节点变不变化,多个线程拿到的都是同一个锁对象。
在这里插入图片描述

2、initTable

  再来看下initTable()方法:

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        //进入后会再次判断表是否为空,双重检查
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //线程A尝试将SIZECTL属性从0改成-1,成功了另一个线程在循环时会Thread.yield(); 让出cpu执行权
            //保证表只被初始化一次
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                		 //多次判断表是否为空
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

四、分段计数

1、addCount

    private final void addCount(long x, int check) {
    		 //as 累加数组
        CounterCell[] as; long b, s;
        //进入这个分支的条件:
        //1、累加数组不为空(首先会将counterCells属性赋值给as)
        if ((as = counterCells) != null ||
        		//或2、尝试CAS改变BASECOUNT失败,证明存在并发访问桶数组的情况,BASECOUNT只能被一个线程修改。
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            //a 累加单元
            CounterCell a; long v; int m;
            boolean uncontended = true;
            //进入这个分支的条件:
            //累加数组为空 或数组长度<0 或线程唯一的随机数和数组长度与运算得到的累加数组下标对应的累加单元为空 
            //或CAS尝试将线程唯一的随机数和数组长度与运算得到的累加数组下标对应的累加单元的 value属性+1失败
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            //统计数组中所有有元素桶中元素的总计
            s = sumCount();
        }
     	//....下半段是扩容的逻辑
    }

  CounterCell是CHM的一个静态内部类,只有一个value属性:

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

  ThreadLocalRandom.getProbe是偏底层的方法,用于获取当前线程专属的随机数。
在这里插入图片描述

2、fullAddCount

  核心思想在于多重检查+CAS操作:

    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
        		//as 累加数组 a 累加单元
            CounterCell[] as; CounterCell a; int n; long v;
            //将counterCells属性赋值给as as的长度赋值给n 然后进行判断(无论if是否符合条件,都会进行赋值操作,源码里很多地方只是把赋值和比较放在了一个语句中)
            //1、累加数组不为空就会进入该分支
            if ((as = counterCells) != null && (n = as.length) > 0) {
                //1.1、累加数组长度-1 和 线程的唯一标识 做与运算 得到的累加数组索引位置的元素为空
                if ((a = as[(n - 1) & h]) == null) {
                		//判断是否有其他线程在同时操作
                    if (cellsBusy == 0) {
                    		//创建一个累加单元,它的value属性为1            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        //双重检查是否有其他线程在同时操作,并且通过CAS尝试标记当前线程正在进行操作
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                //再次检查累加数组是否不为空,并且即将插入的位置为空
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    //将创建的累加单元放入累加数组
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                            		 //释放cellsBusy 标记
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //1.2、wasUncontended为false(某个线程第一次进该方法的wasUncontended默认就是false)
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //1.3、尝试CAS修改某个累加单元中value的值 + 1    
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                //1.4、累加数组发生了改变,或者长度大于cpu的最大核心数
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                //1.5、    collide为false时(collide默认就是false的)
                else if (!collide)
                    collide = true;
                //1.6、 cellsBusy 为0 并且 CAS将cellsBusy 设置为1 成功   
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                    		//累加数组为空的情况
                        if (counterCells == as) {// Expand table unless stale
                            //对累加数组扩容,长度为原有的2倍
                            CounterCell[] rs = new CounterCell[n << 1];
                            //元素迁移
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            //重新将新累加数组赋值给counterCells 属性
                            counterCells = rs;
                        }
                    } finally {
                    		//释放cellsBusy 标记
                        cellsBusy = 0;
                    }
                    collide = false;
                    //继续循环
                    continue;                   // Retry with expanded table
                }
                //重新计算进入该方法线程的唯一标识,避免每次都对同一个下标进行操作
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //2、累加数组为空,但是CAS设置CELLSBUSY标记为1成功(保证累加数组属性只会被一个线程初始化)
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {
                		 //再次判断                           // Initialize table
                    if (counterCells == as) {
                    		//就会初始化一个累加数组 长度为2
                        CounterCell[] rs = new CounterCell[2];
                        //并且给其中某个索引创建累加单元 value为1
                        rs[h & 1] = new CounterCell(x);
                        //将该累加数组赋值给属性counterCells 
                        counterCells = rs;
                        //标记已经初始化
                        init = true;
                    }
                } finally {
                		 //最终将CELLSBUSY标记设置为0
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //3、累加数组为空,并且CELLSBUSY标记已经为1
            //说明累加数组也无法操作了,就再次尝试修改BASECOUNT的值。
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

  如果新进来一个线程,累加数组为空,并且CAS设置CELLSBUSY标记为1成功,自然会创建一个新的累加数组,并且创建一个累加单元放入其中。
  假设累加数组中某个累加单元已经有了元素,另一个线程也要尝试操作同样的位置,第一次循环会将wasUncontended 标记设置为true,然后h = ThreadLocalRandom.advanceProbe(h);重新计算当前线程的唯一标识,目的就是为了避免第二次循环重复竞争相同的位置。
  假设第二次循环计算出位置的累加单元也有了元素,则会尝试将该累加单元的value属性+1。如果+1操作依旧失败,就会将collide 设置为true,然后h = ThreadLocalRandom.advanceProbe(h);再次重新计算当前线程的唯一标识。
  如果多次通过h = ThreadLocalRandom.advanceProbe(h);重新计算当前线程的唯一标识,依旧无法创建新的累加单元,或者对某个累加单元的value属性+1,说明多线程竞争激烈,就会尝试扩容。
  个人理解wasUncontended 标记和collide 标记是起到一个缓冲的作用,目的是多次h = ThreadLocalRandom.advanceProbe(h);重新计算当前线程的唯一标识。

3、sumCount

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        //只有一个线程能操作CHM的baseCount属性
        long sum = baseCount;
        //累加数组不为空
        if (as != null) {
        		//循环累加数组
            for (int i = 0; i < as.length; ++i) {
                //累加数组的某个累加单元也不为空
                if ((a = as[i]) != null)
                		 //进行累加
                    sum += a.value;
            }
        }
        //最终的总数 = baseCount + 累加数组中所有不为空的累加单元中value的总计
        //sum的作用是判断是否需要扩容
        return sum;
    }

总结

  在CHM中,锁的粒度是桶数组的单个下标。并且在插入元素,累加计数,初始化table中,多次利用了CAS+双重检查的模式,保证线程安全。

下一篇:集合类源码浅析のJDK1.8ConcurrentHashMap(下篇)

标签:JDK1.8,int,value,累加,源码,线程,数组,null,浅析
From: https://blog.csdn.net/2301_77599076/article/details/143695499

相关文章

  • 集合类源码浅析のJDK1.8ConcurrentHashMap(下篇)
    文章目录前言一、分段扩容1、addCount2、transfer3、helpTransfer二、查询二、删除总结前言  主要记录ConcurrentHashMap(笔记中简称CHM)的查询,删除,以及扩容方法的关键源码分析。一、分段扩容1、addCount  扩容的逻辑主要在addCount方法的后半段:private......
  • 从源码角度分析JDK动态代理
    文章目录前言一、JDK动态代理二、动态代理的生成三、invoke的运行时调用总结前言  本篇从源码的角度,对JDK动态代理的实现,工作原理做简要分析。一、JDK动态代理  JDK动态代理是运行时动态代理的一种实现,相比较于CGLIB,目标对象必须实现接口,下面是一个简单案例......
  • 浅析注意力(Attention)机制
    Attention顾名思义,说明这项机制是模仿人脑的注意力机制建立的,我们不妨从这个角度展开理解2.1人脑的注意力机制人脑的注意力机制,就是将有限的注意力资源分配到当前关注的任务,或关注的目标之上,暂时忽略其他不重要的因素,这是人类利用有限的注意力资源从大量信息中快速筛选出高价值......
  • SpringBoot在线投票数据分析平台研究与设计8kxf0(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、项目背景与意义随着互联网技术的普及,在线投票活动逐渐成为各类组织、企业和个人进行决策和意见收集的重要手段。然而,如何高效地收集、整理和分......
  • SpringBoot在线教育系统a1q7y(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、项目背景与意义随着互联网技术的快速发展,教育领域正经历着前所未有的变革。在线教育系统作为数字化教育的重要载体,以其跨越时空限制、灵活便捷......
  • 基于Springboot+Vue的停车管理系统 (含源码数据库)
    1.开发环境开发系统:Windows10/11架构模式:MVC/前后端分离JDK版本:JavaJDK1.8开发工具:IDEA数据库版本:mysql5.7或8.0数据库可视化工具:navicat服务器:SpringBoot自带apachetomcat主要技术:Java,Springboot,mybatis,mysql,vue2.视频演示地址3.功能这个系......
  • 基于Springboot+Vue的中国蛇类识别系统 (含源码数据库)
    1.开发环境开发系统:Windows10/11架构模式:MVC/前后端分离JDK版本:JavaJDK1.8开发工具:IDEA数据库版本:mysql5.7或8.0数据库可视化工具:navicat服务器:SpringBoot自带apachetomcat主要技术:Java,Springboot,mybatis,mysql,vue2.视频演示地址3.功能这个系......
  • ssm131保险业务管理系统设计与实现+jsp(论文+源码)_kaic
     毕业设计(论文)题目:保险业务管理系统设计与实现      摘 要现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本保险业务管理系统就是在这样的大环境下诞生,其可以帮助管理者在短时......
  • 解读 DelayQueue 源码:探究其精妙的设计架构与实现细节
    一、简介DelayQueue是JUC包(java.util.concurrent)为我们提供的延迟队列,用于实现延时任务比如订单下单15分钟未支付直接取消。它是BlockingQueue的一种,底层是一个基于PriorityQueue实现的一个无界队列,是线程安全的BlockingQueue的实现类DelayQueue 中存放的元素......
  • flask火车购票系统(毕设源码+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于火车购票系统的研究,现有研究多侧重于传统购票方式的改进以及购票系统的基本功能实现。在国内外,随着信息技术的发展,购票系统已逐渐......