首页 > 其他分享 >HashMap和ConcurrentHashMap原理

HashMap和ConcurrentHashMap原理

时间:2023-02-16 11:57:13浏览次数:60  
标签:Node key ConcurrentHashMap HashMap 链表 数组 原理 null 节点

参考文章:
https://zhuanlan.zhihu.com/p/70220699/
https://blog.csdn.net/androidstarjack/article/details/124507171
https://blog.csdn.net/a745233700/article/details/119709104
https://blog.csdn.net/liuwg1226/article/details/119548439

HashMap

原理

JDK8及其以后的版本中,HashMap使用了数组+链表+红黑树实现。数组是HashMap的主体,链表则是主要为了解决哈希冲突而存在的,红黑树解决了链表太长导致的查询速度变慢的问题。

HashMap通过key的HashCode经过扰动函数处理过后得到Hash值,然后通过位运算判断当前元素存放的位置,如果当前位置存在元素的话,就判断该元素与要存入的元素的hash值以及key是否相同,如果相同的话,直接覆盖,不相同就通过拉链法解决冲突。当Map中的元素总数超过Entry数组的0.75时,触发扩容操作,为了减少链表长度,元素分配更均匀。

Hash组成结构

HashMap结构是由“数组”和“链表”组成,其结构类似于下图的形式
image

为什么要使用数组和链表?

两个字“性能”,HashMap是一个集添加、删除、查询都具备高性能的优点于一身。

数组的优势
因为每个数组都是有下标的,我们可以通过算法做查找优化,所以数组查找数据的速度非常快
数组的劣势
但是数组也有它的劣势,那就是数组插入数据非常的慢,因为每次插入数据的时候需要把后面的数据都重新挪下位置

image


数组主要是查找数据快,但插入数据却需要挪动数据,当数组的长度越长,那么插入数据的时候,需要挪动的元素就越多,所以效率非常低。也正因为数组插入数据的效率低,所以这里就使用到了另外一个数据结构“链表”。

链表的优势
链表一个显著的优势就是插入数据快,链表不会像数组一样记录下标的值,但是链表的每个元素自己都会记录下一个元素的位置。就像我们排队一样,我们只需要记得站在我前面是谁就行, 我并不需要去记住我是队伍的第几个。

这样的好处就是,如果往链表中间插入一条数据时,比如在A->B->D>-E 链表中,如果在B节点和D节点之间插入C ,那么我只需要修改B节点保存的自己后面一个的引用,然后把C节点后面一个节点的引用指向D即可,其他的节点都不需要做任何的修改,不论这个链表长度如何。

image

链表的劣势
链表的劣势也很明显,就是链表非常不便于查找数据,因为它没有像数组一样记录元素的下标,所以每次查找一个元素就只能遍历整个链表去匹配。很显然这种查找方式很粗鲁。


总结
因为HashMap添加数据、修改数据的操作都比较频繁,所以必须要保证查找元素的效率的同时也要保证插入元素的效率,在查找数据的时候充分利用数组的优势,定位到数据在哪个下标的链表里。在插入数据的时候充分利用链表的优势,每次插入数据时,把插入的数据放在链表的第一个节点,其他的节点不需要做任何修改。最终综合两种数据结构的优势提升整体性能。

HashMap如何PUT一个数据的?

image

HashMap初始化容量机制和扩容机制

初始化容量
put数据的时候,首先得初始化一个数组才行,而数组创建是需要指定一个长度的,这里当没指定数组长度的时候,HashMap会默认初始化一个默认长度为16的数组。

这个初始化的操作可以查看源码中的resize()方法,在扩容的时候也会用到,在这个方法中,有一个常量DEFAULT_INITIAL_CAPACITY控制着HashMap的初始化容量。

扩容机制
当数据越来越多的时候,就需要对数组进行扩容了, 进行扩容的时候会有几个问题是需要我们思考的:

  • 当数据到达多少时进行扩容?
    通过源码可以知道,是否扩容的决定性因素是DEFAULT_LOAD_FACTOR负载因子这个常量上,它的值为0.75。

