LongAdder源码
概览
LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧
LongAdder 类有几个关键域
// transient不会序列化
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;
优化机制
LongAdder 是 Java8 提供的类,跟 AtomicLong 有相同的效果,但对 CAS 机制进行了优化,尝试使用分段 CAS 以及自动分段迁移的方式来大幅度提升多线程高并发执行 CAS 操作的性能
CAS 底层实现是在一个循环中不断地尝试修改目标值,直到修改成功。如果竞争不激烈修改成功率很高,否则失败率很高,失败后这些重复的原子性操作会耗费性能(导致大量线程空循环,自旋转)
优化核心思想:数据分离,将 AtomicLong 的单点的更新压力分担到各个节点,空间换时间,在低并发的时候直接更新,可以保障和 AtomicLong 的性能基本一致,而在高并发的时候通过分散减少竞争,提高了性能
分段 CAS 机制:
- 在发生竞争时,创建 Cell 数组用于将不同线程的操作离散(通过 hash 等算法映射)到不同的节点上
- 设置多个累加单元(会根据需要扩容,最大为 CPU 核数),Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1] 等,最后将结果汇总
- 在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能
自动分段迁移机制:某个 Cell 的 value 执行 CAS 失败,就会自动寻找另一个 Cell 分段内的 value 值进行 CAS 操作
伪共享
Cell 为累加单元:数组访问索引是通过 Thread 里的 threadLocalRandomProbe 域取模实现的,这个域是 ThreadLocalRandom 更新的
// Striped64.Cell
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 用 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}
因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。缓存离cpu越近速度越快。 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long),缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中,CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效。
从 CPU 到 | 大约需要的时钟周期 |
---|---|
寄存器 | 1 cycle (4GHz 的 CPU 约为 0.25ns) |
L1 | 3~4 cycle |
L2 | 10~20 cycle |
L3 | 40~45 cycle |
内存 | 120~240 cycle |
Cell 是数组形式,在内存中是连续存储的,64 位系统中,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),每一个 cache line 为 64 字节,因此缓存行可以存下 2 个的 Cell 对象,当 Core-0 要修改 Cell[0]、Core-1 要修改 Cell[1],无论谁修改成功都会导致当前缓存行失效,从而导致对方的数据失效,需要重新去主存获取,影响效率
@sun.misc.Contended
:防止缓存行伪共享,在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,使用 2 倍于大多数硬件缓存行让 CPU 将对象预读至缓存时占用不同的缓存行,这样就不会造成对方缓存行的失效
源码解析
Striped64 类成员属性:
// 表示当前计算机CPU数量
static final int NCPU = Runtime.getRuntime().availableProcessors()
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域,当 cells 扩容时,也会将数据写到 base 中
transient volatile long base;
// 在 cells 初始化或扩容时只能有一个线程执行, 通过 CAS 更新 cellsBusy 置为 1 来实现一个锁
transient volatile int cellsBusy;
工作流程:
- cells 占用内存是相对比较大的,是惰性加载的,在无竞争或者其他线程正在初始化 cells 数组的情况下,直接更新 base 域
- 在第一次发生竞争时(casBase 失败)会创建一个大小为 2 的 cells 数组,将当前累加的值包装为 Cell 对象,放入映射的槽位上
- 分段累加的过程中,如果当前线程对应的 cells 槽位为空,就会新建 Cell 填充,如果出现竞争,就会重新计算线程对应的槽位,继续自旋尝试修改
- 分段迁移后还出现竞争就会扩容 cells 数组长度为原来的两倍,然后 rehash,数组长度总是 2 的 n 次幂,默认最大为 CPU 核数,但是可以超过,如果核数是 6 核,数组最长是 8
流程图分析:
add 流程图
longAccumulate 流程图
每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)
源码分析:
LongAdder#add:累加方法
javapublic void add(long x) { // as 为累加单元数组的引用,b 为基础值,v 表示期望值 // m 表示 cells 数组的长度 - 1,a 表示当前线程命中的 cell 单元格 Cell[] as; long b, v; int m; Cell a; // cells 不为空说明 cells 已经被初始化,线程发生了竞争,去更新对应的 cell 槽位 // 进入 || 后的逻辑去更新 base 域,更新失败表示发生竞争进入条件 if ((as = cells) != null || !casBase(b = base, b + x)) { // uncontended 为 true 表示 cell 没有竞争 boolean uncontended = true; // 条件一: true 说明 cells 未初始化,多线程写 base 发生竞争需要进行初始化 cells 数组 // fasle 说明 cells 已经初始化,进行下一个条件寻找自己的 cell 去累加 // 条件二: getProbe() 获取 hash 值,& m 的逻辑和 HashMap 的逻辑相同,保证散列的均匀性 // true 说明当前线程对应下标的 cell 为空,需要创建 cell // false 说明当前线程对应的 cell 不为空,进行下一个条件【将 x 值累加到对应的 cell 中】 // 条件三: 有取反符号,false 说明 cas 成功,直接返回,true 说明失败,当前线程对应的 cell 有竞争 if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); // 【uncontended 在对应的 cell 上累加失败的时候才为 false,其余情况均为 true】 } }
Striped64#longAccumulate:cell 数组创建
java// x null false | true final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; // 当前线程还没有对应的 cell, 需要随机生成一个 hash 值用来将当前线程绑定到 cell if ((h = getProbe()) == 0) { // 初始化 probe,获取 hash 值 ThreadLocalRandom.current(); h = getProbe(); // 默认情况下 当前线程肯定是写入到了 cells[0] 位置,不把它当做一次真正的竞争 wasUncontended = true; } // 表示【扩容意向】,false 一定不会扩容,true 可能会扩容 boolean collide = false; //自旋 for (;;) { // as 表示cells引用,a 表示当前线程命中的 cell,n 表示 cells 数组长度,v 表示 期望值 Cell[] as; Cell a; int n; long v; // 【CASE1】: 表示 cells 已经初始化了,当前线程应该将数据写入到对应的 cell 中 if ((as = cells) != null && (n = as.length) > 0) { // CASE1.1: true 表示当前线程对应的索引下标的 Cell 为 null,需要创建 new Cell if ((a = as[(n - 1) & h]) == null) { // 判断 cellsBusy 是否被锁 if (cellsBusy == 0) { // 创建 cell, 初始累加值为 x Cell r = new Cell(x); // 加锁 if (cellsBusy == 0 && casCellsBusy()) { // 创建成功标记,进入【创建 cell 逻辑】 boolean created = false; try { Cell[] rs; int m, j; // 把当前 cells 数组赋值给 rs,并且不为 null if ((rs = cells) != null && (m = rs.length) > 0 && // 再次判断防止其它线程初始化过该位置,当前线程再次初始化该位置会造成数据丢失 // 因为这里是线程安全的判断,进行的逻辑不会被其他线程影响 rs[j = (m - 1) & h] == null) { // 把新创建的 cell 填充至当前位置 rs[j] = r; created = true; // 表示创建完成 } } finally { cellsBusy = 0; // 解锁 } if (created) // true 表示创建完成,可以推出循环了 break; continue; } } collide = false; } // CASE1.2: 条件成立说明线程对应的 cell 有竞争, 改变线程对应的 cell 来重试 cas else if (!wasUncontended) wasUncontended = true; // CASE 1.3: 当前线程 rehash 过,如果新命中的 cell 不为空,就尝试累加,false 说明新命中也有竞争 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // CASE 1.4: cells 长度已经超过了最大长度 CPU 内核的数量或者已经扩容 else if (n >= NCPU || cells != as) collide = false; // 扩容意向改为false,【表示不能扩容了】 // CASE 1.5: 更改扩容意向,如果 n >= NCPU,这里就永远不会执行到,case1.4 永远先于 1.5 执行 else if (!collide) collide = true; // CASE 1.6: 【扩容逻辑】,进行加锁 else if (cellsBusy == 0 && casCellsBusy()) { try { // 线程安全的检查,防止期间被其他线程扩容了 if (cells == as) { // 扩容为以前的 2 倍 Cell[] rs = new Cell[n << 1]; // 遍历移动值 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 把扩容后的引用给 cells cells = rs; } } finally { cellsBusy = 0; // 解锁 } collide = false; // 扩容意向改为 false,表示不扩容了 continue; } // 重置当前线程 Hash 值,这就是【分段迁移机制】 h = advanceProbe(h); } // 【CASE2】: 运行到这说明 cells 还未初始化,as 为null // 判断是否没有加锁,没有加锁就用 CAS 加锁 // 条件二判断是否其它线程在当前线程给 as 赋值之后修改了 cells,这里不是线程安全的判断 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 初始化标志,开始 【初始化 cells 数组】 boolean init = false; try { // 再次判断 cells == as 防止其它线程已经提前初始化了,当前线程再次初始化导致丢失数据 // 因为这里是【线程安全的,重新检查,经典 DCL】 if (cells == as) { Cell[] rs = new Cell[2]; // 初始化数组大小为2 rs[h & 1] = new Cell(x); // 填充线程对应的cell cells = rs; init = true; // 初始化成功,标记置为 true } } finally { cellsBusy = 0; // 解锁啊 } if (init) break; // 初始化成功直接跳出自旋 } // 【CASE3】: 运行到这说明其他线程在初始化 cells,当前线程将值累加到 base,累加成功直接结束自旋 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; } }
sum:获取最终结果通过 sum 整合,保证最终一致性,不保证强一致性
javapublic long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { // 遍历 累加 for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }