modified已修改、exclusive独占、shared共享、invalid失效
规则: 写变量时,如果变量不在缓存行,那么需要从内存中读取到缓存行之后,才能写入
lock:锁缓存行或者锁总线。lock前缀保证刷store buffer fence:会等待流水线之前的所有相关(store、load)指令执行完成并且从store buffer中刷入内存。
fence的性能相比lock非常差,所以在上层应用框架中使用lock前缀来保证刷store buffer,而在内核层面由于要考虑io等对指令的强一致性要求,所以一刀切使用mfence全屏障。
TSO:允许store-load 乱序
PSO:允许 store-load, store-store 乱序
RMO:允许 store-load, store-store,load-load,load-store 乱序
C:volatile保证编译器不优化,volatile修饰的变量不会保存到寄存器中,而是每次都从内存中读取最新值。
Java:volatile保证可见性、指令乱序问题 volatile写变量A时:StoreStore A StoreLoad volatile读变量A时:A LoadLoad LoadStore
JMM:StoreStore、StoreLoad、LoadLoad、LoadStore四种屏障来保证指令重排序问题。
JVM:Intel X86平台为例,因为cpu只允许storeload重排序,所以只需要在storeload中加入lock前缀即可,其他屏障只需要增加编译器屏障即可。内核屏障自带编译器屏障功能。
xLinux X86平台为例
static inline void compiler_barrier() { //编译器屏障
__asm__ volatile ("" : : : "memory");
}
inline void OrderAccess::loadload() { compiler_barrier(); }
inline void OrderAccess::storestore() { compiler_barrier(); }
inline void OrderAccess::loadstore() { compiler_barrier(); }
inline void OrderAccess::storeload() { fence(); } //由于x86存在store_buffer,所以存在storeload乱序问题,因此使用lock前缀,保证乱序问题
inline void OrderAccess::acquire() { compiler_barrier(); }
inline void OrderAccess::release() { compiler_barrier(); }
inline void OrderAccess::fence() { //lock前缀,全屏障功能
__asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
}
Linux操作系统:操作系统提供了三种屏障,在对称多处理时使用smp_mb系列屏障。因为操作系统需要考虑的情况很多,所以读屏障也会加lock前缀。
xxxxxxxxxx
//全屏障
//读屏障
//写屏障
Java对象头:
编译器层面:synchronize在方法上时增加一个ACC_SYNCHRONIZED标志(底层还是monitorenter、monitorexit),当synchronize(this)一个对象时,会在代码块开始、结束处增加monitorenter、monitorexit。
monitorenter加锁:
如果对象无锁,则设置对象头的threadId偏向自己,修改对象头为偏向锁
如果对象是偏向锁,但是非偏向线程访问,首先撤销偏向锁,然后升级为轻量级锁,在栈中找到一个BasicObjectLock,将旧的对象头保存在其中,并且修改对象头指向BasicObjectLock,修改对象头为轻量级锁
如果多线程访问时,则会升级为重量级锁,首先在inflate方法中创建一个ObjectMonitor对象,在enter方法中,首先CAS将当前线程加入阻塞队列cxq中,然后park。被唤醒之后再次循环获取锁(ObjectMonitor对象的_owner属性设置为自己,则表示获取锁,也可能owner指向线程栈的BasicObjectLock对象)
monitorexit释放锁:
如果是偏向锁,什么都不做
如果是轻量级锁,则设置对象头为无锁
如果是重量级锁,在exit方法中,如果EntryList中有数据则唤醒一个即可,否则CAS将cxq中的数据快照到EntryList中,此后就是单线程unpark唤醒等待线程。
ObjectMonitor实现重量级锁原理:
cxq:同aqs阻塞竞争队列
EntryList:快照读cxq,单线程唤醒
waitset:条件等待队列同reentrantLock的ConditionObject队列,用于处理wait、notify
object.wait:将当前线程加入waitset等待队列,释放锁。同ReentrantLock的await
object.notify:将等待线程从waitset中移除,并加入entryList或者cxq的头部或者尾部,唤醒等待线程。同ReentrantLock的signal,会将等待线程加入aqs阻塞队列,并且将其前一个node的状态改为signal状态。
推理AQS:
怎么设计:多线程访问共享数据时,只需要让多个线程CAS操作一个单原子变量state并且加volatile,即cmpxchg(state,0,1) 将state从0设置为1,即为获取锁成功,因为cmpxchg在操作系统加了lock前缀,所以本身就是原子性、线程安全的。其他失败的线程该干什么呢?当然是去一个队列中等待。
锁失败的线程如何处理:那么问题又来了,多个线程操作一个共享队列也有线程安全问题,并且共享队列的数据结构该如何选择:我们选择双向链表,因为会有频繁的插入和移除操作。我们假设操作链表是尾插头拿,即新节点插入尾部,从头节点唤醒。
如何删除节点:既然选择了双向链表,那么如何保证线程安全的删除节点呢?因为唤醒时是线程安全的(只有头节点的下一个节点才可以执行,顺序唤醒),所以只需要设置当前节点为头节点即可。而在取消节点时,将节点的状态设置为取消,cas设置prev.next指向当前节点的next,即从队列中移除。
如何加入节点:插入一个新节点,对哪个数据进行cas操作?tail指针、新节点的prev指针、尾节点的next指针
tail指针:当改变tail指针指向新节点时,当head指针(也是尾节点)释放锁时,head.next指针为null,唤醒不了新节点。死了!所以在cas之前先设置当前节点的prev节点为tail,之后cas设置tail几点,但是在唤醒时如果head.next节点为null,就得从tail节点反向遍历。
新节点的prev指针:当新节点的prev指针指向尾节点时,此时head指针(也是尾节点)释放锁,head.next指针为null,同样唤醒不了新节点。死了!
尾节点的next指针:在cas设置完尾节点tail.next之后,再修改tail节点为自己(延迟更新tail节点)只需要在使用时检测tail.next==null即可。ConcurrentLinkedQueue中使用
如何唤醒:在while循环中,如果前面节点是head则尝试获取锁(当head刚好释放锁时就可以直接抢锁),如果失败,则设置前一个节点状态为signal,之后再尝试获取锁失败之后阻塞park。被唤醒之后不一定能拿到锁(非公平锁),所以循环继续处理等待。获取锁的线程退出时,释放锁,之后从head.next节点唤醒等待的线程。如果为null,则从tail节点往前遍历找到距离head最近的可执行节点唤醒即可。
为什么非公平锁比公平锁性能高?
公平锁:就会使线程排队,并且进入内核态。有阻塞和和唤醒操作等浪费性能。
非公平锁:有可能抢锁直接成功,减少了阻塞和唤醒的操作。
如果获取锁的线程发现不满足执行条件则阻塞,等满足条件后继续执行该怎么办?此场景类似于生产者消费者模型。因此需要一个对象,包含头尾节点,用于保存条件队列。条件不满足的线程需要加入条件队列,并且释放锁。之后while循环检测是否已经满足条件。而其他线程获取锁之后使条件满足,唤醒条件队列的线程。
如何实现线程之间通信呢?即A线程使得条件满足如何通知等待在此条件队列的线程呢?当然是共享队列啊。AQS现成的阻塞队列啊。await线程只需要将自己加入条件队列,然后释放锁,之后循环判断自己是否已经被添加到阻塞队列,如果没有就park;否则就尝试抢锁失败时阻塞park。
为什么不直接使用阻塞队列,而使用条件队列最后又加入阻塞队列呢?想一下如果直接加入阻塞队列,其他线程还未将条件满足,此时unlock之后,会唤醒之后的节点,如果后面节点一大批都是条件队列的那么都是伪唤醒浪费大量时间,并且还可能导致lock的线程由于长时间获取不到锁而超时。所以必须要有一个过度的队列来保存还未满足条件时等待的线程,当满足条件时加入阻塞队列让其抢锁执行。
tryAcquire,尝试抢锁成功返回true
addWaiter,加入等待队列
acquireQueued,循环:抢锁(当前节点的前一个是head节点就尝试抢锁)、找到一个队列可以被唤醒的节点插入队列,之后睡眠。
xxxxxxxxxx
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
创建node节点,如果tail!=null,则尝试cas设置tail节点为node。否则走全流程enq。
xxxxxxxxxx
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
当前只有一个线程执行此方法,此时head和tail都是null。
cas设置head节点为new Node(),并且赋值tail,接下来cas设置tail=node,并且将node节点链接到head之后。
当前有多个线程执行此方法,此时head和tail都是null。
此时持有锁的线程释放,由于head和tail都是null,谁来唤醒当前线程?
在加入队列之后,阻塞之前尝试获取锁即可。
为什么是cas设置head而不是tail?
假设此处cas设置tail地址为1,那么设置成功之后,还未执行head=tail,此时有大量线程也要加入队列,执行addWaiter方法,由于tail不为null,则tail指针就会一直被更新,假设某一刻要执行head=tail,且tail的地址为10,那么执行head=tail时,由于tail加了volatile,都最新值。所以此时head=tail=10,那么1-9之间的线程怎么办,死了没法唤醒了。
如果cas设置head,那么设置成功之后,还未执行tail=head,此时有大量线程也要加入队列,执行addWaiter方法,由于tail=null,所以进入enq方法,等待tail=head设置完成,才会进行队列的尾加入,而不是像前面一样越过head=tail的设置,直接在addWaiter方法中进行尾加入。
xxxxxxxxxx
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
/**
if (compareAndSetTail(new Node()))
head = tail;
**/
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
抢锁(当前节点的前一个是head节点就尝试抢锁)解决了,多线程同时加入队列,并且head=null时,持有锁线程释放,导致队列中线程没人唤醒的问题。
找到一个可以被唤醒的节点插入队列,之后设置前面的节点状态为SIGNAL,如果前面节点一开始不是SIGNAL,那么返回false。此时当前线程继续执行1,因为当前节点的perv节点可能发生了变化,所以继续尝试获取锁。
阻塞park。
唤醒之后,检测当前线程是否被中断,parkAndCheckInterrupt(),被中断会返回true。但是中断标志已经被清除。所以在抢到锁返回true时,acquire方法会调用selfInterrupt()重新设置线程中断标记。由于此方法忽略中断,所以不要考虑中断。中断看lockInterruptibly()
可能被唤醒的线程没有抢到锁,继续执行2-3。
xxxxxxxxxx
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
线程释放锁成功时,只有当h != null && h.waitStatus != 0时,才会唤醒队列中的等待线程, 还有一种情况,即head节点不是一开始初始化的new Node()节点,此时waitStatus!=0,但是head.next=null。
xxxxxxxxxx
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
0:表示节点处于初始状态,即节点未被取消、未被等待、未被通知。
CANCELLED (1):表示节点已被取消。
SIGNAL (-1):表示后继节点已经被阻塞,当前节点在释放锁时需要唤醒后继节点
CONDITION (-2):表示节点在条件队列中等待。
PROPAGATE (-3):表示当前节点的线程应当传播唤醒信号。
如果head节点状态小于0,表示需要唤醒,则cas设置为0
如果head.next节点为null(处理next指针还未来得及设置的问题),或者是已取消,此时直接从tail往前遍历,唤醒最先进入队列并且可以被唤醒的等待线程。
注意此处索然从tail开始,但是也是一直往前找,找到最后一个满足的,而不是找到一个直接返回。满足公平性
xxxxxxxxxx
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
会有大量线程同时释放锁。获取head节点,当队列中还有阻塞线程时判断当前head节点状态,如果为-1,则将其设置为0,唤醒下一个。如果此时有多个线程释放,只能有一个将其设置为0。而其他线程则继续竞争将其设置为-3。其余线程则直接退出。
xxxxxxxxxx
private void doReleaseShared() {
for (;;) {
Node h = head;
//队列中还有等待节点
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//唤醒下一个
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
创建share节点加入到队列中,如果前一个节点是head节点,则尝试获取锁。抢锁成功返回当前可用数量,为0时表示信号量已经被全部使用。更新头节点,检测旧头节点、最新头节点是否被标记为-1或-3,当为-1时,可能伪唤醒(如果为-1,则表示此线程被唤醒调用tryAcquireShared成功返回之后,if (r > 0 ...)检测状态之前,有新线程加入队列并可能已经阻塞,而此时如果信号量为0,还是会被唤醒,但是是无效唤醒即伪唤醒)。如果是则唤醒下一个节点。
xxxxxxxxxx
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && (int r = tryAcquireShared(arg)) >= 0) {
Node h = head;
setHead(node);
//检测旧头节点、新头节点的状态(r=0,status=-1伪唤醒)
if (r > 0 || h.waitStatus < 0 || (h = head).waitStatus < 0) {
Node s = node.next;
//s==null情况。node为tail节点。新节点先cas设置tail,还未更新旧tail.next节点。发生这种情况时r>0或status=-3。而不可能是status=-1,因为设置前节点为-1是在完全插入队列之后。为此情况是在插入队列中。
if (s == null || s.isShared())
doReleaseShared();
}
p.next = null;
if (interrupted)
selfInterrupt();
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
}
}
要想说明此状态位的用处,以信号量为例Semaphore(2),A、B线程持有锁,等待队列中有1、2、3。并且3正在执行shouldParkAfterFailedAcquire,还未设置2的状态为-1。
此时A线程释放锁将head状态设置为0,调用unpark函数唤醒2,A直接退出。2正在准备执行setHeadAndPropagate函数且propagate=0,此时线程B释放锁,进入doReleaseShared,此时state=1。
此时2还未设置head指针
线程B发现head状态为0,则设置head=-3,B退出。此时2执行,发现h.waitStatus < 0,唤醒3,不管3现在有没有park。有可能3此时正好设置了2的状态为-1(从-3变为-1),park了(所以prop状态作用主要在此)。也有可能3此时还没有park。
此时2已经设置head指针
线程B设置head状态在2检测状态之前发生
线程B发现head状态为0,则设置head=-3,但是此时head已经被2设置,所以继续for循环,拿到最新的head,由于此时head状态=0,继续设置head=-3,B退出。此时2检测到h.waitStatus < 0,唤醒3。
线程B设置head状态在2检测状态之后发生
3设置2的状态为-1在2检测(h = head) == null || h.waitStatus < 0之前。
3设置2的状态为-1在线程B检测ws == Node.SIGNAL之前
伪唤醒
:除以上之外说明2的状态为0,线程B设置2状态为-3直接退出。那么谁来唤醒3呢?除了上面两种情况说明3还未阻塞,并且还未设置2的状态,那么在其设置完2状态-3变为-1。之后尝试抢锁成功,修改state=0。而又会去执行唤醒操作。因为head.waitStatus=-1,尽管state=0,无可用信号。如果此时还有其他节点4,那么就会造成伪唤醒问题。
首先添加到条件队列中,之后释放所有的锁(重入),循环判断是否被添加到阻塞队列中,如果没有则阻塞park;否则尝试抢锁或阻塞。
x
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
关联:object.wait的逻辑类似。加入waitSet队列中,释放synchronized重量级锁。循环检测是否被添加到cxq中。
xxxxxxxxxx
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
//创建节点
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
//自选加入_WaitSet中
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
//释放锁
exit (true, Self) ;
//阻塞
Self->_ParkEvent->park () ;
//循环抢锁阻塞
ReenterI (Self, &node) ;
}
void ATTR ObjectMonitor::ReenterI (Thread * Self, ObjectWaiter * SelfNode) {
JavaThread * jt = (JavaThread *) Self ;
...
int nWakeups = 0 ;
for (;;) {
ObjectWaiter::TStates v = SelfNode->TState ;
//当前一定是在cxq或者entryList队列中
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
//抢锁
if (TryLock (Self) > 0) break ;
//自旋等待
if (TrySpin (Self) > 0) break ;
//阻塞
Self->_ParkEvent->park () ;
//抢锁
if (TryLock(Self) > 0) break ;
}
...
}
将所有条件队列中的任务全部添加到阻塞队列中。
x
public final void signalAll() {
//判断是否持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
//加入阻塞队列
transferForSignal(first);
first = next;
} while (first != null);
}
关联:object.notifyAll的逻辑类似。将waitSet队列中任务添加到cxq队列中
x
void ObjectMonitor::notifyAll(TRAPS) {
for (;;) {
//获取_WaitSet一个节点
ObjectWaiter* iterator = DequeueWaiter () ;
//直到所有节点全部移动完
if (iterator == NULL) break ;
//设置状态
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
}
}