link-JAVA多线程与高并发系列[前言,大纲,目录]
LockSupport
关键方法:
- park(): 当前线程阻塞(如果当前线程没有被unpark)
- unpark(Thread thread):如果入参的线程正在park(),则让它恢复运行;否则,就保证下一次该线程park()时不会阻塞.
unpark(t)必须在线程t启动(start)后才有效果.
可以这么理解:unpark相当于是一张通行证,park是一道关卡;先拿到通行证了,遇到关卡也不会被阻拦;没有通行证时遇到关卡会被阻拦,直到有通行证了才能继续走.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 1public class T13_TestLockSupport {
2 public static void main(String[] args) {
3 Thread t = new Thread(()->{
4 for (int i = 0; i < 10; i++) {
5 System.out.println(i + "|" + Instant.now());
6 if(i == 5) {
7 LockSupport.park();
8 }
9 try {
10 TimeUnit.SECONDS.sleep(1);
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 }
14 }
15 });
16
17 t.start();
18
19 // 下面相当于提前unpark了,因为线程t 5秒后才会park
20 // 必须在t.start()后才有效果
21// LockSupport.unpark(t);
22
23 // 下面是先park后unpark,因为8秒后会unpark线程t
24 try {
25 TimeUnit.SECONDS.sleep(8);
26 } catch (InterruptedException e) {
27 e.printStackTrace();
28 }
29 System.out.println("after 8 senconds! | " + Instant.now());
30 LockSupport.unpark(t);
31 }
32}
33
34
AQS
简介
AQS:AbstractQueuedSynchronizer
JAVA的几乎所有锁(除synchronized,LockSupport外)都是用AQS实现的,比如ReentrantLock,CountdownLatch,CyclicBarrier,Semaphore等;
可以说AQS的底层就是CAS+volatile state:
- 属性private volatile int state;同步状态
这个state在不同的子类实现中有不同的含义,但都是通过这个state来控制"同步"的行为.这个state通过CAS来改变.
- AQS维护着Node的双向链表作为等待队列,这个等待队列是"CLH"(Craig, Landin, and Hagersten)队列的变体
Node是AQS的内部类,Node里面的属性装着线程volatile Thread thread,和一个状态volatile int waitStatus.
各个线程去争抢的过程其实就是这个CLH队列的入列和出列,这个过程的关键操作都是通过CAS来实现的
通过ReentrantLock的lock()方法,去阅读理解AQS
AbstractQueuedSynchronizer的state属性对于ReentrantLock的意义:
state表示线程持有锁的次数:
state=0表示没有线程持有锁;
state>0表示有一个线程持有锁,重入了state次.
起初我是看的JDK8的源码,但是它和JDK13上有一些细节上的区别,JDK13更好理解一些.我还记下了JDK8和JDK11在AQS实现上的区别,后来想想没啥意义,还容易记忆混乱,就干脆只记录JDK13的实现方式了.既然学习嘛,那就学新的~
JDK8和新版的JDK(以11为例)实现的细节上有些不同,整体思路是一样的:
CAS修改state和Node节点入队列时,JDK8通过unsafe获取变量的内存地址偏移量,和JDK11通过变量句柄指向变量的地址.
按我的理解,这俩效果是一样的,JDK11的效率应该更高一些
以state为例:
JDK8用到了stateOffset.(为了允许将来的优化没有用AtomicInteger),这个stateOffset可以理解为state的"指针"(地址),可以通过stateOffsetCAS改变state
1
2
3
4 1long stateOffset=unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
2unsafe.compareAndSwapInt(this, stateOffset, expect, update);
3
4
JDK11用到了VarHandle,可以直接CAS改变state
1
2
3
4
5 1VarHandle STATE=MethodHandles.lookup()
2.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
3STATE.compareAndSet(this, expect, update)
4
5
Unsafe是实现CAS的核心类,Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。Unsafe类提供了硬件级别的原子操作。
lock()的大致流程如下:
下面简称AbstractQueuedSynchronizer为AQS,AbstractOwnableSynchronizer为AOS
ReentrantLock
Sync
AQS
LockSupport
调用acquire(1)
调用acquire(1)
tryAcquire,再CAS获取一次锁
如果获取到了锁,就直接返回
如果没获取到锁,就把当前线程加入等待队列addWaiter
加入等待队列后再CAS尝试从队列拿到当前线程acquireQueued
如果CAS了几次仍拿不到当前线程,就park
LockSupport被唤醒后返回
selfInterrupt,唤醒线程,也就是拿到了锁
线程lock()结束
ReentrantLock
Sync
AQS
LockSupport
尝试阅读ReentrantLock的lock()方法
下面尝试着通过debugReentrantLock.lock()方法,去阅读源码.
-
首先随便写个方法,在lock.lock()处打断点,然后debug运行,追踪下去
1
2
3
4
5
6
7
8 1 public static void main(String[] args) {
2 ReentrantLock lock = new ReentrantLock();
3 lock.lock();
4 System.out.println("hahaha");
5 lock.unlock();
6 }
7
8
- 看到ReentrantLock的lock()方法
ReentrantLock.lock()方法注释的大致意思:
这个方法用于获得该锁;
如果该锁没有被线程持有,那么把锁的hold count设为1后直接返回;
如果当前线程已经持有该锁,则hold count加一,然后返回;
如果该锁被其他线程持有,那么当前线程将"阻塞",直到获得锁,然后把hold count设为1后返回
这个lock hold count应该就是AQS的state
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1 /**
2 * Acquires the lock.
3 *
4 * <p>Acquires the lock if it is not held by another thread and returns
5 * immediately, setting the lock hold count to one.
6 *
7 * <p>If the current thread already holds the lock then the hold
8 * count is incremented by one and the method returns immediately.
9 *
10 * <p>If the lock is held by another thread then the
11 * current thread becomes disabled for thread scheduling
12 * purposes and lies dormant until the lock has been acquired,
13 * at which time the lock hold count is set to one.
14 */
15 public void lock() {
16 sync.acquire(1);
17 }
18
19
- 看到ReentrantLock.lock()其实是调用了sync.acquire(1),sync变量是Sync类型.Sync是何许人也?
Sync,NonfairSync,FairSync都是ReentrantLock的内部类
Sync 继承了 AbstractQueuedSynchronizer
NonfairSync(非公平锁的实现) 和 FairSync(公平锁的实现) 都继承了 Sync
Sync的acquire方法继承自AQS,自己没有实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1 /**
2 * Acquires in exclusive mode, ignoring interrupts. Implemented
3 * by invoking at least once {@link #tryAcquire},
4 * returning on success. Otherwise the thread is queued, possibly
5 * repeatedly blocking and unblocking, invoking {@link
6 * #tryAcquire} until success. This method can be used
7 * to implement method {@link Lock#lock}.
8 *
9 * @param arg the acquire argument. This value is conveyed to
10 * {@link #tryAcquire} but is otherwise uninterpreted and
11 * can represent anything you like.
12 */
13 public final void acquire(int arg) {
14 if (!tryAcquire(arg) &&
15 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
16 selfInterrupt();
17 }
18
19
3.1 AQS的acquire中的tryAcquire是由子类实现的,
这个tryAcquire是NonfairSync实现的,调用了Sync的nonfairTryAcquire方法,因为ReentrantLock是默认是非公平锁
下面代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1/**
2 * Performs non-fair tryLock. tryAcquire is implemented in
3 * subclasses, but both need nonfair try for trylock method.
4 */
5 @ReservedStackAccess
6 final boolean nonfairTryAcquire(int acquires) {
7 final Thread current = Thread.currentThread();
8 /*
9 getState()调用了AQS的方法,直接返回了state属性.
10 如果state等于0,则CAS设置stateOffset为acquires;
11 如果state!=0,则判断拥有锁的线程是不是当前线程,如果不是则返回false;如果是当前线程已经持有锁,则更新state值为state+acquire(可重入)
12 可见ReentrantLock把state作为是否锁定的判断标准,如果state=0则表示没有线程持有锁.如果state值>0,则表示已有线程持有了锁,并且持有了state次(可重入).
13 */
14 int c = getState();
15 if (c == 0) {
16 // compareAndSetState是AQS的方法,CAS修改state值
17 if (compareAndSetState(0, acquires)) {
18 //AQS继承了AbstractOwnableSynchronizer
19 //设置状态后,则进入AbstractOwnableSynchronizer的setExclusiveOwnerThread方法.表示已经拿到锁.
20 setExclusiveOwnerThread(current);
21 return true;
22 }
23 }
24 // 没拿到锁,则判断是否锁的持有者就是当前线程
25 else if (current == getExclusiveOwnerThread()) {
26 // 当前线程重入
27 int nextc = c + acquires;
28 if (nextc < 0) // overflow
29 throw new Error("Maximum lock count exceeded");
30 setState(nextc);
31 return true;
32 }
33 return false;
34 }
35
36
3.2 如果tryAcquire没有成功获得锁,则把当前线程加入等待队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1 /**
2 * Creates and enqueues node for current thread and given mode.
3 *
4 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
5 * @return the new node
6 */
7 private Node addWaiter(Node mode) {
8 Node node = new Node(mode);
9 // 自旋把前节点加入等待队列
10 for (;;) {
11 Node oldTail = tail;
12 if (oldTail != null) {
13 node.setPrevRelaxed(oldTail);
14 if (compareAndSetTail(oldTail, node)) {
15 oldTail.next = node;
16 return node;
17 }
18 } else {
19 initializeSyncQueue();
20 }
21 }
22 }
23
24
然后调用acquireQueued方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1 /**
2 * Acquires in exclusive uninterruptible mode for thread already in
3 * queue. Used by condition wait methods as well as acquire.
4 *
5 * @param node the node
6 * @param arg the acquire argument
7 * @return {@code true} if interrupted while waiting
8 */
9 final boolean acquireQueued(final Node node, int arg) {
10 boolean interrupted = false;
11 try {
12 for (;;) {
13 final Node p = node.predecessor();
14 // 如果上一个节点是头节点,则说明下一个就该当前线程了
15 // 那就试着去拿一下锁,tryAcquire仍是调用的子类(Sync)的方法
16 if (p == head && tryAcquire(arg)) {
17 setHead(node);
18 p.next = null; // help GC
19 return interrupted;
20 }
21 // 判断是否需要park,需要的话就调用LockSupport的park阻塞线程
22 if (shouldParkAfterFailedAcquire(p, node))
23 interrupted |= parkAndCheckInterrupt();
24 }
25 } catch (Throwable t) {
26 cancelAcquire(node);
27 if (interrupted)
28 selfInterrupt();
29 throw t;
30 }
31 }
32
33