ConcurrentHashMap详解

# ConcurrentHashMap详解

ConcurrentHashMap和CopyOnWriteArrayList都是线程安全的集合。 会涉及到多线程相关知识点,本篇文章对多线程相关知识点不做展开讨论,后续会针对多线程相关知识点再写一些文章。

博主水平有限,写完这篇文章之后发现如果不对多线程相关知识点有充分的了解,看这篇文章可能会被源码中各种状态、标识、CAS操作绕晕,这种情况下可以先看看方法总结、方法执行过程、数据存储结构和使用场景。 等到多线程相关知识点深入学习总结后再返回来看方法的细节也许会有新的理解。

# 1、ConcurrentHashMap简介

ConcurrentHashMap是线程安全的键值对集合。ConcurrentHashMap的设计宗旨是减少锁的竞争,来提高并发性能。

适用场景: ConcurrentHashMap 非常适用于需要高并发读写的场景,下面是几个常见的使用场景:

  • 缓存系统:在一个多线程环境中使用缓存时,ConcurrentHashMap 能有效地处理大量的并发读写请求,保证数据的一致性和访问速度。

  • 计数器:在高并发的应用中,例如统计网站访问量、计数器等,ConcurrentHashMap 可以高效地更新计数器值。

  • 会话管理:用于管理用户会话信息,当多个线程需要并发访问和修改会话数据时,使用 ConcurrentHashMap 是一种安全且高效的选择。

  • 配置管理:在分布式系统中,各个节点需要共享一些配置信息,并且这些配置信息可能会动态变化。ConcurrentHashMap 可以保证在高并发环境下的快速访问和更新。

  • 任务调度:在任务调度系统中,可以使用 ConcurrentHashMap 来存储和管理任务状态,当多个线程需要并发地读取和更新任务状态时,能保证系统的高效和稳定。

即使列举出了一些使用场景,但是如果没有真正的在生产环境中使用过的话,其实对于ConcurrentHashMap的整体把握还是比较空泛的。
纸上得来终觉浅,绝知此事要躬行。
下面我尽量列出一些生产环境使用 ConcurrentHashMap 的真实案例来帮助理解。

# 2、ConcurrentHashMap底层数据结构解析

写在前面:
分析ConcurrentHashMap为什么要拿JDK8之前和JDK8的源码对比,因为JDK8对ConcurrentHashMap做了很大的改造,提升了其性能和灵活性。对比学习能体会到从JDK的集合框架源码升级过程中的一些优化设计思想,对以后代码设计能有所帮助。(也许看下来体会不到啥思想还把自己绕晕了...)

# JDK8之前:

# JDK8之前的ConcurrentHashMap类的继承体系

mixureSecure

基于JDK7的源码 可以看到 JDK7的ConcurrentHashMap 底层也是采用数组来存储元素,且数组存的元素类型是Segment<K,V>

final Segment<K,V>[] segments;

Segment继承了ReentrantLock,所以 Segment具有可重入锁的一些特性

static final class Segment<K,V> extends ReentrantLock implements Serializable

ConcurrentHashMap 的哈希表(数组)默认容量大小,加载因子都和HashMap一致。
但是也有一些ConcurrentHashMap 特有的属性: 比如:

/**
 * 默认的并发级别,即分段的数量。ConcurrentHashMap 将哈希表分成多个段 (Segment),
 * 每个段是一个小的哈希表。这样可以使多个线程在不同的段上进行并发操作,从而提高并发性能。
 */
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
 * 段的最小容量。当新建一个段时,默认的初始容量至少为 2。
 * 即使用户指定的初始容量很小,也会将其提升到 2。
 */
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

/**
 * 段的最大数量。ConcurrentHashMap 最多可以有 65536 (1 << 16) 个段。
 * 这个值是一个常量,用于限制段的数量,避免内存消耗过大。
 */
static final int MAX_SEGMENTS = 1 << 16;

/**
 * 在尝试获取锁之前,允许线程重试的次数。如果超过这个次数,线程将使用锁机制来确保线程安全。
 * 这个值用于减少锁争用,提高并发性能。
 */
static final int RETRIES_BEFORE_LOCK = 2;

/**
 * 段掩码,用于根据哈希值计算段索引。segmentMask 的值通常是 segments 数组长度减 1。
 * 通过哈希值与段掩码进行按位与操作,可以快速定位到对应的段。
 */
final int segmentMask;

/**
 * 段移位量,用于根据哈希值计算段索引。segmentShift 的值通常是一个用来将哈希值右移的位数,
 * 以便通过右移操作来获得段索引。
 */
final int segmentShift;

/**
 * 段数组,包含了 ConcurrentHashMap 的所有段。每个段都是一个小的哈希表,
 * 具有独立的锁和容量设置。通过分段锁机制 (Segment Locking),多个线程可以在不同的段上并发操作,
 * 从而提高整体的并发性能。
 */
final Segment<K,V>[] segments;

看到这 就知道了 Segment<K,V>[] 数组 实际上是ConcurrentHashMap 中第一层的数据存储类型,该数组中每个 Segment<K,V> 都是一个独立的锁,先到这。

我们继续看Segment类到底是什么: 从源码可以看出 Segment 是ConcurrentHashMap 的一个静态内部类。

static final class Segment<K,V> extends ReentrantLock implements Serializable 
mixureSecure

我们看下 Segment有哪些属性:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    /**
     * 最大扫描重试次数。如果可用处理器数量大于 1,则最大扫描重试次数为 64,否则为 1
     * 这个值用于在尝试获取锁之前允许进行的最大重试次数
     */
    static final int MAX_SCAN_RETRIES =
        Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

    /**
     * 哈希表数组。存储每个段中的哈希桶 (HashEntry)
     * 它会在并发操作中被频繁修改
     */
    transient volatile HashEntry<K,V>[] table;

    /**
     * 哈希表中当前的键值对数量
     */
    transient int count;

    /**
     * 修改计数器 记录了哈希表被修改的次数,用于实现快速失败 (fail-fast) 机制 
     */
    transient int modCount;

    /**
     * 阈值  表示哈希表扩容的临界点,当哈希表中的键值对数量超过这个值时,需要进行扩容。
     */
    transient int threshold;

    /**
     * 负载因子。表示哈希表的装填因子,用于控制哈希表的扩容行为。
     */
    final float loadFactor;
}

Segment的属性是不是很熟悉,特别像HashMap的属性,有用于存储数据的 table 数组 只不过类型是 HashEntry<K,V>[],还有扩容阈值,扩容因子等。
所以 ConcurrentHashMap 的 Segment<K,V>[] 数组中,每个 Segment 实例实际上都是一个小的哈希表(对应 HashEntry<K,V>[]),并且具有独立的锁。

这个时候整个ConcurrentHashMap 的数据结构就清晰了:

Segment<K,V>[] 数组保存若干个(默认是16个)Segment元素,每个Segment都是一个单独的哈希表(对应 HashEntry<K,V>[],对应的HashEntry<K,V>[]才是用来存储实际的键值对数据的数组)。

这种设计方式使得多个线程在不同Segment上执行插入、删除、查找等操作时能够做到真正的并发,因为操作不同Segment时不会相互阻塞,只有在同一Segment内进行操作的线程才需要竞争该Segment的锁。

# JDK8之前的ConcurrentHashMap 数据结构图示:

mixureSecure

其中Segment<K,V>[] 数组的容量一旦初始化就不能再改变了。HashEntry<K,V>[]就类似于HashMap中的 Node<K,V>[]可以动态扩容。

# 为什么Segment<K,V>[] 数组的容量一旦初始化就不能再改变了

Segment<K,V>[] 数组的容量一旦初始化就不能再改变,主要是为了保证并发安全、简化设计、确保性能以及维持数据一致性这几个方面。

  • ①、线程安全考虑:
    在并发环境下,扩容操作比单线程环境复杂得多,需要维护数据一致性,避免数据丢失或损坏。直接改变Segment数组的大小涉及到大量元素的迁移和重新分配,这在多线程环境下很难保证操作的原子性和一致性,会大大增加实现的复杂度和风险。

  • ②、分段锁的设计原则:
    分段锁机制要求每个Segment独立管理自己的锁,以减少锁的粒度,提高并发效率。如果允许动态改变Segment数组的大小,那么就需要能够动态地添加或移除Segment以及其相关的锁,这将破坏原有的设计平衡,使得锁管理变得复杂且低效。

  • ③、数据迁移的复杂性:
    扩容或缩容操作需要将原有数据迁移到新的数组结构中,这在单个哈希表中就已经很复杂了,在分段锁机制下更是如此,因为每个Segment都需要独立完成这一过程,同时还需要协调各个Segment之间的操作,以保持整体数据的一致性。

  • ④、性能考量:
    动态调整数组容量是一个非常耗时的操作,尤其是在并发环境下,它可能导致长时间的阻塞或者复杂的同步逻辑,严重影响容器的响应时间和吞吐量。固定容量可以在设计阶段就根据预期负载来选择一个合适的大小,从而在运行时避免这种昂贵的操作。

# JDK8之前的ConcurrentHashMap构造函数

  • 空参构造
// 默认的初始容量为16
static final int DEFAULT_INITIAL_CAPACITY = 16;

// 定义默认的负载因子为0.75f
static final float DEFAULT_LOAD_FACTOR = 0.75f;

// 定义默认的并发级别为16,这个值决定了Segment的数量,进而影响锁的粒度和并发性能。
// 较高的并发级别可以支持更多的并发更新,但也会消耗更多的内存。
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 空参构造函数,它会调用带有三个参数的重载构造函数,并传入默认值。
public ConcurrentHashMap() {
    // 调用带有初始容量、负载因子、并发级别的构造函数,使用上面定义的默认值。
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

空参构造调用下面的带参构造:

/**
 * 创建具有指定初始容量、加载因子和并发级别的 ConcurrentHashMap。
 * 
 * @param initialCapacity 初始容量
 * @param loadFactor      加载因子
 * @param concurrencyLevel 并发级别(分段数量)
 * @throws IllegalArgumentException 如果加载因子小于等于0,初始容量小于0,或并发级别小于等于0
 */
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    // 检查参数的有效性
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 并发级别不能超过最大段数    static final int MAX_SEGMENTS = 1 << 16;   =  65536
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;

    // 找到最接近并发级别的 2 的幂次方值
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 段移位量(16的移位量是4) 用于将哈希值右移,段掩码用于通过按位与操作计算段索引。
    this.segmentShift = 32 - sshift; // 计算段移位量
    this.segmentMask = ssize - 1;    // 计算段掩码

    // 如果初始容量大于最大容量,则使用最大容量
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    // 计算每个段的容量   =  传入的数据存储容量/segment段容量
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;

    // 找到最接近 c 的 2 的幂次方值,作为每个段的实际容量  MIN_SEGMENT_TABLE_CAPACITY = 2
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;

    // 创建段数组,并初始化第一个段
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // 有序写入 segments[0]
    this.segments = ss;
}

/**
 * Segment 构造函数,创建一个具有指定加载因子和初始容量的段。
 *
 * @param loadFactor  加载因子
 * @param threshold   扩容阈值
 * @param table       哈希表数组
 */
