JUC|十一、AbstractQueuedSynchronizer源码分析

一、AbstractQueuedSynchronizer简介 1、概述

1、AbstractQueuedSynchronizer(抽象队列同步器),来自于JDK1.5,位于JUC包下,简称AQS;AQS作为一个抽象类,是构建JUC包中的锁或者其它同步组件的底层基础框架及JUC体系的基石。 2、在每一个同步组件(如:ReentrantLock、CountDownLatch、Semaphore、CyclicBarrier、ReentrantReadWriteLock等)的实现类中,都具有AQS的实现类作为内部类。 【JUC|十一、AbstractQueuedSynchronizer源码分析】JUC|十一、AbstractQueuedSynchronizer源码分析
文章图片

3、锁和同步组件关系:
  • 锁(如ReentrantLock)是面向锁的使用者:定义了程序员和锁交互的使用层API,隐藏了实现细节,调用即可。
  • 同步组件是面向锁的实现者:如AQS简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、线程的等待与唤醒等更加底层的操作。我们如果想要自己写一个“锁”,那么就可以直接利用AQS框架实现,而不需要去关心那些更加底层的东西。
2、AQS的核心思想
1、AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。 2、CLH(Craig,Landin,and Hagersten)队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果JUC|十一、AbstractQueuedSynchronizer源码分析
文章图片

3、它的head引用指向的头结点作为哨兵结点,不存储任何与等待线程相关的信息,或者可以看成已经获得锁的结点。第二个结点开始才是真正的等待线程构建的结点,后续的结点会加入到链表尾部。 4、将新结点添加到链表尾部的方法是compareAndSetTail(Node expect,Node update)方法,该方法是一个CAS方法,能够保证线程安全。 5、同步队列遵循先进先出(FIFO),头结点的next结点是将要获取到锁的结点,线程在释放锁的时候将会唤醒后继结点,然后后继结点会尝试获取锁。
3、AQS类的设计
1、AbstractQueuedSynchronizer被设计为一个抽象类,它使用了一个volatile int类型的成员变量state来表示同步状态(或者说资源),通过一个内置的FIFO双向同步队列来完成资源获取线程的排队等待工作,通过一个或者多个ConditionObject条件队列来实现线程之间的通信(等待和唤醒)。 2、Lock中“锁”的状态使用state变量来表示,一般来说0表示锁没被占用,大于0表示所已经被占用了
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {/** * 共享变量,使用volatile修饰保证线程可见性 */ private volatile int state; /** * 返回同步状态的当前值 */ protected final int getState() { return state; }/** * 设置同步状态的值,此操作具有volatile写的内存语义,因此每次写数据都是写回主存并导致其它缓存实效 */ protected final void setState(int newState) { state = newState; }/** * 如果当前同步状态的值等于expect(期望值),则以原子方式将同步状态设置为给定的update(更新值) * stateOffset为state内存偏移地址,底层通过 * unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"))方法 * 获取state内存偏移地址 */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } }

4、AQS对资源的共享方式
1、独占(Exclusive):只有一个线程能执行,如ReentrantLock,又可分为公平锁和非公平锁
  • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
2、共享(Share):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock等 3、注:ReentrantReadWriteLock可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
5、AQS底层使用了模板方法模式
1、同步器的设计是基于模板方法模式的,如果需要自定义同步器需要继承AbstractQueuedSynchronizer并重写指定的方法(这些重写方法很简单,无非是对于共享资源state的获取和释放),需要重写方法如下:
  • isHeldExclusively():该线程是否正在独占资源,只有用到condition才需要去实现它。
  • tryAcquire(int arg):尝试以独占模式获取资源,成功则返回true,失败则返回false。
  • tryRelease(int arg):尝试以独占模式释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int arg):尝试以共享模式获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int arg):尝试以共享模式释放资源,成功则返回true,失败则返回false。
2、以上的方法在AQS中并没有提供实现,如果子类不重写就直接调用还会抛出异常。
6、父类AbstractOwnableSynchronizer
1、AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {/** Use serial ID even though all fields transient. */ private static final long serialVersionUID = 3737899427754241961L; /** * 空参构造 */ protected AbstractOwnableSynchronizer() { }/** * 独占模式下的线程 */ private transient Thread exclusiveOwnerThread; /** * 设置独占线程 */ protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }/** * 获取独占线程 */ protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }

