Java并发编程|Java并发编程 - 深入剖析ReentrantLock之非公平锁加锁流程(第1篇)

这篇文章不是讲ReentrantLock的使用,而是通过调试,分析其实现原理。
非公平锁 测试代码如下:
import java.util.concurrent.locks.ReentrantLock; public class ReentrantLockExample {private ReentrantLock lock = new ReentrantLock(); public void doSomething() {lock.lock(); try { System.out.println(Thread.currentThread().getName() + " has been acquired the lock..."); } finally { lock.unlock(); System.out.println(Thread.currentThread().getName() + " has been released the lock..."); }}public static void main(String[] args) {ReentrantLockExample lockExample = new ReentrantLockExample(); Thread A = new Thread(()->{ lockExample.doSomething(); }, "thread-A"); Thread B = new Thread(()->{ lockExample.doSomething(); }, "thread-B"); Thread C = new Thread(()->{ lockExample.doSomething(); }, "thread-C"); Thread D = new Thread(()->{ lockExample.doSomething(); }, "thread-D"); A.start(); B.start(); C.start(); D.start(); } }

代码测试的是4个线程争抢一把锁。
1. 加锁流程解析 通过IDEA多线程调试工具进行调试,调试模拟情景为:A-B-C-D依次获取锁,等所有的获取请求结束,再依次释放锁。
第一步:线程A请求锁
调用ReentrantLock的lock方法:
public void lock() { sync.lock(); }

执行到此出,将处理交给sync,我们这里是非公平锁,则sync为NonfairSync,调用方法:
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }

通过CAS设置state变量,期望当前值是0,更新值是1。state初始值为0,无其他线程更改过,所以这里CAS设置成功,state值变为1。并且exclusiveOwnerThread设置为thread-A对象。
执行图:
thread-A加锁.png lock对象内容:
thread-A加锁后ReentrantLock对象内容.png 第二步:线程B请求锁
挂起A线程,执行B线程。
线程B执行到此代码:
ReentrantLock.java
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }

此时state=1,此处CAS执行失败,接下来执行else语句代码。
【Java并发编程|Java并发编程 - 深入剖析ReentrantLock之非公平锁加锁流程(第1篇)】acquire方法在AbstractQueuedSynchronizer类中定义:
AbstractQueuedSynchronizer.java
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

if条件中有两处,若第一次为false,则第二处会执行。
先来看tryAcquire, tryAcquire在AbstractQueuedSynchronizer重定义,但是没有定义具体实现,具体实现交给子类,我们这里就是NonfairSync类实现。
NonfairSync
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }

nonfairTryAcquire在NonfairSync直接父类也就是Sync中定义:
Sync
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

此时state=1,exclusiveOwnerThread=thread-A, current = thread-B。此方法内逻辑不执行,返回false。
返回执行acquireQueued方法。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

执行这个方法前要先执行addWaiter方法,此方法用于创建即将入同步队列的Node(节点)。
AbstractQueueSynchronizer.java
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

此时tail = null,执行enq方法。
AbstractQueueSynchronizer.java
private Node enq(final Node node) { for (; ; ) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

  • 第一次循环:创建一个Node对象,设置给head。同时将head赋值给tail,这样head和tail指向同一个对象。两者不再为null。
head_tail-0.png
  • 第二次循环:将B节点的前置设置为t(t是中间变量指向的就是head/tail节点),然后将B设置为tail。执行完成后返回t,此时t和head执行同一个对象,其实返回的就是head节点。
执行完成后,head和tail的内容如下:
Java并发编程|Java并发编程 - 深入剖析ReentrantLock之非公平锁加锁流程(第1篇)
文章图片
head_tail-1.png 注意一下,虽然addWaiter方法返回的是头节点,但是node还是指向的B节点。接着执行acquireQueued方法。
AbstractQueuedSynchronizer.java
final boolean acquireQueued(final Node node, int arg) {// Node此时是B节点 boolean failed = true; try { boolean interrupted = false; for (; ; ) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

  • 第一次循环:p为head节点,tryAcquire和前面一样返回false,第一个if语句不行。紧接着执行shouldParkAfterFailedAcquire方法,方法代码如下:
AbstractQueuedSynchronzier.java
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE.Indicate that we * need a signal, but don't park yet.Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

此时pred=head, node = B, ws = 0, 执行:
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

这个执行完成后,head的waitStatus被设置成Node.SINGAL,这个表示head节点的后继者也就是B节点代表的线程需要被通知。返回false。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;

if代码块不执行。
  • 第二次循环: p还是head节点,tryAcquire和第一次循环一样,第一个if依然不执行,继续执行shouldParkAfterFailedAcquire,此时shouldParkAfterFailedAcquire方法内pre是head节点,而head节点经过第一次循环waitStatus被设置为-1。shouldParkAfterFailedAcquire(p, node)返回true, 则接下来执行parkAndCheckInterrupt()。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

线程B执行到LockSupport.park(this)处被挂起。
执行图:
thread-B加锁.png 第三步:线程C请求锁
注意:此时线程A是我们主动(通过IDE操作)挂起的(RUNNING),线程B通过上一步的操作被动(park操作)挂起的(WAITTING)。
因为前面已经见到一些逻辑的执行了,这里就不再重复,直接讲关系到Node的操作。
通过addWaiter操作,为线程C创建一个Node节点:
AbstractQueuedSynchronzier.java
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

此时tail为B节点,然后将C节点的prev设置为B,通过CAS操作将C节点设置为tail,设置成功将B的next设置为C。然后直接返回C节点。
执行完成后head和tail的内容如下:
head_tail-2.png addWaiter返回C节点后,执行acquireQueued方法:
AbstractQueuedSynchronzier.java
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (; ; ) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

此时node=C
  • 第一次循环: C的前置节点是B,这里p=B,第一个if不执行,执行
shouldParkAfterFailedAcquire:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE.Indicate that we * need a signal, but don't park yet.Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

pre=B,B的waitStatus=0,则执行compareAndSetWaitStatus(pred, ws, Node.SIGNAL),B的waitStatus被设为-1,表示后继节点,即C节点所代表的线程需要被唤醒。继续执行线程C被挂起。
执行图:
thread-C加锁.png 第四步:线程D请求锁
线程D请求锁和上面的流程几乎是一样了,这里就不再分别调试,执行图:
Java并发编程|Java并发编程 - 深入剖析ReentrantLock之非公平锁加锁流程(第1篇)
文章图片
thread-D加锁.png

    推荐阅读