文章目录
-
ReentrantLock
-
热身
-
- 公平锁和非公平锁
-
- 常用方法
-
ReentrantLock 继承结构
-
- lock
-
修改同步状态state(锁状态)
-
等待队列
-
线程中断
-
节点取消
-
2.unlock
-
总结
-
AQS
-
Node
-
构造函数
-
waitStatus
-
其他
-
队列
ReentrantLock
代码的分析以非公平锁为例,有兴趣可以自己走一遍公平锁的流程。它们大致一样,只是在获取锁的时候有些许差别。
代码主要分为两部分
- 获取锁的过程 :锁状态修改、 等待队列、线程的挂起(WAITING 状态)、线程中断
- 释放锁的过程: 线程的唤醒
热身
ReentrantLock的重入锁的实现是通过内部子类Sync来实现的。Sync 继承自AQS,ReentrantLock通过操作Sync来获取和控制同步状态。
工具类的最大好处就是使用起来比较简单。比如如下操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1public class LockReentrantDemo {
2 public static void main(String[] args) {
3 //定义一个重入锁
4 Lock lock=new ReentrantLock();
5 //开启两个线程
6 MyThread thread1= new MyThread(lock,"thread1....");
7 MyThread thread2= new MyThread(lock,"thread2...");
8 thread1.start();
9 thread2.start();
10 }
11 //定义一个内部类
12 static class MyThread extends Thread{
13 private Lock lock;
14 public MyThread(Lock lock,String name){
15 super(name);
16 this.lock=lock;
17 }
18 public void run() {
19 try {
20 lock.lock();
21 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) +".........."+Thread.currentThread().getName());
22 Thread.sleep(2000);
23 } catch (Exception e) {
24 e.printStackTrace();
25 }finally {
26 lock.unlock();
27 }
28 }
29 }
30}
31
32
33
2019-08-08 16:01:25…thread1…
2019-08-08 16:01:27…thread2…
从上面可以看出 thread1 释放锁之后 thread2才能继续执行。
1. 公平锁和非公平锁
默认情况下就是非公平锁。非公平锁(就是插队)与公平锁的不同就在与不管队列中是否有节点在等待总是试图立即获取锁。
1
2
3
4
5 1 public ReentrantLock() {
2 sync = new NonfairSync();
3 }
4
5
调用有参构造函数,可以指定创建的是公平锁还是非公平锁。
1
2
3
4
5 1 public ReentrantLock(boolean fair) {
2 sync = fair ? new FairSync() : new NonfairSync();
3 }
4
5
2. 常用方法
ReentrantLock 中常用的方法就是lock,tryLock,unlock。暂时不涉及Condition
简单介绍一下它们的使用:
- lcok 尝试获取锁,获取成功则持有独占锁,重入则计数器加一,代码执行执行。否则进行AQS等待队列挂起
- tryLock 尝试获取锁,同上面不同的是会立即返回结果。获取到锁 返回true,未获取到返回false,如果是重入返回true。
- unlock 释放锁。如果该线程没有获取锁但是调用该方法则会抛出异常,否则重入计数器减一。
注意:
- tryLock 方法以非公平方式获取锁。
- tryLock(long timeout, TimeUnit unit) 的公平性和ReentrantLock 对象的构造有关。
ReentrantLock 继承结构
在AQS中存在一个变量 state,用来表示同步状态,不同的实现类含义可能不同。
在Reentrant中:
-
state >=1 表示有锁。
-
state= 1
- state >1 表示锁重入,即当前线程多次调用了lock 方法。
-
state=0 ,表示无锁。
1. lock
多线程获取锁的过程:
- 判断当前锁状态,如果是无锁状态则尝试获取。获取成功返回,否则进入步骤2
- 未获取到锁就会将当前线程封装为Node节点,并加入等待队列挂起线程,直到其他线程调用unlock完全释放锁。
修改同步状态state(锁状态)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1 /**
2 * Acquires the lock.
3 *
4 * <p>Acquires the lock if it is not held by another thread and returns
5 * immediately, setting the lock hold count to one.
6 *
7 * <p>If the current thread already holds the lock then the hold
8 * count is incremented by one and the method returns immediately.
9 *
10 * <p>If the lock is held by another thread then the
11 * current thread becomes disabled for thread scheduling
12 * purposes and lies dormant until the lock has been acquired,
13 * at which time the lock hold count is set to one.
14 */
15 public void lock() {
16 sync.lock();
17 }
18
19
点击进入lock方法。会发现委派给内部类 Sync实现的。Sync有两个实现NonfairSync和FairSync。具体调用那个是根据创建ReentrantLock对象时是否指定锁类型确定的。
默认情况下就是非公平锁。非公平锁(就是插队)与公平锁的不同就在与不管队列中是否有节点在等待总是试图立即获取锁。
1
2
3
4
5 1 public ReentrantLock() {
2 sync = new NonfairSync();
3 }
4
5
非公平模式获取锁走下面的代码
NonfairSync.java
1
2
3
4
5
6
7
8
9
10 1 //NonfairSync.java
2 final void lock() {
3 //试图立即获取锁,获取成功则设置独占锁线程为当前线程。
4 if (compareAndSetState(0, 1))
5 setExclusiveOwnerThread(Thread.currentThread());
6 else //否则和公平锁一样调用AQS模板方法acquire()
7 acquire(1);
8 }
9
10
acquire方法继承自AQS,是一个模板方法是实现锁的核心方法。
该方法给我们规范了获取锁的操作步骤,之后的方法就是围绕这一功能展开的。
- 尝试获取锁,获取成功则跳出该方法
- 获取锁失败,则会将当前线程封装为Node节点并加入等待队列。
- 在等待队列中被挂起的线程被唤醒后,会尝试获取锁。获取成功将该节点设置为头结点,否则又会加入等待队列。
AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1 /**
2 * Acquires in exclusive mode, ignoring interrupts. Implemented
3 * by invoking at least once {@link #tryAcquire},
4 * returning on success. Otherwise the thread is queued, possibly
5 * repeatedly blocking and unblocking, invoking {@link
6 * #tryAcquire} until success. This method can be used
7 * to implement method {@link Lock#lock}.
8 *
9 * @param arg the acquire argument. This value is conveyed to
10 * {@link #tryAcquire} but is otherwise uninterpreted and
11 * can represent anything you like.
12 */
13 public final void acquire(int arg) {
14 if (!tryAcquire(arg) &&
15 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
16 selfInterrupt();
17
18
19
tryAcquire 方法用于尝试获取锁,具体功能在子类中实现。当获取失败时则执行addWaiter方法将该线程加入到等待队列中。
在非公平模式下会调用nonfairTryAcquire方法
NonfairSync.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1 final boolean nonfairTryAcquire(int acquires) {
2 final Thread current = Thread.currentThread();
3 int c = getState();//获取锁状态 0表示无所状态,>=1,表示有锁状态
4 //如果此时没有线程持有锁,则立即尝试获取
5 if (c == 0) {
6 if (compareAndSetState(0, acquires)) {
7 setExclusiveOwnerThread(current);
8 return true;
9 }
10 }
11 //是否当前线程已经获取到锁(重入)。如果是 将state锁计数加1
12 else if (current == getExclusiveOwnerThread()) {
13 int nextc = c + acquires;
14 if (nextc < 0) // overflow
15 //java中整型都是有符号的,所以当数量较大时就会变为负数
16 //比如 byte: 127+1 => -128
17 throw new Error("Maximum lock count exceeded");
18 setState(nextc);
19 return true;
20 }
21 return false;
22 }
23
24
等待队列
当获取锁失败的时候,就会执行addWaiter(Node.EXCLUSIVE)方法,将当前线程包装为Node节点,并添加到等待队列中。
Node是AQS中的内部类,用于构建等待链表。它包含了以下重要属性
- waitStatus:等待状态,控制等待队列中节点(线程)的唤醒与挂起
- Node prev:指向上一节点
- Node next :指向下一节点
- thread : 竞争锁的线程
在AQS中 有两个重要的属性来指示队列的头尾节点。
- head; 指向等待队列头结点
- tail; 指向等待队列尾结点
刚开始 head= tail =null;
addWaiter 方法会执行以下逻辑操作:
- 队列为空。创建一个空节点作为头结点,并将创建的节点添加到队列中。
- 队列不为空,则直接将该节点加入到队列尾部。
AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1 /**
2 * Creates and enqueues node for current thread and given mode.
3 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
4 * @return the new node
5 */
6 private Node addWaiter(Node mode) {
7 Node node = new Node(Thread.currentThread(), mode);
8 // Try the fast path of enq; backup to full enq on failure
9 Node pred = tail;
10 //队列不为空,则将该节点加入到队列的尾部
11 if (pred != null) {
12 node.prev = pred;
13 //将当前节点设置到队列尾部,设置失败则通过enq方法,自旋进行追加操作
14 if (compareAndSetTail(pred, node)) {
15 pred.next = node;
16 return node;
17 }
18 }
19 ////否则 创建一个空的头结点,并将则将该节点加入到队列的尾部
20 enq(node);
21 return node;
22 }
23
24
AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1 private Node enq(final Node node) {
2 for (;;) {
3 Node t = tail;
4 if (t == null) { // Must initialize
5 //创建空节点 tail = head =Node
6 if (compareAndSetHead(new Node()))
7 tail = head;
8 } else {
9 node.prev = t;
10 if (compareAndSetTail(t, node)) {
11 t.next = node;
12 return t;
13 }
14 }
15 }
16 }
17
18
注意:
- enq返回的是当前节点的上一节点
现在我们假设当前有三个线程要获取重入锁(A,B,C)。当前线程A已获取到了锁,然后线程B和线程C都执行到了addWaiter 方法。
- 此时队列中没有节点,head= tail = null。假设B线程先执行就会进入enq方法,此时执行代码 compareAndSetHead(new Node())创建一个空节点(node)。此时 tail = head = node。然后进入下一次循环执行compareAndSetTail(该方法线程安全),将B线程包装的节点添加到尾部。这时候就是 head -> node -> threadB
- 如果线程C刚执行到addWaiter 方法,则满足 if (pred != null) 的分支。如果此时头结点还没有创建或者 线程B的节点此时刚插入成功,那么都会执行enq 方法,最终加入队列中(head -> node -> threadB->threadC)
线程B,C都加入后等待队列如下结构
下面的就是重头戏了,下面代码实现了节点线程的挂起和唤醒后重新获取锁的功能。
AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1 /**
2 * Acquires in exclusive uninterruptible mode for thread already in
3 * queue. Used by condition wait methods as well as acquire.
4 *
5 * @param node the node
6 * @param arg the acquire argument
7 * @return {@code true} if interrupted while waiting
8 */
9 final boolean acquireQueued(final Node node, int arg) {
10 boolean failed = true;
11 try {
12 //线程在挂起的过程中,是否调用了中断方法。
13 boolean interrupted = false;
14 for (;;) {
15 //获取上一个节点
16 final Node p = node.predecessor();
17 //如果上一个节点是头结点,则尝试获取锁
18 if (p == head && tryAcquire(arg)) {
19 setHead(node);
20 p.next = null; // help GC
21 failed = false;
22 return interrupted;
23 }
24 //获取失败则挂起当前线程。
25 if (shouldParkAfterFailedAcquire(p, node) &&
26 parkAndCheckInterrupt())
27 interrupted = true;
28 }
29 } finally {
30 if (failed)
31 //异常处理,取消节点
32 cancelAcquire(node);
33 }
34 }
35
36
以上面的节点为例。
节点B进入该方法,获取到上一节点 就是头结点(空节点)。
- 获取锁成功,将线程B的节点修改为头结点。线程获取锁后会继续执行 一些获取独占锁后的同步操作。
- 获取锁失败。则尝试挂起B线程
shouldParkAfterFailedAcquire 方法用于更新节点的waitStatus状态,如果当前节点的上一个节点的waitStatus状态是Node.SIGNAL,则说明当前节点的线程可以安全的挂起。(因为调用unlock方法时,只用 waitStatus <0的节点,节点线程才会被唤醒)
Node类中使用waitStatus 来控制节点的状态,定义了以下常量来标识不同状态:
- CANCELLED = 1;
- SIGNAL = -1; 标识当前节点的下一节点已经或者即将被挂起。当释放锁时需要唤醒改状态节点的下一节点。
- CONDITION = -2; condition队列节点初始状态
- PROPAGATE = -3;
- 0; AQS 等待队列节点的初始状态
AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 1 /**
2 * Checks and updates status for a node that failed to acquire.
3 * Returns true if thread should block. This is the main signal
4 * control in all acquire loops. Requires that pred == node.prev.
5 *
6 * @param pred node's predecessor holding status
7 * @param node the node
8 * @return {@code true} if thread should block
9 */
10 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
11
12 int ws = pred.waitStatus;
13 if (ws == Node.SIGNAL)
14 /*
15 * This node has already set status asking a release
16 * to signal it, so it can safely park.
17 */
18 return true;
19// Node 节点的状态之后 CANCELLED 大于0 ,所以此时的操作用于取消线程执行
20 if (ws > 0) {
21 /*
22 * Predecessor was cancelled. Skip over predecessors and
23 * indicate retry.
24 */
25 //暂时不涉及此处内容,只需要了解 头结点waitStatus永远不会大于零
26 do {
27 node.prev = pred = pred.prev;
28 } while (pred.waitStatus > 0);
29 pred.next = node;
30 } else {
31 /*
32 * waitStatus must be 0 or PROPAGATE. Indicate that we
33 * need a signal, but don't park yet. Caller will need to
34 * retry to make sure it cannot acquire before parking.
35 */
36 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
37 }
38 return false;
39 }
40
41
节点状态的修改步骤:
- 节点B进入该方法,此时它的上一节点为Head节点
waitStatus=0,会执行compareAndSetWaitStatus方法。修改成功后头节点状态
waitStatus=Node.SIGNAL,第一次循环方法返回
false, 然后执行上面的acquireQueued 循环步骤。
- 如果再次循环依旧没有获取锁,则会又会执行shouldParkAfterFailedAcquire方法。此时头节点
waitStatus=Node.SIGNAL 返回
true。则会继续执行parkAndCheckInterrupt 挂起当前线程。
节点C进入acquireQueued 方法会执行上面相同的操作将它的上一节点(threadB)的waitStatus修改为 SIGNAL。
修改后的队列状态值如下(假设此时unlock方法还未被调用,此时节点B和节点C都还在等待队列中):
shouldParkAfterFailedAcquire 返回true以后,就会执行parkAndCheckInterrupt操作来挂起当前线程,该功能是通过方法LockSupport.park(this)来实现的。
AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7 1 private final boolean parkAndCheckInterrupt() {
2 //挂起线程
3 LockSupport.park(this);
4 return Thread.interrupted();
5 }
6
7
这样该线程就被挂起了,一直等待直到其他线程的唤醒
假设线程A执行完成调用了unlock方法完全释放了锁。这样线程B就会被唤醒,就会继续执行 接下来的代码( return Thread.interrupted()) 。这样又会继续执行acquireQueued 循环操作。此时线程A已经释放锁,如果此时没有新线程去竞争锁的话就是无锁状态,并且节点B(线程B)的上一个节点是头结点,这样线程B 通过tryAcquire方法获取锁成功,将B节点设置为头结点。线程B由此获取到了锁终止队列中的挂起,继续执行获取锁后的代码。
线程中断
线程获取被唤醒之后,并不会立即获取锁,而是先获取线程的中断状态。
因为线程被唤醒可能是以下原因:
- 调用了LockSupport.unpark(thread)方法
- 锁释放后调用的LockSupport.unpark()方法
- 其他线程调用LockSupport.unpark()方法
- 其他线程 中断了该线程(调用interrupt方法)。
因为如果在线程挂起的过程中,其他线程B调用了 threadB.interrupt()方法,后将线程B唤醒并设置中断位为true。但是此时线程B还未获取到锁所以无法响应中断请求。当线程B成功获取到锁之后就可以响应之前的中断请求。
AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8 1 private final boolean parkAndCheckInterrupt() {
2 //挂起线程
3 LockSupport.park(this);
4 //判断线程是否中断过,同时清除中断标志位
5 return Thread.interrupted();
6 }
7
8
在获取锁成功后会将这个中断状态返回。因所释放导致的线程唤醒interrupted=false,因线程中断导致的线程唤醒interrupted=true。
因中断被唤醒的线程很有可能不能获取到锁,这样线程就又会被挂起,但是线程的中断状态已经被修改保存。所以当线程获取锁之后就会去响应中断。
1
2
3
4
5
6
7
8 1 if (p == head && tryAcquire(arg)) {
2 setHead(node);
3 p.next = null; // help GC
4 failed = false;
5 return interrupted;
6}
7
8
如果线程被interrupt()方法唤醒并成功获取到锁,那么就会去响应中断。
AbstractQueuedSynchronizer.java
1
2
3
4
5
6 1 public final void acquire(int arg) {
2 if (!tryAcquire(arg) &&
3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4 selfInterrupt();
5
6
响应中断的方式很简单就是又调用了一次interrupt()方法。当上述的代码执行完成之后 lock()代码执行完成,线程获取到了锁,然后继续上次退出的指令出执行。
AbstractQueuedSynchronizer.java
1
2
3
4
5 1 static void selfInterrupt() {
2 Thread.currentThread().interrupt();
3 }
4
5
对于线程中断的详细内容可以参考 博客:java线程随笔
节点取消
当执行acquireQueued 方法内部循环时,出现了异常
tryAcquire 方法抛出异常
1
2
3
4
5
6
7
8
9 1 else if (current == getExclusiveOwnerThread()) {
2 int nextc = c + acquires;
3 if (nextc < 0) // overflow
4 throw new Error("Maximum lock count exceeded");
5 setState(nextc);
6 return true;
7 }
8
9
predecessor 获取上一节点
1
2
3
4
5
6
7
8
9 1 final Node predecessor() throws NullPointerException {
2 Node p = prev;
3 if (p == null)
4 throw new NullPointerException();
5 else
6 return p;
7 }
8
9
其他异常 (InterruptedException,,,)
1
2
3 1 throw new InterruptedException();
2
3
那么就会执行取消节点操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 1 /**
2 * Cancels an ongoing attempt to acquire.
3 *
4 * @param node the node
5 */
6 private void cancelAcquire(Node node) {
7 //注意该节点是线程节点
8 //这也是为什么头节点的waitStatus不可能为 1(CANCELLED)
9
10 // Ignore if node doesn't exist
11 if (node == null)
12 return;
13 //被取消的节点是能再执行线程任务,所以置位null
14 node.thread = null;
15
16 // Skip cancelled predecessors
17 Node pred = node.prev;
18 //循环,跳过waitStatus=CANCELLED 的节点。
19 while (pred.waitStatus > 0)
20 node.prev = pred = pred.prev;
21
22 // predNext is the apparent node to unsplice. CASes below will
23 // fail if not, in which case, we lost race vs another cancel
24 // or signal, so no further action is necessary.
25 Node predNext = pred.next;
26
27 // Can use unconditional write instead of CAS here.
28 // After this atomic step, other Nodes can skip past us.
29 // Before, we are free of interference from other threads.
30 //设置节点状态为取消(1)
31 node.waitStatus = Node.CANCELLED;
32
33 // If we are the tail, remove ourselves.
34 if (node == tail && compareAndSetTail(node, pred)) {
35 compareAndSetNext(pred, predNext, null);
36 } else {
37 // If successor needs signal, try to set pred's next-link
38 // so it will get one. Otherwise wake it up to propagate.
39 int ws;
40 if (pred != head &&
41 ((ws = pred.waitStatus) == Node.SIGNAL ||
42 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
43 pred.thread != null) {
44 Node next = node.next;
45 if (next != null && next.waitStatus <= 0)
46 compareAndSetNext(pred, predNext, next);
47 } else {
48 unparkSuccessor(node);
49 }
50
51 node.next = node; // help GC
52 }
53 }
54
55
2.unlock
唤醒线程的功能就是在unlock中数实现的。相较于获取锁而言,释放锁的代码和逻辑更清晰。
下面还是以非公平锁展开。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1 /**
2 * Attempts to release this lock.
3 *
4 * <p>If the current thread is the holder of this lock then the hold
5 * count is decremented. If the hold count is now zero then the lock
6 * is released. If the current thread is not the holder of this
7 * lock then {@link IllegalMonitorStateException} is thrown.
8 *
9 * @throws IllegalMonitorStateException if the current thread does not
10 * hold this lock
11 */
12 public void unlock() {
13 sync.release(1);
14 }
15
16
release 方法也是一个模板方法。
主要是以下逻辑:
- 当重入了多次的时候,会将重入计数次数减一
- 释放锁成功(state==0)则会唤醒等待队列中头节点的下一节点中的线程
AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1 /**
2 * Releases in exclusive mode. Implemented by unblocking one or
3 * more threads if {@link #tryRelease} returns true.
4 * This method can be used to implement method {@link Lock#unlock}.
5 *
6 * @param arg the release argument. This value is conveyed to
7 * {@link #tryRelease} but is otherwise uninterpreted and
8 * can represent anything you like.
9 * @return the value returned from {@link #tryRelease}
10 */
11 public final boolean release(int arg) {
12 if (tryRelease(arg)) {
13 Node h = head;
14 if (h != null && h.waitStatus != 0)
15 unparkSuccessor(h);
16 return true;
17 }
18 return false;
19 }
20
21
Sync继承了AQS,修改锁状态state,当state==0是说明为无锁状态。state>1 :说明当前线程多次获取了锁(锁重入)
同tryQcquire 方法一样 tryRelease方法需要在子类中实现具体功能。
Sync.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1 protected final boolean tryRelease(int releases) {
2 int c = getState() - releases;
3 //不合法的状态,为获取锁的线程试图调用unlock方法会抛出该异常
4 if (Thread.currentThread() != getExclusiveOwnerThread())
5 throw new IllegalMonitorStateException();
6 boolean free = false;
7 if (c == 0) {
8 //锁计数为0,exclusiveThread = null
9 free = true;
10 setExclusiveOwnerThread(null);
11 }
12 setState(c);
13 return free;
14 }
15
16
正是因为 一次unlock操作只会 state–,所以lock和unlock必须
成对出现,而且unlock释放锁操作需要放在
finally代码块中执行。
锁释放成功后,就会唤醒等待队列中 头节点的下一节点中的线程。以上面等待队列为例,此时会唤醒节点B
AbstractQueuedSynchronizer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 1 private void unparkSuccessor(Node node) {
2 /*
3 * If status is negative (i.e., possibly needing signal) try
4 * to clear in anticipation of signalling. It is OK if this
5 * fails or if status is changed by waiting thread.
6 */
7
8 int ws = node.waitStatus;
9 if (ws < 0)
10 compareAndSetWaitStatus(node, ws, 0);
11
12 /*
13 * Thread to unpark is held in successor, which is normally
14 * just the next node. But if cancelled or apparently null,
15 * traverse backwards from tail to find the actual
16 * non-cancelled successor.
17 */
18
19 Node s = node.next;
20 //取消操作,被取消的节点将不能获取锁。
21 if (s == null || s.waitStatus > 0) {
22 s = null;
23 for (Node t = tail; t != null && t != node; t = t.prev)
24 if (t.waitStatus <= 0)
25 s = t;
26 }
27 //唤醒线程
28 if (s != null)
29 LockSupport.unpark(s.thread);
30 }
31
32
看上面的操作好像waitStatus即便不修改为 SIGNAL(默认为0)。此处代码依然可以走通,为什么要多次一举呢。 AQS是模板工具类,应当具有良好的扩展性、通用性和可读性。所以waitStatus 的状态才有很多个用于明确节点所处状态。在ReentrantLock等待队列中SIGNAL状态没有起到什么实质作用,但是在Condition队列,CountDownLatch 中就真正用到了 SIGNAL状态。
总结
注意:
关于一些术语的问题
有些术语用的不是特别准确。比如线程挂起的问题,我其实不太清楚LockSupport.park()方法调用后具体应该用什么术语描述比较准确。根据java文档中给出的注释来说,说线程等待应该更准确些,但是语义上好像又没有那么通畅。所以我用的是线程挂起
1
2
3
4
5
6
7
8
9
10
11
12
13 1 /**
2 * Thread state for a waiting thread.
3 * A thread is in the waiting state due to calling one of the
4 * following methods:
5 * <ul>
6 * <li>{@link Object#wait() Object.wait} with no timeout</li>
7 * <li>{@link #join() Thread.join} with no timeout</li>
8 * <li>{@link LockSupport#park() LockSupport.park}</li>
9 * </ul>
10 */
11 WAITING
12
13
上面介绍的ReentrantLock 获取锁的流程。如果是考虑到Condition队列就要复杂些了。
小结:
- lock、unlock方法必须成对出现
- unlock 方法应当放在 finally代码块中,以保证锁一定会释放。
下面重新回顾获取锁的流程:
- 查看当前锁标识,如果是无锁状态,则通过CAS操作原子获取。
- 当前已经是有锁状态
- 获取锁的线程和当前线程是同一个,则重入 state++
- 如果不同则将线程包装为Node节点加入到队列中,并将符合条件的线程挂起。
不是时序图的时序图
释放锁步骤:
- 检查锁状态和拥有锁的线程。
- 重入则将state–,否则唤醒等待队列中的下一节点中的线程。
AQS
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues
AbstractQueuedSynchronizer(AQS) 是一个基于先进先出队列(FIFO)的同步框架,框架用于实现 阻塞锁和一些相关的同步器。
AQS会维护一个同步状态(state),在不同的实现类中有不同的含义。 JDK 6中AQS被广泛使用,基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。
AQS中有几个重要的属性
- Thread exclusiveOwnerThread:保存当前获取锁的线程。该属性继承自AbstractOwnableSynchronizer
- int state : 同步状态。不同实现类中state的含义也是不同的。
- CountDownLatch: 计数器
- ReentrantLock: 重入锁,也可以说是锁计数器
- Node tail: 指向队列尾部节点
- Node head:指向队列的头部节点
Node
AQS中使用内部类Node来维护先进先出的等待队列。
构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1 //用于创建头节点
2 Node() { // Used to establish initial head or SHARED marker
3 }
4
5 // 调用addWaiter方法用于添加一个节点
6 Node(Thread thread, Node mode) { // Used by addWaiter
7 this.nextWaiter = mode;
8 this.thread = thread;
9 }
10 //构造Condition 队列中的节点
11 Node(Thread thread, int waitStatus) { // Used by Condition
12 this.waitStatus = waitStatus;
13 this.thread = thread;
14 }
15
16
waitStatus
waitStatus 设置节点的状态 用来控制节点的挂起,唤醒和移除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 1 /** waitStatus value to indicate thread has cancelled */
2 //节点取消
3 static final int CANCELLED = 1;
4 /** waitStatus value to indicate successor's thread needs unparking */
5 //表示当前节点下一节点可以安全挂起
6 static final int SIGNAL = -1;
7 /** waitStatus value to indicate thread is waiting on condition */
8 //condition 队列中的初始状态
9 static final int CONDITION = -2;
10 /**
11 * waitStatus value to indicate the next acquireShared should unconditionally propagate
12 */
13 //
14 static final int PROPAGATE = -3;
15
16 /**
17 * Status field, taking on only the values:
18 * SIGNAL: The successor of this node is (or will soon be)
19 * blocked (via park), so the current node must
20 * unpark its successor when it releases or
21 * cancels. To avoid races, acquire methods must
22 * first indicate they need a signal,
23 * then retry the atomic acquire, and then,
24 * on failure, block.
25 * CANCELLED: This node is cancelled due to timeout or interrupt.
26 * Nodes never leave this state. In particular,
27 * a thread with cancelled node never again blocks.
28 * CONDITION: This node is currently on a condition queue.
29 * It will not be used as a sync queue node
30 * until transferred, at which time the status
31 * will be set to 0. (Use of this value here has
32 * nothing to do with the other uses of the
33 * field, but simplifies mechanics.)
34 * PROPAGATE: A releaseShared should be propagated to other
35 * nodes. This is set (for head node only) in
36 * doReleaseShared to ensure propagation
37 * continues, even if other operations have
38 * since intervened.
39 * 0: None of the above
40 *
41 * The values are arranged numerically to simplify use.
42 * Non-negative values mean that a node doesn't need to
43 * signal. So, most code doesn't need to check for particular
44 * values, just for sign.
45 *
46 * The field is initialized to 0 for normal sync nodes, and
47 * CONDITION for condition nodes. It is modified using CAS
48 * (or when possible, unconditional volatile writes).
49 */
50 volatile int waitStatus;
51
52
其他
thread: 保存操作的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 1 /** Marker to indicate a node is waiting in shared mode */
2 //共享锁模式
3 static final Node SHARED = new Node();
4 /** Marker to indicate a node is waiting in exclusive mode */
5 //独占锁模式
6 static final Node EXCLUSIVE = null;
7
8
9 /**
10 * Link to predecessor node that current node/thread relies on
11 * for checking waitStatus. Assigned during enqueuing, and nulled
12 * out (for sake of GC) only upon dequeuing. Also, upon
13 * cancellation of a predecessor, we short-circuit while
14 * finding a non-cancelled one, which will always exist
15 * because the head node is never cancelled: A node becomes
16 * head only as a result of successful acquire. A
17 * cancelled thread never succeeds in acquiring, and a thread only
18 * cancels itself, not any other node.
19 */
20 volatile Node prev;
21
22 /**
23 * Link to the successor node that the current node/thread
24 * unparks upon release. Assigned during enqueuing, adjusted
25 * when bypassing cancelled predecessors, and nulled out (for
26 * sake of GC) when dequeued. The enq operation does not
27 * assign next field of a predecessor until after attachment,
28 * so seeing a null next field does not necessarily mean that
29 * node is at end of queue. However, if a next field appears
30 * to be null, we can scan prev's from the tail to
31 * double-check. The next field of cancelled nodes is set to
32 * point to the node itself instead of null, to make life
33 * easier for isOnSyncQueue.
34 */
35 volatile Node next;
36
37 /**
38 * The thread that enqueued this node. Initialized on construction and nulled out after use.
39 */
40 //进入此节点队列的线程
41 volatile Thread thread;
42
43 /**
44 * Link to next node waiting on condition, or the special
45 * value SHARED. Because condition queues are accessed only
46 * when holding in exclusive mode, we just need a simple
47 * linked queue to hold nodes while they are waiting on
48 * conditions. They are then transferred to the queue to
49 * re-acquire. And because conditions can only be exclusive,
50 * we save a field by using special value to indicate shared
51 * mode.
52 */
53 //Condition 队列用于指向下一节点
54 Node nextWaiter;
55
56 /**
57 * Returns true if node is waiting in shared mode.
58 */
59 //是否是共享模式(共享锁)
60 final boolean isShared() {
61 return nextWaiter == SHARED;
62 }
63
64 /**
65 * Returns previous node, or throws NullPointerException if null.
66 * Use when predecessor cannot be null. The null check could
67 * be elided, but is present to help the VM.
68 *
69 * @return the predecessor of this node
70 */
71 //获取当前节点的上一节点
72 final Node predecessor() throws NullPointerException {
73 Node p = prev;
74 if (p == null)
75 throw new NullPointerException();
76 else
77 return p;
78 }
79
80 }
81
82
队列
初始状态,head=tail = null
当有线程进入队列时,会先创建头节点