所以研究其子类,只需要看其对state的不同运用,和获取锁成功tryAcquire和释放锁tryRelease。
AtomicXXX:其实现逻辑就是通过compareAndSwap方法,底层通过cmpxchg汇编指令自动加Lock前缀来保证原子操作。
Lock前缀:Intel CPU中,lock前缀会锁定缓存行或者总线,并且会刷新storeBuffer。保证原子性、可见性。抽象内存模型
Striped64:分槽位,将竞争拆散到不同的Cell上。在sum时循环遍历所有Cell,将其值value相加即可。
获得锁的次数,非0表示没有线程持有锁。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//锁重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//线程安全,只有当前线程在操作
setState(nextc);
return true;
}
return false;
}
xxxxxxxxxx
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//检测当前线程是否持有锁
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//释放,否则只是锁重入减少
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
高16位表示读锁及重入数量(线程重入数量使用Threadlocal),低16位表示写锁及锁重入数量。
同一线程持有读锁时,必须先释放读锁,才能再获取写锁
xxxxxxxxxx
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//有锁(读/写)
if (c != 0) {
//读持有锁/不等于写锁线程,去阻塞(可以发现同一线程持有读锁时,必须先释放读锁,才能再获取写锁)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
//非公平:false
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
xxxxxxxxxx
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//写锁为0,唤醒下一个
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
同一线程持有写锁时,可以直接获取读锁。非公平下,如果当前是读锁,同时一大批读锁过来会导致写操作很难执行(写饥饿)。
x返回值小于0,会进入阻塞队列
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//有写锁,且不是当前线程。直接去阻塞(可以发现同一线程持有写锁时,可以直接获取读锁)
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
//读锁数量
int r = sharedCount(c);
//非公平:head后的第一个节点是非share时,读阻塞
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
//之前没有读
if (r == 0) {
//局部变量缓存增加性能
firstReader = current;
//缓存重入数量
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//重入
firstReaderHoldCount++;
} else {
//获取缓存的其他读线程重入信息
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//进入完整流程
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {//读阻塞
if (firstReader == current) {
//下面会处理读重入
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
//当前线程没有读重入,则去阻塞
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
xxxxxxxxxx
//返回true,唤醒下面的节点
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
//最后一个负责唤醒下面的
return nextc == 0;
}
}
使用共享锁实现。主要作用是,其他线程控制await线程从阻塞变为运行。
表示允许多少个线程获取锁。
使用ReentrantLock+Condition实现。Condition原理
将等待的线程全部主要作用是,线程等待一起执行。count记录当前阻塞数量,parties记录需要的个数。
xxxxxxxxxx
private int dowait(boolean timed, long nanos) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//是否被破坏
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
//被中断,唤醒所有。设置broken=true;
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
//全部到达了
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//唤醒所有的等待队列的线程,重置数量。下一个parties
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//循环等待
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
...
}
if (g.broken)
throw new BrokenBarrierException();
//如果当前派对不是自己要参加的,返回还未到达人数
//当前线程超时醒来之后正准备判断,另外线程加入刚好满足数量要求,调用nextGeneration,创建了generation。所有g!=generation
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
64位,分为2个32位
高32位:最高位标记是否结束 其他位表示phase阶段数
低32位:分为2个16位,高16位表示当前参与数,低16位表示未到达数
结束:最高位为1,也即state<0,
异常(空参与):低32位==1,即参与数为0.但是等待数却为1,当phase大于最大值时,会设置此状态
底层逻辑就是cas设置state。new Phaser(fatherphaser)场景比喻:公司准备了多个部门party,各个部门party都有人数要求。当所有部门都准备好开始时公司需要发表讲话之后各部门才能开始。此时就需要父类来统筹管理所有的子类。
添加参与者,如果在添加时发现party已经开始了,就需要加入新的队列中,所以需要两个队列来保存等待的线程。
xxxxxxxxxx
private int doRegister(int registrations) {
//设置低32位,参与数和需要等待数,添加时和原来相加
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
//如果有父类,将父类的高32位阶段设置为自己
long s = (parent == null) ? state : reconcileState();
//获取低32位
int counts = (int)s;
//需要人数
int parties = counts >>> PARTIES_SHIFT;
//未到达人数
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
//获取阶段数
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)//结束
break;
//EMPTY:参与者位0,未到达为1
if (counts != EMPTY) {
//没有父类,或父类阶段和当前阶段一致
if (parent == null || reconcileState() == s) {
//所有都到达
if (unarrived == 0)
//多线程进入阻塞,协助唤醒上一阶段线程
root.internalAwaitAdvance(phase, null);
//添加参与数
else if (UNSAFE.compareAndSwapLong(this,
stateOffset, s, s + adjust))
break;
}
}
//EMPTY,父类为空
else if (parent == null) {
//添加到下一阶段为什么用|。因为EMPTY:参与者位0,未到达为1。|刚好重新设置
long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
//EMPTY,父类不为空
else {
synchronized (this) {
if (state == s) {
phase = parent.doRegister(1);
if (phase < 0)
break;
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
}
break;
}
}
}
}
return phase;
}
协助唤醒前一个阶段的节点。生成node节点,加入队列中。奇偶两个队列。
xxxxxxxxxx
//node不为空:接收中断、超时。 node为空,一直阻塞
private int internalAwaitAdvance(int phase, QNode node) {
//线程协作,帮助唤醒上一个阶段的等待节点
releaseWaiters(phase-1);
//是否已经添加到队列
boolean queued = false;
int lastUnarrived = 0;
int spins = SPINS_PER_ARRIVAL;
long s;int p;
//阶段相同
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase)
if (node == null) {
//自旋优化,创建节点
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) {
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
//完成或中止
else if (node.isReleasable())
break;
else if (!queued) {
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase)
queued = head.compareAndSet(q, node);
}
else {
try {
//阻塞
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
if (node != null) {
if (node.thread != null)
node.thread = null;
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase);
}
//线程协作,唤醒
releaseWaiters(phase);
return p;
}
通知已到达。如果当前未到达数大于0,则去阻塞等待;否则回调onAdvance方法,看是否继续执行。设置下一阶段值并设置state,唤醒等待的线程。
xxxxxxxxxx
public int arriveAndAwaitAdvance() {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
//异常
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
//设置state,减少未到达数
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) {
//还有未到达数
if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
//通知父类一个到达
if (root != this)
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK;
//参与数
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
//回调方法,返回true,则设置结束
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
//没有参与者,设置异常
else if (nextUnarrived == 0)
n |= EMPTY;
//设置下一阶段的参与数
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
//设置下一阶段的阶段数
n |= (long)nextPhase << PHASE_SHIFT;
//设置下一阶段state,如果失败说明别的线程调用了forceTermination,关闭了。
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT);
//唤醒等待的线程
releaseWaiters(phase);
return nextPhase;
}
}
}
将当前线程移除,并且通知已到达。设置参与数-1,等待数-1,如果当前等待数为1,设置循环数+1,和其他位。唤醒其他等待线程
xxxxxxxxxx
//adjust 1 | 1 << 16
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
//减少参与数、未到达数
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 1) {//全部到达
long n = s & PARTIES_MASK;
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
} else if (nextUnarrived == 0) {//参与数为0,当前值设为异常。交由父类执行
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY);
} else//参与数不为0
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
低7位:读锁个数;第8:写锁
其他位:版本号,每使用一次写锁,就会+1
优化ReentrantReadWriteLock,使得读读、读写可并行。
获取乐观读锁,写标志位如果未设置,直接返回版本号;
xxxxxxxxxx
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
返回true表示,在获取版本号stamp期间,没有写锁。
loadFence:读屏障。
Java层面:禁止读重排序;
C层面:禁止编译器优化;
操作系统层面:刷新invalidQueue队列
可以看到Intel x86平台下,jvm实现:使用了发布订阅API中的订阅方法acquire,只是调用了编译器屏障,因为此CPU不存在invalidQueue,使用嗅探技术,不存在读读、写读、写写乱序的问题。内存屏障
xxxxxxxxxx
inline void OrderAccess::acquire() { compiler_barrier(); }
xxxxxxxxxx
public boolean validate(long stamp) {
//inline void OrderAccess::acquire() { compiler_barrier(); }
//为什么这里需要使用读屏障?不太理解
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}
xxxxxxxxxx
public long readLock() {
long s = state, next;
//队列中没有等待节点,且没有写锁,读数量小于126,直接抢锁。
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
第一层for循环:
如果是第一个加入队列的节点,则创建head、tail节点,并且将自己添加到尾节点,并更新tail节点
如果队列中已经有节点且tail为写模式,则加入队尾,更新tail节点
如果队列中已经有节点且tail为读模式,则头插法加入尾节点的cowait链表
xxxxxxxxxx
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) {
WNode h;
//队列中没有等待节点
if ((h = whead) == (p = wtail)) {
for (long m, s, ns;;) {
//没有写锁,直接抢锁
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
//去别的变量中保存读数量
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns;
//有写锁,自旋等待
else if (m >= WBIT) {
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
if (spins == 0) {
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
spins = SPINS;
}
}
}
}
//尾节点为null,创建head、tail节点
if (p == null) {
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
//创建node
else if (node == null)
node = new WNode(RMODE, p);
//队列没有节点、尾节点不是读模式(就将自己设置为尾节点)
else if (h == p || p.mode != RMODE) {
//更新前驱节点
if (node.prev != p)
node.prev = p;
//更新尾节点为当前节点
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
//队列中有数据,且尾节点为读模式,更新cowait,头插法
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))
node.cowait = null;
//兄弟节点添加成功之后,就需要去阻塞了
else {
for (;;) {
WNode pp, c; Thread w;
//帮助唤醒(如果head为读,且有兄弟节点
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
//前驱节点已经被唤醒(表示自己关联的读已经可以获取了,不需要阻塞)
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
//循环尝试抢锁
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT);
}
if (whead == h && p.prev == pp) {
long time;
//前驱节点被唤醒了或取消,从头开始重新去抢锁吧
if (pp == null || h == p || p.status > 0) {
node = null;
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
//只有兄弟读的主会进来
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) {
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
WNode c; Thread w;
whead = node;
node.prev = null;
//抢锁成功,唤醒兄弟节点
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
//未获取锁,退出
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
}
//帮助head唤醒兄弟
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
//当前节点的前驱节点不等于p,更新前驱节点的next指向自己
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node;
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
//状态为阻塞,前驱节点不是head或存在读锁
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
//被中断唤醒则取消节点
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
xxxxxxxxxx
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
if (((s = state) & SBITS) != (stamp & SBITS) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
if (m < RFULL) {
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)
break;
}
}
只有没有写锁或者读锁时,才允许抢写锁。可以发现其实和ReentrantReadWriteLock.WriteLock差不多,都是排他锁。
xxxxxxxxxx
public long writeLock() {
long s, next;
//低8位为0,直接抢写锁将第8位设置为1,成功返回。
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
xxxxxxxxxx
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
//自旋优化,就是不想去阻塞。创建node加入链接
for (int spins = -1;;) {
long m, s, ns;
//低7位等于0,尝试抢锁,设置第8位为1
if ((m = (s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
}
else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
//初始化尾节点
else if ((p = wtail) == null) {
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
//创建node节点
else if (node == null)
node = new WNode(WMODE, p);
//设置前驱节点
else if (node.prev != p)
node.prev = p;
//更新尾节点
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
for (int spins = -1;;) {
WNode h, np, pp; int ps;
//当前节点的前一个节点等于head节点,自旋尝试抢锁
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) {
long s, ns;
//抢锁
if (((s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) {
whead = node;
node.prev = null;
return ns;
}
}else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
}
//当前节点前驱节点不是head,并且head不为空。帮助唤醒head节点的兄弟节点
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
//头节点还未变,当前节点前驱节点可能是head也可能不是
if (whead == h) {
//当前节点的前驱节点不等于p,更新前驱节点的next指向自己
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node;
}
//将前驱节点状态更新为等待
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
//前驱节点取消,跳过。
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
//前驱节点为等待状态
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
//状态为阻塞,前驱节点不是head或低8为不为0存在读/写锁,
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
//被中断唤醒则取消节点
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
可以发现写锁不允许重入。
xxxxxxxxxx
//stamp: 00...1 1000 0000,低8位一定是1000 0000
public void unlockWrite(long stamp) {
WNode h;
//状态不一致,或者写锁位为0
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
//stamp=-128,11...1 1000 0000,时版本号已经被全部用完,重新开始(版本号+1,清空写锁位)
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
//头节点不为空,且为等待
if ((h = whead) != null && h.status != 0)
release(h);
}
xxxxxxxxxx
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
//next为null,从头往前或者下一个节点为取消,从尾往前找到最靠前的等待状态
if ((q = h.next) == null || q.status == CANCELLED) {
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
//唤醒下一个等待的节点
if (q != null && (w = q.thread) != null)
U.unpark(w);
}
}