1.一致性定义
关于一致性的定义,大概如下:
一致性(Consistency)是指多副本(Replications)问题中的数据一致性。可以分为强一致性、顺序一致性与弱一致性。
1.1 强一致性(Strict Consistency)
强一致性也被可以被称做:
原子一致性(Atomic Consistency)
线性一致性(Linearizable Consistency)
要满足强一致性,必须符合以下两个要求:
任何一次读都能读到某个数据的最近一次写的数据。
系统中的所有进程,看到的操作顺序,都和全局时钟下的顺序一致。
上述定义用通俗的话来解释就是,假定对同一个数据集合,分别有两个线程A、B进行操作,假定A首先进行的修改操作,那么从时序上在A这个操作之后发生的所有B的操作都应该能看到A修改操作的结果。
1.2 弱一致性
数据更新之后,如果能容忍访问不到或者只能部分访问的情况,就是弱一致性。最终一致性是弱一致性的一个特例。
也就是说,对于数据集,分别有两个线程A、B进行操作,假定A首先进行了修改操作,那么可能从时许上滞后的B进行的读取操作在一段时间内还读取不到这个结果。读取的还是A操作之前的结果。这就是弱一致性。
最终一致性就是说,只要A、B的都不进行任何更新操作,一段时间之后,数据都能读取到最新的数据。
2.size方法源码
2.1 jdk1.8实现
2.1.1 size方法
我们来看看1.8版本中的ConcurrnetHashMap中size方法的源码:
/**
* {@inheritDoc}
*/
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
2.1.2 sumCount
实际上底层调用的是sumCount方法:
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
可以看到,这个count,实际上是对CounterCell数组进行遍历,中间没有任何锁操作。
2.1.3 CounterCell
CounterCell源码如下:
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
这实际上就是一个volatile修饰的计数器。除了Contended这个注解之外,没有什么特别之处,在put、remove的时候,对这个计数器进行增减。
Contended这个注解我们在后面再来详细解释。
counterCells这个数组,实际上size和table一致,这样Counter中的value就是这个数组中index对应到table中bucket的长度。
在table扩容的时候,这个计数器数组也会扩容:
CounterCell[] rs = new CounterCell[n << 1];
2.1.4 addCount
那么在put和remove以及clear等方式对size数量有影响的方法中,都会调用addCount对size进行增减。
x为正数表示增加,负数表示减小。同时check如果大于0则需要对结果进行check,避免在并发过程中由于并发操作带来的计算不准确。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//判断是否为空
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
//a是计算出来的槽位
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
//cas方式
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//增加
fullAddCount(x, uncontended);
return;
}
//如果不需要检查就直接返回
if (check <= 1)
return;
s = sumCount();
}
//如果需要检查
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//遍历并重新计算
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
2.1.5 fullAddCount
这是执行增加的核心方法,其中大量使用了cas操作,另外还必须考虑到执行的并行性。
ini 代码解读复制代码// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//死循环,cas方式修改
for (;