ConcurrentHashMap 介绍
引入
HashMap 是最常用的 Map 集合之一,JDK1.8 之后其底层的数组+链表/红黑树的结构使得读写效率都非常高,然而它并不是一个线程安全的集合,如果不加以同步控制,在并发的环境下,可能会导致 HashMap 存在线程安全问题。
HashMap 线程不安全
HashMap 的 put()、resize() 方法都没有任何同步措施,比如 synchronized 或 CAS 机制,当多个线程同时操作同一个 HashMap 时,这些操作不是原子性的,容易出现竞态条件。
如果两个线程 A、B 同时写入不同的数据,当发生不同数据 hash 冲突时,他们可能会同时读取对应索引的哈希桶头节点,各自创建新节点并返回,最终会导致节点数据覆盖的问题。并且 size 属性的更新也并非原子操作(写入数据后采用 ++size 的方式更新)
线程安全的替代方案
正因为 HashMap 是线程不安全的,所以需要寻找线程安全的替代方案
HashTable:所有方法都使用synchronized修饰,使用全局锁,性能不高Collections.synchronizedMap(new HashMap<>()):把 HashMap 包装为 SynchronizedMap,所有方法的逻辑都被synchronized代码块包裹,与 HashTable 类似,也是使用全局锁,性能不高ConcurrentHashMap:采用更精细化的锁控制,JDK 1.7 使用分段锁,JDK 1.8 使用 CAS +synchronized
JDK 1.8 ConcurrentHashMap
在 JDK 1.8 中,ConcurrentHashMap 在保证线程安全的情况下,通过无锁化、细颗粒度锁、多线程协作,最大化并发性能
底层结构
在 JDK 1.8 中,ConcurrentHashMap 的存储结构是由 Node数组 + 链表/红黑树 组成,当 hash 冲突链表达到一定长度后,链表会转换为红黑树,与 HashMap 类似
初始化
- ConcurrentHashMap 的初始化是一个懒加载的行为,构造时只记录容量,首次写操作才初始化
table,避免无用开销 sizeCtl被volatile修饰,使得对它的修改能让其他线程及时感知到- 用 CAS 的方式来修改
sizeCtl,其他线程感知到sizeCtl小于 0 就执行yield()让出 CPU 并进行自旋等待,保证只有一个线程能进行初始化 - 经过 CAS 后进行多一次的
table检查,双重检查避免重复初始化 - ConcurrentHashMap 使用 CAS 和
volatile实现了无锁初始化,避免了直接使用synchronized的性能损耗
关于 yield() 的介绍👉: yield学习
// 容器初始化和调整大小的空间
// 构造时,sizeCtl表示容量(如果有指定初始容量)
// 初始化时,sizeCtl被设置为-1作为正在初始化的标志位
// 初始化完成后,sizeCtl表示扩容阈值
// 扩容时,sizeCtl被设置为-N(高16位表示扩容的标识,低16位减1为正在扩容线程数)
private transient volatile int sizeCtl;
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果sizeCtl小于0,此时有其他线程正在执行initTable
if ((sc = sizeCtl) < 0)
// 让出CPU使用权,只有一个线程能初始化,其他线程自旋等待,直到初始化完成
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
// 线程 CAS 成功把sizeCtl改成-1,抢到initTable执行权
try {
// CAS 后进行双重检查,避免重复初始化
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 初始化 table
table = tab = nt;
// 计算扩容阈值
sc = n - (n >>> 2);
}
} finally {
// 初始化完成,把sizeCtl设置为扩容阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}扩容
和 HashMap 一样,当哈希桶过于拥挤时,查询效率会从 O(1) 退化成 O(n) (链表变长) 或者 O(log n) (红黑树),尽管链表超过一定阈值后也会转换成红黑树来提升查询效率,但是哈希桶扩容也是能大大提升查询效率的。
当元素数量 > sizeCtl(初始化完成后,它作为扩容阈值) 时,ConcurrentHashMap 会把底层数组的容量扩容为原来的 2 倍,让元素重新分布,降低冲突。
触发时机:
put()成功插入一个元素后,调用addCount()检查是否需要扩容,检查元素数量是否超过阈值sizeCtl- 在哈希桶完成迁移之后,哈希桶的头节点会转换为特殊节点
ForwardingNode(hash = -1)
扩容期间的读写操作,写操作加锁或CAS,读操作依靠
volatile保证可见性- 扩容期间读操作:读取数据的过程完全不加锁,靠
volatile和ForwardingNode保证线程安全 - 扩容期间写操作:写入数据时发现对应的哈希桶已迁移,则去新数组进行插入;如果未迁移,那么先协助迁移桶,再写入
- 写操作和迁移桶前都仅会对某一个哈希桶加锁,既保证迁移和写操作互斥,又降低锁的颗粒度,最大程度减少竞争
- 扩容期间读操作:读取数据的过程完全不加锁,靠
协作扩容:ConcurrentHashMap 的扩容不是一蹴而就,而是充分利用多核CPU,多个线程参与合作
- 第一个发现需要扩容的线程会初始化一个新的数组
nextTable,并开始迁移第一位元素 - 后续调用
put()的线程,如果发现正在扩容(检查到哈希桶第一个元素是ForwardingNode),也会帮忙迁移 - 每个参与扩容的线程通过 CAS 申请一段区间(如果[0~15])来迁移避免重复迁移
- 第一个发现需要扩容的线程会初始化一个新的数组
扩容结束:当所有哈希桶都迁移完成时,最后一个线程会:
- 把
table指向nextTable并清空nextTable - 重新计算新的
sizeCtl= 负载因子(默认是0.75) * 新容量
- 把
迁移逻辑
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算每个线程负责的步长(stride)
// ... 初始化 nextTab(如果第一个线程)
for (int i = 0, bound = 0;;) {
Node<K,V> f = tabAt(tab, i);
if (f == null)
advance = casTabAt(tab, i, null, fwd); // 桶为空,直接 CAS 设置为 ForwardingNode
else if ((fh = f.hash) == MOVED)
continue; // 已经是 ForwardingNode,跳过
else {
synchronized (f) { // 对哈希桶头加锁,保证迁移和写操作互斥
if (tabAt(tab, i) == f) { // 双重检查
Node<K,V> ln = null, hn = null;
// ... 遍历链表,按 hash & oldCap 分成 low/high 两组
setTabAt(nextTab, i, ln); // 低位链
setTabAt(nextTab, i + n, hn); // 高位链(偏移 n)
setTabAt(tab, i, fwd); // 迁移完成后,将旧桶设为 ForwardingNode
// ...
}
}
}
}
}其他线程协助迁移逻辑
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS)
break;
if (transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab); // 协助迁移
break;
}
}
// putVal()方法发现索引落在了已迁移的哈希桶,那么需要去新表插入
// 具体的行为是返回 nextTab,在下一轮的循环中操作 nextTab
return nextTab; // 返回新表
}
return table;
}检查是否需要扩容的逻辑
private final void addCount(long x, int check) {
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// ...
if (sc < 0) { // sizeCtl小于0,表示正在扩容
// 使用 CAS 的方式对 sizeCtl 增加1,表示当前线程需要加入扩容操作
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
// 主动参与扩容
transfer(tab, nt);
}
}
}
}put
put 操作流程:
spread()方法计算 key 的 hash 值- 如果发现
table是null,那么调用initTable()来完成数组初始化,通过 CAS 的方式确保只有一个线程能进入初始化流程 - 如果发现对应的哈希桶是一个空桶,那么使用 CAS 的方式插入数据
- 如果发现对应的哈希桶有数据,那么需要使用
synchronized锁住哈希桶第一个元素,来完成链表或红黑树的插入 - 完成插入数据的工作后,调用
addCount()方法来进行扩容检查,如果需要扩容,那么参与扩容工作
ConcurrentHashMap put方法采用分段锁思想:只有在必要时针对单个桶加锁,其他情况使用 CAS 或者无锁操作,最大化减少锁竞争的开销
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算哈希值
// 与 HashMap 类似,采用了高16位和低16位异或的方式来降低哈希冲突
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
// 底层 table 未初始化时,执行初始化
// 使用 CAS 确保只有一个线程可以初始化,其他 CAS 失败的线程执行 yield 进行自旋
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 通过 CAS 的方式进行空桶插入
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 发现元素落在已迁移的哈希桶
// 协助迁移过程,最后返回nextTab,在下一轮循环中完成数据插入
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
// 哈希桶已经有数据,需要锁住桶头避免其他线程的写入或扩容影响
synchronized (f) {
// 双重检查,防止在等待锁期间,该桶被扩容(变为 ForwardingNode)
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 链表的方式插入
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
// 红黑树的方式插入
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 更新元素数量,如果超出sizeCtl,就需要扩容
addCount(1L, binCount);
return null;
}get
- ConcurrentHashMap 中有一些特殊的 hash 值,-1 代表正在扩容,-2 代表红黑树的根节点,碰上这两种情况时需要特殊处理
get()方法读取数据不需要加锁- 底层的 table 用
volatile修饰,一旦扩容完成,就能立刻读取到新 table - 获取元素时,使用的
tabAt方法底层使用UnSafe的getReferenceVolatile实现,以volatile语义保证读取数组元素的可见性 - Node 的 val 和 next 都用
volatile修饰,保证数据的修改对其他线程可见
- 底层的 table 用
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算key对应的索引
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 和哈希数组的第一个元素的hash值相同
if ((eh = e.hash) == h) {
// 直接命中哈希数组上的第一个元素
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
// hash值小于0,说明正在扩容或者是红黑树,如果是 ForwardingNode,会去nextTab中寻找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
// 无法命中第一个元素,并且hash值正常,并且发现还有后续节点,说明当前是链表结构,按照链表的方式查询
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}JDK 1.7 ConcurrentHashMap
对于 JDK 1.7 的 ConcurrentHashMap,我们重点关注和 JDK 1.8 的区别
核心机制
| 机制 | JDK 1.7 实现要点 |
|---|---|
| 底层结构 | Segment数组 (分段锁) 每个 Segment 是 HashEntry数组 + 链表的结构 |
| 初始化 | 构造时一次性创建所有 Segment数组,对于 HashEntry数组则是懒加载 (第一次put时初始化) |
| put | 1. 根据哈希确定用哪一个 Segemnt 2. Segment 内部使用 ReentrantLock 进行加锁 3. 在 Segment 内部像 HashMap 一样 put |
| get | 无锁操作,依赖 volatile 保证 HashEntry.val/next 的数据可见性 |
| 扩容 | 每个 Segment 独立扩容 只锁当前 Segment,不影响其他 Segment |
| 保证并发性能 | Segment 代表一个 HashMap,用 16 个(默认)ReentrantLock 保护 16 个独立的小 HashMap,实现并发 |
底层结构对比
| 对比维度 | JDK 1.7 | JDK 1.8 |
|---|---|---|
| 结构 | 底层结构 Segment[] + HashEntry[] | 底层结构是 Node[] |
| 内存开销 | 每个 Segment 有 ReentrantLock、table 字段,内存开销大 | 省去 Segment 结构,不单独维护 ReentrantLock,更轻量化 |
锁粒度对比
| 对比维度 | JDK 1.7 | JDK 1.8 |
|---|---|---|
| 锁单位 | 加锁时会锁住一整个 Segment | 加锁时锁住单个哈希桶 |
| 并发度 | 因为加锁会锁住 Segment,所以并发度 = Segment | 因为加锁只锁住一个哈希桶 所以理论并发度 = 哈希桶数量 |
| 锁竞争 | 因为 Segment 内存开销较大,Segment 数量不会太多 并且 Segment[] 也不会扩容,所以锁的竞争会更多 | ConcurrentHashMap 会扩容 因此哈希桶会变多,锁竞争更少 |
扩容机制
| 对比维度 | JDK 1.7 | JDK 1.8 |
|---|---|---|
| 扩容单位 | 每个 Segment 独立扩容 | 整个 table 扩容 |
| 扩容效率 | 扩容只能一个线程完成 当 Segment 较大时扩容速度会下降 | 充分利用多核 CPU 多线程协作扩容 大表扩容时,参与的线程越多扩容越快 |
关键面试问题
ConcurrentHashMap 如何保证线程安全
ConcurrentHashMap 保证线程安全在以下各个方面:
- 初始化:通过 CAS 修改
sizeCtl和双重检查table的方式保证只有一个线程能进入初始化工作,其他线程调用yield()自旋等待 - put 操作:
- 空哈希桶:CAS 无锁插入
- 有数据的哈希桶:使用
synchronized只锁住哈希桶第一个元素,锁粒度更精细 - 遇到扩容:跳转新表,下一轮循环在新表插入
- get 操作:读操作不加锁,依赖
volatile字段(val,next)和tabAt()(volatile 读)保证可见性 - 扩容操作:多线程协作,通过
ForwardingNode确保读写操作在新旧表中切换
为什么 ConcurrentHashMap 的读操作不需要加锁
Node的val和next字段都使用volatile修饰,所有修改能保证可见性- 获取元素时使用
tabAt()方法,底层通过UnSafe类的getReferenceVolatile实现,以 volatile 语义保证读取元素操作的可见性 - 遇上扩容并且哈希桶正在被迁移时,当前节点会变成
ForwardingNode,自动跳转新表查找元素
ConcurrentHashMap 扩容时,为什么get/put操作都不阻塞
- get 操作:查询元素时碰上
ForwardingNode(hash=-1),会调用find()自动去新表查询 - put 操作:
- 如果正在扩容但是当前哈希桶还没迁移,那么在旧表中完成插入数据,后续旧表会被迁移到新表
- 如果正在扩容但是当前哈希桶正在迁移,会因为获取哈希桶头锁而短暂等待,但是拿到锁后会发现当前节点已经变成
ForwardingNode, 会跳转到新表来继续完成插入数据 - 如果扩容已经完成了,由于
table被volatile修饰,那么直接在新表中插入数据
sizeCtl 的作用
sizeCtl 其实蕴含了一些状态机思维
sizeCtl > 0:在构造阶段,它代表了初始容量;在初始化完成后,它代表了扩容阈值sizeCtl = -1:表示正在初始化sizeCtl < -1:表示正在扩容,低16位是参与扩容的线程数
通过 volatile 修饰和 CAS 操作,实现了 sizeCtl 的无锁化转换
ConcurrentHashMap 在 JDK 1.8 和 JDK 1.7 中的对比
从以下几个维度分析他们的差异:
- 存储结构:JDK 1.7 中 ConcurrentHashMap 的结构是 Segment 数组 + HashEntry 数组 + 链表的结构,每一个 Segment 相当于一个 HashMap;JDK 1.8 中 ConcurrentHashMap 是 Node 数组 + 链表/红黑树的结构
- 锁颗粒度:JDK 1.7 中 ConcurrentHashMap 在插入数据时会锁住整个 Segment,相当于锁住单个 HashMap,对于同一个 Segment 的修改操作都要阻塞等待;JDK 1.8 中 ConcurrentHashMap 在插入/扩容时会锁住单个 Node(哈希桶第一个元素),其他哈希桶可以正常操作,锁颗粒度更小
- 扩容机制:JDK 1.7 中 ConcurrentHashMap 每一个 Segment 独立扩容;JDK 1.8 中 ConcurrentHashMap 全表扩容,但是可以多线程协作
ConcurrentHashMap 在 JDK 1.8 中放弃 Segment 的原因
主要有以下几个核心原因:
- 锁颗粒度太粗,JDK 1.7 中对整个 Segment 加锁,默认只有 16 个 Segment,并发度不高;JDK 1.8 对单个哈希桶加锁,颗粒度更小
- 内存开销太大:JDK 1.7 中 Segment 数组即使没有用到,都要初始化,并且 Segment 中各自维护 ReentrantLock 和 table 字段
- 大表扩容效率不高:Segment 内部扩容无法充分利用多线程协作的能力,当 Segment 变大,扩容、迁移的效率会下降