1 ConcurrentHashMap 简介
Map 一种存储键值对 (key-value) 的数据结构, 可以通过 key 快速地定位到需要的 value, 在 Java 中是一个使用频率很高的一个数据结构。一般情况下, 我们都是可以直接使用它的实现类 HashMap 就能满足需求了。
但是 HashMap 在多线程情况, 并不是一个线程安全的类。
解决的方式有很多, 例如:
- 使用在 Java 体系中古老的 Hashtable 作为替代, 该类基本上所有的方法都采用 synchronized 进行线程安全的控制, 虽然保证了线程安全, 但是牺牲了很大的性能。 在高并发的情况下, 每次只有一个线程能够获取对象监视器锁, 这样的并发性能的确不令人满意。
- 通过 Collections 的 synchronizedMap(Map<K,V> m) 方法返回一个线程安全的 Map。但是这个的效果实际上和 Hashtable 的一样, 依然是采用 synchronized 独占式锁进行线程安全的并发控制的, 所以这种方案的性能也是令人不太满意的。
那么有没有一种既保证了线程安全性, 性能也不错的 Map 呢? ConcurrentHashMap 就是我们的选择了, 内部利用了锁分段的思想提高了并发度。当然的, 随着 JDK 的升级, ConcurrentHashMap 的实现也有了不同的方式。
大体了分为 2 个版本: 1.6 版本的 和 1.8 版本。
ConcurrentHashMap 在 JDK 1.6 的版本网上资料很多, 有兴趣的可以去看看。 JDK 1.6 版本关键要素:
- segment 继承了 ReentrantLock 充当锁的角色, 为每一个 segment 提供了线程安全的保障
- segment 维护了哈希散列表的若干个桶, 每个桶由 HashEntry 构成的链表
而到了 JDK 1.8 的 ConcurrentHashMap 就有了很大的变化, 光是代码量就足足增加了很多。1.8 版本舍弃了 segment, 并且大量使用了 synchronized, 以及 CAS 无锁操作以保证 ConcurrentHashMap 操作的线程安全性。
至于为什么不用 ReentrantLock 而是 Synchronzied 呢? 实际上, synchronzied 在 JDK 1.6 做了很多的优化, 包括偏向锁, 轻量级锁, 重量级锁, 可以依次向上升级锁状态。
因此, 使用 synchronized 相较于 ReentrantLock 的性能会持平甚至在某些情况更优, 具体的性能测试可以去网上查阅一些资料。另外, 底层数据结构改变为采用数组 + 链表 + 红黑树的数据形式。
2 ConcurrentHashMap 的关键属性, 类和 CAS 方法
在了解 ConcurrentHashMap 的具体方法实现前, 我们需要系统的来看一下几个关键的地方, 从这几个地方, 从大体上了解 ConcurrentHashMap 的一些特性。
2.1 常量设置 (这些设置基本和 HashMap 的类似)
- ConcurrentHashMap 内部是使用一个数组存放数据的, 这个数组的长度必须是 2 的 n 次方, 默认的容量是 16, 最大是 1 << 30, 既 2 的 30 次方
- 默认负载因子为 0.75, 当数组的的存数据的项 >= 数组的长度 * 负载因子, 就会进行数组长度扩容
- 数组的每一项, 在 JDK 1.8 中, 可以是链表, 也可以是红黑树。
- 当数组的长度大于等于 64, 数组的某一项的长度大于等于 8, 会转为红黑树, 小于等于 6, 会重新转为链表
- key 和 value 都不允许为 null (HashMap 允许一个 key 为 null 和 不限制的 value 为 null)
- 在 ConcurrentHash 中数组中的每个节点的 hash 都有特殊作用
ConcurrentHash 中数组中的节点的 hash 值不同的含义
- >= 0, 表明这个节点为 Node, 既链表节点
- -1, 表明这个节点为 ForwardingNode, 说明这个位置正在数据迁移
- -2, 表明这个节点为 TreeBin, 树节点 TreeNode 的进一步封装
- -3, 表明这个节点为 ReservationNode, 通过 JDK 1.8 新增的几个方法给 ConcurrentHashMap 设值时, 会先对应位置的节点变为 ReservationNode, 再做处理
比如 JDK 1.8 中 Map 新提供了 computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) 方法, 这个方法的作用, 就是向 Map 获取 key 对应的值不存在时, 将后面的 lambda 表达式的返回值设置到 Map 中, 然后返回。
这样就能保证从 Map 不会获取到 null 值。
Map<Integer, String> map = new ConcurrentHashMap<>();
map.put(1, "1");
// 输出: 2123
System.out.println(map.computeIfAbsent(2, k -> k + "123"));
// 输出: null
System.out.println(map.get(10));
computeIfAbsent 定义在 Map 接口中声明的方法, 同时基于 JDK 1.8 的特性, 用 default 关键字进行了修饰, 提供了默认的实现。
Map 提供的这些 default 方法, 可以发现都是线程不安全的, 所以 ConcurrentHashMap 对他们进行了重写, 在这些方法中, 需要对向已有的数组里面的添加新值,
那么就会向把对应位置的那一项, 设置为 ReservationNode 节点。用于通知其他的线程, 这个位置的数据在修改中。
2.2 关键属性
public class ConcurrentHashMap<K,V> {
// 装载 Node 的数组, 作为 ConcurrentHashMap 的数据容器, 采用懒加载的方式, 直到第一次插入数据的时候才会进行初始化操作, 数组的大小总是为 2 的幂次方
transient volatile Node<K,V>[] table;
// 扩容时使用, 平时为 null, 只有在扩容的时候才为非 null, 扩容时先把数据放在这个对象, 扩容完成后, 才把 table 指向这个对象
transient volatile Node<K,V>[] nextTable;
// 在没有发生争用时的元素个数的统计, 即当前的元素个数
transient volatile long baseCount;
// 数组大小控制
// 这个属性非常重要, 取值不同有不同的情况
// 1. 当 sizeCtl > 0 时
// 在 ConcurrentHashMap 声明, 但是内部存储数据的数组是延迟加载的, 这个数组的大小临时存放到 sizeCtl 中
// 初始化后表示扩容的阈值, 默认为当前数组的长度 * 0.75
// 2. 当 sizeCtl = 0 时
// 在声明 ConcurrentHashMap, 没有指定容量大小, 这时 sizeCtl 为 0, 也就是表明 数组 table 的长度去默认值: 16
// 3. 当 sizeCtl = -1 时
// 表明当前 table 正在初始中
// 4. 当 sizeCtl < -1 时
// sizeCtl 的数据类型为 int, 占 32 位。
// 那么 sizeCtl 的高 16 位存放的是扩容时标识符(具体的解释可以看后面, 现在知道有这个设定就行了), 低 16 位存放的是参与扩容的线程数目 (CocurrentHashMap 在扩容的时候, 有别的线程发现正在扩容中, 会一起参与进来, 一起扩容)
transient volatile int sizeCtl;
// 扩容索引值, 表示已经分配给扩容线程的 table 数组索引位置, 主要用来协调多个线程间迁移任务的并发安全
transient volatile int transferIndex;
}
2.3 关键内部类
public class ConcurrentHashMap<K,V> {
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
// 很多属性都是用 volatile 进行修饰的, 也就是为了保证内存可见性
volatile V val;
volatile Node<K,V> next;
......
}
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
......
}
static final class TreeBin<K,V> extends Node<K,V> {
// 这个类并不负责包装用户的 key、value 信息, 而是包装的很多 TreeNode 节点。
// 实际的 ConcurrentHashMap "数组" 中, 存放的是 TreeBin 对象, 而不是 TreeNode 对象
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // 持有写锁时的标志
static final int WAITER = 2; // 等待写锁的标志
static final int READER = 4; // 读锁的增加值
......
}
static final class ForwardingNode<K,V> extends Node<K,V> {
// 在扩容时才会出现的特殊节点, 作为一个标记节点放在桶的首位,
// 其 key, value, next 全部为 null, hash 为 MOVED (-1), 并拥有 nextTable 指针指向新的 table 数组
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
// hash, key, value, next
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
static final class ReservationNode<K,V> extends Node<K,V> {
// 调用 Map default 方法时, 会把节点修改为这个特殊节点
// 其 key, value, next 全部为 null, hash 为 RESERVED (-3)
ReservationNode() {
super(RESERVED, null, null);
}
Node<K,V> find(int h, Object k) {
return null;
}
}
}
结合上面几个内部类, 基本可以推理出 ConcurrentHashMap 的存储数据的方式和 HashMap 一样, 都是数组, 数组的数据类型可以是链表, 也可以是红黑树。
大体的结构如下:
2.4 一些高频的 CAS 方法
public class ConcurrentHashMap<K,V> {
/**
* 该方法用来获取 tab 数组中索引为 i 的 Node 元素
*/
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
/**
* 利用 cas 操作将 tab 数组中索引为 i 的元素从 c 替换为 v
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
/**
* 将 tab 数组中索引为 i 的元素设置为 v
*/
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}
}
3 ConcurrentHashMap 的源码实现
3.1 实例构造器方法
public class ConcurrentHashMap<K,V> {
// 1. 构造一个空的 map, 即 table 数组还未初始化, 初始化放在第一次插入数据时, 默认大小为 16
public ConcurrentHashMap(){}
// 2. 给定 map 的大小
public ConcurrentHashMap(int initialCapacity){}
// 3. 给定一个 map
public ConcurrentHashMap(Map<? extends K, ? extends V> m){}
// 4. 给定 map 的大小以及负载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor){}
// 5. 给定 map 大小, 加载因子以及最少多少个桶(数组的最小长度)
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel){}
}
ConcurrentHashMap 一共给我们提供了 5 种构造器方法, 具体使用请看注释, 我们来看看第 2 种构造器, 传入指定大小时的情况 (其他的构造方式都是类似的), 该构造器源码为:
public class ConcurrentHashMap<K,V> {
// 存储数据的数组的最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
public ConcurrentHashMap(int initialCapacity) {
//1. 小于 0 直接抛异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
//2. 判断指定的容量是否超过了允许的最大值, 超过了话则取最大值, 否则再对该值进一步处理, 使其为 2 的 n 次方
// 在 initialCapacity 的值大于等于 MAXIMUM_CAPACITY / 2 的情况下, 直接取最大值 MAXIMUM_CAPACITY, 否则重试计算出第一个大于 initialCapacity 的 2 的 n 次方的数
// initialCapacity >>> 1 相当于除以2 取整, 经过这样处理可以得到第一个大于 initialCapacity 的 2 的 n 次方的数
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//3. 赋值给 sizeCtl, 这时候 sizeCtl 作用是存放初始时数组的容量
this.sizeCtl = cap;
}
// 经过这个方法的处理后, 可以得到第一个大于等于 c 的 2 的 n 次方的数
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
// MAXIMUM_CAPACITY == 1 << 30
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
}
这段代码的逻辑请看注释, 很容易理解。
如果小于 0 就直接抛出异常, 如果指定值大于了所允许的最大值的话就取最大值。
否则, 在对指定值做进一步处理。最后将计算出来的容量 cap 赋值给 sizeCtl, 关于 sizeCtl 的说明请看上面的说明, 当调用构造器方法之后, sizeCtl 的大小就代表了 ConcurrentHashMap 的大小, 即 table 数组长度。
在指定容量的处理方法 tableSizeFor 中, 根据入参的值计算出第一个大于当前入参的 2 的 n 次方数, 这个值就是 ConcurrentHashMap 中数组进行声明时的容量了。
比如, 当指定大小为 18 时, 经过这个方法处理, 会得到一个 32 的值。
需要注意的是, 调用构造器方法的时候并未构造出 table 数组 (可以理解为 ConcurrentHashMap 的数据容器) , 只是算出 table 数组的长度, 当第一次向 ConcurrentHashMap 插入数据的时候才真正的完成初始化创建 table 数组的工作。
3.2 ConcurrentHashMap 中数组的初始方法 initTable()
在上面的指定容量的构造函数中, 可以看到, 只是对入参的容量参数进行了处理, 然后赋值给 sizeCtl, 就结束了, 而真正存储数据的 table 数组还是为空的。
这是一种懒加载的方式, 而只要第一次向里面放数据时, 就会进行数组的初始化, initTable 就是初始化的方法。
public class ConcurrentHashMap<K,V> {
private final Node<K,V>[] initTable() {
Node<K,V>[] tab;
int sc;
// table 为空 或者长度为 0, 进入循环, 否则进入后面的逻辑
while ((tab = table) == null || tab.length == 0) {
// 当前的 sizeCtl 小于 0, 表示 ConcurrentHashMap 正在扩容中
if ((sc = sizeCtl) < 0)
// 让当前线程让出行时间段, 使正在运行中的线程重新变成就绪状态, 后面重新竞争 CPU 的调度权
// 确保当前只有一个线程在初始化
Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 通过 CAS 将 sizeCtl 设置为 -1, 成功了, 进行初始化
// 在初始时, 会先将 sizeCtl CAS 为 -1, 也就是其他的线程进入到 while 里面, 也会在上面的判断时, 判断为 true 然后让出执行权
try {
// 再次判断 table 为 null 或者长度为 0
if ((tab = table) == null || tab.length == 0) {
// 如果指定的容量小于 0, 使用默认值 16, 进行声明, 否则就用指定的大小进行声明
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 初始数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 计算数组中可用的大小: 实际大小 n * 0.75 (加载因子)
// n >>> 2 相等于 n /2 /2 = n/4 = 0.25
// sc = n - 0.25 * n = 0.75 * n
sc = n - (n >>> 2);
}
} finally {
// 这时候 sizeCtl 存放的是阈值, 不是数组的长度, 也不是 -1 了
// 如果上面异常了, 可以把 sc 重新赋值过来, 也就是还是维护的是数组的长度
sizeCtl = sc;
}
}
}
}
}
代码的逻辑请见注释, 有可能存在一个情况是多个线程同时走到这个方法中, 为了保证能够正确初始化, 会在数组初始时做了一个判断 sizeCtl < 0 的判断,
若当前已经有一个线程正在初始化即 sizeCtl 值变为 -1, 这个时候其他线程在 if 判断为 true 从而调用 Thread.yield() 让出 CPU 时间片。
正在进行初始化的线程会调用 U.compareAndSwapInt 方法将 sizeCtl 改为 -1 即正在初始化的状态。
构造方法 + initTable 基本就构成了 ConcurrentHashMap 的初始的全部逻辑了。
3.3 ConcurrentHashMap 新增数据 - put() 方法
public class ConcurrentHashMap<K,V> {
static final int spread(int h) {
// ConcurrentHashMap 中 hash 值的计算方法
// h 异或 (h 无符号右移 16 位) 后, 再与上 HASH_BITS
// HASH_BITS = 0x7fffffff = 2147483647, 二进制表示为 01111111 11111111 11111111 11111111
return (h ^ (h >>> 16)) & HASH_BITS;
}
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 注意: 这里和 HashMap 的区别, HashMap 允许一个 key 为 0 和 多个 value 为 null
// ConcurrentHashMap 则 key 和 value 都不允许, 直接抛出异常
if (key == null || value == null)
throw new NullPointerException();
// 第一步, 计算 key 对应的 hash 值
int hash = spread(key.hashCode());
// 用来计算对应位置的链表的长度
int binCount = 0;
// 注意: 这里做了一个死循环, 自旋操作
for (Node<K,V>[] tab = table;;) {
Node<K,V> f;
int n, i, fh;
// 第二步, 当前的数据结构为 null 或者 长度为 0, 进行初始化
if (tab == null || (n = tab.length) == 0)
// 继续数组的声明, 数组创建成功后, 回到循环的头部, 重新循环
tab = initTable();
// 第三步, 将当前 key 对应的 hash 值 & 上 数组的长度 - 1 (这一步的操作, 在数组的长度为 2 的 n 次方的情况下《等同于 hash % 数组的长度)
// 得到当前 value 存放在数组的哪个位置, 并赋值给 i
// 通过 CAS 得到 i 位置的值, 如果为空, 进入判断里面的操作
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 把当前的数据封装为节点, 通过 cas 把这个节点放到 i 位置
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
// cas 操作成功, 跳出循环, 但是还是会执行到下面的 addCount 方法, 这时候 binCount = 0
break;
// 第四步, MOVED = -1; 对应位置的节点的 hash 为 -1, 既当前节点为 ForwardingNode 节点, 表示当前正在扩容, 当前线程进行协助扩容
} else if ((fh = f.hash) == MOVED) {
// 协助扩容
// 协助扩容后, 又会回到循环的头部, 重新处理
tab = helpTransfer(tab, f);
} else {
// 当前 value 应该存放到位置已经有节点了
V oldVal = null;
// 如果指定的位置不为 null, 同时不是 ForwardingNode, 对这个对象上锁
// 因为这里对这个节点上锁了, 所以扩容和添加新数据, 只有一个线程进入
synchronized (f) {
// 做多一次判断, 当前位置的节点还是指定的节点
if (tabAt(tab, i) == f) {
// 第五步, 当前为链表, 在链表中插入新的键值对, 或者存在 key 相同的, 进行值替换
// fh = 定位到的位置的 hash, > 0 为 链表, -2 表示的是树节点
if (fh >= 0) {
// 链表当前有节点个数
binCount = 1;
// 循环遍历 链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到 hash 和 key 完成相同的节点, 覆盖旧值即可
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
// 旧值
oldVal = e.val;
// 是否替换旧值, 这里是 !false
if (!onlyIfAbsent)
e.val = value;
// 这里旧值不为 null 并且 binCount >= 1, 走到下面的判断后, 不会走到 addCount 方法
break;
}
Node<K,V> pred = e;
// 如果当前节点的下一个节点为 null, 把当前数据封装为新节点放到链表的尾部, 跳出循环
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
// 这时候旧值为 null, binCount = 链表的长度, 会走到 addCount 方法
break;
}
}
}
// 第六步, 当前为红黑树节点的话, 将新的键值对插入到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
// 直接把 binCount 设置为 2, 用于后面判断, 跳出循环
binCount = 2;
// 将节点转为 TreeBin 红黑树, 调用 TreeBin 的 putTreeVal 方法, 尝试将当前节点放到树内
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 第七步, 插入完键值对后再根据实际大小看是否需要转换成红黑树
if (binCount != 0) {
// 链表的长度大于等于 8 了, 红黑树化
if (binCount >= TREEIFY_THRESHOLD)
// 在 treeifyBin 中会判断当前的数组长度, 如果长度小于 MIN_TREEIFY_CAPACITY = 64
// 不会红黑树化, 而是直接扩容
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 第八步, 新增元素节点 1 个, 当前新增元素所在位置的节点个数 binCount 元素计数,
// 在 addCount 中检查是否需要扩容
addCount(1L, binCount);
return null;
}
}
从整体而言, 为了解决线程安全的问题, ConcurrentHashMap 使用了 synchronized 和 CAS 的方式。
ConcurrentHashMap 的数据存储方式和 HashMap 没有多大的区别。ConcurrentHashMap 是一个哈希桶数组, 如果不出现哈希冲突的时候, 每个元素均匀的分布在哈希桶数组中。
当出现哈希冲突的时候, 是标准的链地址的解决方式, 将 hash 值相同的节点构成链表的形式, 称为 “拉链法”, 另外, 在 1.8 版本中为了防止拉链过长, 当链表的长度大于
8 (同时还需要满足数组的长度达到了 64, 否则只会进行数组的扩容, 进行重新调整) 的时候会将链表转换成红黑树。table 数组中的每个元素实际上是单链表的头结点或者红黑树的根节点。
上面的新增数据的流程大体如下:
第一步: 计算 key 的 hash 值
当插入键值对时, 首先应该定位到要插入的桶, 即插入 table 数组的索引 i 处。那么, 怎样计算得出索引 i 呢? 在 ConcurrentHash 通过 spread 方法就能得到 key 对应的 hash 值。
spread() 方法 主要是将 key 的 hashCode 和 key 的高 16 位进行异或运算, 这样不仅能够使得 hash 值能够分散, 均匀减小 hash 冲突的概率, 另外只用到了异或运算,
在性能开销上也能兼顾, 做到权衡。
第二步: table 的初始化
判断当前的存放数据的 table 是否为空或者长度为 0, 如果是的话, 调用 initTable() 方法进行初始化, 方法的介绍可以看上面的。
第三步: 能否直接将新值插入到 table 数组中
通过公式 通过当前 key 计算出来的 hash 值 & (当前存储数据的数组的长度 - 1), 得到当前的键值对应该存储在数组的哪个位置, 这个计算公式在数组长度确保为 2 的 n 次方下,
相当于 hash 对数组的长度取模, 既 hash % n
。
得到当前键值对待插入的位置, 再通过 tabAt() 方法获取该位置的节点, 如果节点为 null 的话, 就可以直接用 casTabAt() 方法将新值插入即可。
第四步: 当前是否正在扩容, 协助扩容
如果键值对待插入的位置的节点不为 null, 且该节点为特殊节点 (ForwardingNode) 的话, 就说明当前 ConcurrentHashMap 正在进行扩容操作。
怎样确定当前的这个 Node 是不是特殊的节点了? 是通过判断该节点的 hash 值是不是等于 -1(MOVED)。如果是说明当前正在扩容中, 则会协助扩容。
helpTransfer
协助扩容这里涉及到扩容的一些知识, 所以将其放到后面的扩容一起讲解。
第五步: 当前为链表, 在链表中插入新的键值对, 或者存在 key 相同的, 进行值替换
在 table[i] 不为 null 并且不为 ForwardingNode 时, 并且当前 Node f 的 hash 值大于 0 (fh >= 0) 的话说明当前节点 f 为当前桶的所有的节点组成的链表的头结点。
那么接下来, 要想向 ConcurrentHashMap 插入新值的话就是向这个链表插入新值就行了。通过 synchronized(f) 的方式进行加锁以实现线程安全性。
只对数组中这个位置的链表进行加锁, 而不是整个数组加锁, 提高了并发性。
第六步: 当 table[i] 为红黑树的根节点, 在红黑树中插入新值
按照之前的数组 + 链表的设计方案, 这里存在一个问题, 即使负载因子和 Hash 算法设计的再合理, 也免不了会出现拉链过长的情况, 一旦出现拉链过长, 甚至在极端情况下,
查找一个节点会出现时间复杂度为 O(n) 的情况, 则会严重影响 ConcurrentHashMap 的性能, 于是, 在 JDK 1.8 版本中, 对数据结构做了进一步的优化, 引入了红黑树。
而当链表长度太长 (默认超过 8) 时, 链表就转换为红黑树, 利用红黑树快速增删改查的特点提高 ConcurrentHashMap 的性能。
通过 f instanceof TreeBin 判断当前 table[i] 是否是树节点, 这下也正好验证了我们在最上面介绍时说的 TreeBin 会对 TreeNode 做进一步封装,
对红黑树进行操作的时候针对的是 TreeBin 而不是 TreeNode。
如果在红黑树中存在于待插入键值对的 Key 相同 (hash 值相等并且 equals 方法判断为 true) 的节点的话, 就覆盖旧值, 否则就向红黑树追加新节点
第七步: 插入完键值对后再根据实际大小看是否需要转换成红黑树
如果当前链表节点个数大于等于 8 (TREEIFY_THRESHOLD) 的时候, 就会调用 treeifyBin 方法将 tabel[i] (第 i 个散列桶) 拉链转换成红黑树。
第八步 添加元素计数, 并在 binCount 大于 0 时检查是否需要扩容
每次新增的时候都会对当前的数组的长度做一个检查, 超过了临界值, 就会进行扩容, 相关的方法 addCount
。
同样的, addCount
涉及到了扩容方面的知识, 将其也放到扩容的时候一起讲。
整体的流程是这样的:
- 首先对于每一个放入的值, 首先利用 spread 方法对 key 的 hashcode 进行一次 hash 计算, 由此来确定这个 key 应该存放在 table 中的位置
- 如果当前 table 数组还未初始化, 先将 table 数组进行初始化操作
- 如果要存放的位置是 null 的, 那么使用 cas 操作直接放入
- 如果这个位置存在结点, 说明发生了 hash 碰撞, 首先判断这个节点的类型。如果该节点 fh == MOVED (代表 forwardingNode, 数组正在进行扩容) 的话, 说明正在进行扩容, 协助扩容, 扩容结束后, 重新判断
- 如果是链表节点 (fh > 0), 则得到的结点就是 hash 值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到 key 相同的节点, 则只需要覆盖该结点的 value 值即可。否则依次向后遍历, 直到链表尾插入这个结点。
- 如果这个节点的类型是 TreeBin 的话, 直接调用红黑树的插入方法进行插入新的节点
- 插入完节点之后再次检查链表长度, 如果长度大于 8, 就把这个链表转换成红黑树
- 对当前容量大小进行检查, 判断是否需要扩容
3.4 ConcurrentHashMap 获取数据 - get() 方法
看完了 put 方法再来看 get 方法就很容易了, 用逆向思维去看就好, 怎么存, 反过来这么取就好了, get 方法的源码:
public class ConcurrentHashMap<K,V> {
public V get(Object key) {
Node<K,V>[] tab;
Node<K,V> e, p;
int n, eh; K ek;
// 得到 key 的 hash 值
int h = spread(key.hashCode());
// table 数组不为空同时长度大于 0, 计算得到的位置不为 null
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
// 指定位置的 hash key 都相同, 直接在头节点找到了, 直接返回这个节点值
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 节点的 hash 为 0 说明为树节点, 在红黑树中查找即可
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 依次遍历链表的每一个节点
while ((e = e.next) != null) {
// hash 和 key 都相同, 返回这个节点 value
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
// 找到了直接返回
return e.val;
}
}
return null;
}
}
整理后的流程大体如下:
- 计算出 key 的 hash
- 通过 hash 得到对应位置的节点, 如果为 null, 直接返回
- 判断对应位置的节点的 hash 和 key 是否一致, 如果是的话, 返回这个节点的值
- 判断这个节点的 hash 小于 0, 说明这个为树节点, 在树中进行查找
- 到了这一步, 说明这个节点为链表, 依次遍历这个链表的节点, 进行查找
- 如果都没有的的, 直接返回 null
3.5 ConcurrentHashMap 的扩容
当 ConcurrentHashMap 容量不足的时候, 需要对 table 进行扩容。这个方法的基本思想跟 HashMap 是很像的, 但是由于它是支持并发扩容的, 所以要复杂的多。
原因是它支持多线程进行扩容操作, 而并没有加锁。这样做的目的不仅仅是为了满足 concurrent 的要求, 而是希望利用并发处理去减少扩容带来的时间影响。
3.5.1 扩容标识的获取 - resizeStamp() 方法
public class ConcurrentHashMap<K,V> {
/**
* 返回一个标志位用于调整 table 的大小
* 这个数左移 RESIZE_STAMP_SHIFT, 也就是 16 位, 必须后能得到一个负数
*/
static final int resizeStamp(int n) {
// Interger.numberOfLeadingZero 的作用: 返回整型 n 非零最高位前面 0 的个数
// 比如: 13 的二进制为: 00000000 00000000 00000000 00001101 , 从右往左, 最后一个非零的前面有 28 个 0, 所以这里等于 28
// RESIZE_STAMP_BITS = 16 , 所以 1 << (RESIZE_STAMP_BITS - 1) 固定为 32768, 二进制表达为 00000000 00000000 10000000 00000000
// 这里的 |, 或操作, 2 位只要有一个是 1 就是 1 了, 在这里可以简单的看出是 2 个数相加
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
}
这个方法看起来就一行代码, 但是拆成三部分来看。
Integer.numberOfLeadingZeros(n)
这个方法的作用: 返回一个整形, 从左往右到第一个非 0 位的位数, 比如 13 的二进制为 00000000 00000000 00000000 00001101, 从左往右算到第一个 1 时, 总共有 28 位, 所以这个返回 28。
因为 int 为 32 位, 那么经过这个方法计算最终得到的结果最大为 32 (输入了 0), 最小当然是 0 (输入了 -1), 所以这个方法返回值在 0 - 32 之间。返回值的二进制为 00000000 00000000 00000000 00XXXXXX
1 << (RESIZE_STAMP_BITS - 1)
RESIZE_STAMP_BITS 是一个常量, 等于 16, 1 << (16 - 1) = 32768, 二进制为: 00000000 00000000 10000000 00000000, 刚好得到了一个第 16 位为 1 的数
| (或操作)
或运算, 这里可以看做是 2 个数相加, 将上面 2 步的结果进行或运算后, 可以得到一个二进制: 00000000 00000000 10000000 00XXXXXX
这个方法的作用是返回一个扩容标志符, 这个标志符经过左移 16 位后, 必须是一个负数。这个标志位主要用于后面的扩容。
通过这个方法得到的数字, 左移 16 位后, 为 10000000 00XXXXXX 00000000 00000000, 将这个数分成 2 部分。
高 16 位表示当前在扩容中, 而且保存了当前 table 的长度。
调用 resizeStamp() 方法, 传过来的 n = 当前 table 的长度, 而 table 的长度是 2^n 次方, 2^n 在二进制的形式为从 01 {n 个 0} 的形式, 结合 Interge.numberOfLeadingZeros 就可以知道当前 table 的长度了。
而低 16 位用在后面的记录当前参与扩容的线程数。每有一个线程参加了扩容就加 1, 所以最多参与扩容的线程个数就是 00000000 00000000 11111111 11111111 既 2^17 - 1 个线程。
先记住他的含义, 后面扩容的时候, 就能了解他的神奇之处。
3.5.2 扩容的判断 - addCount() 方法
在每次 put 新值的时候, 存放成功后, 都会执行一次是否需要扩容的检测, 如果需要就会调用扩容, 扩容的检测的方法就是 addCount() 方法
public class ConcurrentHashMap<K,V> {
/**
* 这个方法的作用: 更新当前的键值对的个数, 同时根据个数判断是否需要扩容
* 不存在并发的情况下, 键值对的个数存放在 baseCount 下
* 但是当出现了并发的时候, 键值对的个数的统计是通过数组 counterCells 每一项的 value 的累加
*
* @param x 增加的个数
* @param check 检查模式, check < 0 不进行扩容的的检测, check >= 0 进行扩容检测, 但是出现了并发时, 0<= check <= 1, 不进行扩容检测
*/
private final void addCount(long x, int check) {
// CounterCell 是 ConcurrentHashMap 中的一个内部类
// 整个类只有一个 long value 的属性
// 主要用于并发时, ConcurrentHashMap 的数据个数的计算
CounterCell[] as;
long b, s;
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 1. 用于统计数组个数的 counterCells 数组已经不为 null 了
// 2. 通过 cas 将当前直接存储 ConcurrentHashMap 元素个数的 bseCount + x, 失败 (baseCount 存放的是当前 ConcurrentHashMap 中存放的数据个数)
// 这 2 个情景都可以确定当前的增加元素个数进入了并发情景了
CounterCell a;
long v;
int m;
// 线程争用的状态标记, 默认为无竞争
boolean uncontended = true;
// ThreadLocalRandom.getProbe() 随机产生一个 int 的数字,
// 下面的 m = ConcurrentHashMap 的 CounterCell[] counterCells 数组的长度, 默认声明是为 2, 后面扩容也是在原来的基础上 * 2, 所以 counterCell 的长度也是 2 的 n 次方
// 所以通过 ThreadLocalRandom.getProbe() & m, 就是从 CounterCell[] counterCells 中随机去一个位置
if (as == null || (m = as.length - 1) < 0 || (a = cs[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)))) {
// 1. ConcurrentHashMap 的 CounterCell[] counterCells 为空或者长度小于 0
// 2. 从 CounterCell[] counterCells 随机取出一个位置为 null
// 3. 从 CounterCell[] counterCells 随机取出一个位置不为 null, 通过 cas 将这个位置的 CounterCell 的 value + x 失败了
// 这里只有是给当前的 ConcurrentHashMap 的元素个数加上 x, 通过 fullAddCount() 方法实现的
// 这个方法的复杂度很高, 就不展开了, 说一下大体的功能
// 先通过 ThreadLocalRandom.getProbe() 当前线程的用于产生随机数的随机种子
// 同一个线程一直调用这个方法会返回通过一个值, 调用 advanceProbe() 方法进行刷新, 再次调用 getProbe() 可以得到另一个新的随机种子
// 下面的整个操作的过程都是自旋的
// 1. 当前的 counterCells 有数据
// 1.1 通过获取到随机种子, 算出在 counterCells 中的一个位置
// 1.2 获取到的位置不为 null, 通过 cas 给这个位置的 CounterCell 的 value + x
// 1.2.1 cas 设置成功了, 结束整个方法
// 1.2.2 cas 设置失败了, 给 counterCells 进行扩容, 变为原来的 2 倍, 调用 ThreadLocalRandom.advanceProbe() 获取新的随机种子, 回到自旋循环的开头
// 1.3 获取到的位置为 null, 创建一个新的 CounterCell, value 为 x
// 1.3.1 在判断这个位置为 null, 将新的 CounterCell 放到这个位置, 结束
// 1.3.3 在判断这个位置不为 null, 回到自旋循环的开头
// 2. counterCells 没有数据, 通过 cas 将 baseCount 加 x
// 在 fullAddCount 中实际的实现比上面的复杂, 会通过 cas cellsBusy 从 0 设置为 1, 设置成功, 才能进行上面的操作, 设置失败都只能自旋
fullAddCount(x, uncontended);
// 注意这里的 return 有个问题, 通过 fullAddCount() 给当前的 ConcurrentHashMap 的元素个数 + 1, 为什么不检测是否需要扩容呢?
// 这里其实需要从多线程的角度考虑了, 能够进到这里, 有哪些情况? 结合上面 2 个 if
// 1. counterCells 不为 null, 并且通过 cas 更新 counterCells 的计算出来的位置的值失败, 那么就是有另一个线程更新了这个值, 扩容的检测交给那个线程就行了
// 2. counterCells 为 null, 通过 cas 更新 baseCount 失败了, 进到这里很大概率 as 为空, 同样的是交给另一个线程进行扩容的检测
return;
}
// 增加元素个数进入了并发情景了
// 不需要进行扩容的检测
if (check <= 1)
return;
// 统计总个数
// baseCount + CounterCell[] counterCells 中每个不为 null 的元素的 value
s = sumCount();
}
// 进行扩容的检测
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 当前的元素个数 > 阈值 并且 table 不为空, 同时 table 的长度小于最大容量 2^30, 还有扩容空间
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
//根据数组长度获取一个扩容标志, 格式 16 个 0 + 1 + 15 位 (表示当前 table 容量的二进制表示从左往右 0 的个数)
int rs = resizeStamp(n);
// 在上面我们说过了 sizeCtl 的不同取值有不同的含义,
// 小于 0, 初始中或者有线程在扩容中, 但是在 addCount 方法可以断定为扩容中
if (sc < 0) {
// 1. sc 无符号右移 16 位和扩容标识 rs 不相同, 将不做处理,
// 出现不同的原因就是 table 的长度改变了, 导致计算出来的 rs 变了 也就是上次的扩容完成了, 不进行处理
// 当第一个线程进行扩容的时候, 会把 sizeCtl 设置为计算出来的 rs 右移 16 位后 + 2,
// 所以当改变后的 sizeCtl 左移 16 位和当前的 rs 不一致了, 说明 table 的长度已经变了
// 如果 rs 从 + 1, 是无法区分 sizeCtl 低 16 位都是 0 时, 是扩容为开始还是扩容已经结束
// 如果直接 + 2, 后续减少时 -1, 那么低 16 位都是 0 表示扩容未开始, 低 16 位为 1, 扩容结束
// 2. sc == rs + 1 这里是一个 bug, 应该改为 sc == (rs <<< RESIZE_STAMP_SHIFT) + 1
// sc 在经过条件 1 后, 值是不会变的, 那么 sc 依旧是小于 0, 而 rs 在我们分析后必定是一个大于 0 的数, 所以 sc 一定不会等于 rs + 1, 应该是 sc == (rs <<< RESIZE_STAMP_SHIFT) + 1
// 这个 bug 可以查看 这里 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427, 预计在 JDK 12 修复, 在 14 的时候已经看到修复了
// 如果修改为上面的, 我们知道 sc 的低位存放的是当前线程参与扩容的线程数, 默认是从 2 开始累积的, 在扩容 transfer 中,
// 每有一个线程扩容完成时会 - 1, 当 sc 的低位减到了 1 时, 说明这时候扩容已经完成了, 而 rs 无符号左移 16 位后, 低 16 位都是 0,
// 如果 sc 的低位 = 右移后的 rs 的低位 + 1, 说明扩容完成了, 正在进行末尾的处理了, 不进行处理
// 3. 同上面的条件 2 一样, 应该修改为 sc == (rs <<< RESIZE_STAMP_SHIFT) + MAX_RESIZERS, 这里的 MAX_RESIZERS = 1 << 16 - 1;
// 也就是 00000000 00000000 11111111 11111111, 也就是我们扩容能支持的最大线程数, 这里也就是我们当前扩容的线程数达到了上限, 不进行处理
// 4. 在扩容的时候, 我们会先声明 nextTable, 逐渐将数据放到这个对象, 如果这个对象为 null, 说明没有线程在进行扩容, 不进行处理, 在上面的 size < 0, 我们可以得知线程进来是有线程在扩容, 现在 nextTable 为空, 说明扩容完成了
// 5. 转移索引标识 transferIndex <= 0, 不进行处理, 这个变量用于扩容时控制, 可以查看下面的扩容方法, <= 0 说明我们的扩容已经完成了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
break;
// 在 sizeCtl 添加多一个线程, 开始协助扩容, 这时候扩容的新数组就是 nextTable
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 尝试将 sizeCtl 设置为 扩容标志 rs 左移 16 位 + 2, 2 表示线程数默认是从 2 开始累积的
else if (U.compareAndSetInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
// cas 将 sizeCtl 设置成功, 开始扩容
// 进行扩容, 存放数据的新数组为 null,
transfer(tab, null);
// 重新计算元素个数
s = sumCount();
}
}
}
}
总的来说: addCount 就做了 2 件事
- 更新当前的元素个数
- 检查是否需要扩容或者协助扩容
3.5.3 扩容 - transfer() 方法
public class ConcurrentHashMap<K,V> {
/**
* 扩容
* 大体的步骤:
* 通过当前机器的 cpu 和 当前数组的容量算出一个迁移跨度 stride
* 每次一个线程进来扩容的话,就会分配当前数组的 stride 个位置给它, 另一线程进来或者当前线程 stride 个位置已经迁移完了,
* 就再分配另外 stride 个位置给他, 直到当前数组所有的位置都分配完成
* ConcurrentHashMap 的 transferIndex 扩容时默认为当前数组的容量, 类似于一个游标, 每次分配是从这个游标开始往前 stride 个位置就是分配个
* 这个线程的区间了。
* 里面声明的 2 个变量 i 和 boud, 就是当前线程可以处理的区间的位置 [bound, i], [transferIndex - 1 - stride, transferIndex - 1]
* 同时把 transferIndex 更新为 transferIndex - stride
*
* 遍历中的操作
* 1. 获取到的位置为空, 填充为 ForwardingNode 节点, 处理下一个节点
* 2. 获取到的位置的节点为 ForwardingNode, 跳过处理下一个节点
* 3. 判断当前位置的节点是否为链表或者树
* 3.1 链表, 新找到尾节点, 尾节点的 hash & 当前数组的容量 == 0 ? 放到低纬链表的头部, 放到高纬链表的头部, 使用同 HashMap 的高低位链表的方式进行迁移
* 3.2 遍历当前位置的链表直到倒数第二个, 处理的节点的 hash & 数组的长度 = 0? 应该放到低纬, 应该放到高纬度
* 3.3 把当前的节点封装为新的节点, 这个节点的下一个节点为当前的高纬/低纬节点, 然后把当前节点放到低纬/高纬的位置
* 3.4 最后把临时数组 nextTab 的 i 位置设置为当前的低纬链表, i + 当前数组的容量的位置设置为当期的高纬链表
* 3.5 把当前数组的 i 位置设置为 ForwardingNode 节点, 下一个遍历
*
* 迁移完成
* 把临时数据 nextTable 赋值给真正的存储数组 table
* 把 nextTable 设置为 null
* 把 sizeCtl 设置为 旧数组容量的 * 1.5
*
* 没法迁移了, 就给当前的 sizeCtl 减 1
*
* @param tab 扩容前的数组
* @param nextTab 新的数组, 扩容的话, 这里为 null, 协助扩容的为, 为新的数组
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// 这里的 tab 就是成员变量 table
int n = tab.length, stride;
// MIN_TRANSFER_STRIDE = 16 NCPU = 当前服务器的 CPU 核数
// n 为数组的长度, 默认为 2 的 n 次方, 所以这里的 >>> 3, 相当于除以 8
// 当前为多核服务器的话 stride = 当前的容量 / 8 / CPU 的数量
// 如果为单核的话的话, stride = 当前数组容量
// 最终计算出来的 stride 如果小于 16, 将其变为 16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// 如果用于存数据的下一个数组为 null, 进行初始, 然后把这个数组赋值给变量 nextTable,
if (nextTab == null) {
try {
// 扩容为原来的 2 倍
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
// 扩容失败, 直接将阈值变为 int 的最大值, 然后返回
sizeCtl = Integer.MAX_VALUE;
return;
}
// 将创建出来的扩容数组赋给给 nextTable, 可以让其他线程看见
nextTable = nextTab;
// 扩容索引变为扩容前数组的长度
transferIndex = n;
}
// 记录过渡数组的长度
int nextn = nextTab.length;
// 创建一个 ForwardingNode 节点, 用于占位。当别的线程发现这个槽位中是 ForwardingNode 类型的节点, 则跳过这个节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 该变量控制迁移的进行 true 说明可以再次迁移一个下标 (i--) ,
// 反之, 如果是 false, 那么就不能推进下标, 需要将当前的下标处理完毕才能继续推进
boolean advance = true;
// 完成状态, 如果是 true, 说明迁移完成, 可以结束此方法
boolean finishing = false;
// 下面的循环主要是
// 1. 为了计算出当前线程需要处理的区间 [transferIndex - stride, transferIndex], 即 [bound (transferIndex - stride), i(transferIndex - 1)],
// 同时更新 transferIndex = transferIndex - stride, 方便后续的线程或者线程重新获取的迁移的区间
// 2. 遍历中, 负责进行 i - 1 操作, 然后决定是继续遍历还是进行下一次分配
// i 表示处理的当前桶区间最大下标, bound 表示当前线程可以处理的当前桶区间最小下标
for (int i = 0, bound = 0;;) {
Node<K,V> f;
int fh;
// 当前线程是否可以向后推进, 这个循环就是控制 i 递减, 同时, 每个线程都会进入这里取得自己需要转移的桶的区间
while (advance) {
int nextIndex, nextBound;
// 对 i 减一, 判断是否大于等于 bound (正常情况下, 如果大于等于 bound 不成立, 说明该线程上次领取的任务已经完成了那么, 需要在下面继续领取任务)
// 如果对 i 减一, 大于等于 bound (还需要继续做任务) , 或者迁移完成了, 修改推进状态为 false, 不能推进了。 每迁移完成一个位置, 推进状态会被修改为 true
// 通常, 第一次进入循环, i-- 这个判断会无法通过, 从而走下面的 nextIndex 赋值操作 (获取最新的转移下标)
//其余情况都是: 如果可以推进, 将 i 减一, 然后修改成不可推进。如果 i 对应的位置处理成功了, 又把推进状态修改成可以推进。
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
// 如果 transferIndex 小于等于 0, 说明没有区间了, i 改成 -1, 推进状态变成 false, 不再推进, 表示, 扩容结束了, 当前线程可以退出了
i = -1;
advance = false;
} else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
// 通过 cas 减少 transferIndex (transferIndex - stride 跨度) 的值成功了, 为当前线程重新分配新的扩容区间
// 这里的目的是:
// 1. 当一个线程进入时, 使其可以获取最新的转移下标
// 2. 当一个线程处理完自己的区间时, 如果还有剩余区间的没有别的线程处理, 再次获取区间
// 这个值就是当前线程可以处理的最小当前区间最小下标
bound = nextBound;
// 初次对 i 赋值, 这个就是当前线程可以处理的当前区间的最大下标
i = nextIndex - 1;
advance = false;
}
}
// i < 0 (当 transferIndex <= 0 时, 数组已经分配完成了, 线程进入到上面的 while, 使得 i = -1, 然后走到这个方法, 让线程结束扩容 )
// i >= 旧数组的长度
// i + n >= 新数组的长度
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果完成了扩容
if (finishing) {
// 成员变量设置为 null
nextTable = null;
// 将迁移完成的数组赋值给 table
table = nextTab;
// sizeCtl 变为 1.5 倍 (2n - 0.5n)
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 通过 cas 设置 sizeCtl 的扩容线程数减 1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 一开始进入扩容时 sc + 2, 这里 - 2 等于扩容标识, 说明当前扩容完成了, 只剩一个线程了
// 让这个线程进行扩容完成后的处理
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 更新结束标志为 true, 推进标志为 true
finishing = advance = true;
// 将 i 重新设置为 n, 让上面的 --i >= bound 为 true, 可以进入后面 finish 的判断
i = n;
}
}
// 扩容时发现负责的区域有空的桶直接使用 ForwardingNode 填充
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 对应位置的节点为 -1 (MOVED) 已经处理了, 继续下一个位置的处理
else if ((fh = f.hash) == MOVED)
advance = true;
else {
// 对 table 的 i 位置的节点上锁, 防止 putVal 的时候向链表插入新的数据
synchronized (f) {
// 做一次判断, 确保对应位置的对象为需要处理的对象
if (tabAt(tab, i) == f) {
// 此处扩容和 HashMap 有点像, 在原数组的 pos 位置的节点, 在扩容后, 会在新数组的 pos 位置 或者 pos + 原数组长度的位置
// 前者为低纬 lowNode, 后者为高纬 highNode
Node<K,V> ln, hn;
// 如果 f 的 hash 值大于 0, 表明这个节点为链表
if (fh >= 0) {
// 节点的 hash & 旧数组的长度
int runBit = fh & n;
// 存储的是链表的尾结点
Node<K,V> lastRun = f;
// 遍历链表, 找到最后链表中最后的一个
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
// hash 和 runBit 不一样了, 进行替换
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 尾结点的 hash 处理后为 0, 将尾节点放在低纬链表
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 尾结点的 hash 处理后不为 0, 将尾结点放在高纬链表
else {
hn = lastRun;
ln = null;
}
// 没有上面的操作, 直接遍历链表也完成功能, 只是上面的都是做了一些优化
// 比如 原来数组的链表为 1(低纬) - 2(高纬) - 3(高纬) - 4(低纬) - 5(低纬) - 6(低纬) - 7(低纬)
// 那么经过上面的处理, lastRun = 4(低纬)
// ln = 4(低纬) - 5(低纬) - 6(低纬) - 7(低纬)
// 因为 lastRun 后面的节点必定和 lastRun 在同一个位置, 那么我们在迁移时, 只需要遍历到 lastRun 这个位置就行了
// 后面的都是位置和 lastRun 一样, 直接把 lastRun 移过来就行了, 后面的节点都在 lastRun 后面, 不用处理了
// 注意了, 这里是头插法, 也就是我们直接遍历链表, 把节点直接放到lastRun 的前面就行了
// 重新遍历链表, 以 lastRun 为结束标志
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 可以看出这里是头插法
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 在 nextTable 的 i 位置上插入一个链表
setTabAt(nextTab, i, ln);
// 在 nextTable 的 i + 旧数组长度的位置上插入一个链表
setTabAt(nextTab, i + n, hn);
// 在 table 的 i 位置上插入 forwardNode 节点
setTabAt(tab, i, fwd);
// 设置推进标志为 true
advance = true;
} else if (f instanceof TreeBin) {
// 树节点, 处理
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历树节点
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
// 和链表相同的判断, 与运算 == 0 的放在低位
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} else {
// 不是 0 的放在高位
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果树的节点数小于等于 6, 那么转成链表, 反之, 创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位树
setTabAt(nextTab, i, ln);
// 高位数
setTabAt(nextTab, i + n, hn);
// 旧的设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}
}
}
}
}
}
}
第一部分 构建一个 nextTable, 它的容量是原来的两倍, 这个操作是单线程完成的。新建 table 数组的代码为: Node<K,V>[] nt = (Node<K,V>[])new Node,?>[n << 1], 在原容量大小的基础上右移一位
第二部分就是将原来 table 中的元素复制到 nextTable 中, 主要是遍历复制的过程。 根据运算得到当前遍历的数组的位置 i, 然后利用 tabAt 方法获得i位置的元素再进行判断
- 如果这个位置为空, 就在原 table 中的 i 位置放入 forwardNode 节点, 这个也是触发并发扩容的关键点
- 如果这个位置是 Node 节点 (fh >= 0) , 如果它是一个链表的头节点, 就构造一个反序链表, 把他们分别放在 nextTable 的 i 和 i + n 的位置上
- 如果这个位置是 TreeBin 节点 (fh < 0) , 也做一个反序处理, 并且判断是否需要 untreefi, 把处理的结果分别放在 nextTable 的 i 和 i + n 的位置上
- 遍历过所有的节点以后就完成了复制工作, 这时让 nextTable 作为新的 table, 并且更新 sizeCtl 为新容量的 0.75 倍, 完成扩容
3.5.4 协助扩容 - helpTransfer() 方法
当 ConcurrentHashMap 正在扩容中, 有其他的线程计划向里面添加数据时, 因为正在扩容中, 无法添加。
但是 ConcurrentHashMap 的设计是不会让这个线程阻塞, 而是让这个线程帮忙扩容, 既执行 helpTransfer() 方法。
public class ConcurrentHashMap<K,V> {
/**
* @param tab 扩容的数组, 这里看成存放数据的 table 就对了
* @param f 当前处于扩容中的节点
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 1. 当前的 tab(也就是 table) 不为 null,
// 2. 当前的节点为 ForwardingNode
// 3. 当前 ForwardingNode 中用于临时存放扩容时, 数据的 nextTable 不为空
// 三个条件都满足了, 会进入协助扩容
if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
// 这里的逻辑和 addCount 里面的差不多, 就不做解释了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 通过 CAS 操作, 设置当前的扩容线程 + 1,
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 设置成功了, 进入协助扩容
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
}
4 ConcurrentHashMap 与 size 相关的一些方法
对于 ConcurrentHashMap 来说, 这个 table 里到底装了多少东西其实是个不确定的数量, 因为不可能在调用 size() 方法的时候像 GC 的 “stop the world” 一样让其他线程都停下来让你去统计,
因此只能说这个数量是个估计值。对于这个估计值, ConcurrentHashMap 也是大费周章才计算出来的。
为了统计元素个数, ConcurrentHashMap 定义了一些变量和一个内部类
public class ConcurrentHashMap<K,V> {
/**
* Contended 注解可以用来解决伪共享
*/
@sun.misc.Contended
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
/** 当出现了并发的时候, 整个 ConcurrentHashMap 的元素个数, 是分散存在这个数组内*/
private transient volatile CounterCell[] counterCells;
/** 创建 CounterCells 是充当自旋锁使用 */
private transient volatile int cellsBusy;
/** 实际保存的是 Map 中的元素个数 利用 CAS 锁进行更新, 但是当出现了并发情况, 元素的个数就不从这个进行获取了*/
private transient volatile long baseCount;
}
size() 方法
public class ConcurrentHashMap<K,V> {
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
/** counterCells 不为 null, 就是出现了并发情况, 这个对象被声明了 */
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;//所有counter的值求和
}
}
return sum;
}
/**
* mappingCount 与 size 方法的类似
* 官方的给出的注释来看, 应该使用 mappingCount 代替 size 方法
*/
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n;
}
}
5 总结
JDK 1.6, 1.7 中的 ConcurrentHashmap 主要使用 Segment 来实现减小锁粒度。 分割成若干个 Segment, 在 put 的时候需要锁住 Segment, get 时候不加锁, 使用 volatile 来保证可见性。
当要统计全局时 (比如 size) , 首先会尝试多次计算 modcount 来确定, 这几次尝试中, 是否有其他线程进行了修改操作, 如果没有, 则直接返回 size。如果有, 则需要依次锁住所有的 Segment 来计算。
1.8 之前 put 定位节点时要先定位到具体的 segment, 然后再在 segment 中定位到具体的桶。而在 1.8 的时候摒弃了 segment 臃肿的设计, 直接针对的是 Node[] table 数组中的每一个桶, 进一步减小了锁粒度。
并且防止拉链过长导致性能下降, 当链表长度大于 8 的时候采用红黑树的设计。
主要设计上的变化有以下几点:
- 放弃采用 segment 而采用 node, 锁住 node 来实现减小锁粒度
- 设计了 MOVED 状态, 当 resize 的过程中, 线程 2 在 put 数据, 线程 2 会帮助 resize
- 使用 3 个 CAS 操作来确保 node 的一些操作的原子性, 这种方式代替了锁。
- sizeCtl 的不同值来代表不同含义, 起到了控制的作用
- 采用 synchronized 而不是 ReentrantLock
更多关于 1.7 版本与 1.8 版本的 ConcurrentHashMap 的实现对比, 可以参考这篇文章。
6 参考
ConcurrentHashmap简介
ConcurrentHashMap源码阅读
How does ConcurrentHashMap resizeStamp method work