一、前言
在Java并发编程中,线程安全的Map实现一直是一个重要话题。虽然我们可以使用Collections.synchronizedMap()
或者HashTable
来获得线程安全的Map,但它们的性能在高并发场景下往往不尽人意。ConcurrentHashMap作为Java并发包中的重要组件,以其优雅的设计和出色的性能成为了并发编程的首选。
二、为什么需要ConcurrentHashMap
1.1 传统方案的问题
在ConcurrentHashMap出现之前,我们主要有以下几种线程安全的Map实现:
HashTable
// HashTable的put方法
public synchronized V put(K key, V value) {
// 整个方法都被synchronized修饰
// 所有操作都是串行的
}
Collections.synchronizedMap()
// SynchronizedMap的实现
public V put(K key, V value) {
synchronized (mutex) { // mutex通常是this
return m.put(key, value);
}
}
这两种方案的共同问题是:
- 粗粒度锁:整个Map只有一把锁,所有操作都需要竞争这把锁
- 读写冲突:读操作也需要获取锁,无法并发进行
- 性能瓶颈:在高并发场景下,锁竞争激烈,性能急剧下降
1.2 性能对比示例
public class MapPerformanceTest {
private static final int THREAD_COUNT = 16;
private static final int OPERATION_COUNT = 100_000;
public static void main(String[] args) throws InterruptedException {
// 测试HashTable
Map<String, Integer> hashTable = new Hashtable<>();
testConcurrentAccess("HashTable", hashTable);
// 测试ConcurrentHashMap
Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();
testConcurrentAccess("ConcurrentHashMap", concurrentMap);
}
private static void testConcurrentAccess(String mapType, Map<String, Integer> map)
throws InterruptedException {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
long startTime = System.currentTimeMillis();
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
try {
startLatch.await();
for (int j = 0; j < OPERATION_COUNT; j++) {
map.put("key" + j, j);
map.get("key" + j);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}
startLatch.countDown();
endLatch.await();
long endTime = System.currentTimeMillis();
System.out.println(mapType + " 耗时: " + (endTime - startTime) + "ms");
}
}
典型测试结果:
- HashTable: 2500ms+
- ConcurrentHashMap: 800ms-
三、ConcurrentHashMap的演进历史
2.1 JDK 1.5-1.7:分段锁时代
在JDK 1.7及之前,ConcurrentHashMap采用**分段锁(Segment)**的设计:
// JDK 1.7 ConcurrentHashMap结构
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table; // 该段的哈希表
transient int count; // 该段的元素数量
transient int modCount; // 修改次数
transient int threshold; // 扩容阈值
final float loadFactor; // 负载因子
}
// HashEntry结构
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next; // 链表指针
}
分段锁原理:
- 将整个Map分成多个Segment(默认16个)
- 每个Segment相当于一个小的HashMap,拥有独立的锁
- 不同Segment之间的操作可以并发进行
- 只有操作同一个Segment时才需要竞争锁
// JDK 1.7 put操作
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 根据hash值确定segment
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
2.2 JDK 1.8+:CAS + synchronized的革新
JDK 1.8对ConcurrentHashMap进行了重大改革:
主要变化:
- 取消分段锁:改为对每个桶(数组元素)进行锁定
- 引入红黑树:链表长度超过8时转换为红黑树
- CAS操作:大量使用CAS操作减少锁的使用
- 锁粒度更细:锁的粒度从Segment级别降低到桶级别
// JDK 1.8+ 主要数据结构
transient volatile Node<K,V>[] table; // 主数组
private transient volatile Node<K,V>[] nextTable; // 扩容时的新数组
private transient volatile int sizeCtl; // 控制标识符
// Node结构
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
// 红黑树节点
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev;
boolean red;
}
四、扩容的区别
4.1、JDK1.7中的扩容
- 基于Segment:ConcurrentHashMap 是由多个 Segment 组成的,每个 Segment 中包含一个 HashMap。当某个 HashMap 达到扩容阈值时,单独为该 Segment 进行扩容,而不会影响其他的 Segment。
- 扩容过程:每个 Segment 维护自己的负载因子,当 Segment 中的元素超过阈值时,该Segment 的 HashMap 会扩容,整体的 ConcurrentHashMap 并不是一次性全部扩容。
4.2、JDK1.8中的扩容
- 全局扩容:ConcurrentHashMap 取消了 Segment,变成了一个全局数组(类似于HashMap),因此当 ConcurrentHashMap 中的任意一个元素超出阈值时,整个 ConcurrentHashMap 都会触发扩容。
- 基于CAS扩容:在扩容时,ConcurrentHashMap 采用了类似 HashMap 的方式,但通过 CAS操作 确保线程的安全,避免了锁住整个数组。在扩容时多个线程可以协同完成扩容。
- 渐进式扩容:JDK1.8的 ConcurrentHashMap 引入了渐进式扩容机制,扩容不是一次性将所有数据重新分配,而是多个线程共同参与,逐步迁移旧数据到新数组,降低了扩容时性能的开销
4.3、深入JDK 1.8:多线程协作扩容
// JDK 1.8 扩容核心流程
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算每个线程的工作量
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// 首个线程初始化新表
if (nextTab == null) {
try {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
// 多线程协作迁移
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 获取工作范围
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// CAS获取工作范围
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 处理边界和收尾
if (i < 0 || i >= n || i + n >= nextn) {
// 扩容完成处理...
}
// 迁移具体的桶
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true;
else {
synchronized (f) { // 锁定单个桶
if (tabAt(tab, i) == f) {
// 链表/红黑树迁移逻辑
// 分为高位和低位两个链表/树
}
}
}
}
}
// 多线程协作时序图
Thread-1 Thread-2 Thread-3
| | |
| 发起扩容 | |
| 初始化nextTable | |
| transfer(0-15) | helpTransfer |
| | transfer(16-31) | helpTransfer
| 迁移bucket-0 | | transfer(32-47)
| 迁移bucket-1 | 迁移bucket-16 |
| ... | ... | 迁移bucket-32
| 完成分配范围 | 完成分配范围 | ...
| 检查是否全部完成 | 退出扩容 | 完成分配范围
| 替换table引用 | | 退出扩容
| 扩容完成 | |
优点:
- 多线程并发迁移,效率高
- 细粒度锁,减少阻塞时间
- 渐进式迁移,内存使用平滑
- 支持扩容期间的读操作
缺点:
- 实现复杂度高
- 需要额外的协调机制
- 内存开销较大(ForwardingNode等)
五、核心源码分析
3.1 初始化过程
// 构造函数
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// 计算最接近size的2的幂次方
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap; // 设置初始容量
}
// 懒初始化
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // 其他线程正在初始化,让出CPU
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// CAS成功,获得初始化权限
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2); // 0.75 * n
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
3.2 put操作详解
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 计算hash值
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 情况1:表未初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 情况2:目标桶为空,直接CAS插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // CAS成功,结束循环
}
// 情况3:正在扩容,协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 情况4:桶不为空,需要同步操作
else {
V oldVal = null;
synchronized (f) { // 锁定桶的头节点
if (tabAt(tab, i) == f) { // 双重检查
// 4.1 链表结构
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到相同key,更新value
if (e.hash == hash &&
((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 到达链表尾部,插入新节点
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 4.2 红黑树结构
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 检查是否需要树化
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 转换为红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 更新计数
return null;
}
3.3 get操作详解
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 直接命中头节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash值为负数,特殊节点(红黑树或转发节点)
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get操作的特点:
- 无锁读取:get操作不需要加锁,依赖volatile保证可见性
- 快速路径:优先检查头节点,大多数情况下可快速返回
- 支持并发扩容:即使在扩容过程中也能正确读取数据