为什么负载因子要设置成0.75呢,这个是经过了科学测试的,这个值设置太小会导致HashMap空间利用率不高,扩容的频率也会更高,扩容的时候需要把数据重新计算哈希排列,这样会影响性能。
设置成0.75为了减少哈希冲突,当我们通过科学的测试后,发现当数据量超过数组容量的0.75时,产生hash冲突的几率会很高,因为hash冲突的数据会放到同一个链表,这样会加长链表的长度,同样也会影响HashMap的性能。

  • 每次扩容的大小如何决定?
    需要进行扩容的时候都是对原来的容量增加一倍的长度,这里主要是保证扩容后的长度是2的N次幂。至于这里为什么要保证这种机制,后面我们在key是如何被分配到数组的某一个位置时候结合说明。

如何定位一个key会存储到数组的哪个位置?

现在我们已经得到了一个长度为16的数组,那么我们有一个硬性要求就是通过一种方式把Key的存放位置定位到0-15下标之间。我们如何定位一个Key存储到数组的哪个位置?

  • 第一步
    首先我们要把我们的Key通过hashCode()方法得到一个数字,比如说我们Key 为“name”得到的hashCode为3373707。
String key="name";
int hashCode=key.hashCode();
  • 第二步
    然后我们需要对这个哈希数和数组的长度进行一个运算,得到一个1-15的数字。这我们可以使用hashCode 对我们的数组总长度16 进行取余来得到一个1-15的数字(不过这里HashMap中是使用的是 (n - 1) & hash,可以达到同样的效果)
    我们使用取余方式将3373707 % 16 =11 ,最后得到Key为”name”数据会存储到数组下标为11的位置,最后我们可以把数据存储到下标为11的链表里面去,如果链表里面有相同的Key则替换,没有重复的则追加到链表的尾部。

总结
因为在HashMap的容量为 2 的N次幂时,使用 HashCode % HashMap容量 与 HashCode &(HashMap容量 -1) 计算方式得到的运算结果是一样的,而我们把容量改成其他数字就达不到这样的效果了,所以HashMap的扩容机制必须是2的N次幂,因为位与运算的速度要远高于取余的方式,所以HashMap最终采用了这种算法模式来决定一个key 会分配到哪个数组下标的位置去。

在put数据的时候如何减少哈希冲突?

这里的减少哈希冲突其实就是让我们的Key能合理均匀的分布到我们数组的各个下标里面,避免我们的元素都被集中分配到一个数组下标下面,这样会使我们的数组不能合理的利用起来,也会降低我们的查询效率

static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

这段代码意思是把HashCode 与HashCode无符号右移16位的值进行异或处理,这个值才是我们真正用于数组定位的HashCode值,那么为什么要进行这样一个处理呢?
举一个例子来说明一下这个算法的意图,比如:如果有两个人,他们分别具备身高、年龄、性别、发型指标;如果想最大化的区分两个人的差异性,那么我们肯定把不同维度的指标都进行匹配,因为多一个维度的指标就有可能出现差异性,身高年龄相同的很多,但是身高年龄发型相同的几率就会少很多,而这里其实就是在HashCode上加一个维度的比较那么产生哈希冲突的几率就越少。


如果用HashCode &(HashMap容量 -1)定位数组位置
image

通过上面的图可以发现,我们在计算出HashCode与数组长度进行运算时,其实我们只用到了HashCode的一小部分数据参与了运算,那么根据我们经验得知,肯定是参与匹配的维度越多,那么就越难出现哈希冲突,这HashMap中把HashCode 与HashCode无符号右移16位的值进行异或处理,正是出于想把其他没有使用到的数据也合理的利用起来参与到运算中,从而达到减少哈希冲突的结果。


总结
多一个维度的比较就能减少重复,HashMap为了把HashCode充分的利用到数组位置计算中,从而达到减少哈希冲突,所以使用了上面的算法,目的就是为了让HashCode更多的数字参与到Key的位置计算中来。

链表过长怎么办?

数组的长度终归是有限的,这样就必然造成一部分数据会分配到同一个数组下面(链表会很长)。必须得想个法子优化,要不然这么长的链表,匹配数据的时候要一个个去比较,肯定会贼慢。所以HashMap把这个链表结构转换成红黑树,这样通过树结构来来优化查询的算法,提高查询的性能。
image

那么什么时候HashMap 会触发链表转换成红黑树的操作呢,还有当红黑树的数据少于多少的时候又会转回链表呢,从HashMap源码中看到
当进行put数据的时候,链表长度大于static final int TREEIFY_THRESHOLD = 8,就会进行链表转红黑树的操作;

image

因为我们的HashMap会进行扩容,扩容后我们的元素又可能会分配到别的数组小标里面去,所以我们红黑树的节点也会减少,而当红黑树的节点数量减少到一定程度的时候,红黑树又会转换成链表,而这个值是由static final int NTREEIFY_THRESHOLD = 6;来设定的,当红黑树的节点小于等于6的时候会完成转换链表的操作,这个我们在resize()方法中有调用split()方法,从源码中可以看到进行红黑树转链表的操作。

image

总结
因为数组长度有限的,所以无法避免哈希冲突造成链表数据过多,当我们链表数据过长的时候会进行一些优化,把链表转换为红黑树优化查询效率,当链表长度大于等于7的时候,进行链表转红黑树,当链表长度小于等于6的时候又会把红黑树还原成链表。

HashMap get数据时是如何查找对应数据的?

get数据的流程:

  • 1、先计算出key哈希值。
  • 2、通过哈希值定位到Key存在哪个数组下标。
  • 3、找到后看数组下标里面有没有节点。
  • 4、有节点的话区分节点数据是红黑树还是链表,然后分别使用对应数据结构的查找方法。
  • 5、根据查找的key和节点里面存的Key 值判断两个key是不是equas() ,equas则返回对应的节点,否则继续匹配下一个节点,直到匹配成功返回节点,或者没有节点配后返回null;

ConcurrentHashMap

介绍

因为HashMap 是线程不安全的(表现为在多线程环境下添加元素,可能会出现数据丢失的情况),在一些多线程的情况下,如果想使用线程安全的 Map 容器,可以使用 ConcurrentHashMap 集合。

ConcurrentHashMap 的底层数据结构依然采用“数组+链表+红黑树”,与HashMap不同的是,ConcurrentHashMap 并不是直接转换为红黑树,而是把这些节点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。。采用了 synchronized + CAS 算法来保证线程安全。在ConcurrentHashMap中,大量使用 Unsafe.compareAndSwapXXX 的方法,这类方法是利用一个CAS算法实现无锁化的修改值操作,可以大大减少使用加锁造成的性能消耗。这个算法的基本思想就是不断比较当前内存中的变量值和你预期变量值是否相等,如果相等,则接受修改的值,否则拒绝你的而操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。

原理

重要属性

    private static final int MAXIMUM_CAPACITY = 1 << 30;
    private static final int DEFAULT_CAPACITY = 16;
    static final int TREEIFY_THRESHOLD = 8;
    static final int UNTREEIFY_THRESHOLD = 6;
    static final int MIN_TREEIFY_CAPACITY = 64;
    static final int MOVED     = -1; // 表示正在转移
    static final int TREEBIN   = -2; // 表示已经转换成树
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
    transient volatile Node<K,V>[] table; // 默认没初始化的数组,用来保存元素
    private transient volatile Node<K,V>[] nextTable; // 转移的时候用的数组
    /**
         * 用来控制表初始化和扩容的,默认值为0,当在初始化的时候指定了大小,
         * 这会将这个大小保存在 sizeCtl 中,大小为数组的 0.75
         * 当为负的时候,说明表正在初始化或扩张,
         *     -1表示初始化
         *     -(1+n) n:表示活动的扩张线程
         */
        private transient volatile int sizeCtl;

核心数据结构Node

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
 //...       
}

跟HashMap差不多的,但是不同点是,他使用volatile去修饰了他的数据Value还有下一个节点next。

volatile的特性是啥?

  • 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。(实现可见性)
  • 禁止进行指令重排序。(实现有序性)volatile 只能保证对单次读/写的原子性。i++ 这种操作不能保证原子性。

核心数据结构TreeBin

TreeBin也继承自Node,封装TreeNode,完成对红黑树的包装。与TreeNode不同的是TreeBin在头部结点被使用(个人理解就是(n-1)&h)。其并没有持有key、value而是指向了TreeNodes和root结点。

static final class TreeBin<K,V> extends Node<K,V> {
   TreeNode<K,V> root;
   volatile TreeNode<K,V> first;
   volatile Thread waiter;
   volatile int lockState;
   // values for lockState
   static final int WRITER = 1; // set while holding write lock
   static final int WAITER = 2; // set when waiting for write lock
   static final int READER = 4; // increment value for setting read lock
   //...
}   

这个类并不负责包装用户的key、value值,而是包装很多TreeNode节点,他代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap数组中,存放的是 TreeBin对象,而不是TreeNode对象。 TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以 TreeBin就是封装TreeNode的容器,他提供转换红黑树的一些条件和锁的控制。

ForwardingNode

 static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
           super(MOVED, null, null, null);
           this.nextTable = tab;
        }
}

