Java并发编程(11)-使用ReentrantLock与Condition实现生产者-消费者队列

释放双眼,带上耳机,听听看~!

文章目录

  • 一、并发编程中的条件变量

  • 1.1、从生产者-消费者模型理解条件变量
    * 1.2、Condition接口
    * 1.3、Condition接口方法

    
    
    1
    2
    1  * 二、实现一个生产者-消费者中的条件队列
    2
  • 2.1、条件变量的一般使用模式
    * 2.2、使用条件变量实现一个生产者-消费者模式的队列



Condition在Java并发编程中是一个十分常用的接口,被称为条件变量,常与显式锁和可重入锁一起使用,它可以在某些条件下使线程进行休眠或者唤醒,本文将以实现生产者-消费者模式的队列作为demo让大家对条件变量 有初步的了解。

一、并发编程中的条件变量

1.1、从生产者-消费者模型理解条件变量

生产者-消费者模型是一种常见的模型,在《Java并发编程实践》中有一个例子很好地解释了这种模式:

两个洗盘子的劳动分工也是一个与生产者-消费者设计相类似的例子:一个人洗盘子,并把洗好的盘子放到盘子架上,另一个人从盘子架上得到盘子,并把它烘干。在这个场景中,盘子架充当了阻塞队列;如果架子上没有盘子,消费者会一直等待,直到有盘子需要烘干,如果盘子架被放满了,那么生产者会停止洗盘子,直到架上有新的空间可用。

在这个例子中哪里看出了条件变量呢?
这个条件变量就是盘子架是否满了,当盘子架满了,那么生产者等待盘子架有空余的空间时才开始工作,当盘子架空了,消费者等待其有碗了才开始工作,这就可以很好地理解条件变量了:条件变量就是在某些条件下使线程进行休眠或者唤醒的一种工作机制。

1.2、Condition接口

Condition接口是Java并发编程中的环境变量,它位于java.util.concurrent.locks包下,常与显式锁一起使用,使用Lock.newCondition()获得Condition对象。


1
2
3
4
5
6
7
8
9
10
1public interface Condition {
2   void await() throws InterruptedException;
3   void awaitUninterruptibly();
4    long awaitNanos(long nanosTimeout) throws InterruptedException;
5    boolean await(long time, TimeUnit unit) throws InterruptedException;
6    void signal();
7    void signalAll();  
8}
9
10

1.3、Condition接口方法

以上是Condition接口定义的方法,await对应于Object.wait,signal对应于Object.notify,signalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法。

  • await()方法

造成当前线程在接到信号或被中断之前一直处于等待状态。

  • signal()方法

唤醒特定的等待线程。

  • signalAll()方法

唤醒所有的等待线程。

二、实现一个生产者-消费者中的条件队列

2.1、条件变量的一般使用模式


1
2
3
4
5
6
7
8
9
10
11
12
13
1 lock.lock();
2    try{
3       while(条件判断){
4           acondition.await();
5       }
6
7           //dosomething...
8           bconditon.signal();
9 }finally{
10  lock.unlock();
11}
12
13

