JAVA多线程与高并发(四)[LockSupport,AQS解读]

释放双眼,带上耳机,听听看~!

link-JAVA多线程与高并发系列[前言,大纲,目录]

LockSupport

关键方法:

  1. park(): 当前线程阻塞(如果当前线程没有被unpark)
  2. unpark(Thread thread):如果入参的线程正在park(),则让它恢复运行;否则,就保证下一次该线程park()时不会阻塞.

unpark(t)必须在线程t启动(start)后才有效果.

可以这么理解:unpark相当于是一张通行证,park是一道关卡;先拿到通行证了,遇到关卡也不会被阻拦;没有通行证时遇到关卡会被阻拦,直到有通行证了才能继续走.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1public class T13_TestLockSupport {
2    public static void main(String[] args) {
3        Thread t = new Thread(()->{
4            for (int i = 0; i < 10; i++) {
5                System.out.println(i + "|" + Instant.now());
6                if(i == 5) {
7                    LockSupport.park();
8                }
9                try {
10                    TimeUnit.SECONDS.sleep(1);
11                } catch (InterruptedException e) {
12                    e.printStackTrace();
13                }
14            }
15        });
16
17        t.start();
18
19        // 下面相当于提前unpark了,因为线程t 5秒后才会park
20        // 必须在t.start()后才有效果
21//        LockSupport.unpark(t);
22
23        // 下面是先park后unpark,因为8秒后会unpark线程t
24        try {
25            TimeUnit.SECONDS.sleep(8);
26        } catch (InterruptedException e) {
27            e.printStackTrace();
28        }
29        System.out.println("after 8 senconds! | " + Instant.now());
30        LockSupport.unpark(t);
31    }
32}
33
34

AQS

简介

AQS:AbstractQueuedSynchronizer
JAVA的几乎所有锁(除synchronized,LockSupport外)都是用AQS实现的,比如ReentrantLock,CountdownLatch,CyclicBarrier,Semaphore等;

可以说AQS的底层就是CAS+volatile state:

  1. 属性private volatile int state;同步状态

这个state在不同的子类实现中有不同的含义,但都是通过这个state来控制"同步"的行为.这个state通过CAS来改变.

  1. AQS维护着Node的双向链表作为等待队列,这个等待队列是"CLH"(Craig, Landin, and Hagersten)队列的变体

Node是AQS的内部类,Node里面的属性装着线程volatile Thread thread,和一个状态volatile int waitStatus.
各个线程去争抢的过程其实就是这个CLH队列的入列和出列,这个过程的关键操作都是通过CAS来实现的

通过ReentrantLock的lock()方法,去阅读理解AQS

AbstractQueuedSynchronizer的state属性对于ReentrantLock的意义:
state表示线程持有锁的次数:
state=0表示没有线程持有锁;
state>0表示有一个线程持有锁,重入了state次.

起初我是看的JDK8的源码,但是它和JDK13上有一些细节上的区别,JDK13更好理解一些.我还记下了JDK8和JDK11在AQS实现上的区别,后来想想没啥意义,还容易记忆混乱,就干脆只记录JDK13的实现方式了.既然学习嘛,那就学新的~

JDK8和新版的JDK(以11为例)实现的细节上有些不同,整体思路是一样的:
CAS修改state和Node节点入队列时,JDK8通过unsafe获取变量的内存地址偏移量,和JDK11通过变量句柄指向变量的地址.
按我的理解,这俩效果是一样的,JDK11的效率应该更高一些
以state为例:
JDK8用到了stateOffset.(为了允许将来的优化没有用AtomicInteger),这个stateOffset可以理解为state的"指针"(地址),可以通过stateOffsetCAS改变state


1
2
3
4
1long stateOffset=unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
2unsafe.compareAndSwapInt(this, stateOffset, expect, update);
3
4

JDK11用到了VarHandle,可以直接CAS改变state


1
2
3
4
5
1VarHandle STATE=MethodHandles.lookup()
2.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
3STATE.compareAndSet(this, expect, update)
4
5

Unsafe是实现CAS的核心类,Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。Unsafe类提供了硬件级别的原子操作。

lock()的大致流程如下:

下面简称AbstractQueuedSynchronizer为AQS,AbstractOwnableSynchronizer为AOS

ReentrantLock

Sync

AQS

LockSupport

调用acquire(1)

调用acquire(1)

tryAcquire,再CAS获取一次锁

如果获取到了锁,就直接返回

如果没获取到锁,就把当前线程加入等待队列addWaiter

加入等待队列后再CAS尝试从队列拿到当前线程acquireQueued

如果CAS了几次仍拿不到当前线程,就park

LockSupport被唤醒后返回

selfInterrupt,唤醒线程,也就是拿到了锁

线程lock()结束

ReentrantLock

Sync

AQS

LockSupport

尝试阅读ReentrantLock的lock()方法

下面尝试着通过debugReentrantLock.lock()方法,去阅读源码.

  1. 首先随便写个方法,在lock.lock()处打断点,然后debug运行,追踪下去


1
2
3
4
5
6
7
8
1   public static void main(String[] args) {
2        ReentrantLock lock = new ReentrantLock();
3        lock.lock();
4        System.out.println("hahaha");
5        lock.unlock();
6    }
7
8
  1. 看到ReentrantLock的lock()方法