Segment(float loadFactor, int threshold, HashEntry<K,V>[] table) {
    this.loadFactor = loadFactor;
    this.threshold = threshold;
    this.table = table;
}

# JDK8之前的ConcurrentHashMap,put方法

put方法

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException(); // 如果值为空,则抛出空指针异常
    int hash = hash(key); // 计算键的哈希值
    int j = (hash >>> segmentShift) & segmentMask; // 计算该键所属的段索引
    if ((s = (Segment<K,V>)UNSAFE.getObject  // 获取段对象(非volatile;重新检查)
         (segments, (j << SSHIFT) + SBASE)) == null) // 如果段为空,则调用 ensureSegment
        s = ensureSegment(j); // 确保该段已初始化
    return s.put(key, hash, value, false); // 在该段中放置键值对
}

总结下put方法:

  • 该方法首先检查输入的 value 是否为空,如果为空则抛出 NullPointerException。
  • 计算键的哈希值 hash。
  • 使用位移操作和掩码计算出该键所属的段索引 j。
  • 使用非volatile的方式尝试获取该段对象 s。
  • 如果该段对象为空,则调用 ensureSegment(j) 确保该段已初始化。
  • 调用段的 put 方法将键值对插入到该段中。

ensureSegment方法

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments; // 获取所有段的数组
    long u = (k << SSHIFT) + SBASE; // 计算段的偏移量
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 使用 volatile 方式获取段对象
        Segment<K,V> proto = ss[0]; // 使用第0段作为原型
        int cap = proto.table.length; // 获取段的容量
        float lf = proto.loadFactor; // 获取负载因子
        int threshold = (int)(cap * lf); // 计算阈值
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; // 创建哈希表数组
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 重新检查段是否为空
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 创建新的段对象
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次检查段是否为空
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) // 使用CAS操作将段设置为新创建的段
                    break;
            }
        }
    }
    return seg; // 返回段对象
}

总结下ensureSegment方法:

  • 获取所有段的数组 ss。
  • 计算段的偏移量 u。
  • 使用volatile的方式获取该段对象 seg。
  • 如果段对象为空,则进行段的初始化:
    • 使用第0段作为原型,获取其容量 cap 和负载因子 lf。
    • 计算段的阈值 threshold。
    • 创建一个新的哈希表数组 tab。
    • 重新检查段是否为空。
    • 创建一个新的段对象 s。
    • 使用CAS操作将段设置为新创建的段对象 s。
  • 返回段对象 seg。

Segment的put方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    // 尝试获取锁,如果获取锁失败,则调用 scanAndLockForPut 方法   
    // 在获取锁前扫描链表,并返回一个新的 HashEntry 节点或 null

    V oldValue; // 保存旧值
    try {
        HashEntry<K,V>[] tab = table; // 获取当前段的哈希表
        int index = (tab.length - 1) & hash; // 计算键值对应的数组索引
        HashEntry<K,V> first = entryAt(tab, index); // 获取索引位置的第一个节点
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                    oldValue = e.value; // 如果找到相同的键,则保存旧值
                    if (!onlyIfAbsent) {
                        e.value = value; // 如果 onlyIfAbsent 为 false,则更新节点值
                        ++modCount; // 修改计数加一
                    }
                    break;
                }
                e = e.next; // 继续遍历链表
            } else {
                if (node != null)
                    node.setNext(first); // 如果 node 不为空,将其 next 设置为 first
                else
                    node = new HashEntry<K,V>(hash, key, value, first); // 创建新的 HashEntry 节点

                int c = count + 1; // 更新节点计数
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 如果节点计数超过阈值并且数组长度小于最大容量,则重新哈希
                else
                    setEntryAt(tab, index, node); // 将新节点放置在索引位置
                ++modCount; // 修改计数加一
                count = c; // 更新节点计数
                oldValue = null; // 设置旧值为 null
                break;
            }
        }
    } finally {
        unlock(); // 释放锁
    }
    return oldValue; // 返回

总结下Segment的put方法:

  • ①、尝试获取锁:

HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
尝试获取锁,如果获取锁成功,node 为 null。
如果获取锁失败,则调用 scanAndLockForPut 方法,在获取锁前扫描链表,并返回一个新的 HashEntry 节点或 null。

  • ②、获取当前段的哈希表和计算索引:

HashEntry<K,V>[] tab = table;获取当前段的哈希表。
int index = (tab.length - 1) & hash;计算键对应的数组索引。

  • ③、遍历链表:

HashEntry<K,V> first = entryAt(tab, index);获取索引位置的第一个节点。
for (HashEntry<K,V> e = first;;)遍历链表。
如果找到相同的键,保存旧值并根据 onlyIfAbsent 决定是否更新节点值。
如果没有找到相同的键,准备插入新的节点。

  • ④、插入新节点:

if (node != null) node.setNext(first); 如果 node 不为空,将其 next 设置为 first。
else node = new HashEntry<K,V>(hash, key, value, first);否则,创建新的 HashEntry 节点。

  • ⑤、更新节点计数:

int c = count + 1;更新节点计数。
如果节点计数超过阈值并且数组长度小于最大容量,则重新哈希。
否则,将新节点放置在索引位置。
更新 modCount 和 count。

  • ⑥、释放锁并返回旧值:

finally { unlock(); }确保在操作完成后释放锁。
return oldValue;返回旧值。

scanAndLockForPut方法
scanAndLockForPut 方法用于在无法立即获得锁的情况下扫描链表,并在尝试多次获取锁后强制获取锁,同时创建一个新的 HashEntry 节点(如果需要)。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash); // 获取对应哈希值的第一个节点
    HashEntry<K,V> e = first; // 用于遍历链表的节点
    HashEntry<K,V> node = null; // 可能的新节点
    int retries = -1; // 用于记录重试次数,在定位节点时为负数

    while (!tryLock()) { // 尝试获取锁,如果获取失败则继续循环
        HashEntry<K,V> f; // 用于重新检查第一个节点
        if (retries < 0) {
            if (e == null) {
                if (node == null) // 如果链表结束且未创建新节点,则创建新节点
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0; // 重置重试次数
            } else if (key.equals(e.key)) {
                retries = 0; // 如果找到相同的键,重置重试次数
            } else {
                e = e.next; // 继续遍历链表
            }
        } else if (++retries > MAX_SCAN_RETRIES) { // 如果重试次数超过最大限制
            lock(); // 强制获取锁
            break; // 跳出循环
        } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
            e = first = f; // 如果重试次数为偶数且链表的第一个节点发生变化,重新遍历链表
            retries = -1; // 重置重试次数
        }
    }

    return node; // 返回可能的新节点
}

总结下scanAndLockForPut方法:

  • ①、获取哈希值对应的第一个节点

HashEntry<K,V> first = entryForHash(this, hash);根据哈希值计算该段中链表的第一个节点。
HashEntry<K,V> e = first;初始化遍历链表的节点 e。
HashEntry<K,V> node = null;初始化可能的新节点 node。
int retries = -1;初始化重试次数 retries 为 -1,表示在定位节点。

  • ②、尝试获取锁

while (!tryLock())循环尝试获取锁,如果获取失败则继续循环。
HashEntry<K,V> f;用于重新检查第一个节点。

  • ③、定位节点或创建新节点

  • if (retries < 0):如果在定位节点阶段:

    • if (e == null):如果链表结束且未找到相同的键:
      • if (node == null):如果尚未创建新节点,则创建新节点。
        • node = new HashEntry<K,V>(hash, key, value, null);
      • retries = 0;:重置重试次数为 0。
    • else if (key.equals(e.key)):如果找到相同的键:
      • retries = 0;:重置重试次数为 0。
    • else:继续遍历链表:
      • e = e.next;
  • else if (++retries > MAX_SCAN_RETRIES):如果重试次数超过最大限制:

    • lock();:强制获取锁。
    • break;:跳出循环。
  • else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first):如果重试次数为偶数且链表的第一个节点发生变化:

    • e = first = f;:重新遍历链表。
    • retries = -1;:重置重试次数为 -1。
  • ④、返回可能的新节点

return node;返回可能的新节点 node。

重新遍历链表是个耗时的操作,所以重试次数为偶数且链表的第一个节点发生变化时,再重新遍历链表。

通过上面的步骤,scanAndLockForPut 方法在无法立即获得锁的情况下,尽量确保在链表变化时重新扫描,同时限制重试次数,最终在必要时强制获取锁。这样做可以提高并发情况下的插入操作效率和可靠性。

rehash方法
rehash方法主要作用是当哈希表需要扩容时,对其进行重新哈希,将所有的元素重新分配到新的哈希表中,并将新节点插入到相应的位置。
相当于HashMap的resize方法。

private void rehash(HashEntry<K,V> node) {
    // 保存当前的哈希表
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length; // 当前哈希表的容量
    int newCapacity = oldCapacity << 1; // 新的哈希表容量是旧容量的两倍
    threshold = (int)(newCapacity * loadFactor); // 重新计算阈值
    // 创建一个新的哈希表,容量为新的容量
    HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1; // 掩码,用于计算新索引
    // 遍历旧哈希表中的每个桶
    for (int i = 0; i < oldCapacity; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask; // 计算元素在新哈希表中的索引
            if (next == null) { // 如果这个桶中只有一个元素
                newTable[idx] = e; // 直接放入新哈希表中
            } else { // 如果这个桶中有多个元素
                // 找到从该元素开始的一串元素中,最后一个索引相同的元素
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next; last != null; last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) { // 如果遇到一个索引不同的元素
                        lastIdx = k; // 更新最后一个索引相同的元素的索引
                        lastRun = last; // 更新最后一个索引相同的元素
                    }
                }
                newTable[lastIdx] = lastRun; // 将最后一个索引相同的元素放入新哈希表
                // 克隆剩下的元素并重新插入新哈希表
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 计算新节点在新哈希表中的索引并添加新节点(使用头插法)
    int nodeIndex = node.hash & sizeMask; 
    node.setNext(newTable[nodeIndex]); // 设置新节点的下一个节点为当前新哈希表中该索引处的节点
    newTable[nodeIndex] = node; // 将新节点放入新哈希表
    table = newTable; // 更新哈希表
}

# JDK8之前的ConcurrentHashMap, get方法

public V get(Object key) {
    Segment<K,V> s; // 定义段变量
    HashEntry<K,V>[] tab; // 定义哈希表变量
    int h = hash(key); // 计算 key 的哈希值
    // 计算段偏移量,UNSAFE 是用于直接内存操作的类
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    
    // 获取段 s
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 获取哈希表的桶,计算桶的索引
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // 如果找到匹配的 key,则返回对应的值
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    // 如果没有找到,返回 null
    return null;
}

总结下get方法:

  • ①、哈希值计算:int h = hash(key);
    计算键的哈希值 h。

  • ②、计算段偏移量:long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

  • h >>> segmentShift:通过右移哈希值来确定该键属于哪个段。

  • & segmentMask:通过与段掩码进行与操作,得到段索引。

  • << SSHIFT 和 + SBASE:计算该段在内存中的实际地址偏移量。

  • ③、获取段:if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null)

