作者:不清不慎,Java大数据开发工程师一枚,热爱研究开源技术! 架构师社区合伙人!
Java的并发包下存在着许多并发工具,CAS也是其中的一员,对于CAS(CompareAndSwap)我们并不陌生,它基于乐观锁的机制,当前值等于期望值时设置成功,但是它也会导致一系列问题,当大量线程同时去访问时,就会导致空旋转,CPU资源消耗过多,而且执行效率也不高。同时它也会产生ABA问题,不过在java中可以使用基于时间版本号的AtomicStampedReference来解决。
(本文默认读者对CAS已经掌握,不懂的读者推荐阅读 方腾飞的《java并发编程的艺术》一书,讲解的比较详细)。
针对大量线程访问性能和效率问题,在java中引入了新类LongAdder
,它对CAS进行了优化,采用了CAS分段机制与自动分段迁移机制。下面我们来看看这两种机制的实现原理。
CAS分段机制与自动分段迁移机制
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//CAS失败hash到下一个cell
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
上面的代码比较复杂,最重要的是我们体会CAS分段的机制与自动迁移的机制,这是出自Doug Lea大师之手,在我们开发自己的系统中,这个思想非常值得借鉴。
最后,当我们需要获取多线程更新后的值的时候,只需要将base和cell数组中的值加起来返回即可。源码如下:
最后,该篇文章介绍完毕,如有问题,欢迎不吝赐教,留言讨论!
长按订阅更多精彩▼
如有收获,点个在看,诚挚感谢