ForwardingNode 在转移的时候放在头部的节点,是一个空节点

重要方法

在 ConcurrentHashMap 中使用了 unSafe 方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制(Unsafe是java操作内存的一个非安全类,操作对象和对应的变量来完成CAS操作)


    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    /*
         * 用来返回节点数组的指定位置的节点的原子操作
         */
        @SuppressWarnings("unchecked")
        static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
            return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
        }
        /*
         * cas原子操作,在指定位置设定值
         */
        static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                            Node<K,V> c, Node<K,V> v) {
            return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
        }
        /*
         * 原子操作,在指定位置设定值
         */
        static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
            U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
        }

image

数组初始化方法 initTable

    /**
         * 初始化数组table,
         * 如果sizeCtl小于0,说明别的数组正在进行初始化,则让出执行权
         * 如果sizeCtl大于0的话,则初始化一个大小为sizeCtl的数组
         * 否则的话初始化一个默认大小(16)的数组
         * 然后设置sizeCtl的值为数组长度的3/4
         */
         private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {    
        // 第一次 put 的时候,table 还没被初始化,进入 while
            if ((sc = sizeCtl) < 0)
            // sizeCtl 初始值为0,当小于 0 的时候表示在别的线程在初始化表或扩展表
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    
            // SIZECTL:表示当前对象的内存偏移量,sc表示期望值,-1表示要替换的值,
            // 设定为-1表示要初始化表了,这一步会将sizeCtl设置成-1
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;        
                        // 指定了大小的时候就创建指定大小的Node数组,
                        // 否则创建指定大小(16)的 Node 数组
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;  // 初始化后,sizeCtl 长度为数组长度的 3/4
                }
                break;
            }
        }
        return tab;
    }

