首页 > 编程语言 >ConcurrentHashMap源码逐行解读基于jdk1.8

ConcurrentHashMap源码逐行解读基于jdk1.8

时间:2024-01-20 22:02:06浏览次数:27  
标签:Node ConcurrentHashMap hash int 源码 tab sc null 逐行


前导知识

// node数组最大容量:2^30=1073741824  

  private  static  final  int  MAXIMUM_CAPACITY =  1  <<  30    ;  

  // 默认初始值,必须是2的幕数  

  private  static  final  int  DEFAULT_CAPACITY =  16    ;  

  //数组可能最大值,需要与toArray()相关方法关联  

  static  final  int  MAX_ARRAY_SIZE = Integer.MAX_VALUE -  8    ;  

  //并发级别,遗留下来的,为兼容以前的版本  

  private  static  final  int  DEFAULT_CONCURRENCY_LEVEL =  16    ;  

  // 负载因子  

  private  static  final  float  LOAD_FACTOR =  0    .75f;  

  // 链表转红黑树阀值,> 8 链表转换为红黑树  

  static  final  int  TREEIFY_THRESHOLD =  8    ;  

  //树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))  

  static  final  int  UNTREEIFY_THRESHOLD =  6    ;  

  static  final  int  MIN_TREEIFY_CAPACITY =  64    ;  

  private  static  final  int  MIN_TRANSFER_STRIDE =  16    ;  

  private  static  int  RESIZE_STAMP_BITS =  16    ;  

  // 2^15-1,help resize的最大线程数  

  private  static  final  int  MAX_RESIZERS = (    1  << (    32  - RESIZE_STAMP_BITS)) -  1    ;  

  // 32-16=16,sizeCtl中记录size大小的偏移量  

  private  static  final  int  RESIZE_STAMP_SHIFT =  32  - RESIZE_STAMP_BITS;  

  // forwarding nodes的hash值  

  static  final  int  MOVED     = -    1    ;  

  // 树根节点的hash值  

  static  final  int  TREEBIN   = -    2    ;  

  // ReservationNode的hash值  

  static  final  int  RESERVED  = -    3    ;  

  // 可用处理器数量  

  static  final  int  NCPU

  //存放node的数组  

  transient  volatile  Node<K,V>[] table;  

  /*控制标识符
-1:代表hash表所有元素转移完成
-n:代表hash表此时正在进行扩容
*/

  private  transient  volatile  int  sizeCtl;

一:ConcurrentHashMap数据结构

比较ConcurrentHashMap中用了cas的地方,而相同的逻辑hashmap没有用的地方,就是ConcurrentHashMap的优点所在。
底层实现:数组+红黑树+链表
控制并发的核心:利用cas不停的自旋,进入不同的分支达到对应的目的,直至退出(以下的讲述我将会对应不同的场景来说明各个分支的作用)

一般的我们在使用线程池的时候确定核心线程的数量有2种办法
1:io密集型(自己预估比较耗io的线程有多少),2:cpu密集型( 也就是接下来的源码中的NCPU= Runtime.getRuntime().availableProcessors(); )

put()

/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
//ConcurrentHashMap不支持插入null
        if (key == null || value == null) throw new NullPointerException();
//根据key算出一个hash值
        int hash = spread(key.hashCode());
//对hash表中的元素进行计数,hash表中的所有元素个数=bincount+所有cellcount中的value之和
        int binCount = 0;
//table(一开始hash表为null)
        for (Node<K,V>[] tab = table;;) {
      
            Node<K,V> f; int n, i, fh;
//把hash表的长度赋值给n
            if (tab == null || (n = tab.length) == 0)
 //如果hash表长度为0,或者是空的,进行初始化
                tab = initTable();
//如果hash表已经初始化过了且hash表(n - 1) & hash这个位置的元素为null
//进行cas控制,保证只有一个线程能插入成功
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
//如果hash表已经初始化过了且hash表(n - 1) & hash这个位置的元素不为null,但是这个位置的元素的hash值为MOVEN(默认值为-1),说明别的线程在对tab进行扩容,帮助其扩容(转移元素到新的hash表)。此时fh = hash值
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
//如果hash表已经初始化过了且hash表(n - 1) & hash这个位置的元素不为null,且这个位置的元素的hash值不为MOVEN(默认值为-1),分情况插入链表或者红黑树             
            else {
                V oldVal = null;
//锁住链表的头节点或者是红黑树的根节点,保证同个位置同一时间只有一个线程能进行操作
                synchronized (f) {
//再次判断这个位置的节点是否和之前得到的节点f一样,锁住了节点但是没有锁住位置!,防止此时tab正好扩容完成,但是这个位置的节点已经不是f这个情况
                    if (tabAt(tab, i) == f) {
//如果这个节点有hash值,说明这个节点的下面是个链表,进行链表操作(treebin中没有hash这个属性)
                        if (fh >= 0) {
          
                            binCount = 1;
//死循环
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
//判断链表中的节点是否string或者int类型的key与现在要put的string或者int类型的key是否相同,如果相同返回老的value,新的value替换老的value,退出死循环
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
//如果循环到链表尾了,尾节点前面的节点都没有相同的key与之相同,那么就把这个key,value加到链表尾部                             
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
//如果这个位置的节点是个treebin,加入红黑树中
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
               //
                if (binCount != 0) {
//如果链表长度超过8,这个位置的链表,转红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
//返回被替换的节点的value
                        return oldVal;
                    break;
                }
            }
        }
//put操作成功,hash表中元素个数加1
        addCount(1L, binCount);
        return null;
    }

initTable()

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
//sizeCtl<0说明其他线程正在初始化tab
        if ((sc = sizeCtl) < 0)
//让出其他线程初始化tab,执行的时间,接着自旋
            Thread.yield(); // lost initialization race; just spin
//cas确保只有一个线程能初始化tab
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
//只有tab为null或者长度为0才初始化
                if ((tab = table) == null || tab.length == 0) {
//sc = -1,n为默认的DEFAULT_CAPACITY
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
//进行tab初始化赋值
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
//修改sc标记,让其他线程不再Thread.yield();
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
//退出死循环
            break;
        }
    }
    return tab;
}