7、AQS的属性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {private static final long serialVersionUID = 7373984972572414691L; // 头结点 private transient volatile Node head; // 尾结点 private transient volatile Node tail; // 状态 private volatile int state; // 自旋时间 static final long spinForTimeoutThreshold = 1000L; // Unsafe类实例 private static final Unsafe unsafe = Unsafe.getUnsafe(); // state(状态)内存偏移地址 private static final long stateOffset; // head(头结点)内存偏移地址 private static final long headOffset; // tail(尾结点)内存偏移地址 private static final long tailOffset; // waitStatus(等待状态在Node类中)内存偏移地址 private static final long waitStatusOffset; // next(后继结点在Node类中)内存偏移地址 private static final long nextOffset; // 静态初始化块 static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } }

9、类的内部类 - Node类
1、每个被阻塞的线程都会被封装成一个Node结点,放入队列。每个结点包含了一个Thread类型的引用,并且每个结点都存在一个状态(waitStatus),具体状态如下:
  • CANCELLED:值为1,指示当前结点(线程)需要取消等待,由于在同步队列中等待的线程发生等待超时、中断、异常,即放弃获取锁,需要从同步队列中取消等待,就会变成这个状态,如果结点进入该状态,那么不会再变成其他状态。
  • SIGNAL:值为-1,指示当前结点(线程)的后续结点(线程)需要取消等待(被唤醒);如果一个结点状态被设置为SIGNAL,那么后继结点的线程处于挂起或者即将挂起的状态,当前结点的线程如果释放了锁或者放弃获取锁并且结点状态为SIGNAL,那么将会尝试唤醒后继结点的线程以运行,这个状态通常是由后继结点给前驱结点设置的。一个结点的线程将被挂起时,会尝试设置前驱结点的状态为SIGNAL。
  • CONDITION:值为-2,线程在等待队列里面等待,waitStatus值表示线程正在等待条件,原本结点在等待队列中,结点线程等待在Condition上,当其他线程对Condition调用了signal()方法之后,该结点会从从等待队列中转移到同步队列中,进行同步状态的获取。
  • PROPAGATE:值为-3,释放共享资源时需要通知其他结点,waitStatus值表示下一个共享式同步状态的获取应该无条件传播下去。
  • 值为0:表示当前结点在同步队列中,等待着获取资源
static final class Node { /** * AQS支持共享模式和独占模式两种类型 * 共享模式下构造的结点,用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的 */ static final Node SHARED = new Node(); /** * 独占模式下构造的结点,用来标记该线程是获取独占资源时被阻塞挂起后放入AQS队列的 */ static final Node EXCLUSIVE = null; // 线程结点的等待状态,用来表示该线程所处的等待锁的状态/** * 指示当前结点(线程)需要取消等待 */ static final int CANCELLED = 1; /** * 指示当前结点(线程)的后续结点(线程)需要取消等待(被唤醒) */ static final int SIGNAL = -1; /** * 线程在等待队列里面等待Condition唤醒 */ static final int CONDITION = -2; /** * 共享式同步状态的获取应该无条件传播下去 */ static final int PROPAGATE = -3; /** * 记录当前线程等待状态值,包括上面4种的状态,还有0(表示初始化状态) */ volatile int waitStatus; /** * 前驱结点,当结点加入同步队列将会被设置前驱结点信息 */ volatile Node prev; /** * 后继结点 */ volatile Node next; /** * 当前结点所对应的线程 */ volatile Thread thread; /** * 等待队列中的后继结点,如果当前结点是共享模式的,那么这个字段是一个SHARED常量 * 在独占锁模式下永远为null,仅仅起到一个标记作用 */ Node nextWaiter; /** * 如果结点在共享模式下等待,则返回true,因为nextWaiter字段在共享模式下是一个SHARED常量 */ final boolean isShared() { return nextWaiter == SHARED; }/** * 获取前驱结点,若前驱结点为空,抛出异常 */ final Node predecessor() throws NullPointerException { // 保存前驱结点 Node p = prev; // 前驱结点为空,抛出异常,否则直接返回 if (p == null) throw new NullPointerException(); else return p; }/** * 无参构造方法,用于建立初始头结点或SHARED标记 */ Node() { }Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; }Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }

二、通过ReentrantLock分析AQS源码 1、ReentrantLock概述
1、ReentrantLock来自于JDK1.5,位于JUC包的Locks子包,独占(互斥)式可重入锁。Synchronized的功能他都有,并且具有更加强大的功能。 2、实现了Lock接口,具有通用的操作锁的方法。由于来自JUC包,内部是使用AQS队列同步器来辅助实现的,重写了AQS的独占式获取锁的方法,并实现了可重入性。 3、ReentrantLock还具有公平与非公平两个获取锁模式,这个并非AQS提供的,而是ReentrantLock基于AQS自己实现的。 4、ReentrantLock有三个内部类,Sync、NonfairSync、FairSync。NonfairSync(非公平)与FairSync(公平)类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类,类图如下: JUC|十一、AbstractQueuedSynchronizer源码分析
文章图片

2、公平与非公平
1、公平锁:是指多个线程按照申请锁的顺序来获取锁,先来后到,先来先服务就是公平的,也就是队列。 2、非公平锁:是指在多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取到锁,在高并发的情况下,有可能造成优先级反转或者饥饿现象(也就是某个线程一直得不到锁即为饥饿)。对于synchronized而言,也是一种非公平锁。 3、ReentrantLock既可以是公平锁又可以是非公平锁。当此类的构造方法ReentrantLock(boolean fair) 接收true作为参数时,ReentrantLock就是公平锁,线程依次排队获取公平锁,即锁将被等待最长时间的线程占有。默认是非公平锁。 4、与使用非公平锁相比,使用公平锁的程序在多线程环境下效率比较低。而且即使是公平锁也不一定能保证线程调度的公平性,后来的线程调用tryLock方法同样可以不经过排队而获得该锁。
3、构造方法
1、public ReentrantLock():创建一个ReentrantLock实例。等同于使用ReentrantLock(false)。即默认无参构造器,是非公平模式。 2、public ReentrantLock(boolean fair):创建一个具有给定公平策略的ReentrantLock。false表示不公平,true表示公平。
public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; // 同步队列 private final Sync sync; /** * 创建一个非公平的ReentrantLock实例,默认非公平模式 */ public ReentrantLock() { //内部初始化了一个NonfairSync对象 sync = new NonfairSync(); }/** * 创建一个具有给定公平策略的 ReentrantLock * @param fair false表示不公平,true表示公平。 */ public ReentrantLock(boolean fair) { // 根据参数选择初始化不同的AQS对象 sync = fair ? new FairSync() : new NonfairSync(); } }

4、内部类 - Sync类
1、ReentrantLock类的sync非常重要,对ReentrantLock类的操作大部分都直接转化为对Sync和AbstractQueuedSynchronizer类的操作。 2、类中方法及作用:
方法名 作用
lock 锁定,并没有具体的实现,留给子类实现
nonfairTryAcquire(int acquires) 非公平方式获取锁,acquires获取锁参数,锁状态每次加1
tryRelease(int releases) 尝试释放锁,releases释放参数,锁状态每次减1
isHeldExclusively() 判断资源是否被当前线程占有
newCondition() 新生一个条件
getOwner() 返回资源的占用线程
getHoldCount() 如果当前线程占有资源,就返回当前状态,否则返回0
isLocked() 资源是否被占用(是否已经获取锁,true表示被占用)
readObject 自定义序列化逻辑
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * 获取锁 */ abstract void lock(); /** * 非公平方式获取锁 */ final boolean nonfairTryAcquire(int acquires) { // 当前线程 final Thread current = Thread.currentThread(); // 当前线程的同步(锁)状态 int c = getState(); // 表示没有线程正在竞争资源或锁,0表示锁没被占用,大于0表示所已经被占用了 if (c == 0) { // 如果当前同步状态的值等于0(期望值),则以原子方式将同步状态设置为给定的acquires(更新值) if (compareAndSetState(0, acquires)) { // 设置当前线程独占 setExclusiveOwnerThread(current); // 成功 return true; } } else if (current == getExclusiveOwnerThread()) {// 当前线程拥有该锁 // 增加重入次数(可重入锁) int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 设置同步状态 setState(nextc); // 成功 return true; } // 失败 return false; }/** * 尝试释放锁 * 如果当前线程是这个锁的持有者,那么持有计数就会递减。如果保持计数现在为零,则释放锁。 * 如果当前线程不是该锁的持有者,则抛出IllegalMonitorStateException * @param releases:释放参数,每次减1 */ protected final boolean tryRelease(int releases) { // 获取当前线程的同步状态 int c = getState() - releases; // 如果当前线程不为独占线程,则抛出错误 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 释放标识false boolean free = false; // 如果同步状态为0(即锁没有被占用,也就是释放) if (c == 0) { // 释放标识为true free = true; // 已经释放,清空独占 setExclusiveOwnerThread(null); } // 设置同步状态 setState(c); // 返回释放标识 return free; }/** * 判断资源是否被当前线程占有 */ protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); }// 新生一个条件 final ConditionObject newCondition() { return new ConditionObject(); }// Methods relayed from outer class // 返回资源的占用线程 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); }// 如果当前线程占有资源,就返回当前状态,否则返回0 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; }// 资源是否被占用(是否已经获取锁,true表示被占用) final boolean isLocked() { return getState() != 0; }/** * 自定义序列化逻辑 */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }

5、内部类 - NonfairSync类
1、NonfairSync类继承了Sync类,表示采用非公平策略获取锁,其实现了Sync类中抽象的lock方法 2、从lock方法的源码可知,每一次都尝试获取锁,而并不会按照公平等待的原则进行等待,让等待时间最久的线程获得锁。
static final class NonfairSync extends Sync { // 版本号 private static final long serialVersionUID = 7316153563782823691L; /** * 获得锁 */ final void lock() { // 比较并设置状态成功,状态0表示锁没有被占用 if (compareAndSetState(0, 1)) // 将当前线程设置独占了锁 setExclusiveOwnerThread(Thread.currentThread()); else // 抢占锁失败,前面线程还没有释放锁,后面线程进入等待队列 // 以独占模式获取对象,忽略中断 acquire(1); }/** * 尝试以独占模式非公平获取锁 */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }

5、内部类 - FairSync类
1、FairSync类继承了Sync类,表示采用公平策略获取锁,其实现了Sync类中的抽象lock方法 2、从lock方法的源码可知,不会先尝试获取资源。这也是和NonfairSync最大的区别,NonfairSync每一次都会尝试去获取资源,如果此时该资源恰好被释放,则会被当前线程获取,这就造成了不公平的现象,当获取不成功,再加入队列尾部。
static final class FairSync extends Sync { // 版本号 private static final long serialVersionUID = -3000897897090466540L; // 获得锁 final void lock() { acquire(1); }/** * 尝试公平获取锁 */ protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 当前线程的同步(锁)状态 int c = getState(); // 表示没有线程正在竞争资源或锁,0表示锁没被占用,大于0表示所已经被占用了 if (c == 0) { // 查询是否有任何线程等待获取的时间超过当前线程 if (!hasQueuedPredecessors() && // 如果当前同步状态的值等于0(期望值),则以原子方式将同步状态设置为给定的acquires(更新值) compareAndSetState(0, acquires)) { // 设置当前线程独占 setExclusiveOwnerThread(current); // 成功 return true; } } // 状态不为0,即资源已经被线程占据,并判断当前线程是否独占锁 else if (current == getExclusiveOwnerThread()) { // (可重入锁) int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 设置同步状态 setState(nextc); // 成功 return true; } // 失败 return false; } }

三、ReentrantLock非公平模式加锁原理及源码分析 1、lock方法加锁
JUC中只要是实现Lock的锁,那么加锁的方法,一般都统一调用开放给外部的lock()方法。
/** * @Date: 2022/9/4 * ReentrantLock加锁流程 */ public class ReentrantLockTest { public static void main(String[] args) { // 默认非公平锁 ReentrantLock lock = new ReentrantLock(); new Thread(() -> { lock.lock(); try { Thread.sleep( 20000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("A获取到锁"); lock.unlock(); }, "A").start(); new Thread(() -> { lock.lock(); System.out.println("B获取到锁"); lock.unlock(); }, "B").start(); new Thread(() -> { lock.lock(); System.out.println("C获取到锁"); lock.unlock(); }, "C").start(); } }

当有多个线程调用lock时候,跟踪lock()方法的源码,NonfairSync类的lock的方法调用主流程如下:
  • lock.lock()ReentrantLock类的lock()Sync类的lock()NonfairSync类的lock()AQS类的acquire()AQS类的tryAcquire()NonfairSync类的tryAcquire()Sync类的nonfairTryAcquire
  • 其中多个线程的时候,如果某个线程占锁时间较长,那么Sync类的nonfairTryAcquire返回false。
  • 此时AQS类的tryAcquire()返回值也是false,AQS类的acquire()方法中!tryAcquire(arg)就为true,也就执行后面一段逻辑acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  • 调用addWaiter(Node.EXCLUSIVE), arg)为当前线程创建排队结点,进入队列中等待唤醒。
2、NonfairSync类的lock方法
1、线程A调用lock.lock()方法,由于默认是非公平模式,所以进入到NonfairSync类的lock()方法。 2、lock方法流程:
  • 首先尝试获取锁,即CAS将state的值从0更新为1,表示此时没有线程获取到锁,然后线程A尝试获取锁,如果CAS成功,那么说明该锁没有被任何线程获取,此时获取锁成功,将当前线程标记为持有锁的线程(即线程A),加锁结束。
  • 若获取锁失败,表示该锁此前已经被某条线程获取到了,或者被别的线程抢先CAS成功,那么执行acquire方法继续尝试获取锁。B、C线程再次进入的时候,获取尝试获取锁失败,那么就执行acquire方法。
JUC|十一、AbstractQueuedSynchronizer源码分析
文章图片

3、AQS类的acquire方法
1、B、C线程尝试获取锁失败,进入到acquire方法。 2、acquire方法就是AQS提供的模版方法,用于独占式获取锁,该方法不会响应中断,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。 3、在acquire方法中会继续尝试获取锁,直到成功才能返回。最终调用的是NonfairSync类的tryAcquire方法,而它又是调用Sync类的nonfairTryAcquire方法。 4、nonfairTryAcquire方法流程:
  • 再次判断state,如果state=0,表示当前锁未被任何线程持有,那么尝试CAS获取锁,若CAS获取成功则设置当前线程,返回true,方法结束。
  • 否则,表示此前已经有线程获取锁了,那么判断是否就是当前线程获取到了锁,如果是,执行重入逻辑,将state加上1再写回,表示重入次数+1,返回true,方法结束。
  • 若CAS失败(有其他线程已经获取了锁),或者若当前锁已经被其他线程持有,则直接返回false,方法结束。
5、B、C线程进入此方法后,发现锁已经被其他线程获取,tryAcquire方法最终执行结果为false,!tryAcquire(arg)的值为true,那么执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
/** * AQS类中的acquire * 独占式的尝试获取锁,获取不成功就进入同步队列等待 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }/** * AQS类中的模板方法:tryAcquire,在AQS中的实现是抛出异常,需要子类自己重写 */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }/** * NonfairSync类重写tryAcquire方法,底层调用Sync类的nonfairTryAcquire方法实现 */ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }/** * Sync类的nonfairTryAcquire方法实现,非公平方式获取锁 */ final boolean nonfairTryAcquire(int acquires) { // 当前线程 final Thread current = Thread.currentThread(); // 当前线程的同步(锁)状态 int c = getState(); // 表示没有线程正在竞争资源或锁,0表示锁没被占用,大于0表示所已经被占用了 if (c == 0) { // 如果当前同步状态的值等于0(期望值),则以原子方式将同步状态设置为给定的acquires(更新值) if (compareAndSetState(0, acquires)) { // 设置当前线程独占 setExclusiveOwnerThread(current); // 成功 return true; } } else if (current == getExclusiveOwnerThread()) {// 当前线程拥有该锁 // 增加重入次数(可重入锁) int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 设置同步状态 setState(nextc); // 成功 return true; } // 失败 return false; }

4、AQS类的addWaiter方法
1、addWaiter方法是AQS提供的,不需要我们重写,可以说是锁的通用方法。通过addWaiter方法将线程按照独占模式Node.EXCLUSIVE构造同步结点,并添加到同步队列的尾部。 2、addWaiter方法流程:
  • 按照给定模式,构建新结点。
  • 如果同步队列不为null,则尝试将新结点添加到队列尾部(只尝试一次),如果添加成功则返回新结点,方法返回。
  • 如果队列为null或者添加失败,则调用enq方法循环尝试添加,直到成功,返回新结点,方法结束。
/** * 将获取锁失败的线程按给定模式构建成结点加入到同步队列的尾部 * @param mode 模式,独占模式传入的是一个Node.EXCLUSIVE,即null; *共享模式传入的是一个Node.SHARED,即一个静态结点对象(共享的、同一个) */ private Node addWaiter(Node mode) { // 按给定模型创建结点 Node node = new Node(Thread.currentThread(), mode); // 获取同步器的尾结点,使用pred来保存 Node pred = tail; // 如果pred不为null,也就是队列不为null,那么使用CAS方式将当前结点设为尾结点 if (pred != null) { // 将当前结点的前驱结点指向结点pred node.prev = pred; // 通过CAS方式将当前结点设为尾结点 if (compareAndSetTail(pred, node)) { // 将结点pred的后继结点指向当前结点 pred.next = node; // 返回结点 return node; } } /** * 如果pred为null,也就是队列为null,也可能是compareAndSetTail(pred, node)失败 * 调用enq方法,采用自旋方式保证构造的新结点成功添加到同步队列中 */ enq(node); return node; }/** * addWaiter方法中使用到的Node构造器 * @param thread 当前线程 * @param mode模式 */ Node(Thread thread, AbstractQueuedSynchronizer.Node mode) { //等待队列中的后继结点 就等于该结点的模式 //共享模式该值为Node.SHARED结点常量,独占模式该值为null this.nextWaiter = mode; //当前线程 this.thread = thread; }

5、AQS类的enq方法
1、enq方法用在同步队列为null或者一次CAS操作添加失败的时候,enq方法要保证结点最终必定添加成功。 2、enq方法流程:
  • 开启一个死循环,保证结点一定添加进去。
  • 如果队列为空,那么初始化队列,添加一个哨兵结点,结束本次循环,继续下一次循环。
  • 如果队列不为空,则尝试将新结点添加到队列尾部,如果添加成功则返回新结点的前驱,循环结束;如果不成功,结束本次循环,继续下一次循环,直到添加成功。
3、此时,B、C线程的结点就保存到队列中,等待唤醒,队列结构如下图: JUC|十一、AbstractQueuedSynchronizer源码分析
文章图片

/** * 将节点插入队列,必要时进行初始化 * @param node 要插入的结点 * @return 节点的前驱结点 */ private Node enq(final Node node) { // 死循环直到添加成功 for (; ; ) { // 直接获取尾结点 Node t = tail; // 如果结点为null,则初始化同步队列 if (t == null) { // Must initialize /* * 调用compareAndSetHead方法,初始化同步队列 * 注意:这里是新建了一个空白结点,也就是所谓的哨兵结点 * CAS成功之后,head将指向该哨兵结点,返回true */ if (compareAndSetHead(new Node())) // 尾结点指向头结点(哨兵结点),方法并没有结束,继续循环,再次执行时t不为null tail = head; } else {// 队列不为null,调用compareAndSetTail方法,新建的新结点插入到同步队列尾部 // 将当前结点的前驱结点指向t node.prev = t; // 通过CAS方式将当前结点设为尾结点 if (compareAndSetTail(t, node)) { // 将结点t的后继结点指向当前结点 t.next = node; // 返回前驱结点 return t; } } } }/** * CAS添加头结点. 仅仅在enq方法中用到 * @param update 头结点 * @return true 成功;false 失败 */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); }/** * CAS添加尾结点. 仅仅在enq方法中用到 * @param expect 预期原尾结点 * @param update 新尾结点 * @return true 成功;false 失败 */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }

6、AQS类的acquireQueued方法
1、执行到acquireQueued方法,说明tryAcquire和addWaiter两个方法都执行成功,表示该线程结点获取锁失败,并且被放入同步队列尾部了。 2、acquireQueued方法表示结点进入同步队列之后的动作,实际上就进入了一个自旋的过程,自旋过程中,当条件满足,获取到了锁,就可以从这个自旋中退出并返回,否则可能会阻塞该结点的线程,后续即使阻塞被唤醒,还是会自旋尝试获取锁,直到成功或者而抛出异常。 3、acquireQueued方法流程:
  • 开启一个死循环,如果当前结点的前驱是头结点,尝试获取锁,如果获取锁成功,那么当前结点设置为头结点head,将当前结点线程引用与前驱结点设置为空,表示当前线程已经获取到了锁,然后返回是否被中断标志,结束循环,进入finally。
  • 如果当前结点的前驱不是头结点或者尝试获取锁失败,那么判断当前线程是否应该被挂起,如果返回true,那么调用parkAndCheckInterrupt挂起当前结点的线程(LockSupport.park方法挂起线程,线程处于WAITING状态),此时不再执行后续的步骤、代码。
  • 如果当前线程不应该被挂起,即返回false,那本次循环结束,继续下一次循环。
  • 如果线程被其他线程唤醒,那么判断是否是因为中断而被唤醒并修改标志位,同时继续循环,直到在步骤1获得锁,才能跳出循环
  • 最终,线程获得了锁跳出循环,或者发生异常跳出循环,那么会执行finally语句块,finally中判断线程是否是因为发生异常而跳出循环,如果是,那么执行cancelAcquire方法取消该结点获取锁的请求;如果不是,即因为获得锁跳出循环,则finally中什么也不做。
/** * 以独占不间断模式获取已在队列中的线程 * @param node 新创建的结点 * @param arg 获取参数 * @return 如果在等待时中断,则返回true */ final boolean acquireQueued(final Node node, int arg) { // 获取锁是否失败的标记 boolean failed = true; try { // 是否被中断的标记 boolean interrupted = false; // 死循环 for (; ; ) { // 获取当前结点的前驱结点,位于Node类中的方法 final Node p = node.predecessor(); // 判断前驱结点是否为头结点,只有前驱结点是头结点的时候才能尝试获取锁,调用tryAcquire方法获取锁 if (p == head && tryAcquire(arg)) { // 获取到锁之后,将当前接结点设置为头结点(哨兵结点),线程出队列 setHead(node); // 断开p结点(原哨兵结点)的后继结点指向,由JVM回收 p.next = null; // help GC // 获取锁是否失败改成false,表示成功获取到了锁 failed = false; // 返回线程是否被中断 return interrupted; } /** * 前驱结点不是头结点,或者是tryAcquire尝试获取锁失败的情况下,执行此逻辑 * shouldParkAfterFailedAcquire检测线程是否应该被挂起 * 如果返回true,则调用parkAndCheckInterrupt用于将线程挂起,否则重新开始循环 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) /* * 到这里,说明是当前结点(线程)因为被中断而唤醒,那就改变自己的中断标志位状态信息为true * 又从新开始循环,直到获取到锁,才能返回 * */ interrupted = true; } } finally { // 线程获取到锁或者发生异常之后都会执行的finally语句块 /** * 如果failed为true,表示获取锁失败,即对应发生异常的情况 * 这里发生异常的情况只有在tryAcquire方法和predecessor方法中可能会抛出异常,此时还没有获得锁,failed=true * 那么执行cancelAcquire方法,该方法用于取消该线程获取锁的请求,将该结点的线程状态改为CANCELLED,并尝试移除 * 结点(如果是尾结点); * 如果failed为false,表示获取到了锁,那么该方法直接结束,继续往下执行 */ if (failed) // 取消获取锁请求,将当前结点从队列中移除 cancelAcquire(node); } }/** * head指向node新结点,该方法是在tryAcquire获取锁之后调用,不会产生线程安全问题 */ private void setHead(Node node) { // 头结点指向新结点 head = node; /** * 将新结点的thread和prev属性置空,成为新的哨兵结点 * 虽然线程引用置空了,但是一般在tryAcquire方法中记录了获取到锁的线程,因此不担心找不到是哪个线程获取到了锁, * 因此哨兵结点也可以叫做"获取到锁的结点" */ node.thread = null; node.prev = null; }

7、AQS的shouldParkAfterFailedAcquire方法
1、shouldParkAfterFailedAcquire方法在没有获取到锁之后调用,用于判断当前结点是否需要被挂起。 2、shouldParkAfterFailedAcquire方法流程:
  • 获取前驱结点的等待状态,如果前驱结点已经是SIGNAL(-1)状态,即表示当前结点可以挂起,返回true,方法结束。
  • 如果前驱结点状态大于0(Node.CANCELLED(1)状态),表示前驱结点放弃了锁的等待,那么由该前驱向前查找,直到找到一个状态小于等于0的结点,再将当前结点排在该结点(状态小于0的)后面,返回false,方法结束。
  • 如果前驱结点的状态既不是SIGNAL(-1),也不是CANCELLED(1),尝试CAS设置前驱结点的状态为SIGNAL(-1),返回false,方法结束。
3、注:只有前驱结点状态为SIGNAL时,当前结点才能安心挂起,否则一直自旋。 4、结论:一个结点SIGNAL状态的设置,一般都是由它的后继结点设置的,但是这个状态却是表示后继结点的状态,表示的意思就是前驱结点如果释放了锁,那么就有义务唤醒后继结点
/** * 检测当前结点(线程)是否应该被挂起 * @param pred 当前结点的前驱 * @param node 当前结点 * @return 如果前驱结点已经是SIGNAL状态,当前结点才能挂起,返回true;否则, *可能会查找新的前驱结点或者尝试将前驱结点设置为SIGNAL状态,返回false */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前驱结点的waitStatus(等待状态) // 最初创建结点时候,并没有给waitStatus赋值,因此每一个结点最开始的时候waitStatus的值都为0 int ws = pred.waitStatus; // 判断前驱结点的等待状态是否为-1,如果前驱结点已经是SIGNAL状态,即表示当前结点可以挂起 if (ws == Node.SIGNAL) // 返回true return true; // 如果前驱结点状态大于0,即Node.CANCELLED表示前驱结点放弃了锁的等待 if (ws > 0) { /** * 由该前驱向前查找,直到找到一个状态小于等于0的结点(即没有被取消的结点),当前结点成为该结点的 * 后继,这一步很重要,可能会清理一段被取消了的结点,并且如果该前驱释放了锁,还会唤醒它的后继 */ do { // 当前结点的前驱结点 = 前驱结点的前驱结点;前驱结点 = 前驱结点的前驱结点 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 直到前驱结点的等待状态小于0,结束循环 // 找到一个状态小于0的结点,并将它的后继结点指向当前结点,重新连接队列 pred.next = node; // 否则,前驱结点的状态既不是SIGNAL(-1),也不是CANCELLED(1) } else { // 通过CAS操作将当前结点的前驱结点的状态设置为SIGNAL(-1),用于后续唤醒操作; // 可能会失败,但没关系,因为失败之后acquireQueued方法会一直循环 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回false return false; }

8、AQS的parkAndCheckInterrupt方法
1、shouldParkAfterFailedAcquire方法返回true之后,将会调用parkAndCheckInterrupt方法挂起线程并且后续判断中断状态。线程中断方法参考《LockSupport与线程中断》篇 2、parkAndCheckInterrupt方法流程:
  • 使用LockSupport.park(this)挂起该线程,不再执行后续的步骤、代码。直到该线程被中断或者被唤醒(unpark)。
  • 如果该线程被中断或者唤醒,那么返回Thread.interrupted()方法的返回值,该方法用于判断前线程的中断状态,并且清除该中断状态。如果该线程因为被中断而唤醒,则中断状态为true,将中断状态重置为false,并返回true。如果该线程不是因为中断被唤醒,则中断状态为false,并返回false。
3、此时,由于A线程获取锁到释放锁的间隔时间较长,B、C线程在队列中等待,对应的哨兵结点和线程B结点的状态都为-1,并且进入阻塞状态,等待A释放锁,B再执行,队列结构如下图: JUC|十一、AbstractQueuedSynchronizer源码分析
文章图片

/** * 挂起线程 */ private final boolean parkAndCheckInterrupt() { // 使用LockSupport.park(this)挂起该线程 LockSupport.park(this); // 判断线程是否被中断,并清除当前中断标识位 return Thread.interrupted(); }

四、ReentrantLock释放锁原理及源码分析 1、unlock方法释放锁
1、JUC中只要是实现Lock的锁,那么解锁的方法,一般都统一调用开放给外部的unlock()方法。 2、ReentrantLock在公平和非公平模式下释放锁的逻辑都是一样的,该实现在Sync类中。 3、每个lock.lock()操作必然对应一个lock.unlock()操作。方法调用流程如下:
  • lock.unlock()ReentrantLock类的unlock()AQS类的release()AQS类的tryRelease()Sync类的tryRelease()
2、AQS类的release方法
1、当前线程获取到锁并执行了相应逻辑之后,就需要释放锁,使得后续结点能够继续获取锁。通过调用AQS的release(int arg)模版方法可以独占式的释放锁。 2、release方法流程:
  • 尝试使用tryRelease(arg)释放锁,该方法在Sync类中,是自己实现的方法,将state值为0或者减少、清除当前获得锁的线程,如果符合自己的逻辑,锁释放成功则返回true,否则返回false。
  • 如果tryRelease释放成功返回true,如果头结点head不为null并且head的等待状态不为0,那么尝试调用unparkSuccessor方法唤醒头结点之后的一个非取消状态(非CANCELLED状态)的后继结点,让其可以进行锁获取。返回true,方法结束。
  • 如果tryRelease释放失败,那么返回false,方法结束。
3、此时A线程逻辑执行完毕,调用释放锁逻辑。
/** * 独占式的释放同步状态 * @param arg 释放参数 * @return 释放成功返回true, 否则返回false */ public final boolean release(int arg) { // 调用ReentrantLock重写的tryRelease方法(位于内部类Sync中),释放同步状态,释放成功将返回true,否则返回false if (tryRelease(arg)) { // 获取头结点 Node h = head; // 如果头结点不为null,并且状态不等于0 if (h != null && h.waitStatus != 0) // 唤醒头结点的一个处于等待锁状态的后继结点 unparkSuccessor(h); return true; } return false; }

3、AQS的unparkSuccessor方法
1、unparkSuccessor方法用于唤醒参数结点的某个非取消的后继结点,该方法在很多地方法都被调用。 2、unparkSuccessor方法流程如下:
  • 如果当前结点的状态小于0,那么CAS设置为0,表示后继结点可以继续尝试获取锁。
  • 如果当前结点的后继s为null或者状态为取消CANCELLED(1),则将s先指向null;然后从尾结点开始到node之间倒序向前查找,找到离尾结点最远的非取消结点赋给s。需要从后向前遍历,因为同步队列只保证结点前驱关系的正确性。
  • 如果s不为null,那么状态肯定不是取消CANCELLED(1),则直接唤醒s的线程,调用LockSupport.unpark方法唤醒,被唤醒的结点将从被park的位置继续执行
3、此时线程B被唤醒了,重新执行acquireQueued方法中的逻辑,线程B仍然需要尝试获取锁,此时有两种情况:
  • 线程B尝试获取锁失败:因为是非公平的实现方式,有可能线程A刚执行完,线程D刚进入且顺利拿到锁,那么线程B仍然需要继续等待。
  • 线程B尝试获取锁成功:执行acquireQueued方法中的如下逻辑
JUC|十一、AbstractQueuedSynchronizer源码分析
文章图片

4、此时队列结构如下: JUC|十一、AbstractQueuedSynchronizer源码分析
文章图片

/** * 唤醒指定结点的后继结点 * @param node 指定结点 */ private void unparkSuccessor(Node node) { // 获取指定结点的等待状态 int ws = node.waitStatus; // 判断等待状态是否小于0,如果小于0,那么CAS设置为0,表示后继结点线程可以先尝试获锁,而不是直接挂起 if (ws < 0) // 通过CAS操作将指定结点的等待状态更新为0 compareAndSetWaitStatus(node, ws, 0); // 获取指定结点的后继结点 Node s = node.next; /** * 如果后继结点为null,或者状态为取消CANCELLED(1),则从尾结点开始到指定结点之间倒序向前查找, * 找到离尾结点最远的非取消结点赋给s */ if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 如果后继结点不为null,则唤醒s结点的线程,调用LockSupport.unpark方法唤醒,被唤醒的结点将从被park的位置向后执行 if (s != null) LockSupport.unpark(s.thread); }

4、AQS的finally代码块中的cancelAcquire方法
1、在acquireQueued方法中有一段finally代码块的逻辑为:
  • 如果failed为true,表示获取锁失败,即对应发生异常的情况,这里发生异常的情况只有在tryAcquire方法和predecessor方法中可能会抛出异常,此时还没有获得锁,failed=true
  • 如果failed为false,表示已经获取到了锁,那么finally中代码都不会执行,acquireQueued方法结束。
2、cancelAcquire方法用于取消结点获取锁的请求,参数为需要取消的结点node 3、cancelAcquire方法流程:
  • 将node结点记录的线程thread设置为null。
  • 由node结点向前查找,直到找到一个状态小于等于0的结点pred (即找一个没有取消的结点),更新node.prev为找到的pred。
  • 将node结点的等待状态waitStatus置为CANCELLED,即取消请求锁。
  • 如果node结点是尾结点,那么尝试CAS更新头结点指向pred,成功之后继续CAS设置pred.next为null。
  • 否则,说明node结点不是尾结点或者CAS失败可能存在对尾结点的并发操作
  • 如果node结点不是头结点,并且 (pred的状态为SIGNAL或者将pred的waitStatus置为SIGNAL成功),并且pred记录的线程不为null。那么设置pred.next指向node.next,最后node.next指向node自己。
  • 否则,说明node结点是头结点或者pred状态设置失败或者pred记录的线程为null。那么调用unparkSuccessor唤醒node的一个没取消的后继结点。最后node.next指向node自己。
/** * 取消指定结点获取锁的请求 * @param node 指定结点 */ private void cancelAcquire(Node node) { // 如果指定结点为null,则直接返回 if (node == null) return; // 将指定结点node记录的线程thread设置为null node.thread = null; // 获取指定结点的前驱结点 Node pred = node.prev; // 循环判断前驱结点的等待状态是否大于0(即取消:CANCELLED(1)) // 由指定结点node向前查找,直到找到一个状态小于等于0的结点(即没有被取消的结点),作为前驱 // 类似于shouldParkAfterFailedAcquire方法中查找有效前驱的代码 while (pred.waitStatus > 0) // 指定结点的前驱结点 = 前驱结点的前驱结点;前驱结点 = 前驱结点的前驱结点,向前排查 node.prev = pred = pred.prev; // 获取到一个非取消状态的前驱结点pred,并获取它的后继结点 Node predNext = pred.next; // 指定结点node的等待状态设置为CANCELLED,即取消请求锁 node.waitStatus = Node.CANCELLED; // 如果指定结点是尾结点,那么尝试CAS更新尾结点指向pred,成功之后继续CAS设置后继结点pred.next为null if (node == tail && compareAndSetTail(node, pred)) { // 新尾结点pred的next结点设置为null,即使失败了也没关系,说明有其它新入队线程或者其它取消线程更新了 compareAndSetNext(pred, predNext, null); // else说明node不是尾结点或者CAS失败(可能存在对尾结点的并发操作),这种情况就要把pred和node的后继非取消结点拼起来 } else { int ws; // 判断是否为头结点 if (pred != head && // 并且pred的状态为SIGNAL或者将pred的waitStatus置为SIGNAL成功 ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 并且pred记录的线程不为null pred.thread != null) { // 指定结点的后继结点 Node next = node.next; // 指定结点的后继结点不为null,并且node的后继结点的等待状态不为取消状态(CANCELLED) if (next != null && next.waitStatus <= 0) // 通过CAS操作将指定结点前的一个非取消状态的前驱结点pred的后继结点指向next compareAndSetNext(pred, predNext, next); // 上面判断存在失败的情况,此时需要调用unparkSuccessor方法尝试唤醒node结点的后继结点 } else { unparkSuccessor(node); } // 指定结点的后继结点设置为自己,方便后续GC时直接销毁无效结点 node.next = node; // help GC } }

5、总结
1、acquire流程图 JUC|十一、AbstractQueuedSynchronizer源码分析
文章图片

2、release流程图 JUC|十一、AbstractQueuedSynchronizer源码分析
文章图片

3、cancelAcquire流程图

    推荐阅读