Java高并发(六):ReentrantLock

释放双眼,带上耳机,听听看~!

目录

  • Lock

  • ReentrantLock
    * ReentrantReadWriteLock
    * StampedLock

    
    
    1
    2
    1  * ReentrantLock
    2
  • ReentrantLock实现原理

  • AQS
    * AQS内部实现
    * 线程同步
    * 锁的释放

    
    
    1
    2
    1  * Condition
    2
  • await
    * signal
    * Condition 总结

Lock

  • Java中的Lock是J.U.C中的一个核心组件,J.U.C(java.util.concurrent)是在并发编程中常用的工具类,里面包含了很多在并发场景中使用的组件,比如线程池、阻塞队列、计时器、同步器、并发集合等。
  • 在J.U.C中的大部分组件中都用到了Lock,在Lock出现之前,只能使用synchronized来处理并发编程中的线程安全问题。但是synchronized不是特别灵活,它是基于jvm层面的锁,锁的释放由jvm自行完成,这样虽然方便了,但是增加了不可控的问题,不够灵活,锁的升级也很难人为控制,另外,synchronized无法实现公平锁。为了弥补synchronized的短处,Java5以后出现了Lock。
  • Lock其实本质上是一个接口,它定义了锁的获取和释放的抽象方法,形成了锁的一个规范,意味着可以实现不同的锁,比如ReentrantLock、ReentrantReadWriteLock、StampedLock。

ReentrantLock

  • 顾名思义,ReentrantLock就是可重入锁(synchronized也是可重入锁),它是唯一一个实现了Lock接口的类,重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数,释放锁时计数器减一,计数器为0则表示锁没有被线程持有。

ReentrantReadWriteLock

  • 重入读写锁,它实现了 ReadWriteLock 接口,在这个类中维护了两个锁,一个是 ReadLock,一个是 WriteLock,他们都分别实现了 Lock接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是: 读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。

StampedLock

  • stampedLock 是 JDK8 引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock 是一种乐观的读策略,使得乐观锁完全不会阻塞写线程。

ReentrantLock

  • 可重入锁,设计的目的就是为了避免线程的死锁。

  • ReentrantLock的简单实用方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1public class StudyReentrantLock {
2    static Lock lock = new ReentrantLock();
3    public static void main(String[] args) {
4        lock.lock();
5        try {
6            TimeUnit.SECONDS.sleep(1);
7        } catch (Exception e) {
8            e.printStackTrace();
9        } finally {
10            lock.unlock();
11        }
12
13    }
14}
15
16

ReentrantLock实现原理

  • 锁的基本原理是,将多线程并行任务通过某一种机制实现线程的串行执行,从而达到线程安全性的目的。在 synchronized 中,我们分析了偏向锁、轻量级锁、乐观锁。基于乐观锁以及自旋锁来优化了 synchronized 的加锁开销,同时在重量级锁阶段,通过线程的阻塞以及唤醒来达到线程竞争和同步的目的。那么在 ReentrantLock 中,也一定会存在这样的需要去解决的问题。就是在多线程竞争重入锁时,竞争失败的线程是如何实现阻塞以及被唤醒的呢?

AQS

  • AQS(AbstractQueuedSynchronizer),是一个同步队列,它是一个同步工具也是 Lock 用来实现线程同步的核心组件。
  • 从使用层面来说,AQS 的功能分为两种:独占和共享。独占锁,每次只能有一个线程持有锁,比如ReentrantLock 就是以独占方式实现的互斥锁;共 享 锁 ,允 许多个线程同时获取锁 ,并发访问共享资 源 ,比 如ReentrantReadWriteLock。

AQS内部实现

  • AQS 队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个 Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 ASQ 队列中去;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。下面是源码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