put 操作

    /*
         * 当添加一对键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了,
         * 如果没有的话就初始化数组
         *  然后通过计算hash值来确定放在数组的哪个位置
         * 如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来
         * 如果取出来的节点的hash值是MOVED(-1)的话,则表示当前正在对这个数组进行扩容,
         * 复制到新的数组,则当前线程也去帮助复制
         * 最后一种情况就是,如果这个节点,不为空,也不在扩容,
         * 则通过synchronized来加锁,进行添加操作
         *    判断当前取出的节点位置存放的是链表还是树
         *    如果是链表的话,则遍历整个链表,直到取出来的节点的key来个要放的key进行比较,
         * 如果key相等,并且key的hash值也相等的话,则说明是同一个key,则覆盖掉value,
         * 否则的话则添加到链表的末尾
         *    如果是树的话,则调用putTreeVal方法把这个元素添加到树中去
         *  最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),
         * 如果达到8个以上了的话,则调用treeifyBin方法来尝试将处的链表转为树,或者扩容数组
         */
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            // K,V 都不能为空,否则的话跑出异常
            int hash = spread(key.hashCode()); // 取得 key 的 hash 值
            // 用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树
            int binCount = 0;    
            for (Node<K,V>[] tab = table;;) {// table 变量会在 initTable() 里面进行赋值
                Node<K,V> f; int n, i, fh;
                if (tab == null || (n = tab.length) == 0)   
                // 第一次 put 的时候 table 没有初始化,则初始化 table,
                // 在 initTable() 里面对 table 变量赋值 
                    tab = initTable();    
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    
                //通过哈希计算出一个表中的位置因为n是数组的长度,所以(n-1)&hash肯定不会
                // 出现数组越界,在这一步算出具体的索引位置i变量的值
                    if (casTabAt(tab, i, null,        
                                 new Node<K,V>(hash, key, value, null)))        
              //cas的方式尝试添加Node节点,注意这个时候是没有加锁的,这里的第一个null表示只有
              // 当前的i位置的变量是null的时候,才会插入Node节点,第二个null表示Node节点的
              // 是下一个节点为空
                        break;                   
                }
                /*
                 * 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段,
                 * 则当前线程也会参与去复制,通过允许多线程复制的功能,
                 *  一次来减少数组的复制所带来的性能损失
                 */
                else if ((fh = f.hash) == MOVED)    
                    tab = helpTransfer(tab, f);
                else {
                    // 注意执行到这里的时候,f局部变量、n和i和fh局部变量都已经有值了,
                    // 因为执行到这里说明上面的几个判断都已经执行过了
                    /*
                     * 如果在这个位置有元素的话,就采用synchronized的方式加锁,
                     *     如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历,
                     *         如果找到了key和key的hash值都一样的节点,则把它的值覆盖掉
                     *         如果没找到的话,则添加在链表的最后面
                     *  否则,是树的话,则调用putTreeVal方法添加到树中去
                     *  
                     *  在添加完之后,会对该节点上关联的的数目进行判断,
                     *  如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容
                     */
                    V oldVal = null;
                    synchronized (f) {
                    // 再次取出要存储的位置的元素,跟前面取出来的比较
                        if (tabAt(tab, i) == f) { 
                        // 取出来的元素的 hash 值大于0,当转换为树之后,hash 值为-2,
                        // 就是变量 TREEBIN 的值      
                            if (fh >= 0) {                
                                binCount = 1;            
                                for (Node<K,V> e = f;; ++binCount) { // 遍历这个链表
                                    K ek;
                                    // 要存的元素的hash,key跟要存储的位置的节点的相同的时候,
                                    // 替换掉该节点的value即可
                                    if (e.hash == hash &&        
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        // 当使用putIfAbsent的时候,只有在这个key没有设置
                                        // 值得时候才设置
                                        if (!onlyIfAbsent)        
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    // 如果不是同样的hash,同样的key的时候,则判断该节点的
                                    // 下一个节点是否为空,
                                    if ((e = e.next) == null) { 
                                    // 为空的话把这个要加入的节点设置为当前节点的下一个节点   
                                        pred.next = new Node<K,V>(hash, key,        
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) { // 表示已经转化成红黑树类型了
                                Node<K,V> p;
                                binCount = 2;
                                // 调用 putTreeVal 方法,将该元素添加到树中去
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,    
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
           // 当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为 tree
                        if (binCount >= TREEIFY_THRESHOLD)    
                            treeifyBin(tab, i);    
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);    //计数
            return null;
        }

ConcurrentHashMap 的扩容

在 put 方法的详解中,我们可以看到,在同一个节点的个数超过 8 个的时候,会调用 treeifyBin 方法。来看看是扩容还是转化为一棵树

同时在每次添加完元素的 addCount 方法中,也会判断当前数组中的元素是否达到了 sizeCtl 的量,如果达到了的话,则会进入 transfer 方法去扩容:

    /**
         * Replaces all linked nodes in bin at given index unless table is
         * too small, in which case resizes instead.
         * 当数组长度小于64的时候,扩张数组长度一倍,否则的话把链表转为树
         */
        private final void treeifyBin(Node<K,V>[] tab, int index) {
            Node<K,V> b; int n, sc;
            if (tab != null) {
                if ((n = tab.length) < MIN_TREEIFY_CAPACITY)    //MIN_TREEIFY_CAPACITY 64
                    tryPresize(n << 1);        // 数组扩容
                else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                    synchronized (b) {    //使用synchronized同步器,将该节点出的链表转为树
                        if (tabAt(tab, index) == b) {
                            TreeNode<K,V> hd = null, tl = null;    //hd:树的头(head)
                            for (Node<K,V> e = b; e != null; e = e.next) {
                                TreeNode<K,V> p =
                                    new TreeNode<K,V>(e.hash, e.key, e.val,
                                                      null, null);
            // 把 Node 组成的链表,转化为 TreeNode 的链表,头结点任然放在相同的位置
                                if ((p.prev = tl) == null)        
                                    hd = p;    // 设置 head
                                else
                                    tl.next = p;
                                tl = p;
                            }
                            // 把 TreeNode 的链表放入容器 TreeBin 中
                            setTabAt(tab, index, new TreeBin<K,V>(hd));
                        }
                    }
                }
            }
        }

可以看到当需要扩容的时候,调用的时候 tryPresize 方法,看看 trePresize 的源码

        /**
         * 扩容表为指可以容纳指定个数的大小(总是2的N次方)
         * 假设原来的数组长度为16,则在调用tryPresize的时候,
         * size参数的值为16<<1(32),此时sizeCtl的值为12
         */
        private final void tryPresize(int size) {
                /*
                 * MAXIMUM_CAPACITY = 1 << 30 = 1073741824
                 * 如果给定的大小大于等于数组容量的一半,则直接使用最大容量,
                 * 否则使用tableSizeFor算出来,tableSizeFor()返回值是入参的二倍
                 */
            int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
                tableSizeFor(size + (size >>> 1) + 1);
            int sc;
            while ((sc = sizeCtl) >= 0) {   // while循环来进行扩容
                Node<K,V>[] tab = table; int n;
                /*
                 * 如果数组table还没有被初始化,则初始化一个大小为sizeCtrl和刚刚
                 * 算出来的c中较大的一个大小的数组
                 * 初始化的时候,设置sizeCtrl为-1,初始化完成之后把sizeCtrl设置为数组长度的3/4
                 * 为什么要在扩张的地方来初始化数组呢?这是因为调用putAll方法
                 * 直接put一个map的话,在putALl方法中没有调用initTable方法去初始化table,
                 * 而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断
                 */
                if (tab == null || (n = tab.length) == 0) {
                    n = (sc > c) ? sc : c;   
                    // 初始化 tab 的时候,把 sizeCtl 设为 -1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    
                        try {
                            if (table == tab) {
                                @SuppressWarnings("unchecked")
                                // 扩容一个长度是 n 的新数组
                                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];  
                                table = nt;  // 把新数组赋值给 table 变量
                                sc = n - (n >>> 2);
                            }
                        } finally {
                            sizeCtl = sc;
                        }
                    }
                }
                else if (c <= sc || n >= MAXIMUM_CAPACITY) {
                        break;    
                }
                else if (tab == table) {
                    int rs = resizeStamp(n);
                    if (sc < 0) {
                        Node<K,V>[] nt;
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            // 开始转移数据
                            transfer(tab, nt);
                    }
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2)) {
                            // 开始转移数据
                            transfer(tab, null);
                    }
                }
            }
        }

在 tryPresize 方法中,并没有加锁,允许多个线程进入,如果数组正在扩张,则当前线程也去帮助扩容。

数组扩容的主要方法就是 transfer 方法

    /**
         * Moves and/or copies the nodes in each bin to new table. See
         * above for explanation.
         * 把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置
         * 在这里首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用,
         * 每个CPU最少处理16个长度的数组元素,也就是说,如果一个数组的长度只有16,
         * 那只有一个线程会对其进行扩容的复制移动操作
         * 扩容的时候会一直遍历,知道复制完所有节点,没处理一个节点的时候会在
         * 链表的头部设置一个fwd节点,这样其他线程就会跳过他,
         * 复制后在新数组中的链表不是绝对的反序的
         */
        private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
            int n = tab.length, stride;
            if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    
            // MIN_TRANSFER_STRIDE 用来控制不要占用太多 CPU
                stride = MIN_TRANSFER_STRIDE;   / /MIN_TRANSFER_STRIDE=16
            /*
             * 如果复制的目标nextTab为null的话,则初始化一个table两倍长的nextTab
             * 此时nextTable被设置值了(在初始情况下是为null的)
             * 因为如果有一个线程开始了表的扩张的时候,其他线程也会进来帮忙扩张,
             * 而只是第一个开始扩张的线程需要初始化下目标数组
             */
            if (nextTab == null) {            // initiating
                try {
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                    nextTab = nt;
                } catch (Throwable ex) {      // try to cope with OOME
                    sizeCtl = Integer.MAX_VALUE;
                    return;
                }
                nextTable = nextTab;
                transferIndex = n;
            }
            int nextn = nextTab.length;
            /*
             * 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,
             * 就设置为fwd节点
             * 这是一个空的标志节点
             */
            ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
            boolean advance = true;    //是否继续向前查找的标志位
             // to ensure sweep(清扫) before committing nextTab,
             // 在完成之前重新在扫描一遍数组,看看有没完成的没
            boolean finishing = false;
            for (int i = 0, bound = 0;;) {
                Node<K,V> f; int fh;
                while (advance) {
                    int nextIndex, nextBound;
                    if (--i >= bound || finishing) {
                        advance = false;
                    }
                    else if ((nextIndex = transferIndex) <= 0) {
                        i = -1;
                        advance = false;
                    }
                    else if (U.compareAndSwapInt
                             (this, TRANSFERINDEX, nextIndex,
                              nextBound = (nextIndex > stride ?
                                           nextIndex - stride : 0))) {
                        bound = nextBound;
                        i = nextIndex - 1;
                        advance = false;
                    }
                }
                if (i < 0 || i >= n || i + n >= nextn) {
                    int sc;
                    if (finishing) {        //已经完成转移
                        nextTable = null;
                        table = nextTab;
                        sizeCtl = (n << 1) - (n >>> 1);    //设置sizeCtl为扩容后的0.75
                        return;
                    }
                    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
                                return;
                        }
                        finishing = advance = true;
                        i = n; // recheck before commit
                    }
                }
                // 数组中把 null 的元素设置为 ForwardingNode 节点(hash 值为 MOVED[-1])
                else if ((f = tabAt(tab, i)) == null)            
                    advance = casTabAt(tab, i, null, fwd);
                else if ((fh = f.hash) == MOVED)
                    advance = true; // already processed
                else {
                    synchronized (f) {     // 加锁操作
                        if (tabAt(tab, i) == f) {
                            Node<K,V> ln, hn;
                            if (fh >= 0) { //该节点的hash值大于等于0,说明是一个Node节点
                                    /*
                                     * 因为n的值为数组的长度,且是power(2,x)的,所以,
                                     * 在&操作的结果只可能是0或者n
                                     * 根据这个规则
                                     *         0-->  放在新表的相同位置
                                     *         n-->  放在新表的(n+原来位置)
                                     */
                                int runBit = fh & n; 
                                Node<K,V> lastRun = f;
                                /*
          * lastRun 表示的是需要复制的最后一个节点
          * 每当新节点的hash&n -> b 发生变化的时候,就把runBit设置为这个结果b
          * 这样for循环之后,runBit的值就是最后不变的hash&n的值
          * 而lastRun的值就是最后一次导致hash&n 发生变化的节点(假设为p节点)
          * 为什么要这么做呢?因为p节点后面的节点的hash&n 值跟p节点是一样的,
          * 所以在复制到新的table的时候,它肯定还是跟p节点在同一个位置
          * 在复制完p节点之后,p节点的next节点还是指向它原来的节点,
          * 就不需要进行复制了,自己就被带过去了
          * 这也就导致了一个问题就是复制后的链表的顺序并不一定是原来的倒序
         */
                                for (Node<K,V> p = f.next; p != null; p = p.next) {
                                    int b = p.hash & n;    //n的值为扩张前的数组的长度
                                    if (b != runBit) {
                                        runBit = b;
                                        lastRun = p;
                                    }
                                }
                                if (runBit == 0) {
                                    ln = lastRun;
                                    hn = null;
                                }
                                else {
                                    hn = lastRun;
                                    ln = null;
                                }
                                /*
                                 * 构造两个链表,顺序大部分和原来是反的
                                 * 分别放到原来的位置和新增加的长度的相同位置(i/n+i)
                                 */
                                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                    int ph = p.hash; K pk = p.key; V pv = p.val;
                                    if ((ph & n) == 0)
                                            /*
                * 假设runBit的值为0,
                * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点
                * (该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash
                * 计算后为0的节点下一个节点
                * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
                                             */
                                        ln = new Node<K,V>(ph, pk, pv, ln);
                                    else
                 /*
                  * 假设runBit的值不为0,
                  * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash
                  * 变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的
                  * table的第一个hash计算后不为0的节点下一个节点
                  * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
                  */
                                        hn = new Node<K,V>(ph, pk, pv, hn);    
                                }
                                setTabAt(nextTab, i, ln);    
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                            else if (f instanceof TreeBin) {    //否则的话是一个树节点
                                TreeBin<K,V> t = (TreeBin<K,V>)f;
                                TreeNode<K,V> lo = null, loTail = null;
                                TreeNode<K,V> hi = null, hiTail = null;
                                int lc = 0, hc = 0;
                                for (Node<K,V> e = t.first; e != null; e = e.next) {
                                    int h = e.hash;
                                    TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                    if ((h & n) == 0) {
                                        if ((p.prev = loTail) == null)
                                            lo = p;
                                        else
                                            loTail.next = p;
                                        loTail = p;
                                        ++lc;
                                    }
                                    else {
                                        if ((p.prev = hiTail) == null)
                                            hi = p;
                                        else
                                            hiTail.next = p;
                                        hiTail = p;
                                        ++hc;
                                    }
                                }
                                /*
                                 * 在复制完树节点之后,判断该节点处构成的树还有几个节点,
                                 * 如果≤6个的话,就转回为一个链表
                                 */
                                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                                setTabAt(nextTab, i, ln);
                                setTabAt(nextTab, i + n, hn);
                                setTabAt(tab, i, fwd);
                                advance = true;
                            }
                        }
                    }
                }
            }
        }

