【JUC】两图理解Condition的等待、唤醒逻辑

使用样例

ThreadA ThreadB ThreadC ThreadD - (插队专业户)Lock lock = new ReentrantLock(); lock.lock(); // ThreadA阻塞在这里 lock.newCondition().await(); // do business.. lock.unlock(); // ———— 无忧无虑的分割线 ————lock.lock(); // ThreadB进入这里,触发signal lock.newCondition().signal(); lock.unlock();

1、图解await
【JUC】两图理解Condition的等待、唤醒逻辑
文章图片

条件:
await()在lock()和unlock()之间执行——执行await的方法必定持有锁(owner记录的线程)
步骤A:
await执行时会做三件事:
1.清除state和owner
2.唤醒工作队列的head.next
3.加入并阻塞在等待队列中
步骤B:
head.next节点绑定的线程(图中为ThreadB)被唤醒,试图抢占锁。抢占成功从工作队列移除(当然ThreadB执行await时,也会重复步骤A)
2、图解signal
【JUC】两图理解Condition的等待、唤醒逻辑
文章图片

signal只做一件事,将等待队列的头节点释放迁移至工作队列的尾部(尾插,图中绿色线)
ThreadA真正的释放要等unlock()方法
以下用代码详细解释上面的两幅图
一、结构说明 Condition结构
public class ConditionObject implements Condition, java.io.Serializable { /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;

Node结构
class Node {/** 独占 */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL= -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** 持有线程 */ volatile Thread thread; /** 等待队列相关 */ volatile int waitStatus; Node nextWaiter; /** 工作队列相关 */ volatile Node prev; volatile Node next;

二、await
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // == 1、创建condition waiter队列(一个新的等待队列),node是尾节点 Node node = addConditionWaiter(); // == 2、state状态清零,解除阻塞状态->此时其它线程可以抢占锁 int savedState = fullyRelease(node); int interruptMode = 0; // == 3.1、不在工作队列中 // 这里的限制:只有在工作队列的node,才能跳出循环,进入3.2逻辑 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }// == 3.2、在工作队列中,尝试排队获取锁(未获取到锁则阻塞在工作队列中) if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

1、创建等待队列
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter private Node addConditionWaiter() { Node t = lastWaiter; // 移除“取消”状态的节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; }// 创建node节点,waitStatus标记为condition->`-2` Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }

2.释放lock
java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 还原state,exclusiveOwnerThread清空;工作队列中原本被阻塞的第一个线程(ThreadB)释放 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }

3.1是否在工作队列中
final boolean isOnSyncQueue(Node node) { // -- 等待队列节点,或无前置节点的,一定不在工作队列,返回false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // -- 在队列中,返回true if (node.next != null) // If has successor, it must be on queue return true; // -- 从工作队列的尾部查找 return findNodeFromTail(node); }

3.2排队获取
【【JUC】两图理解Condition的等待、唤醒逻辑】(在ReentrantLock已分析过,直接粘贴过来)
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 循环中 for (; ; ) { final Node p = node.predecessor(); // ### 前置节点是头节点,有机会尝试获取 //(结合下一个if判断,会自旋两次,也就是说有两次尝试获取机会) if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // ### 第1次将waitstatus设置成signal返回false // ###第2次判断waitstatus==signal返回true if (shouldParkAfterFailedAcquire(p, node) // === 线程阻塞(未来唤醒时,从此处继续执行) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }### private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // -- 第二次调用 if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } // -- 第一次调用 else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }=== private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 当前线程是否被中断 return Thread.interrupted(); }

三、signal
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal public final void signal() { // exclusiveOwnerThread持有线程判定 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 唤醒的是等待队列头节点 if (first != null) // == 队列节点转移(等待队列->工作队列) doSignal(first); }

java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#doSignal private void doSignal(Node first) { do { // -- 释放等待队列头节点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while ( // -- 节点转移 !transferForSignal(first) && (first = firstWaiter) != null); }final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 尾插工作队列 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

    推荐阅读