UNSAFE.getObjectVolatile(segments, u):通过内存地址偏移量获取段 s,并检查段是否存在以及段中的哈希表 tab 是否存在。

  • ④、查找桶中的键值对:通过 for 循环遍历哈希桶:
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next)
  • 计算键在哈希表中的索引:((tab.length - 1) & h) 通过与操作计算索引,然后通过位移和基地址计算出实际内存偏移量。

  • UNSAFE.getObjectVolatile 获取桶中的链表头结点 e。

  • 遍历链表中的所有节点,检查是否存在匹配的键。

  • 如果找到匹配的键(通过引用比较 k == key 或 equals 比较 e.hash == h && key.equals(k)),则返回该键对应的值 e.value。

  • ⑤、返回结果: 如果找到匹配的键,则返回对应的值。 如果没有找到匹配的键,则返回 null。

# JDK8之前的ConcurrentHashMap, get方法有没有加锁?

可以先思考一下,上面就是get方法的源码。
是不是 除了UNSAFE.getObjectVolatile这个看不太懂的操作,就没看到其他可能加锁的地方了。

提到这就不得不说一下多线程相关的知识点了。(稍微提一点有助于理解这个问题)

保证线程安全的关键点主要包括以下三个方面:

  • ①、原子性(Atomicity):
    确保一系列操作作为一个不可分割的整体执行,中间不会被其他线程打断。如果一个操作不是原子的,那么在多线程环境下就可能看到不一致的状态。
    可以通过以下方式实现原子性:
    使用java.util.concurrent.atomic包下的原子类,如AtomicInteger、AtomicBoolean等。
    利用锁机制,如synchronized关键字或java.util.concurrent.locks.Lock接口的实现类(如ReentrantLock)。

  • ②、可见性(Visibility):
    确保一个线程对共享变量的修改能立即被其他线程看到。在多线程环境中,每个线程都有自己的工作内存,如果不采取措施,线程间的工作内存可能不会及时同步到主内存中,导致其他线程看到的是过时的值。
    保证可见性的方法有:
    使用volatile关键字修饰变量,确保每次读取都直接从主内存中读取,每次写入都直接写入主内存。
    同步块(synchronized)和Locks也能在进入和退出时确保变量的可见性。

  • ③、有序性(Ordering):
    在多线程环境中,由于编译器优化(如指令重排序)和处理器的执行优化,程序执行的顺序可能与代码的编写顺序不一致。为了保证程序执行的逻辑正确性,需要维护一定的执行顺序。
    保证有序性的方法包括:
    synchronized关键字不仅提供了互斥访问,还能确保同一锁内的操作不会被重排序。
    volatile关键字除了保证变量的可见性外,还禁止了某些类型的指令重排序。
    java.util.concurrent包中的高级并发工具,如CountDownLatch、Semaphore、CyclicBarrier等,也可以在一定程度上帮助控制线程间的执行顺序。

我们结合保证线程安全的关键点原子性、可见性、有序性来分析JDK8之前的ConcurrentHashMap, get方法到底有没有加锁。我们一般见到锁最容易想起的就是synchronized或者Lock,这两种显式锁。但是实际上Java提供了很多其他的机制来保证原子性、可见性、有序性,就像JDK8之前的ConcurrentHashMap的get方法虽然没有看到显式的加锁,但是其使用了UNSAFE.getObjectVolatile操作来保证线程安全。

我们可以从保证线程安全的本质原子性、可见性、有序性来分析下JDK8之前的ConcurrentHashMap的get方法: 原子性
非阻塞读取Segment:
通过UNSAFE.getObjectVolatile(segments, u)来获取Segment对象,这里利用了volatile变量的特性,确保读取操作本身的原子性,并且每次读取都直接从主内存中获取最新值,防止了数据的不一致问题。
尽管这不涉及修改操作,但保证了读取操作的完整性,避免了部分读取问题。 遍历链表: 遍历哈希桶中的链表时,由于读操作不会修改链表结构,因此不需要额外的锁来保证原子性。
遍历过程中,每个读取操作(如获取下一个节点e.next)也是原子的,不会被其他线程中断。

可见性
使用volatile: segments数组是volatile修饰的,保证了Segment的引用变化对所有线程都是立即可见的。因此,当一个线程创建或修改了Segment,其他线程能够立刻看到这个变化,确保了数据的可见性。 直接内存访问: 使用UNSAFE.getObjectVolatile方法访问segments和tab,确保了读取操作能看到最新数据,增强了可见性。

有序性
volatile的内存屏障效应: volatile变量的读写会插入内存屏障,这有助于维护操作之间的顺序性。
虽然在get方法中主要关注的是读操作,但通过volatile读确保了之前对Segment的写操作(如Segment的初始化)在读操作前已完成,维持了必要的顺序性。 潜在的有序性保障: 尽管get方法内部没有直接使用锁来控制操作顺序,但整个ConcurrentHashMap的设计通过分段锁(Segment锁)和volatile变量的使用,以及代码执行的自然顺序,间接保证了相关操作的有序执行。

所以这个问题可以这么回答,没有显式的加锁,但是通过UNSAFE.getObjectVolatile操作,利用volatile变量的特性,和每个读取操作(如获取下一个节点e.next)本身就是原子的这种方式来保证了原子性、可见性、有序性从而达到线程安全的读取目的。

# JDK8之后:

下面把JDK切换回JDK8。

mixureSecure

# JDK8的ConcurrentHashMap类的继承体系

和JDK8之前的一致。

mixureSecure

上面说了JDK8对ConcurrentHashMap做了很大的改造,现在就来看看到底有哪些不同。

JDK8的ConcurrentHashMap属性注释:

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {

    // 最大容量,2^30,即1073741824
    private static final int MAXIMUM_CAPACITY = 1 << 30;

    // 默认初始容量
    private static final int DEFAULT_CAPACITY = 16;

    // 数组的最大大小
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    // 默认并发级别,影响初始分段数量
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    // 负载因子,决定扩容的阈值
    private static final float LOAD_FACTOR = 0.75f;

    // 链表转为红黑树的阈值
    static final int TREEIFY_THRESHOLD = 8;

    // 红黑树转为链表的阈值
    static final int UNTREEIFY_THRESHOLD = 6;

    // 最小树化容量,数组长度小于64时不会树化
    static final int MIN_TREEIFY_CAPACITY = 64;

    // 最小转移步幅,扩容时每次迁移的最小桶数
    private static final int MIN_TRANSFER_STRIDE = 16;

    // 扩容戳位数,用于扩容标识
    private static int RESIZE_STAMP_BITS = 16;

    // 最大帮助扩容的线程数
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

    // 扩容戳偏移量
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    // 转发节点的hash值,用于扩容时指示节点已迁移
    static final int MOVED     = -1;

    // 树节点的hash值,用于表示红黑树的根节点
    static final int TREEBIN   = -2;

    // 保留节点的hash值,用于临时预留节点
    static final int RESERVED  = -3;

    // 常规节点hash值的可用位,去掉最高位
    static final int HASH_BITS = 0x7fffffff;

    // 获取可用的处理器数量
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    // 当前哈希表,使用volatile修饰以保证线程可见性
    transient volatile Node<K,V>[] table;

    // 扩容时的下一张表
    private transient volatile Node<K,V>[] nextTable;

    // 基础计数,表示大小的基础部分
    private transient volatile long baseCount;

    // 控制大小和扩容的标志,负数表示正在初始化或扩容
    private transient volatile int sizeCtl;

    // 扩容时的转移索引
    private transient volatile int transferIndex;

    // 标志是否有线程正在调整计数
    private transient volatile int cellsBusy;

    // 计数单元数组,针对高并发的计数
    private transient volatile CounterCell[] counterCells;
}

根据 transient volatile Node<K,V>[] table; 可以看出 JDK8的ConcurrentHashMap 底层是使用 Node<K,V>[] 数组来存储元素的。
不再使用JDK8之前的Segment分段策略了。
同时JDK8的ConcurrentHashMap也和HashMap保持了一致,引入了红黑树以及新增链表节点时采用尾插法。

# JDK8的ConcurrentHashMap 数据结构图示:

Node<K,V>[] 数组 + 链表/红黑树。 和JDK8的HashMap数据结构类似。 这里的 Node<K,V>是指ConcurrentHashMap 的静态内部类。

mixureSecure

JDK8的ConcurrentHashMap 做了大量的机制来实现线程安全,和提高并发性能。下面会介绍到。

# JDK8的ConcurrentHashMap构造函数

  • 空参构造
public ConcurrentHashMap() {
    // 无参数构造函数,不做任何初始化操作
}

  • 带参构造1
public ConcurrentHashMap(int initialCapacity) {
    // 检查初始容量是否为负数
    if (initialCapacity < 0)
        throw new IllegalArgumentException();

    // 计算实际容量 cap。如果 initialCapacity 大于等于最大容量的一半,直接使用最大容量,否则使用 tableSizeFor 方法计算合适的容量
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    
    // 将计算得到的容量赋值给 sizeCtl,用于控制哈希表的初始化和扩容
    this.sizeCtl = cap;
}

  • 带参构造2
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    // 调用带三个参数的构造函数,默认并发级别为 1
    this(initialCapacity, loadFactor, 1);
}

  • 带参构造3
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    // 检查负载因子、初始容量和并发级别的有效性,非法参数抛出异常
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();

    // 确保初始容量不小于并发级别
    if (initialCapacity < concurrencyLevel)
        initialCapacity = concurrencyLevel;
    
    // 计算调整后的容量 size,确保不会超过最大容量
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
    
    // 将计算得到的容量赋值给 sizeCtl,用于控制哈希表的初始化和扩容
    this.sizeCtl = cap;
}

  • 带参构造4 调用的就是带参构造3
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    // 调用带三个参数的构造函数,默认并发级别为 1
    this(initialCapacity, loadFactor, 1);
}

  • 带参构造5
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    // 初始 sizeCtl 为默认容量
    this.sizeCtl = DEFAULT_CAPACITY;
    
    // 将传入的 Map 中的所有键值对放入 ConcurrentHashMap 中
    putAll(m);
}

# JDK8的ConcurrentHashMap,put方法

// 使用空参构造函数,初始化ConcurrentHashMap
ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>();
// 添加元素
map.put("1","1");

put方法

public V put(K key, V value) {
    // 调用 putVal 方法,onlyIfAbsent 参数为 false,表示如果键已存在,则更新其值
    return putVal(key, value, false);
}

putVal方法

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 如果键或值为空,抛出空指针异常
    if (key == null || value == null) throw new NullPointerException();

    // 计算键的哈希值并进行扰动处理
    int hash = spread(key.hashCode());
    int binCount = 0;

    // 死循环,直到成功插入或更新键值对
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果表为空,初始化表
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果索引处的节点为空,尝试通过 CAS 操作插入新节点
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break; // 插入成功,无需加锁
        }
        // 如果节点正在迁移,帮助进行迁移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果找到键相同的节点,更新值并退出循环
                            if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 否则插入新节点到链表的末尾
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    } 
                    // 如果是树节点,调用红黑树的插入方法
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 如果链表长度达到阈值,转换为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 更新计数和检查是否需要扩容
    addCount(1L, binCount);
    return null;
}

