文章目录
-
ArrayBlockingQueue原理探究
-
(1). 结构
* (2). ArrayBlockingQueue原理介绍 -
1). offer操作
* 2). put操作
* 3). poll操作
* 4). take操作
* 5). peek操作1
21 * (3). 小结
2
ArrayBlockingQueue原理探究
ArrayBlockingQueue是使用有界数组方式实现的阻塞队列.
(1). 结构
ArrayBlockingQueue内部有一个数组items,用来存放队列元素.outIndex用来存放入队元素下标,tackIndex用来存放出队元素下标.这些变量并没有使用volatile修饰,因为加锁已经保证了这些变量在锁内的可见性了.
独占锁lock用来保证出入队操作的原子性,notEmpty,notFull连个条件变量用来进行出入队的同步.
ArrayBlockingQueue是有界的,所以构造必须传入队列大小为参数.默认情况下使用非公平锁.
(2). ArrayBlockingQueue原理介绍
1). offer操作
插入元素,如果队列已满,则丢弃元素,不会阻塞线程.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1public boolean offer(E e) {
2 // 判断元素是否为空,为空抛出异常
3 checkNotNull(e);
4 // 加锁
5 final ReentrantLock lock = this.lock;
6 lock.lock();
7 try {
8 // 判断队列是否满,count为队列中已填充元素数量
9 // items.length为数组长度,也就是队列的最大值
10 if (count == items.length)
11 return false;
12 else {
13 enqueue(e);
14 return true;
15 }
16 } finally {
17 lock.unlock();
18 }
19}
20// 方法内部将在putIndex位置上放置新元素,并将putIndex++,如果越界重置为0(循环数组)
21private void enqueue(E x) {
22 final Object[] items = this.items;
23 items[putIndex] = x;
24 if (++putIndex == items.length)
25 putIndex = 0;
26 count++;
27 notEmpty.signal();
28}
29
30
2). put操作
向尾部插入一个元素,如果有空闲,插入,如果队列已满,阻塞等待队列出现空闲.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 1public void put(E e) throws InterruptedException {
2 // 判断传入元素的非空
3 checkNotNull(e);
4 // 加锁
5 final ReentrantLock lock = this.lock;
6 lock.lockInterruptibly();
7 try {
8 // 循环判断队列是否已满,如果已满,条件挂起
9 while (count == items.length)
10 notFull.await();
11 // 走到这里说明队列中出现了空闲,插入元素
12 enqueue(e);
13 } finally {
14 lock.unlock(); //解锁
15 }
16}
17
18
3). poll操作
从队列头部移除一个元素,如果队列为空,返回null,不会阻塞等待队列不为空
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1public E poll() {
2 // 加锁
3 final ReentrantLock lock = this.lock;
4 lock.lock();
5 try {
6 // 如果队列为空,返回null
7 // 如果不为空,进行删除元素的操作,并将该元素返回
8 return (count == 0) ? null : dequeue();
9 } finally {
10 lock.unlock();//解锁
11 }
12}
13private E dequeue() {
14 final Object[] items = this.items;
15 @SuppressWarnings("unchecked")
16 // 获取头部的元素,并将队列中的元素置空
17 E x = (E) items[takeIndex];
18 items[takeIndex] = null;
19 // 循环队列
20 if (++takeIndex == items.length)
21 takeIndex = 0;
22 // 调整计数器的值
23 count--;
24 // itrs是当前活动迭代器的共享状态;如果已知没有状态,则为null。
25 if (itrs != null)
26 // 更新迭代器中的元素数据
27 itrs.elementDequeued();
28 // 唤醒因为队列满导致没有入队成功的入队线程
29 notFull.signal();
30 return x;
31}
32
33
4). take操作
获取当前队列头部元素,并删除它,如果队列为空,会阻塞等待队列不为空时进行操作
可响应中断.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1public E take() throws InterruptedException {
2 // 获取锁对象,并响应中断式的加锁
3 final ReentrantLock lock = this.lock;
4 lock.lockInterruptibly();
5 try {
6 // 循环判断队列是否为空,为空则条件阻塞
7 while (count == 0)
8 notEmpty.await();
9 // 删除并返回头部元素
10 return dequeue();
11 } finally {
12 lock.unlock();
13 }
14}
15
16
5). peek操作
获取头部元素,但是不移除,如果队列为空,返回null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 1public E peek() {
2 final ReentrantLock lock = this.lock;
3 lock.lock();
4 try {
5 // 直接返回出队下标处的元素
6 return itemAt(takeIndex);
7 } finally {
8 lock.unlock();
9 }
10}
11// 获取对应下表处的元素
12final E itemAt(int i) {
13 return (E) items[i];
14}
15
16
(3). 小结
ArrayBlockingQueue使用一个独占锁来实现只能有一个线程进行入队和出队操作,这个锁的粒度比较大,类似于在方法上加synchronized.