2.2、使用条件变量实现一个生产者-消费者模式的队列

  • MyBlockingQueue


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
1package cocurrency.product_and_consume_model;
2
3import org.junit.Test;
4
5import java.util.LinkedList;
6import java.util.concurrent.*;
7import java.util.concurrent.locks.Condition;
8import java.util.concurrent.locks.Lock;
9import java.util.concurrent.locks.ReentrantLock;
10
11/**
12 * @Auther: ARong
13 * @Date: 2019/11/30 2:06 下午
14 * @Description: 使用可重入锁和条件变量来实现生产消费者模型队列
15 */
16public class MyBlockingQueue {
17    private Lock lock = new ReentrantLock();
18    private Condition isEmpty = lock.newCondition();
19    private Condition isFull = lock.newCondition();
20    // 存放数据的队列
21    private LinkedList<Integer> queue = new LinkedList<>();
22    // 最大长度
23    private volatile int maxSize;
24
25    public MyBlockingQueue(){
26        this.maxSize = 1 << 4;
27    }
28
29    public MyBlockingQueue(int maxSize) {
30        this.maxSize = maxSize;
31    }
32
33    /*
34     * @Author ARong
35     * @Description 获取数据
36     * @Date 2019/11/30 2:17 下午
37     * @Param []
38     * @return int
39     **/
40    public int poll() {
41        Integer value = -1;
42        lock.lock();//执行到该处的线程获得锁
43        try {
44            while (queue.size() == 0){
45                //队列为空,让线程在 isEmpty 这个条件变量中等待
46                System.out.println(Thread.currentThread().getName()+"[队列为空,poll方法阻塞]");
47                isEmpty.await();
48            }
49            value = queue.poll();
50            System.out.println(Thread.currentThread().getName()+"[删除元素],当前空余容量为:"+(maxSize - queue.size()));
51            //唤醒在 isFull 条件变量下等待的线程
52            isFull.signal();
53        } catch (InterruptedException e) {
54            e.printStackTrace();
55        } finally {
56            lock.unlock();//释放锁
57        }
58        return value;
59    }
60
61    /*
62     * @Author ARong
63     * @Description 放入数据
64     * @Date 2019/11/30 2:17 下午
65     * @Param [data]
66     * @return void
67     **/
68    public void offer(int data) {
69        // 获取锁
70        lock.lock();
71        try {
72            while (queue.size() == maxSize) {
73                // 队列已满,让该线程在 isFull条件中 等待
74                System.out.println(Thread.currentThread().getName()+"[队列已满,offer方法阻塞]");
75                isFull.await();
76            }
77            // 添加数据
78            queue.offer(data);
79            System.out.println(Thread.currentThread().getName() + "[添加数据],当前空余容量为:"+(maxSize - queue.size()));
80            // 唤醒在isEmpty条件等待的线程
81            isEmpty.signal();
82        } catch (InterruptedException e) {
83            e.printStackTrace();
84        } finally {
85            // 释放锁
86            lock.unlock();
87        }
88    }
89}
90
91
  • TestBlockingQueue


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
1package cocurrency.product_and_consume_model;
2
3import cocurrency.CountDownLatchDemo;
4import org.junit.Test;
5
6import java.util.concurrent.ArrayBlockingQueue;
7import java.util.concurrent.CountDownLatch;
8import java.util.concurrent.ThreadPoolExecutor;
9import java.util.concurrent.TimeUnit;
10
11/**
12 * @Auther: ARong
13 * @Date: 2019/11/30 4:46 下午
14 * @Description:
15 */
16public class TestBlockingQueue {
17
18
19    @Test
20    public void test() throws InterruptedException {
21        // 线程池
22        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0l, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5));
23        // 创建一个只能容纳5个元素的阻塞队列
24        MyBlockingQueue blockingQueue = new MyBlockingQueue(5);
25        // 闭锁,等待两个线程完成任务才终止主线程
26        CountDownLatch countDownLatch = new CountDownLatch(2);
27        // 创建调用者去offer和poll
28        threadPoolExecutor.submit(() -> {
29            // 快速添加元素
30            for (int i = 0; i < 10; i++) {
31                blockingQueue.offer(i);
32            }
33            countDownLatch.countDown();
34        });
35
36        threadPoolExecutor.submit(() -> {
37            // 缓慢获取数据
38            for (int i = 0; i < 10; i++) {
39                try {
40                    Thread.sleep(200l);
41                } catch (InterruptedException e) {
42                    e.printStackTrace();
43                }
44                blockingQueue.poll();
45            }
46            countDownLatch.countDown();
47        });
48        // main线程等待
49        countDownLatch.await();
50        System.out.println("[任务执行完毕]");
51    }
52}
53
54

执行结果:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1pool-1-thread-1[添加数据],当前空余容量为:4
2pool-1-thread-1[添加数据],当前空余容量为:3
3pool-1-thread-1[添加数据],当前空余容量为:2
4pool-1-thread-1[添加数据],当前空余容量为:1
5pool-1-thread-1[添加数据],当前空余容量为:0
6pool-1-thread-1[队列已满,offer方法阻塞]
7pool-1-thread-2[删除元素],当前空余容量为:1
8pool-1-thread-1[添加数据],当前空余容量为:0
9pool-1-thread-1[队列已满,offer方法阻塞]
10pool-1-thread-2[删除元素],当前空余容量为:1
11pool-1-thread-1[添加数据],当前空余容量为:0
12pool-1-thread-1[队列已满,offer方法阻塞]
13pool-1-thread-2[删除元素],当前空余容量为:1
14pool-1-thread-1[添加数据],当前空余容量为:0
15pool-1-thread-1[队列已满,offer方法阻塞]
16pool-1-thread-2[删除元素],当前空余容量为:1
17pool-1-thread-1[添加数据],当前空余容量为:0
18pool-1-thread-1[队列已满,offer方法阻塞]
19pool-1-thread-2[删除元素],当前空余容量为:1
20pool-1-thread-1[添加数据],当前空余容量为:0
21pool-1-thread-2[删除元素],当前空余容量为:1
22pool-1-thread-2[删除元素],当前空余容量为:2
23pool-1-thread-2[删除元素],当前空余容量为:3
24pool-1-thread-2[删除元素],当前空余容量为:4
25pool-1-thread-2[删除元素],当前空余容量为:5
26[任务执行完毕]
27
28

给TA打赏
共{{data.count}}人
人已打赏
安全技术

详解Node.js API系列 Crypto加密模块(1)

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索