spread方法

static final int spread(int h) {
    // 通过哈希值扰动,减少冲突
    return (h ^ (h >>> 16)) & HASH_BITS;
}

initTable方法 这个方法很重要,说明ConcurrentHashMap是在新增第一个元素的时候才进行Node<K,V>[]数组的初始化操作。

// 控制大小和扩容的标志,负数表示正在初始化或扩容
private transient volatile int sizeCtl;

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // 循环初始化哈希表
    while ((tab = table) == null || tab.length == 0) {
        // 如果 sizeCtl 为负数,表示正在初始化,让出 CPU
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // 丢失初始化竞争,等待
        // 通过 CAS 操作设置 sizeCtl 为 -1,表示当前线程在进行初始化
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 确定哈希表的初始容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 更新 sizeCtl 为扩容阈值
                    sc = n - (n >>> 2);
                }
            } finally {
                // 释放初始化锁
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

总结下initTable方法

  • ①、检查哈希表数组是否为空:
    使用 while 循环,检查 table 是否为 null 或长度为 0。如果条件成立,表明哈希表数组尚未初始化,需要进行初始化操作。

  • ②、检查 sizeCtl 的状态:

如果 sizeCtl 小于 0,表示其他线程正在进行初始化操作,当前线程需要让出 CPU(使用 Thread.yield()),等待其他线程完成初始化。

  • ③、CAS 操作确保初始化的唯一性:

如果 sizeCtl 大于等于 0,尝试使用 CAS(Compare-And-Swap)操作将 sizeCtl 设置为 -1,表示当前线程将负责初始化。
CAS 操作能够确保只有一个线程成功设置 sizeCtl 为 -1,从而避免多个线程同时进行初始化操作。

  • ④、初始化哈希表数组:

  • 在 CAS 成功后,再次检查 table 是否为空,以防止其他线程已经完成初始化。
    如果 table 仍为空,确定哈希表的初始容量:

    • 如果 sizeCtl 大于 0,则使用 sizeCtl 的值作为初始容量。
    • 否则,使用默认容量 DEFAULT_CAPACITY(通常为 16)。
  • 创建一个新的哈希表数组 Node<K,V>[],并将其赋值给 table。

  • ⑤、更新 sizeCtl:
    初始化成功后,将 sizeCtl 更新为扩容阈值,即容量的 3/4。这个阈值用于控制何时进行扩容操作。

  • ⑥、释放初始化锁:
    使用 finally 块确保无论初始化是否成功,都会将 sizeCtl 设置为新的值,释放初始化锁,允许其他线程继续操作。

  • ⑦、返回哈希表数组:
    初始化完成后,返回哈希表数组 tab。

addCount方法
addCount 方法通过 CAS 操作和条件检查,实现了更新计数和在必要时触发哈希表扩容的功能。
它是 ConcurrentHashMap 在并发环境下保持高效和线程安全的重要方法之一。

private final void addCount(long x, int check) {
    CounterCell[] as; // 计数单元数组
    long b, s;        // baseCount 和 sumCount 的临时变量

    // 使用 CAS 操作更新 baseCount
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;

        // 处理计数单元数组或竞争情况
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended); // 竞争失败,调用 fullAddCount 方法处理
            return;
        }

        // 如果 check 小于等于 1,直接返回
        if (check <= 1)
            return;

        // 更新 s 为当前总计数
        s = sumCount();
    }

    // 如果 check 大于等于 0,检查是否需要扩容
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;

        // 循环检查是否需要扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);

            // 如果 sizeCtl 小于 0,表示正在进行扩容操作
            if (sc < 0) {
                // 检查扩容状态,根据不同状态执行不同操作
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break; // 中断扩容操作

                // 尝试增加扩容状态计数
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt); // 执行数据迁移
            }
            // 如果 sizeCtl 大于等于 0,表示当前不在扩容状态
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null); // 准备扩容操作

            // 更新 s 为当前总计数
            s = sumCount();
        }
    }
}

总结下addCount方法

  • ①、更新计数和竞争处理:

首先尝试使用 CAS 操作更新 baseCount。如果存在竞争(counterCells 不为空或 CAS 失败),则调用 fullAddCount 方法处理竞争情况。

  • ②、处理竞争失败:

如果竞争失败或 check 小于等于 1,则直接返回。竞争失败后,可能需要进行更复杂的处理,例如使用计数单元数组来减小竞争的影响。

  • ③、检查是否需要扩容:

如果 check 大于等于 0,表示需要检查是否需要进行扩容操作。

  • ④、循环检查并执行扩容:

  • 使用 while 循环检查当前的总计数 s 是否超过了 sizeCtl,并且当前哈希表 table 不为空且未达到 MAXIMUM_CAPACITY。

  • 根据 sizeCtl 的值进行不同的扩容操作:

    • 如果 sizeCtl 小于 0,表示正在进行扩容操作,执行相应的状态检查和数据迁移操作。
    • 如果 sizeCtl 大于等于 0,表示当前没有进行扩容操作,准备开始扩容,并更新 sizeCtl 的值以标记扩容的状态。
  • ⑤、数据迁移:

在确认需要扩容并且符合条件时,执行数据迁移操作。这涉及将元素从旧的哈希表 table 迁移到新的哈希表 nextTable 中。

fullAddCount方法
fullAddCount 方法是 ConcurrentHashMap 类中用于处理计数竞争的方法,当 addCount 方法中的 CAS 操作失败时调用。
该方法通过循环和 CAS 操作来处理并发下的计数竞争情况,保证了计数的准确性和并发安全性。

private final void fullAddCount(long x, boolean wasUncontended) {
    int h;

    // 获取当前线程的 probe 值
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // 强制初始化
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;              // 表示当前为无竞争状态
    }

    boolean collide = false;                // 上一个槽位是否非空

    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;

        // 如果计数单元数组不为空且长度大于 0
        if ((as = counterCells) != null && (n = as.length) > 0) {
            // 尝试在指定槽位上增加计数单元
            if ((a = as[(n - 1) & h]) == null) {
                // 如果当前计数单元为空,并且没有其他线程在操作计数单元数组
                if (cellsBusy == 0) {
                    // 乐观地创建新的计数单元
                    CounterCell r = new CounterCell(x); // 乐观创建

                    // 尝试获取 cellsBusy 锁
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // 加锁后重新检查
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;  // 在指定槽位设置新的计数单元
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;  // 释放锁
                        }
                        if (created)
                            break;           // 创建成功,退出循环
                        continue;           // 槽位现在非空,继续循环
                    }
                }
                collide = false;                // 无冲突标志置为 false
            }
            else if (!wasUncontended)           // CAS 操作已知会失败
                wasUncontended = true;          // 重哈希后继续
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;                          // CAS 操作成功,退出循环
            else if (counterCells != as || n >= NCPU)
                collide = false;                // 达到最大大小或者过时
            else if (!collide)
                collide = true;                 // 无冲突标志置为 true
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {   // 除非过时,扩展表格
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;      // 扩展计数单元数组
                    }
                } finally {
                    cellsBusy = 0;              // 释放锁
                }
                collide = false;                // 无冲突标志置为 false
                continue;                       // 使用扩展后的表格重试
            }
            h = ThreadLocalRandom.advanceProbe(h); // 更新 probe 值
        }
        // 如果计数单元数组为空且 cellsBusy 为 0,尝试初始化表格
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                               // 初始化表格
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x); // 在指定槽位创建计数单元
                    counterCells = rs;          // 更新计数单元数组
                    init = true;
                }
            } finally {
                cellsBusy = 0;                  // 释放锁
            }
            if (init)
                break;                          // 初始化成功,退出循环
        }
        // 如果以上所有 CAS 操作都失败,回退到使用 baseCount
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                              // 使用 baseCount 增加计数
    }
}

总结下fullAddCount方法

  • ①、获取当前线程的 probe 值:

如果当前线程的 probe 值为 0,强制初始化,并重新获取 probe 值,标记为无竞争状态。

  • ②、循环处理竞争:

使用无限循环处理计数单元的竞争情况。
根据计数单元数组的状态和竞争情况,尝试执行 CAS 操作来增加计数。

  • ③、处理计数单元数组:

如果计数单元数组不为空且长度大于 0,根据当前 probe 值在对应槽位上操作计数单元。
如果当前槽位为空,并且没有其他线程在操作计数单元数组,则乐观地创建新的计数单元。

  • ④、CAS 操作处理:

如果 CAS 操作成功,则退出循环;否则根据具体情况进行冲突标志的设置和重试操作。

  • ⑤、扩展计数单元数组:

如果需要扩展计数单元数组,使用 CAS 操作获取锁,并进行扩展操作。

  • ⑥、初始化表格:

如果计数单元数组为空且 cellsBusy 为 0,尝试初始化表格,创建初始的计数单元数组。

  • ⑦、回退到 baseCount:

如果以上所有的 CAS 操作都失败,则回退到使用 baseCount 进行计数增加操作。

上面两个方法addCountfullAddCount比较抽象,需要结合部分ConcurrentHashMap类的属性一起来分析。

# sizeCtl属性详解

上面对于sizeCtl只给了简单的注释

// 控制大小和扩容的标志,负数表示正在初始化或扩容
private transient volatile int sizeCtl;

实际上sizeCtl属性在JDK8的ConcurrentHashMap中扮演非常重要的角色。

在initTable方法中我们发现sizeCtl被用到了扩容和CAS操作上。

sizeCtl 搞了这么多值和操作主要是为了什么目的呢? 其实万变不离其宗,那就是在保证线程安全的前提下尽量提高ConcurrentHashMap的性能。

sizeCtl 属性可以总结为以下两点用途:

  • ①、初始化控制: 防止多个线程同时初始化哈希表。
  • ②、扩容控制: 控制何时进行扩容以及管理扩容操作的进度和状态。

在初始化控制方面,有如下状态:

  • 初始状态:
    在 ConcurrentHashMap 初始化时,sizeCtl 通常被设置为 0,表示哈希表尚未初始化。
    当调用 initTable 方法时,如果 sizeCtl 为 0,则会使用默认容量 DEFAULT_CAPACITY (16) 来初始化哈希表。

  • 初始化过程中:
    在哈希表的初始化过程中,sizeCtl 被设置为 -1。这个值表示某个线程正在进行初始化操作,其他线程应当让出 CPU,不要参与初始化。
    具体实现中使用 CAS (Compare-And-Swap) 操作设置 sizeCtl 为 -1,以确保只有一个线程能够成功进行初始化。

具体代码体现:
在initTable 方法中
sizeCtl 为负数(-1)表示正在进行初始化,其他线程应让出 CPU。
CAS 操作成功后,当前线程负责初始化,将 sizeCtl 设置为 -1。
初始化完成后,将 sizeCtl 设置为扩容阈值(容量的 3/4)。