helpTransfer()

/**
     * Helps transfer if a resize is in progress.
     */
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
//tab != null,被转移tab不为null
//f instanceof ForwardingNode f是个ForwardingNode(f是传进来的tabAt(tab, i = (n - 1) & hash)))
//(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null这个nextTab就是新的hash表,不为null以上几点综合,说明元素还在转移
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            
            int rs = resizeStamp(tab.length);
//如果是帮助进行扩容,那么sizeCTL,在扩容的时候会被赋值一个超级小的负数
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
//帮助进行扩容,nextTab是正在扩容的新tab
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

transfer()

/**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//stride区间(划定每个线程从旧的tab的转移哪一段的元素)
        int n = tab.length, stride;
//如果是多核cpu,n>>>3<MIN_TRANSFER_STRIDE,默认区间为MIN_TRANSFER_STRIDE=16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
//如果nextTab为null,创建新的tab容量为2倍旧的tab,开始扩容
//如果nextTab不为null,说明其他线程正在扩容,为帮助扩容,
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
//旧hash表最右下标的值
            transferIndex = n;
        }
//新hash表的长度
        int nextn = nextTab.length;
//注意这个ForwardingNode节点中包括新的hash表!!!!!!!!!!!
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//每一个区间中的元素,是否需要接着转移的标识符号,true标识还要接着转移
        boolean advance = true;
//扩容是否完成的标识符,扩容完成为true
        boolean finishing = false; // to ensure sweep before committing nextTab
//死循环
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
//自旋
            while (advance) {
                int nextIndex, nextBound;
//判断区间[bound,i]是否减到头了(区间元素是否还需要接着转移)
                if (--i >= bound || finishing)
                    advance = false;
//一开始nextIndex = transferIndex = n = tab.length > 0 不会走这
//如果旧的tab剩下的节点 <= stride = 16,那么其他的线程不会继续划分区间,transferIndex = TRANSFERINDEX = nextBound  = 0,且最后一个区间的元素全部转移完成,把i改成-1
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
//这个时候一个线程进来了,经过cas确保这个区间是唯一的且只属于这个线程
//假设旧的hash表的长度为100,经过这个判断transferIndex = TRANSFERINDEX = nextBound  = 84
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
//bound =  nextBound = 84
// i = nextIndex - 1 = 100 - 1 =99
//[bound,i]那么这个区间的下标就是[84,99],这个线程就负责转移这个区间的元素,然后其他的线程过来同理划分区间,因为transferIndex 加了volital关键字,确保了可见性所以第二个线程这个时候的transferIndex = 84,且上面第一个else if中nextIndex = transferIndex=84然后接着这个逻辑划分区间,一直划分到[0,15]
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
//包含了2种情况:当i=-1时说明最后一段区间的所有元素转移完成
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
//如果完成扩容,此时才替换老tab
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
//在帮助扩容时sc会加1,这里相当于计数,当前线程完成了工作,sc-1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//判断此时的sc与传进来的sc是否相同,如果在sc-1的情况下都不同,说明在扩容的过程中有别的线程帮助了扩容导致sc加1,直到循环sc-1,与传进来的sc相同为止,确保所有线程对扩容或者帮扩容完成
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
//接下来的俩个else if目的排除hash表上的null节点和fwd节点
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
//根据头节点,遍历转移下面的链表或者是红黑树
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

untreeify()替换树节点为普通节点

/**
 * Returns a list on non-TreeNodes replacing those in given list.
 */
