Java并发编程(12)-AQS及其组件介绍

释放双眼,带上耳机,听听看~!

文章目录

  • 一.AQS相关

  • 1.什么是AQS?
    * 2.AQS的核心原理是什么?
    * 3.使用AQS实现一个互斥锁
    * 4.AQS定义了哪两种对资源的共享方式?

  • 1.==Exclusive(独占)==:只有一个线程能够获取到该资源,如ReentrantLock。对于独占锁,又可分为公平锁和非公平锁:
    * 2.==Share(共享)==:可以有多个线程获取到该资源,如Semaphore、CountDownLatch、CyclicBarrier和ReadWriteLock。

    
    
    1
    2
    1  * 二.信号量Semaphore相关
    2
  • 1,什么是信号量?
    * 2. acquire()和tryAcquire()有何不同?
    * 3.Semaphore的公平模式和非公平模式了解吗?

  • 1.公平模式
    * 2.非公平模式

    
    
    1
    2
    1  * 三.倒计时器(闭锁)CountDownLatch
    2
  • 1.什么是CountDownLatch
    * 2.CountDownLatch有什么作用
    * 3.一个计数器使用的例子:主线程等待其他所有线程执行完毕在执行
    * 4.一个并发测试的例子:将所有的子线程全部阻塞在一个入口,等待主线程统一放行
    * 5.CountDownLatch的常用方法

  • 1.await() vs wait()
    * 2.countDown() vs getCount()

    
    
    1
    2
    1  * 四.循环栅栏CyclicBarrier
    2
  • 1.什么是循环栅栏?
    * 2,有什么作用
    * 3.具体使用

一.AQS相关

https://www.cnblogs.com/waterystone/p/4920797.html

1.什么是AQS?

AQS全称AbstactQueueSynchronizer,即队列同步器。它使用int类型作为线程的状态,通过名为CLH的FIFO双向队列来管理等待资源的线程。是JUC包中locks包下所有锁实现的基础。

2.AQS的核心原理是什么?

AQS的核心原理是当线程尝试获取共享资源时,若共享资源空闲,则将该线程作为工作线程,并且将共享资源设置为锁定状态。当共享资源处于锁定状态时,则将该线程添加到CLH队列末尾(通过CAS的方式)。

Java并发编程(12)-AQS及其组件介绍

3.使用AQS实现一个互斥锁

AQS使用的是
模板方法模式,所以我们在自己实现一个互斥锁的时候,只需要重写指定的方法即可:

  • Mutex.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
1package cocurrency;
2
3import org.jetbrains.annotations.NotNull;
4
5import java.util.concurrent.TimeUnit;
6import java.util.concurrent.locks.AbstractQueuedSynchronizer;
7import java.util.concurrent.locks.Condition;
8import java.util.concurrent.locks.Lock;
9
10public class Mutex implements Lock {
11    private static class MyAQS extends AbstractQueuedSynchronizer {
12        /*
13         * @Author ARong
14         * @Description 是否处于占用状态
15         * @Date 2019/11/23 8:53 下午
16         * @Param
17         * @return
18         **/
19        @Override
20        protected boolean isHeldExclusively() {
21             return getState() == 1;
22        }
23
24        /*
25         * @Author ARong
26         * @Description 尝试获取锁
27         * @Date 2019/11/23 8:52 下午
28         * @Param [arg]
29         * @return boolean
30         **/
31        @Override
32        protected boolean tryAcquire(int arg) {
33            // 状态为0才可获取锁
34            if (compareAndSetState(0, 1)) {
35                // 使用CAS设置锁状态为1成功,设置锁的所有者
36                setExclusiveOwnerThread(Thread.currentThread());
37                return true;
38            } else {
39                return false;
40            }
41        }
42        /*
43         * @Author ARong
44         * @Description 尝试释放锁
45         * @Date 2019/11/23 8:52 下午
46         * @Param
47         * @return
48         **/
49        @Override
50        protected boolean tryRelease(int arg) {
51            // 释放锁,将状态设置为0
52            if (getState() == 0) {
53                throw new IllegalMonitorStateException("当前资源没有被锁定");
54            } else {
55                // 将当前线程的锁解除
56                setExclusiveOwnerThread(null);
57                setState(0);
58                return true;
59            }
60        }
61        Condition newCondition() {
62            return new ConditionObject();
63        }
64    }
65
66
67    // 以下操作只需要把AQS的内部类实现类代理操作即可
68    private final MyAQS aqs = new MyAQS();
69    
70    @Override
71    public void lock() {
72        aqs.acquire(1);
73    }
74    @Override
75    public void lockInterruptibly() throws InterruptedException {
76        aqs.acquireInterruptibly(1);
77    }
78    @Override
79    public boolean tryLock() {
80        return aqs.tryAcquire(1);
81    }
82    @Override
83    public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException {
84        return aqs.tryAcquireNanos(1, unit.toNanos(time));
85    }
86    @Override
87    public void unlock() {
88        aqs.release(1);
89    }
90    @NotNull
91    @Override
92    public Condition newCondition() {
93        return aqs.newCondition();
94    }
95}
96
97
98

