【Java并发】深入解析ConcurrentHashMap
系列文章目录
文章目录
- 系列文章目录
- 1.1 ConcurrentHashMap简介
- 1.2 关键属性及类
- 1.3 CAS相关操作
- 1.4 深入分析核心方法
- 1. 实例构造器方法
- 2. initTable()方法
- 3 put()方法
- 4 .get()方法
- 5.transfer()方法
- 总结
1.1 ConcurrentHashMap简介
使用hashmap时,在多线程的情况下扩容会出现cpu接近100%的情况,因为HashMap并不是线程安全的,这时可以使用Java体系中古老的hashtable类,该类中的方法几乎都是采用synchronized进行线程安全的控制。可想而知,在高并发的情况下,每次只有一个线程能够获取对象监视器锁,这样的开发性能的确不令人满意。针对这种情况,Doug Lea大师不遗余力的为开发者创造了一些线程安全的并发容器,相对于HashMap,ConcurrentHashMap就是线程安全的map,它是利用了锁分段的思想提高了并发度。ConcurrentHashMap在JDK1.6和JDK1.8版本上有很多不同,对比这两个版本进行横向比较可以发现JDK1.6版本的核心观点有两点:
- 由于segment继承了ReentrantLock,因此每个segment都具备线程安全的特性;
- segment维护了哈希桶的若干个桶,每个桶均有HashEntry构成链表。
而JDK1.8的ConcurrentHashMap有了很大的变化,舍弃了segment,并且大量使用了synchronized以及CAS无锁操作以保证ConcurrentHashMap操作线程安全性。它为什么不用ReentrantLock而是使用synchronized呢?因为sunchronized做了很多的优化,包括偏向锁、轻量级锁以及重量级锁的锁升级机制,提升锁的高并发性能,以及加锁和释放锁的效率。因此,使用synchronized相较于ReentrantLock的性能会持平,甚至在某些情况下更优,另外底层数据结构改变为数组+链表+红黑树的数据形式。
1.2 关键属性及类
ConcurrentHashMap的关键属性如下:
table volatile Node<K,V>[] table;//装载Node的数组,作为ConcurrentHashMap的数组容器,
//采用懒加载的方式,知道第一次插入数据时才会进行初始化操作,数组的大小总为2的幂次方nextTable volatile Node<K,V>[] nextTable;//扩容时使用,平时为null,只有在扩容时才为非null
sizeCtl volatile int SizeCtl; //该属性用来控制table数组的大小
sun.misc.Unsafa U //在ConcurrentHashMap的实现中可以看到大量的U.compare-AndSwqpXXXX()方法去修改ConcurrentHashMap的一些属性,这些操作能够保障线程安全
sun.misc.Unsafe提供的这些能有保障线程安全的方法,其实是利用了CAS算法保证了线程安全性,这是一种乐观锁策略,假设每次操作都不会产生冲突,当且仅当冲突发生时再去尝试 在大量的同步组件和并发容器的实现中使用CAS是通过sun.misc.Unsafe类实现的,该类提供了一些可以直接操控内存和线程的底层操作。该成员变量的获取时在静态代码块中:
static{try{U = sun.misc.Unsafe.getUnsafe();....}catch(Exception e){throw new Error(e);}}
ConcurrentHashMap中的关键内部类如下。
(1)Node。Node类实现了Map.Entry接口,主要是用来存放key-value对,并且具有next域。
static class Node<K,V> implements Map.Entry<K,V>{final int hash;final K key;volatile V val;volatile Node<K,V> next;...}
另外我们可以看出很多属性都是用volatile修饰的,也就是为了保证内存的可见性。
(2)TreeNode。树节点,继承于承载数据的Node类。而红黑树的操作是针对TreeBin类的,从该类的注释中也可以看出,TreeBin会将TreeNode进行再次封装。
static final class TreeNode<K,V> extends Node<K,V>{TreeNode<K,V> parent;TreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev;...}
(3)TreeBin。这个类并不负责包装用户的key和value信息,而是包装了很多TreeNode节点。
static final class TreeBIn<K,V> extends Node<K,V>{TreeNode<K,V> root;volatile TreeNode<K,V> first;volatile Thread waiter;volatile int lockState;static final int WRITER = 1;static final int WAITER = 2;static final int READER = 4;...}
(4)ForwardingNode。在扩容时才会出现的特殊节点,其Key、value、hash全部为null,并用nextTable指针引用新的table数组。
static final class ForwardingNode<K,V> extends Node<K,V>{final Node<K,V>[] nextTable;ForwardingNode(Node<K,V>[] tab){super(MOVED,null,null,null);this.nextTable = tab;}}
1.3 CAS相关操作
上面提到在ConcurrentHashMap中会大量使用CAS修改它的属性和一些操作。因此,在理解ConcurrentHashMap的方法前,我们需要了解下面几个常用的利用CAS算法保障线程安全的操作。
(1)tabAt()。该方法可用来获取table数组中索引为i的Node元素
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab,int i){return (Node<K,V>)U.getObjectVolatile(tab,((long)i << ASHIFT) + ABASE);}
(2)、casTabAt(),该方法可利用CAS操作设置table数组中索引为i的元素
static final <k,V> boolean casTabAt(Node<K,V>[] tab,int i ,Node<K,V> c,Node<K,V> v){return U.compareAndSwapObject(tab,((long)i << ASHIFT) + ABASE,c,v);}
(3)、setTabAt()该方法可用来设置table数组中索引为i的元素。
static final <K,V> void setTabAt(Node<K,V>[] tab,int i,Node<K,V> V){U.putObjectVolatile(tab,((long)i << ASHIFT) + ABASE,v);
}
1.4 深入分析核心方法
1. 实例构造器方法
使用ConcurrentHashMap的第一件事自然是创建一个ConcurrentHashMap对象,他提供了以下构造器方法;
//1. 构造一个空的map,即table数组还未初始化,默认大小为16ConcurrentHashMap();//2.给定map的大小ConcurrentHashMap(int initialCapcity);//3.给定一个mapConcurrentHashMap(Map<? extends K,?extends V> m);//4.给定map的大小以及加载因子ConcurrentHashMap(int initialCapacity,float loadFactor);//5.给定map的大小、加载因子以及并发度(预计同时操作数据的线程)ConcurrentHashMap(int initialCapacity,float loadFactor,int concurrencyLevel);
ConcurrentHashMap一共提供了5中构造器方法,具体使用请看注释,我们来看看第二种构造器,即传入指定大小时的情况,该构造器源码如下:
public ConcurrentHashMap(int initialCapacity){//1.小于0直接抛出异常if(initialCapacity < 0){throw new IllegalArgumentException();}//2.判断是否超过了允许的最大值,若超过则取最大值,否则就对该值做进一步处理int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY: tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1 ));//3. 赋值给sizeCtlthis.sizeCtl = cap;}
具体流程可以看源码中的注释,当调用构造器方法之后,sizeCtl的大小就代表了ConcurrentHashMap的大小,即table数组长度。tableSizeFor()方法又做了什么事情呢?源码如下:
private static final int tableSizeFor(int c){int n = c- 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n<0) ? 1 :(n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n+1;}
该方法会将调用构造器方法时指定值的大小转换位一个2的幂次方数,也就是说ConcurrentHashMap的大小一定是2的幂次方。需要注意的是,调用构造器方法时并未构造出table数组(可以理解为ConcurrentHashMap的数据容器),只是计算出了table数组的长度,当第一次向currentHashMap插入数据时才真正完成初始化创建table数组的工作。
2. initTable()方法
initTable()方法的具体源码如下:
private final Node<K,V>[] initTable(){Node<K,V>[] tab;int sc;while((tab == table) == null || tab.length == 0){if((sc = sizeCtl) < 0){//1.保证只有一个线程正在进行初始化操作Thread.yield();}else if(U.compareAndSwapInt(this,SIZE,SC,-1)){try{if((tab = table) == null || tab.length == 0){//2.得出数组的大小int n = (sc>0) ? sc:DEFAULT_CAPACITY;@SuppressWarnings("unchecked")//3.这里才真正开始初始化数组Node<K,V>[] nt = (Node<K,V>[]) new Node<?,?>[n];table = tab = nt;//4.计算数组中可用的大小:数组实际大小 n * 加载因子 0.75sc = n - (n>>>2);}}finally {sizeCtl = sc;}break;}}return tab;}
代码的逻辑可以看注释,还可能存在一种情况,多个线程同时执行到这个方法时,为了保证能够正确的初始化,在第一步会通过if判断,若当前已经有一个线程正在初始化,即sizeCtl值变为-1,这时若其他线程通过if判断为true,就调用Thread.yield()方法让出CPU时间片,让其他线程先运行。正在进行初始化的线程会调用U.compareAndSwapInt()方法将sizeCtl改为-1,即正在初始化的状态。另外还需要注意的是,在第四步会进一步计算数组中可用的大小,即为数组实际大小n乘以加载因子0.75.
如果选择无参构造器,这里在new Node数组时会使用默认大小DEFAULT_CAPACITY
(16),然后乘以加载因子0.75,即为12,也就是说数组的可用大小为12.
3 put()方法
ConcurrentHashMap最常用的应该是put()和get()方法,我们先来看看put方法是怎么实现的。调用put方法时,具体是由putVal()方法实现的,源码如下:
final V putVal(K key, Vvalue, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException() i//1,计算key的哈希值int hash = spread(key.hashCode());int binCount = 0;for (Node<K, V>[] tab = table; ; ) {Node<K, V> f;int n, i, fh;//2.如果当前table没有被初始化、先调用initTable()方法对tab 进行初始化if (tab == nullll(n = tab.length) == 0) {tab = initTable();}//3.tab中索引为的位置的元素为nu11,则直接使用cAS将值插人即可else if ((f = tabAt(tab, i = (n - 1) & hash)) == nul1) {if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, nul1)))break;}//4.当前正在扩容else if ((fh = f.hash) == MOVED) {tab = helpTransfer(tab, f);} else {V oldVal = null;synchronized (f) {if (tabAt(tab,i) == f) {//5.当前为链表,在链表中插入新的键值对if (fh >= 0) {binCount = 1;for (Node<K, V> e = f; ; ++binCount) {K ek;if (e.hash == hash && ((ek = e.key) == key | l(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent) {e.val = value;}break;}Node<K, V> pred = e;if ((e = e.next) == null) {pred.next = new Node<K, V>(hash, keyvalue, null);break;}}}}}//6.当前为红黑树,将新的键值对插入红黑树else if (f instanceof TreeBin) {Node<K, V> P;binCount = 2;if ((p = ((TreeBin<K, V>) f),putTreeVal(hash, key, value)) !=null){oldVal = p, val;if (!onlyIfAbsent) {p.val = value;}}}//7.插入完键值对后,再根据实际大小看是否需要转换为红黑树if(binCount !=0) {if (binCount >= TREEIFY THRESHOLD)treeifyBin(tab, i);if (oldVal != null){return oldVal;}break;}}}//8.对当前容量大小进行检查,如果超过了临界值(实际大小x加载因子),就需要扩容addCount(1L,binCount);return null;}
PutO方法的代码有点长,按照上面的分解步骤一步步来看。从整体而言,为了解决线程安全的问题,ConcurrentHashMap使用了synchronzied 和CAS的方式ConcurrentHashMaptable结构如下图所示:
ConcurrentHashMap是一个哈希桶数组,不出现哈希冲突时,每个元素都均匀地分布在哈希桶数组中。 当出现哈希冲突时,通过链地址的方式解决哈希冲突的问题,将哈希值相同的节点构成链表的形式,称为“拉链法”。 另外,在JK18版本中为了防止拉链过长,当链表的长度大于8时会把链表转换为红黑树。table数组中的每个元素实际上是单链表的头节点或红黑树的根节点。当插人键值对时首先应该定位到要插人的桶,即插入table数组的索引处。那么怎样计算得出索引i呢?当然是根据key的 hashcode值。整个 put() 方法的关键步骤如下:
(1)利用Spread()方法降低哈希冲突的概率,计算节点哈希值。
对于一个哈希表,哈希值分散得不够均匀会大大增加哈希冲突的概率,从而影响哈希表的性能,要通过spread()方法进行了一次重哈希,减小哈希冲突的可能性。spead()方法的源码如下。
static final int spread(int h){return(h^(h>>> 16))& HASH BITS;
}
该方法主要是将key的hashCode的低16位与高16位进行异或运算,这样不仅能够使哈希值分散均匀,减小哈希冲突的概率,而且只用到了异或运算,在性能开销上也能兼顾,做到平衡的·trade-off;
(2)初始化 table。
第2步会判断当前 table数组是否初始化了,没有就调用imnitTableO)方法进行初始化。
(3)是否可直接赋值到 table 数组。
从上图中可以看出还存在这样一种情况,插入值待插入的位置刚好在的table数组为nu,这时就可以直接将值插人。那么怎样根据哈希值确定在table 中待插入的索引i呢?很显然,可以通过哈希值与数组长度的取模操作,确定新值插入到数组的哪个位置。而之前提过ConcurrentHashMap的大小总是2的幂次方,(n-1)&hash 运算等价于对长度n取模,也就是hash%n。但是,位运算比取模运算的效率要高很多。
确定好数组的索引i后,就可以调用tabAt()方法获取该位置上的元素了,如果当前节点f为null,还可以直接用casTabAt()方法将新值插人。
(4)当前 ConcurrentHashMap 是否正在扩容。
如果当前节点不为null,且该节点为特殊节点(forwardingNode),就说明当前ConcunrentHashMap正在进行扩容操作。关于扩容操作,我们在后面会作为一个具体的方法进行讲解。那么怎样确定当前节点是不是特殊的节点呢?可通过该节点的哈希值是否等于-1(MOVED)来判断,代码为(fh = f.hash) == MOVED,对于MOVED的解释源码中也写得很清楚了.
(5)当 table[i] 为链表的头节点,在链表中插入新值。
当table[i]不为null、也不为forwardingNode,且当前节点f的哈希值大于0(fh>=0)时,说明当前节点f为当前桶的所有节点组成的链表的头节点。那么接下来,要想向ConcurrentHashMap中插人新值,也就是向这个链表中插入新值,可以通过synchronized (f)的方式进行加锁以实现线程安全性。向链表中插入节点的部分代码如下:
if(fh>=0){binCount =1;for(Node<K,V>e=f;;++binCount){K ek;//找到哈希值相同的key,覆盖旧值即可if(e.hash==hash &&((ek=e.key)== key ||(ek !=null && key.equals(ek)))){oldVal = e.val;if (!onlyIfAbsent){e.val = value;}break;}Node<K,V> pred = e;if((e=e.next)== null) {//如果到链表末尾仍未找到,则直接将新值插人链表末尾即可pred,next = new Node<K,V>(hash, key, value, null);break;}}}
这部分代码很好理解,就是分成两种情况:
- 在链表中如果找到了与待插人的键值对key相同的节点,直接覆盖即可;
- 如果直到链表末尾都没有找到,直接将待插人的键值加到链表的末尾即可。
(6)当table[]为红黑树的根节点,在红黑树中插入新值。
按照之前的数组+链表的设计方案,这里存在一个问题,即使加载因子和哈希算法计得再合理,也免不了会出现链表过长的情况,一旦出现链表过长,甚至在极端情况下,找一个节点会出现时间复杂度为(n)的情况,就会严重影响ConcurentHashMap的性能于是,在JDK1.8版本中,对数据结构做了进一步的优化,引人了红黑树。而当链表长摩太长(默认超过8)时,链表就转换为红黑树,利用红黑树的数据结构特点能够快速提高ConcurrentHashMap的性能,其中会用到红黑树的插人、删除以及査找等算法。当table[]为红黑树的根节点时的操作如下。
if(f instanceof TreeBin){Node<K, V> p;binCount = 2;if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) {oldVal = p.val;if (!onlyIfAbsent) {p.val = value;}}}
首先在if中通过f instance of TreeBin判断当前table[i]是否为TreeBin,这也正好验证了上面介绍时说的 TreeBin 会对 TreeNode做进一步封装,对红黑树进行操作时针对的是 TreeBin而不是 TreeNode。这段代码很简单,是调用puTreeVal0)方法向红黑树插入新节点的。同样的逻辑,如果在红黑树中存在与待插入键值对的key相同(哈希值相等并且equals()方法判断为true)的节点,就覆盖旧值,否则向红黑树追加新节点。
(7)根据当前节点个数进行调整。
当完成数据新节点的插人之后,会进一步对当前链表大小进行调整,这部分代码如下
if(binCount !=0){if(binCount >= TREEIFY_THRESHOLD){treeifyBin(tab,i);}if(oldval != null){return oldVal;}break;}
如果当前链表节点个数大于或等于8,就会调用teeifyBim0方法将tablel[i]拉链转换为红黑树。至此,关于put()方法的逻辑就介绍完了。
现在来做一些总结:
(1)对于每个放入的值,首先利用spread0)方法对key的HashCode 进行一次哈希计算,通过spread()方法能够使哈希值分布得更加均匀,从而降低哈希冲突的概率。
(2)如果当前table数组还未初始化,就先将 table 数组进行初始化操作。
(3)如果这个位置是null,就使用CAS操作直接插人.
(4)如果这个位置存在节点,说明发生了哈希碰撞,首先判断这个节点的类型。如果该节点fh = MOVED,说明数组正在进行扩容。
(5)如果是链表节点(fh>0),则得到的节点就是哈希值相同的节点组成的链表的头节点需要依次向后遍历确定这个新加入的值的所在位置。如果遇到key 相同的节点,则只需要覆盖该节点的 value 值即可;否则就依次向后遍历,直到遍历到达链表尾插入的这个节点。
(6)如果这个节点的类型是 TreeBin,可直接调用红黑树的插人方法插人新的节点。
(7)插人完节点之后再次检查链表长度,如果长度大于8,就把这个链表转换为红黑树。
(8)对当前容量大小进行检查,如果超过了临界值(实际大小x加载因子),就需要扩容.
4 .get()方法
了解了 put()方法,再来看get()方法就很容易了,通过逆向思维,将put 的操作“反过来”可以达到取值的目的。getO方法的源码如下:
public V get(object key) {Node<K, V>[] tab;Node<K, V> e, p;int n, eh;K ek;//1.重哈希int h = spread(key.hashCode());if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n1) & h))!=null){//2.table[i]桶节点的key与查找的key相同,则直接返回if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek))) {return e.val;}}//3.当前节点哈希小于0说明为树节点,在红黑树中查找即可else if (eh < 0) {return (p = e.find(h, key)) != null ? p.val : null;}while ((e = e.next) != null) {//4.从链表中查找,查找到就返回该节点的value,否则返同nullif (e.hash == h && (ek = e.key) == key || (ek != null && key.equals(ek))) {return e.val;}}return null;}}
代码的逻辑可以看注释,主要流程如下:
(1).首先看当前的哈希桶数组节点(即table[i])是否为查找的节点,若是,就直接返回
(2).若不是,则继续看它是不是树节点。节点的哈希值是否小于零?如果小于0,就说明他是树节点,如果是树节点,就在红黑树中查找节点;如果不是树节点,那就只剩下为链表形式的这种可能性了,向后遍历查找节点,若查找到,返回节点的value即可,若没有查就返回 null。
5.transfer()方法
当concumentHashMap容量不足时,需要对table进行扩容。这个方法的基本思想与HashMap是很像的,但是由于它是支持并发扩容的,所以要复杂得多。原因是它支持多进行扩容操作而并没有加锁。这样做不仅仅是为了满足ConcurentHashMap的要求,还希望用并发处理减少扩容带来的时间影响。transfer()方法的源码如下:
private final void transfer (Node<K,V>[] tab, Node<K,V>[] nextTab){int n=tab.length, stride;if((stride=(NCPU>1)?(n>>>3)/NCPU :n)<MIN_TRANSEER_STRIDE) {stride = MIN_TRANSFER_STRIDE;}//1.新建Node数组,容量为之前的两倍if(nextTab ==null) {try {@SuppressWarnings("unchecked")Node<K,V>[] nt =(Node<K,V>[])new Node<?,?>[n << 1];nextTab =nt;}catch(Throwable ex){sizeCtl =Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = ni}int nextn= nextTab.length;//2,新建forwardingNode引用ForwardingNode<K,V> fwd = new ForwardingNode<K,V> (nextTab);boolean advance =true;boolean finishing = false;for(int i=0,bound =0;;){Node<K,V> f;int fh;//3、确定遍历中的索引iwhile(advance){int nextIndex,nextBound;if(--i>= bound || finishing){advance =false;}else if((nextIndex=transferIndex)<=0){i = -1;advance =false;}else if(U.compareAndSwapInt (thiS,TRANSFERINDEX,nextIndex,nextBound =(nextIndex>stride ?nextIndex-stride:0))){bound = nextBound;i = nextIndex;advance = false;}}//4.将原数组中的元素复制到新数组中//4.5 for循环退出,扩容结束修改sizeCtl属性if(i<0|| i>=n || i+n >= nextn){int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if(U.compareAndSwapInt(this,SIZECTL,sc=sizeCtl,sc-1)){if((sc-2)!=resizeStamp(n)<<RESIZE_STAMP_SHIFT){return;}finishing = advance = true;i = n;}}//4.1 当前数组中第1个元素为null,用CAS设置为特殊节点forwardingNode(可以理解为占位符)else if((f=tabAt(tab,i))== null)advance =casTabAt(tab,i,null,fwd);//4.2如果遍历到forwardingNode节点,说明这个点已被处理过了,直接跳过就可以else if((fh=f.hash)== MOVED)advance =true;else {synchronized(f){if(tabAt(tab, i )==f){Node<K,V> in,hn;if(fh >= 0){///4.3处理当前节点为链表的头节点的情况int runBit =fh & n;Node<K,V> lastRun = f;for (Node<K,V> p= f.next; p != null;p = p.next){int b = p.hash & n;if((b !=runBit){runBit = b;lastRun =p;}}if(runBit==0){ln = lastRun;hn =null;}else{hn = lastRun;ln = null;}for (Node<K,V> p=f;p!= lastRun; p= p.next) {int ph = p.hash;V pv = p.val;if ((ph & n) == 0)ln = new Node<K, V>(ph, pk, pv, ln);elsehn = new Node<K, V>(ph, pk, pv, hn);}//在nextTable的i位置上插人一个链表setTabAt(nextTab,i,ln);//在nextTable的i+n的位置上插入另一个链表setTabAt(nextTab,i+n,hn);//在 table的i位置上插入 forwardNode 节点,表示已处理过该节点setTabAt(tab,i,fwd);//设置 advance为true,返回到上面的 while 循环中,就可以执行i--操作advance = true;}//4.4处理当前节点是reeBin 的情况,操作和上面的类似else if(f instanceof TreeBin) {TreeBin<K, V> t = (TreeBin<K, V>) f;TreeNode<K, V> lo = null, loTail = null;TreeNode<K, V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K, V> e = t, first; e != null; e = e.next) {int h = e.hash;TreeNode<K, V> p = new TreeNode<K, V>(h, e.key, e.val, null, nul1);if ((h & n) == 0) {if ((p.prev = loTail) == nul1) {lo = p;} else {loTail.next = p;}loTail = p;++lc;} else {if ((p.prev = hiTail) == null)hi = p;else {hiTail.next = p;}hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? newTreeBin < K, V > (lo) :t;hn = (hc <= UNTREEIFY THRESHOLD) ?untreeify(hi) :(lc != 0) ? new TreeBin<K, V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}
整个扩容分两个部分:
第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。新建 table 数组的代码为 Node<K,V>[]nt = (Node<K,V>[])new Node<?,?>[n<<1],在原容量大小的基础上右移一位。
第二部分就是将原来table中的元素复制到nextTabe中,主要是遍历复制的过程。根据运算得到当前遍历的数组的位置i,然后利用tabAtO方法获得i位置的元素后再进行判断:
(1)如果这个位置为空,就在原table 中的i位置放入forwardNode节点,这是触发并发扩容的关键点;
(2)如果这个位置是Node节点(fh>=0),通过fh&n对原链表节点进行标记然后构造反序链表,把它们分别放在 nextTable 的i和i+n的位置上;
(3)如果当前节点是TreeBin类型,说明当前节点已经转换成了红黑树结构。同样的需要通过遍历节点进行节点转移操作,最后通过CAS把In设置到新数组的i位置以及将hn设置到i+n位置、需要注意的是、扩容后需要对容器长度进行判断。如果lo和hi的元素个数小于等下UNTREEIFY THRESHOLD(默认为6),就需要通过untreeify方法将之转换成节点类型的链表结构。
遍历过所有的节点后就完成了复制工作,这时让nexttable作为新的表,并且更新sizeCt为扩容后总容量乘以0.75系数后的值,整个过程就是扩容全过程。设置为新容量的0.75倍的代码为sizeCtl=(n<<1)-(n>>>1)。仔细体会下,是不是很巧妙?n<<1相当于n 右移一位,表示n的2倍,即2n(扩容后的预估总容量);n>>>1相当于右移一位,表示除以2,即0.5n,然后两者相减为2n-0.5n=1.5n,最后的结果就刚好等于新容量的3/4,即2n*0.75=1.5n。操作示意图如下图所示:
总结
JDK1.8之前的 ConcurentHashmap主要通过将数据容器拆解成多个segment,来降低锁粒度,这样可以进一步提升并发性能。在put值时为了保障线程安全需要将segmem锁住再来获取值,但是在get值时并不需要加锁。当要统计全局时(如size),首先会尝试多次计算modcount来确定在多次的计算过程中是否有其他线程进行了修改操作,如果没有,就直接同size;如果有,则需要依次锁住所有的segment 来计算。
JDK1.8之前 put定位节点时要先定位到具体的segment,然后再在segment 中定位到具体的桶。而JDK1.8摒弃了 segment臃肿的设计,直接针对 table 数组中的每个桶,进一步减小了锁粒度,并且防止拉链过长导致性能下降,当链表长度大于8时则采用红黑树的数据结构进步提升数据访问的性能。
主要设计上的变化有以下几点:
(1)不采用segment,而是采用更加细粒度的Node节点,降低锁粒度,提升容器整体的并发特性和性能;
(2)设计了MOVED状态,在resize的过程中,可以由多线程并发协助完成容器的扩容;
(3)进一步通过无锁算法CAS操作完成对节点的数据操作,提升性能和并发度;
(4)采用synchronized 解决线程安全的问题,而不是ReentrantLock。
以上就是本文全部内容,感谢各位能够看到最后,如有问题,欢迎各位大佬在评论区指正,希望大家可以有所收获!创作不易,希望大家多多支持!