到这里,ConcurrentHashMap 的 put 操作和扩容都介绍的差不多了,

下面的两点一定要注意:

  • 复制之后的新链表不是旧链表的绝对倒序
  • 在扩容的时候每个线程都有处理的步长,最少为 16,在这个步长范围内的数组节点只有自己一个线程来处理

get 操作

    /*
         * 相比put方法,get就很单纯了,支持并发操作,
         * 当key为null的时候回抛出NullPointerException的异常
         * get操作通过首先计算key的hash值来确定该元素放在数组的哪个位置
         * 然后遍历该位置的所有节点
         * 如果不存在的话返回null
         */
        public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            int h = spread(key.hashCode());
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                if ((eh = e.hash) == h) {
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }

ConcurrentHashMap 的同步机制

ConcurrentHashMap是如果来做到并发安全,又是如何做到高效的并发的呢?

首先是读操作,从源码中可以看出来,在get操作中,根本没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的。

那么写操作呢?

分析这个之前,先看看什么情况下会引起数组的扩容,扩容是通过transfer方法来进行的。而调用transfer方法的只有trePresize、helpTransfer和addCount三个方法。

这三个方法又是分别在什么情况下进行调用的呢?

  • tryPresize是在treeIfybin和putAll方法中调用,treeIfybin主要是在put添加元素完之后,判断该数组节点相关元素是不是已经超过8个的时候,如果超过则会调用这个方法来扩容数组或者把链表转为树。

  • helpTransfer是在当一个线程要对table中元素进行操作的时候,如果检测到节点的HASH值为MOVED的时候,就会调用helpTransfer方法,在helpTransfer中再调用transfer方法来帮助完成数组的扩容

  • addCount是在当对数组进行操作,使得数组中存储的元素个数发生了变化的时候会调用的方法。