在扩容控制方面,有如下状态:

  • 正常工作状态:
    当哈希表初始化完成后,sizeCtl 的值被设置为扩容阈值,通常是 当前容量的 3/4,用于控制哈希表何时进行扩容。
    例如,如果当前容量为 16,那么扩容阈值为 16 - (16 >>> 2) = 12。

  • 扩容过程中:
    在哈希表扩容过程中,sizeCtl 的值会被设置为 -(1 + 扩容戳记 << 16),以标识扩容操作。扩容戳记 (resize stamp) 是一个独特的值,用于标识扩容操作的状态和进度。

具体代码体现: 在addCount 方法中:

当哈希表中的元素数量超过 sizeCtl 时,触发扩容。
通过 resizeStamp 方法生成扩容戳标记,并将 sizeCtl 设置为 -(1 + 扩容戳标记 << 16),标识扩容状态。

transfer方法
transfer 方法用于扩容 ConcurrentHashMap 时,将旧数组中的节点迁移到新数组中。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 计算迁移步长。对于多CPU系统,步长根据CPU数量和数组长度决定。步长的最小值为 MIN_TRANSFER_STRIDE。
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // 设置最小步长

    // 如果 nextTab 为空,表示这是一次初始化迁移操作
    if (nextTab == null) {
        try {
            // 创建新的哈希表,大小为旧表的两倍
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt; // 设置 nextTab
        } catch (Throwable ex) { // 捕获可能的 OOME 异常
            sizeCtl = Integer.MAX_VALUE; // 设置 sizeCtl 为最大值,表示扩容失败
            return;
        }
        nextTable = nextTab; // 设置 nextTable
        transferIndex = n; // 初始化 transferIndex,表示需要迁移的元素数量
    }

    int nextn = nextTab.length; // 新表的长度
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 创建前向节点
    boolean advance = true; // 是否继续处理标志
    boolean finishing = false; // 是否完成迁移标志

    // 开始循环迁移操作
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;

        // 控制迁移进度
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing) // 如果当前索引 i 已经处理完当前范围或者已经完成迁移
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) { // 如果没有剩余的需要处理的索引
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
                                         nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                bound = nextBound; // 设置新的边界
                i = nextIndex - 1; // 设置当前处理的索引
                advance = false; // 暂时停止 advance
            }
        }

        // 如果索引超出范围,结束迁移
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) { // 如果已经完成迁移
                nextTable = null;
                table = nextTab; // 更新当前表为新表
                sizeCtl = (n << 1) - (n >>> 1); // 更新 sizeCtl
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 如果没有其他线程在进行迁移
                    return;
                finishing = advance = true; // 设置 finishing 和 advance 标志
                i = n; // 重置索引
            }
        }
        else if ((f = tabAt(tab, i)) == null) // 如果当前槽位为空
            advance = casTabAt(tab, i, null, fwd); // 将前向节点设置到当前槽位
        else if ((fh = f.hash) == MOVED) // 如果已经处理过
            advance = true; // 继续处理下一个槽位
        else {
            synchronized (f) { // 锁住当前节点
                if (tabAt(tab, i) == f) { // 确保当前槽位未被其他线程修改
                    Node<K,V> ln, hn;
                    if (fh >= 0) { // 如果当前节点是普通节点
                        int runBit = fh & n; // 根据 hash 值判断是低位还是高位节点
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln); // 设置低位节点
                        setTabAt(nextTab, i + n, hn); // 设置高位节点
                        setTabAt(tab, i, fwd); // 将当前槽位设置为前向节点
                        advance = true;
                    }
                    else if (f instanceof TreeBin) { // 如果当前节点是树节点
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln); // 设置低位节点
                        setTabAt(nextTab, i + n, hn); // 设置高位节点
                        setTabAt(tab, i, fwd); // 将当前槽位设置为前向节点
                        advance = true;
                    }
                }
            }
        }
    }
}

总结下transfer方法

  • ①、计算迁移步长:

根据当前 CPU 核数计算迁移步长,步长最小值为 MIN_TRANSFER_STRIDE。

  • ②、初始化新数组:

如果 nextTab 为空,初始化新的哈希表为旧表的两倍大小,并设置 nextTable 和 transferIndex。

  • ③、设置前向节点和标志:

创建 ForwardingNode 前向节点,用于标识已迁移的槽位。
设置 advance 和 finishing 标志控制迁移流程。

  • ④、迁移节点:

进入循环,逐个迁移旧数组中的节点。
通过 CAS 操作确定当前线程负责的迁移范围,更新 transferIndex。
对于每个槽位,如果为空,设置前向节点;如果已处理(MOVED),继续处理下一个槽位。

  • ⑤、迁移非树节点:

对于普通节点,计算低位和高位链表,根据节点的 hash 值分为两个链表,并将其设置到新数组中对应位置。

  • ⑥、迁移树节点:

对于树节点,遍历树节点,将其分为两个树节点链表,根据节点的 hash 值分为高低位,并进行设置。

  • ⑦、完成迁移:

如果迁移完成,更新 sizeCtl,释放 nextTable,设置 table 指向新数组。

  • ⑧、并发控制:

通过 CAS 操作和锁(cellsBusy)确保多个线程同时执行 transfer 方法时的线程安全性。

当既要线程安全,又要性能的时候,代码复杂性真的成倍增加。建议等多线程相关的知识点多总结总结之后,再回来多看几遍源码。

# JDK8的ConcurrentHashMap,get方法

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode()); // 计算 key 的 hash 值,并进行扰动
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) { // 检查 table 是否初始化且不为空,获取对应桶中的第一个节点
        if ((eh = e.hash) == h) { // 检查第一个节点的 hash 是否等于 key 的 hash
            if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 检查第一个节点的 key 是否等于目标 key
                return e.val; // 找到目标节点,返回其 value
        }
        else if (eh < 0) // 如果节点的 hash 小于 0,表示该节点是一个特殊节点(如 ForwardingNode 或 TreeBin)
            return (p = e.find(h, key)) != null ? p.val : null; // 调用 find 方法查找节点
        while ((e = e.next) != null) { // 遍历链表中的其他节点
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek)))) // 检查节点的 hash 和 key 是否等于目标 key
                return e.val; // 找到目标节点,返回其 value
        }
    }
    return null; // 没有找到目标节点,返回 null
}

总结下get方法:

  • ①、 计算 hash 值:

调用 spread 方法对 key 的 hash 值进行扰动,以减少 hash 冲突。

  • ②、检查哈希表是否为空:

如果 table 未初始化或长度为零,直接返回 null。
否则,根据 hash 值找到对应桶的第一个节点。

  • ③、检查第一个节点:

如果第一个节点的 hash 值等于目标 key 的 hash 值,进一步检查其 key 是否等于目标 key,如果相等,返回其 value。
如果第一个节点的 hash 值小于 0,表示该节点是特殊节点,调用 find 方法查找目标节点并返回其 value。

  • ④、遍历链表节点:

遍历桶内的其他节点,检查每个节点的 hash 值和 key 是否等于目标 key,如果找到,返回其 value。

  • ⑤、返回 null:

如果遍历完所有节点仍未找到目标节点,返回 null。

注意点:

if (eh < 0) // 如果节点的 hash 小于 0,表示该节点是一个特殊节点(如 ForwardingNode 或 TreeBin)
            return (p = e.find(h, key)) != null ? p.val : null; // 调用 find 方法查找节点

find方法在 Node 类中定义,用于在 ConcurrentHashMap 的链表或树结构中查找特定的键。
默认实现比较简单就是遍历链表找到与key和给定hash值匹配的Node节点。

Node<K,V> find(int h, Object k) {
    // 将当前节点赋值给局部变量 e
    Node<K,V> e = this;
    // 只有在键 k 不为 null 时才进行查找
    if (k != null) {
        // 遍历链表
        do {
            K ek;
            // 检查当前节点的哈希值和键是否与目标匹配
            if (e.hash == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                // 如果匹配则返回当前节点
                return e;
        // 将当前节点指向下一个节点,继续循环直到链表结束
        } while ((e = e.next) != null);
    }
    // 如果未找到匹配的节点,则返回 null
    return null;
}

ConcurrentHashMap中Node节点的子类有自己的find方法实现,以支持特定数据结构下的查找工作。

mixureSecure

# JDK8的ConcurrentHashMap中特殊节点的含义

上面我们注释ConcurrentHashMap 的类属性时其实已经给出了注释。 这里再简单介绍下。

  // 转发节点的hash值,用于扩容时指示节点已迁移
    static final int MOVED     = -1;

    // 树节点的hash值,用于表示红黑树的根节点
    static final int TREEBIN   = -2;

    // 保留节点的hash值,用于临时预留节点
    static final int RESERVED  = -3;

ForwardingNode: 在扩容过程中使用的节点。
扩容时,ConcurrentHashMap 会将当前表中的节点迁移到一个新的、更大的表中。在迁移过程中,原表中的某些节点会被替换为 ForwardingNode,这些节点指向新表,以便继续处理新的插入和查询操作,其 hash 值为 -1。

TreeBin: 用于替代链表的红黑树根节点。当桶内的节点数超过一定阈值(默认是 8),会将链表转换为红黑树,以提高查找性能。TreeBin 是红黑树的根节点,其 hash 值为 -2。

**ReservationNode:**用于标记保留状态的特殊节点类。在哈希表的初始化和扩容过程中,使用 ReservationNode 可以确保并发操作的正确性,防止多个线程在同一槽位上发生冲突,其 hash 值为 -3。

# JDK8的ConcurrentHashMap,get方法有没有加锁?

上面就有源码,可以仔细看下。
好像也没有显式加锁,因为没看到synchronized或者Lock相关的实现。不过还有个e.find(h, key)比较可疑。 上面对这个方法也有解释。
可以看到普通的Node<K,V>节点 实现的 find方法也没有加锁等其他保证线程安全的机制。
但是别忘了Node<K,V>节点的几个子类,比如TreeBin<K,V> extends Node<K,V>find(int h, Object k)方法

看下 TreeBin<K,V> extends Node<K,V>find(int h, Object k)方法源码:

// TreeBin的第一个节点
volatile TreeNode<K,V> first;

// 红黑树的根节点
TreeNode<K,V> root;

// 等待获取写锁的线程
volatile Thread waiter;

// 锁状态,使用volatile确保多线程可见性
volatile int lockState;

// 锁状态的值
static final int WRITER = 1; // 写锁标志
static final int WAITER = 2; // 等待锁标志
static final int READER = 4; // 读锁计数

// LOCKSTATE的偏移量,用于原子操作
private static final long LOCKSTATE;

final Node<K,V> find(int h, Object k) {
    // 检查键是否不为 null
    if (k != null) {
        // 遍历链表节点
        for (Node<K,V> e = first; e != null; ) {
            int s; K ek;
            // 如果当前节点的锁状态有写锁或者等待锁
            if (((s = lockState) & (WAITER|WRITER)) != 0) {
                // 检查当前节点是否匹配目标键
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;  // 找到匹配节点,返回
                e = e.next;  // 移动到下一个节点
            }
            // 否则尝试增加读锁计数
            else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) {
                TreeNode<K,V> r, p;
                try {
                    // 如果根节点不为 null,在红黑树中查找目标键的节点
                    p = ((r = root) == null ? null :
                         r.findTreeNode(h, k, null));
                } finally {
                    Thread w;
                    // 释放读锁,减少读锁计数
                    if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                        (READER|WAITER) && (w = waiter) != null)
                        LockSupport.unpark(w);  // 唤醒等待的线程
                }
                return p;  // 返回找到的节点
            }
        }
    }
    return null;  // 键为 null 或未找到匹配节点,返回 null
}