4.AQS定义了哪两种对资源的共享方式?

1.

Exclusive(独占):只有一个线程能够获取到该资源,如ReentrantLock。对于独占锁,又可分为公平锁和非公平锁:

  1. 公平锁:以排队队列中的线程先后顺序为为准,先到先得资源的锁。
  2. 非公平锁:无数排队顺序,谁抢占到了资源就是谁的。

2.

Share(共享):可以有多个线程获取到该资源,如Semaphore、CountDownLatch、CyclicBarrier和ReadWriteLock。

二.信号量Semaphore相关

https://blog.csdn.net/qq_19431333/article/details/70212663

synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。

1,什么是信号量?

信号量规定了一定数量的许可证,在许可证不够用的情况下,获取许可证就会阻塞直到有可用的许可证;再使用完许可证后,可以释放掉许可证。信号量最大的作用就是限制获取资源的线程数量。

  • 一个信号量的使用Demo


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1public class SemaphoreDemo {
2
3    // 请求的数量
4    private static final int threadCount = 550;
5
6    public static void main(String[] args) throws InterruptedException {
7        // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
8        ExecutorService threadPool = Executors.newFixedThreadPool(300);
9        // 一次只能允许执行的线程数量。
10        final Semaphore semaphore = new Semaphore(20);
11
12        for (int i = 0; i < threadCount; i++) {
13            final int threadnum = i;
14            threadPool.execute(() -> {// Lambda 表达式的运用
15                try {
16                    semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20
17                    test(threadnum);
18                    semaphore.release();// 释放一个许可
19                } catch (InterruptedException e) {
20                    // TODO Auto-generated catch block
21                    e.printStackTrace();
22                }
23
24            });
25        }
26        threadPool.shutdown();
27        System.out.println("finish");
28    }
29
30    public static void test(int threadnum) throws InterruptedException {
31        Thread.sleep(1000);// 模拟请求的耗时操作
32        System.out.println("threadnum:" + threadnum);
33        Thread.sleep(1000);// 模拟请求的耗时操作
34    }
35}
36
37

2. acquire()和tryAcquire()有何不同?

这两个方法都可以用于许可证,不同之处在于acquire在许可证不足的时候会阻塞,而tryAcquire会在许可证不足的时候直接返回false。

3.Semaphore的公平模式和非公平模式了解吗?

1.公平模式

根据acquire的顺序来获取许可证,遵循FIFO。

2.非公平模式

即抢占式获取许可证。这是默认情况。

可在构造信号量时通过构造函数去设置这两种模式:


1
2
3
4
5
6
7
8
9
10
1    // 默认是非公平的
2    public Semaphore(int permits) {
3        sync = new NonfairSync(permits);
4    }
5
6    public Semaphore(int permits, boolean fair) {
7        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
8    }
9
10

三.倒计时器(闭锁)CountDownLatch

1.什么是CountDownLatch

CountDownLatch位于java.util.concurrent下,它的作用是使得规定数量的线程在某个位置等待,在数量满足要求后,会在同一时刻并发执行。

2.CountDownLatch有什么作用

  1. 可以使得某个线程等待其他线程全部执行完毕再执行
  2. 可以用作并发模拟
  3. 死锁检测

3.一个计数器使用的例子:主线程等待其他所有线程执行完毕在执行


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1public class CountDownLatchDemo {
2    private static int counter = 0;
3
4    public static void main(String[] args) throws InterruptedException {
5        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
6                0, // corePoolSize
7                Integer.MAX_VALUE, // maxPoolSize
8                60L,
9                TimeUnit.SECONDS,
10                new SynchronousQueue<Runnable>()
11        );
12        int count = 5;
13        CountDownLatch latch = new CountDownLatch(count);
14        for (int i = 0; i < count; i++) {
15            threadPoolExecutor.execute(() -> {
16                try {
17                    run();
18                } catch (Exception e) {
19                }finally {
20                    latch.countDown();
21                }
22            });
23        }
24        latch.await();
25        threadPoolExecutor.shutdown();
26        System.out.println("线程执行完毕");
27    }
28
29    private static void run() throws InterruptedException {
30        Thread.sleep(1000);
31        System.out.println(Thread.currentThread());
32
33    }
34}
35
36

