聊聊java中的哪些Map:(七)ConcurrentHashMap的size方法的一致性分析

时间:2020-8-30 作者:admin


文章目录

关于ConcurrentHashMap的size方法,有资料说size不能提供强的一致性,但是也有人说size是强一致性的。那么对于这个问题,我们从源码出发,来看看size的实现机制。最终看看能否得到正确的答案。

1.一致性定义

关于一致性的定义,大概如下:
一致性(Consistency)是指多副本(Replications)问题中的数据一致性。可以分为强一致性、顺序一致性与弱一致性。

1.1 强一致性(Strict Consistency)

强一致性也被可以被称做:
原子一致性(Atomic Consistency)
线性一致性(Linearizable Consistency)
要满足强一致性,必须符合以下两个要求:

  • 任何一次读都能读到某个数据的最近一次写的数据。
  • 系统中的所有进程,看到的操作顺序,都和全局时钟下的顺序一致。

上述定义用通俗的话来解释就是,假定对同一个数据集合,分别有两个线程A、B进行操作,假定A首先进行的修改操作,那么从时序上在A这个操作之后发生的所有B的操作都应该能看到A修改操作的结果。

1.2 弱一致性

数据更新之后,如果能容忍访问不到或者只能部分访问的情况,就是弱一致性。最终一致性是弱一致性的一个特例。
也就是说,对于数据集,分别有两个线程A、B进行操作,假定A首先进行了修改操作,那么可能从时许上滞后的B进行的读取操作在一段时间内还读取不到这个结果。读取的还是A操作之前的结果。这就是弱一致性。
最终一致性就是说,只要A、B的都不进行任何更新操作,一段时间之后,数据都能读取到最新的数据。

2.size方法源码

2.1 jdk1.8实现

2.1.1 size方法

我们来看看1.8版本中的ConcurrnetHashMap中size方法的源码:

/**
 * {@inheritDoc}
 */
public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

2.1.2 sumCount

实际上底层调用的是sumCount方法:

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

可以看到,这个count,实际上是对CounterCell数组进行遍历,中间没有任何锁操作。

2.1.3 CounterCell

CounterCell源码如下:

/**
 * A padded cell for distributing counts.  Adapted from LongAdder
 * and Striped64.  See their internal docs for explanation.
 */
@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

这实际上就是一个volatile修饰的计数器。除了Contended这个注解之外,没有什么特别之处,在put、remove的时候,对这个计数器进行增减。
Contended这个注解我们在后面再来详细解释。

counterCells这个数组,实际上size和table一致,这样Counter中的value就是这个数组中index对应到table中bucket的长度。
在table扩容的时候,这个计数器数组也会扩容:

 CounterCell[] rs = new CounterCell[n << 1];

2.1.4 addCount

那么在put和remove以及clear等方式对size数量有影响的方法中,都会调用addCount对size进行增减。
x为正数表示增加,负数表示减小。同时check如果大于0则需要对结果进行check,避免在并发过程中由于并发操作带来的计算不准确。

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    //判断是否为空
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
        //a是计算出来的槽位
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
            //cas方式
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
              //增加
            fullAddCount(x, uncontended);
            return;
        }
        //如果不需要检查就直接返回
        if (check <= 1)
            return;
        s = sumCount();
    }
    
    //如果需要检查
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        //遍历并重新计算
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

2.1.5 fullAddCount

这是执行增加的核心方法,其中大量使用了cas操作,另外还必须考虑到执行的并行性。

// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    //死循环,cas方式修改
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        if ((as = counterCells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}

2.1.6 总结

通过对上述方法分析不难看出,size方法是弱一致性的,这是因为,如果有A线程正在进行put操作,之后触发了扩容或者红黑树转置,那么立即就会synchronized锁定root节点。之后开始进行对应的操作,这个操作是需要时间的。但是这个时候,如果线程B来调用size方法,那么size方法由于没有任何锁机制,肯定是能够返回的,此时返回的size就是put之前的值。那么这个结果就导致了弱一致性。即put在前的操作并不能马上让时许在其后面的操作得到结果,需要等一段时间。待synchronized执行完成。
ConcurrenthashMap的counter机制就是为了增加读取性能而设计的,如果为了强一致性,那么只能按HashTable的方式整个读取方法都加锁,那么这样肯定会影响性能的。
另外addCount,在增加操作的时候还会对数量进行检查。以避免并发操作带来的不一致性。

2.2 jdk1.7源码实现

由于1.7采用分段锁的机制,因此设计没有1.8复杂。

2.2.1 size方法源码

/**
 * Returns the number of key-value mappings in this map.  If the
 * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
 * <tt>Integer.MAX_VALUE</tt>.
 *
 * @return the number of key-value mappings in this map
 */
public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
    //死循环
        for (;;) {
        //遍历 逐步锁定段
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            //假定初始的modCount
            sum = 0L;
            size = 0;
            overflow = false;
            //计算bucket
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    //将modCount相加
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            //如果modCount在这个计算过程中没有改变则说明size计算有效,否则会重置last之后重新计算
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
    //将所有的lock进行unlock操作
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

这个方法的逻辑是,在一开始,遍历segment的时候,先锁定一个段,计算size,然后判断在这个过程中,modCount是否发生了改变,如果发生改变则说明计算结果会产生误差,则重新计算。直到modCount在计算前后相等,则说计算可行,之后再移动到下要给bucket。
可以看到这实际上是个低效的操作,只有在所有的bucket都计算完成之后,才会统一在finally中进行unlock。这样会导致全部的段都被锁定。
也就是说,1.7中的size方法,最开始是个乐观锁,最终会转换为悲观锁,这样实际上是个强一致性的方法。

2.3 说明

通过上述对1.7和1.8源码中对size方法的对比,在1.7中,size能做到强一致性,但是这样是有代价的,对分段锁的lock导致了整体性能的降低。而在1.8中,为了增加性能,而增加了一大段复杂的代码将size变成了弱一致性。但是好处是在put的过程中不会对size造成阻塞。
由此可见源码作者为了提升ConcurrentHashMap所做的各种努力。
这也是我们在编码过程中值得借鉴的地方。
至于@sun.misc.Contended,这是通过缓存行对齐来避免伪共享问题,这个将在后续单独介绍。

声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎进行举报,并提供相关证据,工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。