可以看到这个方法用了许多状态变量来标识不同锁,并利用了CAS操作 。

详细看下该方法是如何保证线程安全?

锁状态管理:
lockState 使用 volatile 修饰,确保在多线程环境下的可见性。
通过原子操作(如 compareAndSwapInt 和 getAndAddInt)对锁状态进行修改,确保操作的原子性和线程安全性。

锁状态检测和切换:
在进行链表遍历和红黑树查找之前,先检查锁状态,避免读写冲突。
通过 compareAndSwapInt 尝试增加读锁计数,确保在没有写锁或等待锁的情况下进行红黑树查找。

读锁的增加与释放:
在查找过程中增加读锁计数,确保同时进行的读操作不会互相干扰。
查找完成后在 finally 块中减少读锁计数,确保即使在异常情况下也能正确释放锁。

线程唤醒:
在减少读锁计数时,检查是否需要唤醒等待的线程,确保写操作能够及时进行。

所以JDK8的ConcurrentHashMap,get方法有没有加锁,可以这样回答: 加了,但没完全加。 当get到的节点hash < 0 ,并且是TreeBin节点时,调用TreeBin的find方法就利用了CAS和volatile来保证线程安全,虽然也没有用显式锁,但是可以把CAS理解为乐观锁的一种实现方式。

   else if (eh < 0) // 如果节点的 hash 小于 0,表示该节点是一个特殊节点(如 ForwardingNode 或 TreeBin)
            return (p = e.find(h, key)) != null ? p.val : null; // 调用 find 方法查找节点

# 3、ConcurrentHashMap的真实使用场景

为什么要写使用场景呢?因为我看过很多篇博客,大家对于原理和源码都分析的很深入,但是去说实际的应用场景,或者生产环境应用示例的却很少。 我的学习习惯一般是先了解如何使用,再分析下某项技术适用场景和局限性,尽可能找到生产的最佳实践,最后如果感兴趣或者出于优化解决问题的目的,再去研究源码或者底层的实现细节。 当然我相信还有个比较重要的目的,为了面试对吧,这里可以会心一笑O(∩_∩)O哈哈~。 好吧,不论出于何种目的,学了一种技术不能灵活的运用去解决实际的生产问题的话那不等于白学了吗? 所以多实践实践总归是好的。

# ①、并发任务收集结果集

下面截取部分生产代码 相关命名已作处理(这样应该扫描不出来了吧)

Map<String, Price> returnMap = new ConcurrentHashMap<>(map.size());
productKeys.parallelStream().forEach(productKey -> {
     Price price = StringUtil.getPriceByKey(productKey, 6, map);
     if (price != null) {
         returnMap.put(productKey, price);
     }
 });

使用并行流处理数据,使用ConcurrentHashMap对结果进行收集。

# ②、内存缓存

下面是一个简单的国际化数据缓存。

import java.util.concurrent.ConcurrentHashMap;

public class TestA {
    // 定义一个静态的ConcurrentHashMap,用于存放国际化字符串数据
    private static final ConcurrentHashMap<String, String> i18nMap = new ConcurrentHashMap<>();

    // 静态初始化块,在类加载时自动执行,仅执行一次
    static {
        // 在类初始化阶段加载国际化数据到内存中
        loadI18nData();
    }

    public static void main(String[] args) {
        // 从内存中的i18nMap尝试获取键为"2"的国际化字符串
        String s = i18nMap.get("2");
        // 如果获取不到(即为null),则调用getAndSetI18nData方法从数据库或其他持久化存储中获取
        if (s == null) {
            s = getAndSetI18nData("2");
        }
        // 打印获取到的国际化字符串
        System.out.println(s);
    }

    // 方法:加载国际化数据到ConcurrentHashMap中
    public static void loadI18nData() {
        // 示例中仅放置了一条测试数据,实际应用中这里会从资源文件、数据库等地方加载所有国际化字符串
        i18nMap.put("1", "1"); // 键为"1",值也为"1",仅作示例
    }

    // 方法:如果给定的key在内存缓存中不存在,则从外部数据源(如数据库、Redis)查询对应值,并添加到缓存中
    public static String getAndSetI18nData(String key) {
        // 这里假设从数据库查询,实际操作应替换为真实的数据库访问逻辑
        String result = "查询的结果"; // 假设这是从数据库查询得到的结果
        // 如果查询结果不为空,则将其放入缓存,putIfAbsent方法保证了在key不存在时才插入,避免覆盖已有值
        if (result != null) {
            i18nMap.putIfAbsent(key, result);
        }
        // 返回查询到的结果
        return result;
    }
}

# ③、做对象池

本质上还是并发安全的缓存 下面这个是hutool提供的工具类,用来实例化并存储单例对象

package cn.hutool.core.lang;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.ReflectUtil;

/**
 * 单例类<br>
 * 提供单例对象的统一管理,当调用get方法时,如果对象池中存在此对象,返回此对象,否则创建新对象返回<br>
 * 注意:单例针对的是类和对象,因此get方法第一次调用时创建的对象始终唯一,也就是说就算参数变更,返回的依旧是第一次创建的对象
 * 
 * @author loolly
 *
 */
public final class Singleton {
	private static Map<Class<?>, Object> pool = new ConcurrentHashMap<>();

	private Singleton() {
	}

	/**
	 * 获得指定类的单例对象<br>
	 * 对象存在于池中返回,否则创建,每次调用此方法获得的对象为同一个对象<br>
	 * 注意:单例针对的是类和对象,因此get方法第一次调用时创建的对象始终唯一,也就是说就算参数变更,返回的依旧是第一次创建的对象
	 * 
	 * @param <T> 单例对象类型
	 * @param clazz 类
	 * @param params 构造方法参数
	 * @return 单例对象
	 */
	@SuppressWarnings("unchecked")
	public static <T> T get(Class<T> clazz, Object... params) {
		T obj = (T) pool.get(clazz);

		if (null == obj) {
			synchronized (Singleton.class) {
				obj = (T) pool.get(clazz);
				if (null == obj) {
					obj = ReflectUtil.newInstance(clazz, params);
					pool.put(clazz, obj);
				}
			}
		}

		return obj;
	}

	/**
	 * 获得指定类的单例对象<br>
	 * 对象存在于池中返回,否则创建,每次调用此方法获得的对象为同一个对象<br>
	 * 注意:单例针对的是类和对象,因此get方法第一次调用时创建的对象始终唯一,也就是说就算参数变更,返回的依旧是第一次创建的对象
	 * 
	 * @param <T> 单例对象类型
	 * @param className 类名
	 * @param params 构造参数
	 * @return 单例对象
	 */
	public static <T> T get(String className, Object... params) {
		final Class<T> clazz = ClassUtil.loadClass(className);
		return get(clazz, params);
	}

	/**
	 * 将已有对象放入单例中,其Class做为键
	 * 
	 * @param obj 对象
	 * @since 4.0.7
	 */
	public static void put(Object obj) {
		pool.put(obj.getClass(), obj);
	}

	/**
	 * 移除指定Singleton对象
	 * 
	 * @param clazz 类
	 */
	public static void remove(Class<?> clazz) {
		pool.remove(clazz);
	}

	/**
	 * 清除所有Singleton对象
	 */
	public static void destroy() {
		pool.clear();
	}
}

上面只是列了几个简单的使用场景,如果我们随便找个Spring项目,在JDK源码的ConcurrentHashMap 类上Ctrl+鼠标左键,就能看到ConcurrentHashMap 用的地方非常多,如果感兴趣可以去研究研究各种各样的使用场景。

mixureSecure

# 4、JDK8 ConcurrentHashMap的其他方法

# putIfAbsent 和

putIfAbsent 方法用于在键不存在时插入值

 public V putIfAbsent(K key, V value) {
 		// 上面有putVal方法的详解  使用Ctrl+F 搜索 putVal
        return putVal(key, value, true);
    }

# computeIfAbsent 和 computeIfPresent

computeIfAbsent 和 computeIfPresent类似,这里只简单介绍下computeIfAbsent 方法。

/**
 * 如果指定的键在映射中不存在,则使用提供的映射函数计算其值,
 * 并将计算得到的值与键关联。
 * 如果键已经存在,不论其对应的值是否为null,都直接返回当前关联的值而不执行映射函数。
 * <p>
 * 此方法适用于需要首次访问时懒惰计算值的场景,确保了计算操作的高效性和线程安全性。
 * 映射函数只在键确实不存在映射时被执行一次,之后对该键的访问将直接返回已计算的值。
 * 
 * @param key 要检查并可能为其计算值的键。
 * @param mappingFunction 一个接受键为参数的函数,用于计算首次需要时该键应关联的值。
 *                       
 * @return 与键关联的现有值,或是由映射函数计算并关联的新值(如果键原先不存在)。
 * 
 */
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
    // 检查 key 和 mappingFunction 是否为 null,如果是则抛出 NullPointerException
    if (key == null || mappingFunction == null)
        throw new NullPointerException();
    
    // 计算 key 的哈希值,并进行扰动处理,以减少哈希冲突
    int h = spread(key.hashCode());
    V val = null;
    int binCount = 0;  // 用于记录遍历链表或树的节点数量
    for (Node<K,V>[] tab = table;;) {  // 无限循环,直到成功插入或找到对应节点
        Node<K,V> f; int n, i, fh;
        // 如果哈希表未初始化或长度为 0,则进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果计算出的索引位置为空,则尝试插入 ReservationNode 占位节点
        else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
            Node<K,V> r = new ReservationNode<K,V>();
            synchronized (r) {  // 锁住占位节点
                if (casTabAt(tab, i, null, r)) {
                    binCount = 1;
                    Node<K,V> node = null;
                    try {
                        // 调用 mappingFunction 计算新值,如果不为 null,则创建新节点
                        if ((val = mappingFunction.apply(key)) != null)
                            node = new Node<K,V>(h, key, val, null);
                    } finally {
                        // 将计算出的新节点设置到哈希表中
                        setTabAt(tab, i, node);
                    }
                }
            }
            if (binCount != 0)
                break;
        }
        // 如果当前节点正在进行迁移,则帮助迁移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            boolean added = false;
            synchronized (f) {  // 锁住当前桶的头节点
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {  // 链表结构
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek; V ev;
                            if (e.hash == h &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                val = e.val;  // 找到对应的节点,取值
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                if ((val = mappingFunction.apply(key)) != null) {
                                    added = true;
                                    pred.next = new Node<K,V>(h, key, val, null);
                                }
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {  // 红黑树结构
                        binCount = 2;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(h, key, null)) != null)
                            val = p.val;  // 找到对应的节点,取值
                        else if ((val = mappingFunction.apply(key)) != null) {
                            added = true;
                            t.putTreeVal(h, key, val);  // 插入新节点到红黑树中
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);  // 链表转化为红黑树
                if (!added)
                    return val;  // 返回找到的值
                break;
            }
        }
    }
    if (val != null)
        addCount(1L, binCount);  // 更新计数
    return val;
}