1static final class Node {
2        static final Node SHARED = new Node();
3        static final Node EXCLUSIVE = null;
4        static final int CANCELLED =  1;
5        static final int SIGNAL    = -1;
6        static final int CONDITION = -2;
7        static final int PROPAGATE = -3;
8        volatile int waitStatus;
9        volatile Node next;
10        volatile Thread thread;
11        Node nextWaiter;//存储在condition中的后继节点
12      //是否是共享锁
13        final boolean isShared() {
14            return nextWaiter == SHARED;
15        }
16        final Node predecessor() throws NullPointerException {
17            Node p = prev;
18            if (p == null)
19                throw new NullPointerException();
20            else
21                return p;
22        }
23
24        Node() {    // Used to establish initial head or SHARED marker
25        }
26      //把线程封装为一个Node节点,添加到等待队列
27        Node(Thread thread, Node mode) {     // Used by addWaiter
28            this.nextWaiter = mode;
29            this.thread = thread;
30        }
31      //把线程封装为一个Node节点,放入condition队列
32        Node(Thread thread, int waitStatus) { // Used by Condition
33            this.waitStatus = waitStatus;
34            this.thread = thread;
35        }
36    }
37
38
  • 其中,设置 head 节点不需要用 CAS,原因是设置 head 节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要 CAS 保证,只需要把 head节点设置为原首节点的后继节点,并且断开原head节点的next引用即可。

线程同步

  • 首先找到ReentrantLock 获取锁的入口,lock()方法


1
2
3
4
5
1public void lock() {
2        sync.lock();
3    }
4
5
  • 接下来看sync

Java高并发(六):ReentrantLock

  • sync是一个静态的抽象内部类,继承了AbstractQueuedSynchronizer来实现重入锁的逻辑,我们前面说过 AQS 是一个同步队列,它能够实现线程的阻塞以及唤醒,但它并不具备业务功能,所以在不同的同步场景中,会继承 AQS 来实现对应场景的功能。

  • Sync 有两个具体的实现类,分别是:NofairSync,表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁;FairSync,表示所有线程严格按照 FIFO 来获取锁。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1/**
2     * Sync object for non-fair locks
3     */
4    static final class NonfairSync extends Sync {
5        private static final long serialVersionUID = 7316153563782823691L;
6
7        /**
8         * Performs lock.  Try immediate barge, backing up to normal
9         * acquire on failure.
10         */
11        final void lock() {
12            if (compareAndSetState(0, 1))
13                setExclusiveOwnerThread(Thread.currentThread());
14            else
15                acquire(1);
16        }
17
18        protected final boolean tryAcquire(int acquires) {
19            return nonfairTryAcquire(acquires);
20        }
21    }
22
23

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1/**
2     * Sync object for fair locks
3     */
4    static final class FairSync extends Sync {
5        private static final long serialVersionUID = -3000897897090466540L;
6
7        final void lock() {
8            acquire(1);
9        }
10
11        /**
12         * Fair version of tryAcquire.  Don't grant access unless
13         * recursive call or no waiters or is first.
14         */
15        protected final boolean tryAcquire(int acquires) {
16            final Thread current = Thread.currentThread();
17            int c = getState();
18            if (c == 0) {
19                if (!hasQueuedPredecessors() &&
20                    compareAndSetState(0, acquires)) {
21                    setExclusiveOwnerThread(current);
22                    return true;
23                }
24            }
25            else if (current == getExclusiveOwnerThread()) {
26                int nextc = c + acquires;
27                if (nextc < 0)
28                    throw new Error("Maximum lock count exceeded");
29                setState(nextc);
30                return true;
31            }
32            return false;
33        }
34    }
35
36
  • 从以上代码可以初步看出,ReentrantLock的公平锁和非公平锁的实现逻辑,非公平锁的情况下,线程在抢占锁时会先通过CAS抢占锁,失败则调用acquire(1)走锁竞争逻辑;公平锁则是在线程抢占锁时,直接调用acquire(1)走锁竞争逻辑。

  • CAS 的实现原理