Java并发编程(12)-AQS及其组件介绍

4.一个并发测试的例子:将所有的子线程全部阻塞在一个入口,等待主线程统一放行


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
1public static void main(String[] args) throws InterruptedException {
2        new CountDownLatchDemo().test2();
3    }
4
5    private static int counter = 0;
6    private static void run() throws InterruptedException {
7        System.out.println(Thread.currentThread() + ":" + (++counter));
8    }
9
10    private void test2() throws InterruptedException {
11        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
12                0, // corePoolSize
13                Integer.MAX_VALUE, // maxPoolSize
14                60L,
15                TimeUnit.SECONDS,
16                new SynchronousQueue<Runnable>()
17        );
18        int count = 50, i = 0;
19        CountDownLatch latch = new CountDownLatch(1);
20        for (i = 0; i <= count; i++) {
21            threadPoolExecutor.execute(() -> {
22                try {
23                    run();
24                } catch (Exception e) {
25                }finally {
26                    latch.countDown();
27                }
28            });
29        }
30        // 所有线程启动完成,统一放行
31        while (i == count) {
32            latch.countDown();
33        }
34        threadPoolExecutor.shutdown();
35        System.out.println("线程执行完毕");
36    }
37
38

可以看到执行结果中,counter的值出现了并发冲突:
Java并发编程(12)-AQS及其组件介绍

这时可以使用原子类AtomicInteger,通过CAS去自增计数器的值(为什么不使用volatile呢?因为volatile虽然能保证多线程读写共享变量的可见性,但修改一个变量并写入毕竟是分为读-改-写这三个过程的,所以仍需要使用CAS来保障)。


1
2
3
4
5
6
7
1private static AtomicInteger counter = new AtomicInteger();
2    private static void run() throws InterruptedException {
3        // 使用原子类 CAS更新值
4        System.out.println(Thread.currentThread() + ":" + counter.incrementAndGet());
5    }
6
7

5.CountDownLatch的常用方法

1.await() vs wait()

await是CountDownLatch的方法,它的作用是在指定位置阻塞线程,直到闭锁中的计数为0;而wait()是Object类的方法。

2.countDown() vs getCount()

countDown()方法用于为计数器减少计数,而getCount()可以获取到当前的计数值。

四.循环栅栏CyclicBarrier

https://blog.csdn.net/u010185262/article/details/54692886

1.什么是循环栅栏?

CyclicBarrier和CountDownLatch技术差不多,它们都是为了实现线程的等待以及同时执行,但是CyclicBarrier比CountDownLatch更加复杂,它们之间的不同点有:

  1. CyclicBarrier可以循环使用,当计数达到标准时即全部执行,而CountDownLatch只可使用一次。
  2. CyclicBarrier的实现中,计数器是随着等待的线程数量的增加二增加的,当加到一定值时即全部放行;而CountDownLatch中的计数器是通过countDown一个一个地去减少的,减少到0才全部放行。

2,有什么作用

将大任务划分为小任务完成,然后汇集成大任务,并且可以循环这个过程。

3.具体使用


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
2
3    public static void main(String[] args) {
4        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5                0, // corePoolSize
6                Integer.MAX_VALUE, // maxPoolSize
7                60L,
8                TimeUnit.SECONDS,
9                new SynchronousQueue<Runnable>()
10        );
11        int count = 50, i = 0;
12        for (i = 0; i <= count; i++) {
13            threadPoolExecutor.execute(() -> {
14                try {
15                    runTask();
16                } catch (BrokenBarrierException e) {
17                    e.printStackTrace();
18                } catch (InterruptedException e) {
19                    e.printStackTrace();
20                }
21            });
22        }
23    }
24    private static void runTask() throws BrokenBarrierException, InterruptedException {
25        System.out.println(Thread.currentThread().getName() + "-Ready");
26        cyclicBarrier.await();
27        System.out.println(Thread.currentThread().getName() + "-Finished");
28    }
29
30

从打印结果中可以看到,当等待的任务达到5个时,这5个任务就会立刻开始执行,然后开始下一轮的等待:
Java并发编程(12)-AQS及其组件介绍

给TA打赏
共{{data.count}}人
人已打赏
安全技术

详解Node.js API系列 Crypto加密模块(2) Hmac

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索