总结:
初始检查:
检查 key 和 mappingFunction 是否为 null。如果是 null,抛出 NullPointerException。
计算 key 的哈希值,并进行扰动处理以减少哈希冲突。

哈希表初始化与插入逻辑:
如果哈希表未初始化或长度为 0,则调用 initTable() 初始化哈希表。
计算出 key 对应的索引 i,并尝试插入 ReservationNode 占位节点。如果插入成功,调用 mappingFunction 计算值,并插入到哈希表中。

节点迁移:
如果当前节点正在进行迁移(哈希值为 MOVED),则调用 helpTransfer 方法帮助迁移。

链表结构处理:
如果索引处的节点是链表结构,通过遍历链表查找是否已有对应的 key,如果找到则返回其值。如果未找到则调用 mappingFunction 计算新 值,并插入链表尾部。

红黑树结构处理:
如果索引处的节点是红黑树结构,通过 findTreeNode 方法查找是否已有对应的 key,如果找到则返回其值。如果未找到则调用 mappingFunction 计算新值,并插入红黑树中。

计数更新:
如果成功插入新节点,调用 addCount 方法更新计数。

computeIfAbsent 方法使用示例:

 ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
        String s = map.computeIfAbsent("1", (v) -> {
            System.out.println("添加"+v);
            return v;
        });

# compute

/**
 * 原子性地为给定键的映射计算一个新的值,
 * 这基于当前的映射情况(存在与否)及提供的重映射函数。
 * 此方法适用于 当新值依赖于当前值,或需要动态决定是否更新或移除映射时。
 * <p>
 * 所提供的重映射函数应接受两个参数:键和当前值(如果尚无映射关系,则为null)。
 * 它应当返回要与键关联的新值,或者返回null以指示应移除该映射项。
 * <p>
 * 计算过程及后续的任何插入或移除操作均保证原子性,
 * 确保在并发环境下操作的安全性,防止数据不一致。
 * <p>
 * 如果函数抛出异常,映射不会被修改,异常将直接传递给调用者。
 * <p>
 * @param key 要计算新值的键。
 * @param remappingFunction 一个函数,用于基于键和当前值(如果存在)计算新值。
 *                          函数应返回新值或null以表示映射项应被移除。
 * @return 计算后与键关联的值,可能是新计算的值,也可能是已存在的值(如果函数未改变它)。
 */
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
    // 如果 key 或 remappingFunction 为 null,则抛出 NullPointerException
    if (key == null || remappingFunction == null)
        throw new NullPointerException();
    // 计算 key 的哈希值
    int h = spread(key.hashCode());
    V val = null; // 用于存储计算后的值
    int delta = 0; // 用于记录插入或删除操作的增量
    int binCount = 0; // 用于记录桶中节点的数量
    // 无限循环,直到成功插入或更新
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果表为空,初始化表
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果目标位置为空
        else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
            // 创建一个预留节点
            Node<K,V> r = new ReservationNode<K,V>();
            synchronized (r) {
                // 如果成功设置预留节点
                if (casTabAt(tab, i, null, r)) {
                    binCount = 1; // 更新节点数量
                    Node<K,V> node = null; // 用于存储新节点
                    try {
                        // 应用 remappingFunction 计算新值
                        if ((val = remappingFunction.apply(key, null)) != null) {
                            delta = 1; // 增量设置为 1 表示插入新节点
                            node = new Node<K,V>(h, key, val, null); // 创建新节点
                        }
                    } finally {
                        setTabAt(tab, i, node); // 设置节点到表中
                    }
                }
            }
            // 如果节点数量不为 0,跳出循环
            if (binCount != 0)
                break;
        }
        // 如果目标位置为迁移节点,帮助迁移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            // 对桶头节点加锁
            synchronized (f) {
                // 再次检查桶头节点
                if (tabAt(tab, i) == f) {
                    // 如果是链表节点
                    if (fh >= 0) {
                        binCount = 1; // 更新节点数量
                        // 遍历链表进行插入或更新操作
                        for (Node<K,V> e = f, pred = null;; ++binCount) {
                            K ek;
                            // 如果找到匹配的节点
                            if (e.hash == h &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 应用 remappingFunction 计算新值
                                val = remappingFunction.apply(key, e.val);
                                // 如果新值不为空,更新节点值
                                if (val != null)
                                    e.val = val;
                                else {
                                    // 如果新值为空,删除节点
                                    delta = -1; // 增量设置为 -1 表示删除节点
                                    Node<K,V> en = e.next;
                                    // 更新链表链接
                                    if (pred != null)
                                        pred.next = en;
                                    else
                                        setTabAt(tab, i, en);
                                }
                                break;
                            }
                            pred = e;
                            // 如果到达链表末尾,插入新节点
                            if ((e = e.next) == null) {
                                val = remappingFunction.apply(key, null);
                                if (val != null) {
                                    delta = 1; // 增量设置为 1 表示插入新节点
                                    pred.next = new Node<K,V>(h, key, val, null);
                                }
                                break;
                            }
                        }
                    }
                    // 如果是树节点
                    else if (f instanceof TreeBin) {
                        binCount = 1; // 更新节点数量
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        // 查找树节点
                        if ((r = t.root) != null)
                            p = r.findTreeNode(h, key, null);
                        else
                            p = null;
                        V pv = (p == null) ? null : p.val;
                        // 应用 remappingFunction 计算新值
                        val = remappingFunction.apply(key, pv);
                        // 如果新值不为空,插入或更新树节点
                        if (val != null) {
                            if (p != null)
                                p.val = val;
                            else {
                                delta = 1; // 增量设置为 1 表示插入新节点
                                t.putTreeVal(h, key, val);
                            }
                        }
                        // 如果新值为空,删除树节点
                        else if (p != null) {
                            delta = -1; // 增量设置为 -1 表示删除节点
                            if (t.removeTreeNode(p))
                                setTabAt(tab, i, untreeify(t.first));
                        }
                    }
                }
            }
            // 如果节点数量不为 0,检查是否需要树化并跳出循环
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                break;
            }
        }
    }
    // 更新元素计数
    if (delta != 0)
        addCount((long)delta, binCount);
    return val;
}


总结下:

  • 参数检查: 方法首先检查 key 和 remappingFunction 是否为 null,如果是则抛出 NullPointerException。
  • 哈希计算: 使用 spread 方法计算键的哈希值,以减少哈希冲突。
  • 循环处理: 通过无限循环不断尝试在表中找到合适的位置插入或更新值。
    • 如果表为空或大小为0,初始化表。
    • 如果目标位置为空,则创建一个预留节点,并尝试插入新节点。
    • 如果目标位置为迁移节点,则帮助迁移。
    • 如果目标位置为链表节点,则遍历链表进行插入或更新操作。
    • 如果目标位置为树节点,则遍历树进行插入或更新操作。
  • 同步控制: 在插入或更新操作中,通过对节点进行加锁来确保线程安全。
  • 计数更新: 插入或删除操作完成后,通过 addCount 方法更新元素计数。

compute方法代码示例:
compute方法计算并合并库存

import java.util.concurrent.ConcurrentHashMap;

public class TestA {
    public static void main(String[] args) throws Exception {
        ConcurrentHashMap<String, Integer> inventoryMap = new ConcurrentHashMap<>();
        // 初始库存
        inventoryMap.put("Independent Truck", 10);
        inventoryMap.put("Destructo Truck", 20);

        // 新到货
        String newProduct = "Independent Truck";
        int newStock = 15;

        // 使用 compute 方法进行合并库存操作
        inventoryMap.compute(newProduct, (key, stock) -> (stock == null) ? newStock : stock + newStock);

        System.out.println(inventoryMap); // 输出: {Destructo Truck=20, Independent Truck=25}
    }
}

# merge

/**
 * 尝试将指定的键与给定的值进行合并。
 * 如果此映射中已经存在该键的映射,则使用提供的合并函数来将现有的值与新值结合,生成的新值将替换原有的映射值。
 * 如果键不存在,此方法等同于常规的`put`操作,直接将键值对插入映射中。
 * <p>
 * 合并函数接受两个参数:当前映射中的值和要尝试合并的新值,
 * 应返回合并后的值,该值将成为与键相关联的新值。如果合并函数返回null,
 * 则此映射项将被移除(如果键已存在)或不会被插入(如果键不存在)。
 * <p>
 * 此操作是原子的,确保了对于指定键的合并过程不会被其他并发操作干扰,
 * 保障了在多线程环境下的数据一致性。
 * <p>
 * @param key 要合并值的键。
 * @param value 尝试合并进映射的新值。
 * @param remappingFunction 一个合并函数,接受当前值和新值作为输入,
 *                          返回合并后的值。可以返回null以指示应移除映射项。
 * @return 与键关联的合并后的值,或者是null(如果映射项被移除)。
 */
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
    // 检查 key, value 和 remappingFunction 是否为 null,若是则抛出 NullPointerException
    if (key == null || value == null || remappingFunction == null)
        throw new NullPointerException();
    // 计算 key 的哈希值
    int h = spread(key.hashCode());
    V val = null;
    int delta = 0;
    int binCount = 0;
    // 无限循环,直到成功插入或合并
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果表为空或长度为零,初始化表
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果对应位置为空,尝试使用 CAS 操作插入新节点
        else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(h, key, value, null))) {
                delta = 1; // 增加计数
                val = value;
                break;
            }
        }
        // 如果节点在移动,帮助完成迁移操作
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            // 如果节点不为空且未移动,锁住该节点
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        // 遍历链表节点,查找对应的 key
                        for (Node<K,V> e = f, pred = null;; ++binCount) {
                            K ek;
                            if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                // 找到匹配的 key,应用 remappingFunction
                                val = remappingFunction.apply(e.val, value);
                                if (val != null)
                                    e.val = val; // 更新值
                                else {
                                    delta = -1; // 减少计数
                                    Node<K,V> en = e.next;
                                    if (pred != null)
                                        pred.next = en;
                                    else
                                        setTabAt(tab, i, en);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null) {
                                delta = 1; // 增加计数
                                val = value;
                                pred.next = new Node<K,V>(h, key, val, null);
                                break;
                            }
                        }
                    }
                    // 如果节点是 TreeBin,处理树结构
                    else if (f instanceof TreeBin) {
                        binCount = 2;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r = t.root;
                        TreeNode<K,V> p = (r == null) ? null : r.findTreeNode(h, key, null);
                        val = (p == null) ? value : remappingFunction.apply(p.val, value);
                        if (val != null) {
                            if (p != null)
                                p.val = val;
                            else {
                                delta = 1; // 增加计数
                                t.putTreeVal(h, key, val);
                            }
                        }
                        else if (p != null) {
                            delta = -1; // 减少计数
                            if (t.removeTreeNode(p))
                                setTabAt(tab, i, untreeify(t.first));
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                break;
            }
        }
    }
    if (delta != 0)
        addCount((long)delta, binCount);
    return val;
}

# size

size 方法用于返回 ConcurrentHashMap 的大小,即当前映射中键-值对的数量。