标签:Node,key,ConcurrentHashMap,HashMap,链表,数组,原理,null,节点
From: https://www.cnblogs.com/d111991/p/17116610.html

相关文章

  • 微机原理与系统设计笔记4 | 汇编语言程序设计与其他指令
    打算整理汇编语言与接口微机这方面的学习记录。本部分介绍汇编语言程序设计以及一些跟程序设计密切相关的指令类。参考资料西电《微机原理与系统设计》周佳社西交......
  • RocketMQ - 生产者原理
    https://rocketmq.apache.org/ApacheRocketMQ是一款开源的、分布式的消息投递与流数据平台。出生自阿里巴巴,在阿里巴巴内部经历了3个版本后,作为Apache顶级开源项目之一......
  • mysql事务隔离级别及实现原理,深度理解
    1、事务是什么?事务是数据库一个不可分的工作单元,可以将多个操作步骤表示为一个步骤。2、事务的四大特性Atomicity原子性,Consistency一致性,Isolation隔离性,Durability持......
  • 《分布式技术原理与算法解析》学习笔记Day13
    分布式计算模式:MapReduce什么是分治法?分治法是将一个复杂、难以直接解决的大问题,分割成一些规模小、可以比较简单或者直接求解的子问题,这些子问题之间相互独立且与原问题......
  • lvs+keepalive 工作原理及搭建
    目录lvs+keepalive工作原理及搭建nginx应用场景keepalive介绍keepalived工作原理VRRP工作原理简述keepalive软件结构开始搭建环境准备工作:安装Nginx什么是高可用?解决的问......
  • 《分布式技术原理与算法解析》学习笔记Day12
    调度框架:共享状态调度什么是共享状态调度?共享状态调度是为了解决单体调度和两层调度遇到的问题而创建出来的新的调度框架。它通过将单体调度器分解为多个调度器,每个调度......
  • springboot starter 原理解析及实践
    什么是springbootstarterstarter是springBoot的一个重要部分。通过starter,我们能够快速的引入一个功能,而无需额外的配置。同时starter一般还会给我提供预留的自定配置选......
  • tcp通信原理
    (40条消息)【转】C#Socket编程详解(一)TCP与UDP简介_sinolover的博客-CSDN博客......
  • 双麦定向拾音束回音消除及远场拾音降噪模块 A-68(原理篇)
        最近手上有个双麦定向拾音束降噪和消回音模块,型号是A-68,一般的降噪处理设备不同的是这个模块可以实现定向拾音。只要找好两个麦克风的夹角就能够很好的屏蔽掉......
  • DNS服务原理
    DNS服务一、是什么DNS(DomainNameSystem域名系统)Ip:所有网络设备在互联网上的地址标记作用:解析域名背一个dns服务器:推荐202.106.0.208.8.8.8     3. h......