1
2
3
4
5
6
1protected final boolean compareAndSetState(int expect, int update) {
2        // See below for intrinsics setup to support this
3        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
4    }
5
6
  • 通过 cas 乐观锁的方式来做比较并替换,这段代码的意思是,如果当前内存中的state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,否则返回 false.这个操作是原子的,不会出现线程安全问题,这里面涉及到Unsafe这个类的操作,以及涉及到 state 这个属性的意义。state 是 AQS 中的一个属性,它在不同的实现中所表达的含义不一样,对于重入锁的实现来说,表示一个同步状态。它有两个含义的表示:1. 当 state=0 时,表示无锁状态;2. 当 state>0 时,表示已经有线程获得了锁,也就是 state=1,但是因为ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增,比如重入 5 次,那么state=5。而在释放锁的时候,同样需要释放 5 次直到 state=0其他线程才有资格获得锁。

  • 另外,Unsafe 类是在 sun.misc 包下,不属于 Java 标准。但是很多 Java 的基础类库,包括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发的,比如 Netty、Hadoop、Kafka 等;Unsafe 可认为是 Java 中留下的后门,提供了一些底层操作,如直接内存访问、线程的挂起和恢复、CAS、线程同步、内存屏障。而 CAS 就是 Unsafe 类中提供的一个原子操作,第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的 headOffset 的值),第三个参数为期待的值,第四个为更新后的值整个方法的作用是如果当前时刻的值等于预期值 var4 相等,则更新为新的期望值 var5,如果更新成功,则返回 true,否则返回 false;

  • stateOffset:一个 Java 对象可以看成是一段内存,每个字段都得按照一定的顺序放在这段内存里,通过这个方法可以准确地告诉你某个字段相对于对象的起始内存地址的字节偏移。用于在后面的 compareAndSwapInt 中,去根据偏移量找到对象在内存中的具体位置所以 stateOffset 表示 state 这个字段在 AQS 类的内存中相对于该类首地址的偏移量。

  • 继续看acquire(1)