// baseCount 用于跟踪计数的基本值,volatile 保证可见性
private transient volatile long baseCount;
// counterCells 是用于存储计数器的数组,volatile 保证可见性
private transient volatile CounterCell[] counterCells;

/**
 * 返回当前映射中键-值对的数量。
 *
 * @return 映射的大小。如果大小超过 Integer.MAX_VALUE,则返回 Integer.MAX_VALUE。
 */
public int size() {
    // 计算总计数
    long n = sumCount();
    // 根据计数值返回适当的大小
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

/**
 * 计算所有计数器的总和,包括 baseCount 和 counterCells 中的值。
 *
 * @return 所有计数器的总和。
 */
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

size 方法的设计目标是尽量在高并发情况下提供近似准确的大小,同时保持较高的性能。
为了实现这一目标,ConcurrentHashMap 采用了分段计数机制和无锁化设计,这使得 size 方法在极少数情况下可能返回一个不完全准确的值,但这种设计在大多数并发场景下能提供足够的精度和性能。

// todo 后续再详细讲解 size 方法的设计细节~

# 5、总结

JDK8之前的ConcurrentHashMap 基于Segment分段锁,相比HashTable或者SynchronizedMap提供更细的锁粒度,提高了并发环境下的性能。

JDK8的ConcurrentHashMap 基于CAS机制 和 synchronized 关键字来进一步提升并发环境下的性能。

CAS机制 和 synchronized 关键字在后面多线程知识点总结的时候再详细讨论。

# 补充点

# ConcurrentHashMap为什么设计成key和value不允许存null值?

我们知道HashMap是允许存null key 和 null value的 。
这么一想好像ConcurrentHashMap如果设计成支持存null key 和 null value好像也行得通。

但是再想一下ConcurrentHashMap和HashMap的本质区别,ConcurrentHashMap是线程安全的。

关键点在于 ConcurrentHashMap 在并发修改的情况下 无法保证明确的 map.get(key) 返回null值的语义。
如果允许了null key和 null value 那么 map.get(key) 将无法明确到底是 key不存在于map中 ,还是key存在但是key对应的value是null。

对于HashMap来说,HashMap本身的定义就是非线性安全的集合,所以在单线程操作的情况下,当map.get(key) 返回null值时,我们可以利用map.contains(key)来判断 key到底是否存在于map中,如果存在说明value是null,如果不存在说明map.get(key) 返回null值的含义是 key不存在。

而对于 ConcurrentHashMap来说,ConcurrentHashMap的定义是线程安全的集合允许安全的并发修改操作 。
这里假设一个场景 当map.get(key) 返回null值 时:
我们利用map.contains(key)来判断 key到底是否存在于map中

import java.util.concurrent.ConcurrentHashMap;

public class TestA {

    public static void main(String[] args) throws Exception {

        //  ====== 假如ConcurrentHashMap 允许存储 null key  和 null value
        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
        map.put("1", "1");
        map.put("2", "2");

        String key = null;
        // 假如 我们此时不知道map内部的数据情况
        Thread t1 = new Thread(() -> {
            if (map.containsKey(key)) {
                System.out.println("情况①: key不存在");
            } else {
                System.out.println("情况②: value为null");
            }
        });

        Thread t2 = new Thread(() -> {
            if (!map.containsKey(key)) {
                map.put(key, null);
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();
    }

}

代码是无法运行的,因为会抛NPE。 (注意前提 // ====== 假如ConcurrentHashMap 允许存储 null key 和 null value)

我们假设线程t1 判断 map.containsKey(key)返回了false,表示map中不含null这个key,t1还没来得及打印System.out.println("情况①: key不存在");, t2线程也判断了 map.containsKey(key)返回了false,然后t2随后插入了map.put(key, "t2");,最后t1打印了情况①: key不存在。那么对于t1来说这次判断就是错的,因为实际上t2线程在t1线程打印之前已经存入了null key。

这只是举个例子,如果ConcurrentHashMap允许了 null key 和 null value之后,由于ConcurrentHashMap允许并发修改并且声称并发修改是线程安全的, 但是实际上却无法通过 map.containsKey(key)方法来确定 map.get(key)返回null值到底是 key不存在的情况,还是value为null的情况。
这种问题本质上还是并发相关的问题:

  • 瞬时状态: t1 和 t2 都在接近同时检查 containsKey(key) 的状态。由于这两个操作之间没有同步机制,导致两者看到的 map 状态是相同的,但实际执行顺序是不确定的。

  • 缺乏原子性: 多个操作组合在一起没有原子性。即 containsKey 和 put 不是一个原子操作,中间可能会插入其他线程的操作。

  • 并发修改: 尽管 ConcurrentHashMap 允许并发修改并提供线程安全的单个操作,但在组合这些操作时,依然需要额外的同步来保证逻辑的一致性。

Doug Lea(ConcurrentHashMap的作者) 认为那么这种模糊的语义,在并发环境下是不能容忍的。
我查了一些资料,比较可信的一个说法是,网上有人给Doug Lea写了邮件问了ConcurrentHashMap为什么设计成key和value不允许存null值 这个问题, Doug Lea回复的邮件内容如下:

The main reason that nulls aren't allowed in ConcurrentMaps (ConcurrentHashMaps, ConcurrentSkipListMaps) is that ambiguities that may be just barely tolerable in non-concurrent maps can't be accommodated. The main one is that if map.get(key) returns null, you can't detect whether the key explicitly maps to null vs the key isn't mapped. In a non-concurrent map, you can check this via map.contains(key),but in a concurrent one, the map might have changed between calls.

Further digressing: I personally think that allowing nulls in Maps (also Sets) is an open invitation for programs to contain errors that remain undetected until they break at just the wrong time. (Whether to allow nulls even in non-concurrent Maps/Sets is one of the few design issues surrounding Collections that Josh Bloch and I have long disagreed about.)

It is very difficult to check for null keys and values in my entire application .

Would it be easier to declare somewhere static final Object NULL = new Object(); and replace all use of nulls in uses of maps with NULL?

-Doug

大致意思如下:

在并发Map(如ConcurrentHashMap和ConcurrentSkipListMap)中不允许使用null值的主要原因是,在非并发Map中可能勉强容忍的模糊性在并发映射中是无法容忍的。
主要问题在于,如果map.get(key)返回null,你无法判断这个key是明确映射到null,还是这个key没有被映射。
在非并发映射中,你可以通过map.contains(key)来检查,但在并发映射中,映射可能在不同的调用之间发生了变化。

进一步偏题讲一下:我(Doug Lea)个人认为在映射(以及集合)中允许null值会导致程序包含一些错误,这些错误会在某些关键时刻才被发现并导致问题。(关于是否在非并发映射/集合中允许null值,这是Josh Bloch和我之间少数几个设计问题之一,我们一直存在分歧。)

在我的整个应用程序中检查null键和值是非常困难的。

那么是否更容易在某处声明一个

static final Object NULL = new Object();

然后用NULL替代所有映射中的null值?

-Doug

上面就是Doug Lea回复这个问题的邮件大致内容
我觉得大致上分为了两点:

第一就是我们上面提到的 歧义问题 当map.get(key)返回null值时的歧义问题。

Doug Lea说非并发Map能够勉强容忍这种歧义,而并发Map中难以容忍这种歧义。
我的理解是非并发Map使用map.contains(key)检查 也可能被其他线程打扰,但是都说了是非并发Map,即使被其他线程打扰导致无法明确map.get(key)返回null值时的歧义问题,这个歧义问题也不是设计者的锅! 就拿HashMap举例,我们使用上面的多线程例子完全也可以说明map.get(key)返回null值时的歧义问题,但是 Josh Bloch设计HashMap的时候前提已经说明了HashMap是非线程安全的,你非要用多线程去搞个歧义的问题,那么这个锅就在使用者。是使用者自己使用不当导致的歧义。

而并发Map难以容忍这种歧义,就拿ConcurrentHashMap举例,Doug Lea如果设计成 能存 null value和null key ,那么使用者发现了map.get(key)返回null值时的歧义问题,并且前提是 Doug Lea设计的ConcurrentHashMap宣称是并发安全的map,却有这种歧义问题,并且无法通过map.contains(key)来明确map.get(key)返回null值时语义,那么你们觉得这个锅应该谁来背,显然就是ConcurrentHashMap的作者Doug Lea需要为此负责。 所以Doug Lea就把ConcurrentHashMap设计成 不允许存 null value和null key 也就自然没有这种歧义的问题存在了。

第二点 Doug Lea提到了, 允许null值会导致程序包含一些错误,这些错误会在某些关键时刻才被发现并导致问题。 这点我想是很容易理解的,大部分肯定都遇到过 调用HashMap 的get方法得到了一个null,由于忘记了判空导致程序抛NPE报错。 所以不允许null key和null value还有一点好处就是能减少NPE风险。 同时Doug Lea还提到了在他的整个应用程序中检查null键和值是非常困难的,这是由于他设计的是并发安全的集合,允许null意味着在设计集合时需要花费大量精力来对参数进行判断,这在并发情况下是非常复杂的。

Doug Lea说(关于是否在非并发映射/集合中允许null值,这是Josh Bloch和我之间少数几个设计问题之一,我们一直存在分歧。) 这个就很有意思了,我觉得Doug Lea可能想的是:Josh Bloch你小子设计的是非并发集合,存不存null都可以,存null你小子说增加了集合的灵活性,出现歧义那就是使用者的问题。你小子怎么都是赢。 再看看我设计的是并发安全的集合,要是支持存null,参数的判断都够我喝一壶了,出现歧义问题那就是我设计上的锅。那就直接不允许存null得了。再说了Josh Bloch你小子设计的HashTable支持null key,null value了吗! (这段扯多了,当笑话看就行了,哈哈)

最后Doug Lea还提出一个意见,如果真的需要存null key 或者 null value,可以使用一个空的常量对象来表示 null,存入并发安全的集合中。
例如:

static final Object NULL = new Object();

这个问题我也是在网上查了许多资料总结出上面的结论。
至于Doug Lea邮件的真实性并没有去核实,但是这个并不影响我们对问题的分析。
如果你对这个分析有什么别的看法,欢迎评论。

# 详细比较JDK8之前和JDK8的 ConcurrentHashMap不同点 :

特性 JDK8之前 JDK8
底层数据结构 Segment数组 + HashEntry数组 + 链表 Node数组 + 链表 + 红黑树
Segment 使用Segment数组实现分段锁 不使用Segment,完全基于CAS和synchronized实现
锁机制 每个Segment是一个独立的ReentrantLock 使用CAS操作和synchronized控制并发
并发级别 默认并发级别为16,可以通过构造函数设置 无固定并发级别,扩展性更高
性能 锁竞争较多,在高并发下性能有限 更高效的锁策略和改进的数据结构,性能更好
容量与扩容 基于Segment进行扩容,每个Segment独立扩容 全局扩容,采用更高效的扩容机制
红黑树 不支持 链表长度>=8且数组长度>=64时转换为红黑树
原子操作(如computeIfAbsent) 不支持 支持,基于新的CAS和synchronized实现
size()方法 锁住所有Segment然后求和 并发进行计数操作,更高效