所以研究其子类,只需要看其对state的不同运用,和获取锁成功tryAcquire和释放锁tryRelease。
AtomicXXX:其实现逻辑就是通过compareAndSwap方法,底层通过cmpxchg汇编指令自动加Lock前缀来保证原子操作。
Lock前缀:Intel CPU中,lock前缀会锁定缓存行或者总线,并且会刷新storeBuffer。保证原子性、可见性。抽象内存模型
Striped64:分槽位,将竞争拆散到不同的Cell上。在sum时循环遍历所有Cell,将其值value相加即可。
获得锁的次数,非0表示没有线程持有锁。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //锁重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //线程安全,只有当前线程在操作 setState(nextc); return true; } return false;}xxxxxxxxxxprotected final boolean tryRelease(int releases) { int c = getState() - releases; //检测当前线程是否持有锁 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //释放,否则只是锁重入减少 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free;}高16位表示读锁及重入数量(线程重入数量使用Threadlocal),低16位表示写锁及锁重入数量。
同一线程持有读锁时,必须先释放读锁,才能再获取写锁
xxxxxxxxxxprotected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); //有锁(读/写) if (c != 0) { //读持有锁/不等于写锁线程,去阻塞(可以发现同一线程持有读锁时,必须先释放读锁,才能再获取写锁) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } //非公平:false if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true;}xxxxxxxxxxprotected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; //写锁为0,唤醒下一个 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free;}同一线程持有写锁时,可以直接获取读锁。非公平下,如果当前是读锁,同时一大批读锁过来会导致写操作很难执行(写饥饿)。
x返回值小于0,会进入阻塞队列 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //有写锁,且不是当前线程。直接去阻塞(可以发现同一线程持有写锁时,可以直接获取读锁) if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //读锁数量 int r = sharedCount(c); //非公平:head后的第一个节点是非share时,读阻塞 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //之前没有读 if (r == 0) { //局部变量缓存增加性能 firstReader = current; //缓存重入数量 firstReaderHoldCount = 1; } else if (firstReader == current) { //重入 firstReaderHoldCount++; } else { //获取缓存的其他读线程重入信息 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //进入完整流程 return fullTryAcquireShared(current);}
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) {//读阻塞 if (firstReader == current) { //下面会处理读重入 } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } //当前线程没有读重入,则去阻塞 if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1; } }}xxxxxxxxxx//返回true,唤醒下面的节点protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) //最后一个负责唤醒下面的 return nextc == 0; }}使用共享锁实现。主要作用是,其他线程控制await线程从阻塞变为运行。
表示允许多少个线程获取锁。
使用ReentrantLock+Condition实现。Condition原理
将等待的线程全部主要作用是,线程等待一起执行。count记录当前阻塞数量,parties记录需要的个数。
xxxxxxxxxxprivate int dowait(boolean timed, long nanos) { final ReentrantLock lock = this.lock; lock.lock(); try { //是否被破坏 final Generation g = generation; if (g.broken) throw new BrokenBarrierException();
//被中断,唤醒所有。设置broken=true; if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //全部到达了 if (index == 0) { boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //唤醒所有的等待队列的线程,重置数量。下一个parties nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } //循环等待 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { ... } if (g.broken) throw new BrokenBarrierException(); //如果当前派对不是自己要参加的,返回还未到达人数 //当前线程超时醒来之后正准备判断,另外线程加入刚好满足数量要求,调用nextGeneration,创建了generation。所有g!=generation if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); }}64位,分为2个32位
高32位:最高位标记是否结束 其他位表示phase阶段数
低32位:分为2个16位,高16位表示当前参与数,低16位表示未到达数
结束:最高位为1,也即state<0,
异常(空参与):低32位==1,即参与数为0.但是等待数却为1,当phase大于最大值时,会设置此状态
底层逻辑就是cas设置state。new Phaser(fatherphaser)场景比喻:公司准备了多个部门party,各个部门party都有人数要求。当所有部门都准备好开始时公司需要发表讲话之后各部门才能开始。此时就需要父类来统筹管理所有的子类。
添加参与者,如果在添加时发现party已经开始了,就需要加入新的队列中,所以需要两个队列来保存等待的线程。
xxxxxxxxxxprivate int doRegister(int registrations) { //设置低32位,参与数和需要等待数,添加时和原来相加 long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (;;) { //如果有父类,将父类的高32位阶段设置为自己 long s = (parent == null) ? state : reconcileState(); //获取低32位 int counts = (int)s; //需要人数 int parties = counts >>> PARTIES_SHIFT; //未到达人数 int unarrived = counts & UNARRIVED_MASK; if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); //获取阶段数 phase = (int)(s >>> PHASE_SHIFT); if (phase < 0)//结束 break; //EMPTY:参与者位0,未到达为1 if (counts != EMPTY) { //没有父类,或父类阶段和当前阶段一致 if (parent == null || reconcileState() == s) { //所有都到达 if (unarrived == 0) //多线程进入阻塞,协助唤醒上一阶段线程 root.internalAwaitAdvance(phase, null); //添加参与数 else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) break; } } //EMPTY,父类为空 else if (parent == null) { //添加到下一阶段为什么用|。因为EMPTY:参与者位0,未到达为1。|刚好重新设置 long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; } //EMPTY,父类不为空 else { synchronized (this) { if (state == s) { phase = parent.doRegister(1); if (phase < 0) break; while (!UNSAFE.compareAndSwapLong (this, stateOffset, s, ((long)phase << PHASE_SHIFT) | adjust)) { s = state; phase = (int)(root.state >>> PHASE_SHIFT); } break; } } } } return phase;}
协助唤醒前一个阶段的节点。生成node节点,加入队列中。奇偶两个队列。
xxxxxxxxxx//node不为空:接收中断、超时。 node为空,一直阻塞private int internalAwaitAdvance(int phase, QNode node) { //线程协作,帮助唤醒上一个阶段的等待节点 releaseWaiters(phase-1); //是否已经添加到队列 boolean queued = false; int lastUnarrived = 0; int spins = SPINS_PER_ARRIVAL; long s;int p; //阶段相同 while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) if (node == null) { //自旋优化,创建节点 int unarrived = (int)s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; boolean interrupted = Thread.interrupted(); if (interrupted || --spins < 0) { node = new QNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } } //完成或中止 else if (node.isReleasable()) break; else if (!queued) { AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); if ((q == null || q.phase == phase) && (int)(state >>> PHASE_SHIFT) == phase) queued = head.compareAndSet(q, node); } else { try { //阻塞 ForkJoinPool.managedBlock(node); } catch (InterruptedException ie) { node.wasInterrupted = true; } }
if (node != null) { if (node.thread != null) node.thread = null; if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) return abortWait(phase); } //线程协作,唤醒 releaseWaiters(phase); return p;}通知已到达。如果当前未到达数大于0,则去阻塞等待;否则回调onAdvance方法,看是否继续执行。设置下一阶段值并设置state,唤醒等待的线程。
xxxxxxxxxxpublic int arriveAndAwaitAdvance() { final Phaser root = this.root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); //异常 if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); //设置state,减少未到达数 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { //还有未到达数 if (unarrived > 1) return root.internalAwaitAdvance(phase, null); //通知父类一个到达 if (root != this) return parent.arriveAndAwaitAdvance(); long n = s & PARTIES_MASK; //参与数 int nextUnarrived = (int)n >>> PARTIES_SHIFT; //回调方法,返回true,则设置结束 if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; //没有参与者,设置异常 else if (nextUnarrived == 0) n |= EMPTY; //设置下一阶段的参与数 else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; //设置下一阶段的阶段数 n |= (long)nextPhase << PHASE_SHIFT; //设置下一阶段state,如果失败说明别的线程调用了forceTermination,关闭了。 if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) return (int)(state >>> PHASE_SHIFT); //唤醒等待的线程 releaseWaiters(phase); return nextPhase; } }}将当前线程移除,并且通知已到达。设置参与数-1,等待数-1,如果当前等待数为1,设置循环数+1,和其他位。唤醒其他等待线程
xxxxxxxxxx//adjust 1 | 1 << 16private int doArrive(int adjust) { final Phaser root = this.root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); //减少参与数、未到达数 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { if (unarrived == 1) {//全部到达 long n = s & PARTIES_MASK; int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (root == this) { if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; UNSAFE.compareAndSwapLong(this, stateOffset, s, n); releaseWaiters(phase); } else if (nextUnarrived == 0) {//参与数为0,当前值设为异常。交由父类执行 phase = parent.doArrive(ONE_DEREGISTER); UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); } else//参与数不为0 phase = parent.doArrive(ONE_ARRIVAL); } return phase; } }}低7位:读锁个数;第8:写锁
其他位:版本号,每使用一次写锁,就会+1
优化ReentrantReadWriteLock,使得读读、读写可并行。
获取乐观读锁,写标志位如果未设置,直接返回版本号;
xxxxxxxxxxpublic long tryOptimisticRead() { long s; return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;}返回true表示,在获取版本号stamp期间,没有写锁。
loadFence:读屏障。
Java层面:禁止读重排序;
C层面:禁止编译器优化;
操作系统层面:刷新invalidQueue队列
可以看到Intel x86平台下,jvm实现:使用了发布订阅API中的订阅方法acquire,只是调用了编译器屏障,因为此CPU不存在invalidQueue,使用嗅探技术,不存在读读、写读、写写乱序的问题。内存屏障
xxxxxxxxxxinline void OrderAccess::acquire() { compiler_barrier(); }xxxxxxxxxxpublic boolean validate(long stamp) { //inline void OrderAccess::acquire() { compiler_barrier(); } //为什么这里需要使用读屏障?不太理解 U.loadFence(); return (stamp & SBITS) == (state & SBITS);}xxxxxxxxxxpublic long readLock() { long s = state, next; //队列中没有等待节点,且没有写锁,读数量小于126,直接抢锁。 return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? next : acquireRead(false, 0L));}第一层for循环:
如果是第一个加入队列的节点,则创建head、tail节点,并且将自己添加到尾节点,并更新tail节点
如果队列中已经有节点且tail为写模式,则加入队尾,更新tail节点
如果队列中已经有节点且tail为读模式,则头插法加入尾节点的cowait链表
xxxxxxxxxxprivate long acquireRead(boolean interruptible, long deadline) { WNode node = null, p; for (int spins = -1;;) { WNode h; //队列中没有等待节点 if ((h = whead) == (p = wtail)) { for (long m, s, ns;;) { //没有写锁,直接抢锁 if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : //去别的变量中保存读数量 (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; //有写锁,自旋等待 else if (m >= WBIT) { if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else { if (spins == 0) { WNode nh = whead, np = wtail; if ((nh == h && np == p) || (h = nh) != (p = np)) break; } spins = SPINS; } } } } //尾节点为null,创建head、tail节点 if (p == null) { WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } //创建node else if (node == null) node = new WNode(RMODE, p); //队列没有节点、尾节点不是读模式(就将自己设置为尾节点) else if (h == p || p.mode != RMODE) { //更新前驱节点 if (node.prev != p) node.prev = p; //更新尾节点为当前节点 else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break; } } //队列中有数据,且尾节点为读模式,更新cowait,头插法 else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) node.cowait = null; //兄弟节点添加成功之后,就需要去阻塞了 else { for (;;) { WNode pp, c; Thread w; //帮助唤醒(如果head为读,且有兄弟节点 if ((h = whead) != null && (c = h.cowait) != null && U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); //前驱节点已经被唤醒(表示自己关联的读已经可以获取了,不需要阻塞) if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; //循环尝试抢锁 do { if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; } while (m < WBIT); } if (whead == h && p.prev == pp) { long time; //前驱节点被唤醒了或取消,从头开始重新去抢锁吧 if (pp == null || h == p || p.status > 0) { node = null; break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); } } } } //只有兄弟读的主会进来 for (int spins = -1;;) { WNode h, np, pp; int ps; if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { WNode c; Thread w; whead = node; node.prev = null; //抢锁成功,唤醒兄弟节点 while ((c = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } return ns; } //未获取锁,退出 else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } //帮助head唤醒兄弟 else if (h != null) { WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { //当前节点的前驱节点不等于p,更新前驱节点的next指向自己 if ((np = node.prev) != p) { if (np != null) (p = np).next = node; } else if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; //状态为阻塞,前驱节点不是head或存在读锁 if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); //被中断唤醒则取消节点 if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } }}xxxxxxxxxxpublic void unlockRead(long stamp) { long s, m; WNode h; for (;;) { if (((s = state) & SBITS) != (stamp & SBITS) || (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT) throw new IllegalMonitorStateException(); if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { if (m == RUNIT && (h = whead) != null && h.status != 0) release(h); break; } } else if (tryDecReaderOverflow(s) != 0L) break; }}只有没有写锁或者读锁时,才允许抢写锁。可以发现其实和ReentrantReadWriteLock.WriteLock差不多,都是排他锁。
xxxxxxxxxxpublic long writeLock() { long s, next; //低8位为0,直接抢写锁将第8位设置为1,成功返回。 return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : acquireWrite(false, 0L));}xxxxxxxxxxprivate long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; //自旋优化,就是不想去阻塞。创建node加入链接 for (int spins = -1;;) { long m, s, ns; //低7位等于0,尝试抢锁,设置第8位为1 if ((m = (s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) return ns; } else if (spins < 0) spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } //初始化尾节点 else if ((p = wtail) == null) { WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } //创建node节点 else if (node == null) node = new WNode(WMODE, p); //设置前驱节点 else if (node.prev != p) node.prev = p; //更新尾节点 else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break; } }
for (int spins = -1;;) { WNode h, np, pp; int ps; //当前节点的前一个节点等于head节点,自旋尝试抢锁 if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { long s, ns; //抢锁 if (((s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) { whead = node; node.prev = null; return ns; } }else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } //当前节点前驱节点不是head,并且head不为空。帮助唤醒head节点的兄弟节点 else if (h != null) { WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } //头节点还未变,当前节点前驱节点可能是head也可能不是 if (whead == h) { //当前节点的前驱节点不等于p,更新前驱节点的next指向自己 if ((np = node.prev) != p) { if (np != null) (p = np).next = node; } //将前驱节点状态更新为等待 else if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); //前驱节点取消,跳过。 else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } //前驱节点为等待状态 else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; //状态为阻塞,前驱节点不是head或低8为不为0存在读/写锁, if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); //被中断唤醒则取消节点 if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } }}可以发现写锁不允许重入。
xxxxxxxxxx//stamp: 00...1 1000 0000,低8位一定是1000 0000public void unlockWrite(long stamp) { WNode h; //状态不一致,或者写锁位为0 if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); //stamp=-128,11...1 1000 0000,时版本号已经被全部用完,重新开始(版本号+1,清空写锁位) state = (stamp += WBIT) == 0L ? ORIGIN : stamp; //头节点不为空,且为等待 if ((h = whead) != null && h.status != 0) release(h);}xxxxxxxxxxprivate void release(WNode h) { if (h != null) { WNode q; Thread w; U.compareAndSwapInt(h, WSTATUS, WAITING, 0); //next为null,从头往前或者下一个节点为取消,从尾往前找到最靠前的等待状态 if ((q = h.next) == null || q.status == CANCELLED) { for (WNode t = wtail; t != null && t != h; t = t.prev) if (t.status <= 0) q = t; } //唤醒下一个等待的节点 if (q != null && (w = q.thread) != null) U.unpark(w); }}