1
2
3
4
5
6
7
1 public final void acquire(int arg) {
2        if (!tryAcquire(arg) &&
3            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4            selfInterrupt();
5    }
6
7
  • 这个方法的主要逻辑是:1. 通过 tryAcquire 尝试获取独占锁,如果成功返回true,失败返回 false;2. 如果 tryAcquire 失败,则会通过 addWaiter 方法将当前线程封装成 Node 添加到 AQS 队列尾部;3. acquireQueued,将 Node 作为参数,通过自旋去尝试获取锁。

  • tryAcquire 最终调用的是nonfairTryAcquire(以非公平锁为例):


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1final boolean nonfairTryAcquire(int acquires) {
2            final Thread current = Thread.currentThread();//获取当前线程
3            int c = getState();//获取state的值
4            if (c == 0) {//c==0表示无锁状态
5                if (compareAndSetState(0, acquires)) {//CAS替换state,成功表示获取锁成功
6                    setExclusiveOwnerThread(current);//保存当前获得锁的线程,实现可重入
7                    return true;
8                }
9            }
10            else if (current == getExclusiveOwnerThread()) {
11            //同一个线程获得锁,增加重入次数
12                int nextc = c + acquires;
13                if (nextc < 0) // overflow
14                    throw new Error("Maximum lock count exceeded");
15                setState(nextc);
16                return true;
17            }
18            return false;
19        }
20
21
  • 这个方法的作用是尝试获取锁,如果成功返回 true,不成功返回 false。

  • 当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成Node。入参 mode 表示当前节点的状态,传递的参数是 Node.EXCLUSIVE,表示独占状态。意味着重入锁用到了 AQS 的独占锁功能:1. 将当前线程封装成 Node;2. 当前链表中的 tail 节点是否为空,如果不为空,则通过 cas 操作把当前线程的node 添加到 AQS 队列;3. 如果为空或者 cas 失败,调用 enq 将节点添加到 AQS 队列。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1private Node addWaiter(Node mode) {
2        Node node = new Node(Thread.currentThread(), mode);//把当前线程封装成Node
3        Node pred = tail;
4        //tail不为空的情况下把新封装的node节点设置为新的tail
5        if (pred != null) {
6            node.prev = pred;
7            if (compareAndSetTail(pred, node)) {
8                pred.next = node;
9                return node;
10            }
11        }
12        enq(node);//tail为null,通过自旋操作把当前节点加入到队列中
13        return node;
14    }
15
16
  • 通过 addWaiter 方法把线程添加到链表后,会接着把 Node 作为参数传递给acquireQueued 方法,去竞争锁:1. 获取当前节点的 prev 节点;2. 如果 prev 节点为 head 节点,那么它就有资格去争抢锁,调用 tryAcquire 抢占锁;3. 抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化 head节点;4. 如果获得锁失败,则根据 waitStatus 决定是否需要挂起线程;5. 最后,通过 cancelAcquire 取消获得锁的操作。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1final boolean acquireQueued(final Node node, int arg) {
2        boolean failed = true;
3        try {
4            boolean interrupted = false;
5            for (;;) {
6                final Node p = node.predecessor();//获取当前节点的prev节点
7                if (p == head && tryAcquire(arg)) {//p为head表示有资格去竞争锁
8                    setHead(node);//获取锁成功,设置新的head
9                    p.next = null; // help GC,移除原head
10                    failed = false;
11                    return interrupted;
12                }
13                //前一个线程还未释放锁,则tryAcquire 时会返回 false
14                if (shouldParkAfterFailedAcquire(p, node) &&
15                    parkAndCheckInterrupt())
16                    interrupted = true;//返回当前线程在等待过程中有没有中断过            }
17        } finally {
18            if (failed)
19                cancelAcquire(node);
20        }
21    }
22
23
  • 线程的挂起使用的是LockSupport.park()


1
2
3
4
5
6
1private final boolean parkAndCheckInterrupt() {
2        LockSupport.park(this);
3        return Thread.interrupted();
4    }
5
6
  • LockSupport类是 Java6引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数unpark 函数为线程提供“许可(permit)”,线程调用 park 函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。permit 相当于 0/1 的开关,默认是 0,调用一次 unpark 就加 1 变成了 1.调用一次park 会消费 permit,又会变成 0。 如果再调用一次 park 会阻塞,因为 permit 已经是 0 了。直到 permit 变成 1.这时调用 unpark 会把 permit 设置为 1.每个线程都有一个相关的 permit,permit 最多只有一个,重复调用 unpark 不会累积。

锁的释放

  • 在 unlock 中,会调用 release 方法来释放锁


1
2
3
4
5
1public void unlock() {
2        sync.release(1);
3    }
4
5

1
2
3
4
5
6
7
8
9
10
11
1public final boolean release(int arg) {
2        if (tryRelease(arg)) {//成功释放锁
3            Node h = head;//获取head节点
4            if (h != null && h.waitStatus != 0)//head不为空且状态不为0,则唤醒后续节点
5                unparkSuccessor(h);
6            return true;
7        }
8        return false;
9    }
10
11
  • tryRelease方法可以认为是一个设置锁状态的操作,通过将 state 状态减掉传入的参数值(参数是 1),如果结果状态为 0,就将排它锁的 Owner 设置为 null,以使得其它的线程有机会进行执行。在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock()的次数与 lock()的次数对应才会将Owner 线程设置为空,而且也只有这种情况下才会返回 true。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
1protected final boolean tryRelease(int releases) {
2            int c = getState() - releases;
3            if (Thread.currentThread() != getExclusiveOwnerThread())
4                throw new IllegalMonitorStateException();
5            boolean free = false;
6            if (c == 0) {
7                free = true;
8                setExclusiveOwnerThread(null);
9            }
10            setState(c);
11            return free;
12        }
13
14
  • unparkSuccessor


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1private void unparkSuccessor(Node node) {
2        int ws = node.waitStatus;//获得head节点的状态
3        if (ws < 0)
4            compareAndSetWaitStatus(node, ws, 0);//设置head节点状态为0
5        Node s = node.next;//获得head节点的下一个节点
6        if (s == null || s.waitStatus > 0) {
7        //如果下一个节点为 null 或者 status>0 表示 cancelled 状态。通过从尾部节点开始扫描,找到距离 head 最近的一个waitStatus<=0 的节点
8            s = null;
9            for (Node t = tail; t != null && t != node; t = t.prev)
10                if (t.waitStatus <= 0)
11                    s = t;
12        }
13        if (s != null)
14            LockSupport.unpark(s.thread);//next 节点不为空,直接唤醒这个线程即可
15    }
16
17

Condition

  • synchronized可以基于wait/notify实现线程间通信,Lock同样可以基于Condition来实现线程间的通信功能。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1public class StudyReentrantLock {
2    static Lock lock = new ReentrantLock();
3    static Condition condition = lock.newCondition();
4    public static void main(String[] args) throws InterruptedException {
5         test1();
6         TimeUnit.SECONDS.sleep(1);
7         test2();
8    }
9    public static void test1() {
10        new Thread(() -> {
11            lock.lock();
12            try {
13                System.out.println(Thread.currentThread().getName() + "获得锁并执行等待");
14                condition.await();
15            } catch (Exception e) {
16                e.printStackTrace();
17            } finally {
18                lock.unlock();
19                System.out.println(Thread.currentThread().getName() + "执行完成");
20            }
21        },"test1").start();
22    }
23    public static void test2() {
24        new Thread(() -> {
25            lock.lock();
26            try {
27                System.out.println(Thread.currentThread().getName() + "获得锁并唤醒等待的线程");
28                condition.signal();
29            } catch (Exception e) {
30                e.printStackTrace();
31            } finally {
32                lock.unlock();
33                System.out.println(Thread.currentThread().getName() + "执行完成");
34            }
35        },"test2").start();
36    }
37}
38
39
40

Java高并发(六):ReentrantLock

await

  • 通过这个案例简单实现了 wait 和 notify 的功能,当调用 await 方法后,当前线程会释放锁并等待,而其他线程调用 condition 对象的 signal 或者 signalall 方法通知并被阻塞的线程,然后自己执行 unlock 释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。所以,condition 中两个最重要的方法,一个是 await,一个是 signal 方法:await:把当前线程阻塞挂起;signal:唤醒阻塞的线程。

  • 调用 Condition 的 await()方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await()方法返回时,当前线程一定获取了 Condition 相关联的锁。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1public final void await() throws InterruptedException {
2            if (Thread.interrupted())
3                throw new InterruptedException();
4            Node node = addConditionWaiter();//创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表
5            int savedState = fullyRelease(node);//释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
6            int interruptMode = 0;
7            while (!isOnSyncQueue(node)) {//如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
8                LockSupport.park(this);// 第一次总是 park 自己,开始阻塞等待
9                // 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会在 isOnSyncQueue 中判断自己是否在队列上.
10                //isOnSyncQueue 判断当前 node 状态,如果是 CONDITION 状态,或者不在队列上了,就继续阻塞.
11                //isOnSyncQueue 判断当前 node 还在队列上且不是 CONDITION 状态了,就结束循环和阻塞.
12                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
13                    break;
14            }
15            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
16                interruptMode = REINTERRUPT;
17                // 如果 node 的下一个等待者不是 null, 则进行清理Condition 队列上的节点.
18            if (node.nextWaiter != null) // clean up if cancelled
19                unlinkCancelledWaiters();
20            if (interruptMode != 0)
21                reportInterruptAfterWait(interruptMode);
22        }
23
24

signal

  • 调用 Condition 的 signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。


1
2
3
4
5
6
7
8
9
1public final void signal() {
2            if (!isHeldExclusively())//当前线程是否获得了锁
3                throw new IllegalMonitorStateException();
4            Node first = firstWaiter;//获得condition队列上的首节点
5            if (first != null)
6                doSignal(first);
7        }
8
9

1
2
3
4
5
6
7
8
9
10
11
1 private void doSignal(Node first) {
2            do {
3                if ( (firstWaiter = first.nextWaiter) == null)
4                // 如果第一个节点的下一个节点是 null, 那么, 最后一个节点也是 null
5                    lastWaiter = null;//把下一个节点设置为null
6                first.nextWaiter = null;
7            } while (!transferForSignal(first) &&
8                     (first = firstWaiter) != null);
9        }
10
11
  • 该方法先是 CAS 修改了节点状态,如果成功,就将这个节点放到 AQS 队列中,然后唤醒这个节点上的线程。此时,那个节点就会在 await 方法中苏醒

Condition 总结

  • 阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁
  • 释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程

给TA打赏
共{{data.count}}人
人已打赏
安全经验

独立博客怎样申请谷歌Adsense

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索