ReentrantLock.lock()方法注释的大致意思:
这个方法用于获得该锁;
如果该锁没有被线程持有,那么把锁的hold count设为1后直接返回;
如果当前线程已经持有该锁,则hold count加一,然后返回;
如果该锁被其他线程持有,那么当前线程将"阻塞",直到获得锁,然后把hold count设为1后返回
这个lock hold count应该就是AQS的state


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1    /**
2     * Acquires the lock.
3     *
4     * <p>Acquires the lock if it is not held by another thread and returns
5     * immediately, setting the lock hold count to one.
6     *
7     * <p>If the current thread already holds the lock then the hold
8     * count is incremented by one and the method returns immediately.
9     *
10     * <p>If the lock is held by another thread then the
11     * current thread becomes disabled for thread scheduling
12     * purposes and lies dormant until the lock has been acquired,
13     * at which time the lock hold count is set to one.
14     */
15    public void lock() {
16        sync.acquire(1);
17    }
18
19
  1. 看到ReentrantLock.lock()其实是调用了sync.acquire(1),sync变量是Sync类型.Sync是何许人也?

Sync,NonfairSync,FairSync都是ReentrantLock的内部类
Sync 继承了 AbstractQueuedSynchronizer
NonfairSync(非公平锁的实现) 和 FairSync(公平锁的实现) 都继承了 Sync
Sync的acquire方法继承自AQS,自己没有实现:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1    /**
2     * Acquires in exclusive mode, ignoring interrupts.  Implemented
3     * by invoking at least once {@link #tryAcquire},
4     * returning on success.  Otherwise the thread is queued, possibly
5     * repeatedly blocking and unblocking, invoking {@link
6     * #tryAcquire} until success.  This method can be used
7     * to implement method {@link Lock#lock}.
8     *
9     * @param arg the acquire argument.  This value is conveyed to
10     *        {@link #tryAcquire} but is otherwise uninterpreted and
11     *        can represent anything you like.
12     */
13    public final void acquire(int arg) {
14        if (!tryAcquire(arg) &&
15            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
16            selfInterrupt();
17    }
18
19

3.1 AQS的acquire中的tryAcquire是由子类实现的,
这个tryAcquire是NonfairSync实现的,调用了Sync的nonfairTryAcquire方法,因为ReentrantLock是默认是非公平锁

下面代码


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1/**
2         * Performs non-fair tryLock.  tryAcquire is implemented in
3         * subclasses, but both need nonfair try for trylock method.
4         */
5        @ReservedStackAccess
6        final boolean nonfairTryAcquire(int acquires) {
7            final Thread current = Thread.currentThread();
8            /*
9            getState()调用了AQS的方法,直接返回了state属性.
10          如果state等于0,则CAS设置stateOffset为acquires;
11          如果state!=0,则判断拥有锁的线程是不是当前线程,如果不是则返回false;如果是当前线程已经持有锁,则更新state值为state+acquire(可重入)
12          可见ReentrantLock把state作为是否锁定的判断标准,如果state=0则表示没有线程持有锁.如果state值>0,则表示已有线程持有了锁,并且持有了state次(可重入).
13          */
14            int c = getState();
15            if (c == 0) {
16              // compareAndSetState是AQS的方法,CAS修改state值
17                if (compareAndSetState(0, acquires)) {
18                  //AQS继承了AbstractOwnableSynchronizer
19                  //设置状态后,则进入AbstractOwnableSynchronizer的setExclusiveOwnerThread方法.表示已经拿到锁.
20                    setExclusiveOwnerThread(current);
21                    return true;
22                }
23            }
24            // 没拿到锁,则判断是否锁的持有者就是当前线程
25            else if (current == getExclusiveOwnerThread()) {
26                  // 当前线程重入
27                int nextc = c + acquires;
28                if (nextc < 0) // overflow
29                    throw new Error("Maximum lock count exceeded");
30                setState(nextc);
31                return true;
32            }
33            return false;
34        }
35
36

3.2 如果tryAcquire没有成功获得锁,则把当前线程加入等待队列


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1    /**
2     * Creates and enqueues node for current thread and given mode.
3     *
4     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
5     * @return the new node
6     */
7    private Node addWaiter(Node mode) {
8        Node node = new Node(mode);
9       // 自旋把前节点加入等待队列
10        for (;;) {
11            Node oldTail = tail;
12            if (oldTail != null) {
13                node.setPrevRelaxed(oldTail);
14                if (compareAndSetTail(oldTail, node)) {
15                    oldTail.next = node;
16                    return node;
17                }
18            } else {
19                initializeSyncQueue();
20            }
21        }
22    }
23
24

然后调用acquireQueued方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1    /**
2     * Acquires in exclusive uninterruptible mode for thread already in
3     * queue. Used by condition wait methods as well as acquire.
4     *
5     * @param node the node
6     * @param arg the acquire argument
7     * @return {@code true} if interrupted while waiting
8     */
9    final boolean acquireQueued(final Node node, int arg) {
10        boolean interrupted = false;
11        try {
12            for (;;) {
13                final Node p = node.predecessor();
14                // 如果上一个节点是头节点,则说明下一个就该当前线程了
15                // 那就试着去拿一下锁,tryAcquire仍是调用的子类(Sync)的方法
16                if (p == head && tryAcquire(arg)) {
17                    setHead(node);
18                    p.next = null; // help GC
19                    return interrupted;
20                }
21                // 判断是否需要park,需要的话就调用LockSupport的park阻塞线程
22                if (shouldParkAfterFailedAcquire(p, node))
23                    interrupted |= parkAndCheckInterrupt();
24            }
25        } catch (Throwable t) {
26            cancelAcquire(node);
27            if (interrupted)
28                selfInterrupt();
29            throw t;
30        }
31    }
32
33

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google AdSense 全面解析(申请+操作+作弊+忠告)

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索