static <K,V> Node<K,V> untreeify(Node<K,V> b) {
    Node<K,V> hd = null, tl = null;
    for (Node<K,V> q = b; q != null; q = q.next) {
        Node<K,V> p = new Node<K,V>(q.hash, q.key, q.val, null);
        if (tl == null)
            hd = p;
        else
            tl.next = p;
        tl = p;
    }
    return hd;
}


标签:Node,ConcurrentHashMap,hash,int,源码,tab,sc,null,逐行
From: https://blog.51cto.com/u_16414043/9346330

相关文章

  • MyBatis 系列:MyBatis 源码环境搭建
    目录一、环境准备二、下载MyBatis源码和MyBatis-Parent源码三、创建空项目、导入项目四、编译mybatis-parent五、编译mybatis六、测试总结一、环境准备jdk:17maven:3.9.5二、下载MyBatis源码和MyBatis-Parent源码Mybatis:https://github.com/mybatis/mybatis-3.gitMy......
  • Feign源码解析6:如何集成discoveryClient获取服务列表
    背景我们上一篇介绍了feign调用的整体流程,在@FeignClient没有写死url的情况下,就会生成一个支持客户端负载均衡的LoadBalancerClient。这个LoadBalancerClient可以根据服务名,去获取服务对应的实例列表,然后再用一些客户端负载均衡算法,从这堆实例列表中选择一个实例,再进行http调用即......
  • 将小部分源码设计精髓带入到开发中来(工厂模式、适配器模式、抽象类、监听器)
    前言咋说呢,大学期间阅读过很多源码(Aop、Mybatis、Ioc、SpringMvc…),刚开始看这些源码的时候觉得云里雾里,一个没什么代码量的人突然去接触这种商业帝国级别的成品源码的时候,根本无从下手,这种感觉很难受,但是也庆幸自己熬过了那段难忘且充实的日子,随着自己代码量的慢慢增多,也开始慢慢......
  • 直播app系统源码,通过延迟加载非关键资源实现首屏优化
    直播app系统源码,通过延迟加载非关键资源实现首屏优化将非关键资源(如广告、推荐内容等)的加载延迟到首屏渲染完成之后,以提高首屏展示速度。<!DOCTYPEhtml><html><head><title>延迟加载示例</title></head><body><h1>首屏内容</h1><!--非关键资源--><d......
  • 视频直播app源码,利用缓存实现连续登录失败后的时间等待
    实现步骤:1、用户在视频直播app源码中发起登录请求2、后台验证是否失败次数过多,账户没有锁定的话就进入下面的步骤;否则直接返回3、验证用户的账号+密码3.1验证成功:删除缓存3.2验证失败:统计最近10分钟时间窗口内的失败次数,如果达到5次则设置锁定缓存,返回图解实......
  • MetaGPT day02: MetaGPT Role源码分析
    MetaGPT源码分析思维导图MetaGPT版本为v0.4.0,如下是frommetagpt.rolesimportRole,Role类执行Role.run时的思维导图:概述其中最重要的部分是_react,里面包含了一个循环,在循环中交替执行_think和_act,也就是让llm先思考再行动。_think中决定了llm下一个执行的动作是什么,这个动作......
  • TCP三次握手源码分析(服务端接收ACK&TCP连接建立完成)
    内核版本:Linux3.10内核源码地址:https://elixir.bootlin.com/linux/v3.10/source(包含各个版本内核源码,且网页可全局搜索函数)《TCP三次握手源码分析(客户端发送SYN)》《TCP三次握手源码分析(服务端接收SYN以及发送SYN+ACK)》《TCP三次握手源码分析(客户端接收SYN+ACK以及发送ACK......
  • 《Java并发实现原理:JDK源码剖析》PDF
    《Java并发实现原理:JDK源码剖析》全面而系统地剖析了JavaConcurrent包中的每一个部分,对并发的实现原理进行了深刻的探讨。全书分为8章,第1章从最基础的多线程知识讲起,理清多线程中容易误解的知识点,探究背后的原理,包括内存重排序、happen-before、内存屏障等;第2~8章,从简单到复杂,逐......
  • 《Java并发实现原理:JDK源码剖析》PDF
    《Java并发实现原理:JDK源码剖析》全面而系统地剖析了JavaConcurrent包中的每一个部分,对并发的实现原理进行了深刻的探讨。全书分为8章,第1章从最基础的多线程知识讲起,理清多线程中容易误解的知识点,探究背后的原理,包括内存重排序、happen-before、内存屏障等;第2~8章,从简单到复杂,逐......
  • A021 《斗图大赛》编程 源码
    一、课程介绍本节课将学习新的while循环,并结合布尔值True实现无限循环,最终实现一个动态表情包的效果。二、重难点解析布尔值在编程中,True是真,False是假。“真,假”,也是“对,错”的意思,它们是由英国著名数学家和逻辑学家乔治布尔提出的。所以,True和False也叫做布尔值,用于表示......