文章目录
-
LinkedBlockingQueue原理探究
-
(1). 结构
* (2). LinkedBlockingQueue原理介绍 -
1). offer操作
* 2). put操作
* 3). poll操作
* 4). tack操作
* 5). peek操作
* 6). remove操作1
21 * (3). 小结
2
LinkedBlockingQueue原理探究
LinkedBlockingQueue是使用独占锁实现的阻塞队列.
(1). 结构
有单向链表实现,有两个Node,分别用来存放首尾节点,还有一个原子变量Count用来记录队列元素个数.
还有两个ReentrantLock实例,分别用来空值元素入队和出队的原子性.
tackLock控制出队操作,putLock控制入队操作.
另外使用了两个条件变量,notEmpty(由tackLock锁获得,在出队是判断队列是否为空)和notFull(由putLock锁获得,在入队是判断队列是否已满).
(2). LinkedBlockingQueue原理介绍
1). offer操作
如果有空闲,插入元素并返回true.没有则返回false.
如果传入元素为null,则抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 1public boolean offer(E e) {
2 // 传入元素为null抛出异常
3 if (e == null) throw new NullPointerException();
4
5 // 队列判满
6 final AtomicInteger count = this.count;
7 if (count.get() == capacity)// capacity默认为MAX_VALUE,可在构造中传参设置
8 return false;
9
10 // 构造新节点,获取入队锁对象
11 int c = -1;
12 Node<E> node = new Node<E>(e);
13 final ReentrantLock putLock = this.putLock;
14 putLock.lock();// 加锁
15 try {
16 // 如果队列不满,进队,并递增元素数
17 if (count.get() < capacity) {
18 // 将node节点链接到队列尾
19 enqueue(node);
20 // count自增1,并返回修改前的值
21 c = count.getAndIncrement();
22 // 如果添加后还有空间,唤醒之前条件阻塞的入队线程
23 if (c + 1 < capacity)
24 notFull.signal();
25 }
26 } finally {
27 putLock.unlock();// 解锁
28 }
29 if (c == 0)
30 // c为入队前队列中的元素数,c==0说明此时队列中至少有一个元素
31 // 唤醒其他所有因为不能出队条件阻塞的线程
32 signalNotEmpty();
33 return c >= 0;
34}
35
36
2). put操作
向队列尾插入一个元素,如果队列有空闲则插入,队列已满就阻塞当前线程,直到队列有空闲插入成功后返回.
当被阻塞是其他线程设置了中断,抛出InterruptedExecption异常.
如果传入元素为null,抛出空指针异常.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1public void put(E e) throws InterruptedException {
2 // 判断传入元素是不是null
3 if (e == null) throw new NullPointerException();
4
5 // 构建新节点,获取入队锁对象和计数器
6 int c = -1;
7 Node<E> node = new Node<E>(e);
8 final ReentrantLock putLock = this.putLock;
9 final AtomicInteger count = this.count;
10
11 // 可响应中断式的加锁
12 putLock.lockInterruptibly();
13 try {
14 // 队列已满,则使当前入队线程条件阻塞,等待出队线程的条件唤醒
15 while (count.get() == capacity) {
16 notFull.await();
17 }
18 // 设置尾节点
19 enqueue(node);
20 // 修改计数器,并返回计数前的值
21 c = count.getAndIncrement();
22 // 判断入队后是否满,如果不满,唤醒其他的入队线程
23 if (c + 1 < capacity)
24 notFull.signal();
25 } finally {
26 putLock.unlock();
27 }
28 // 如果至少有一个元素(入队前为空,入队后至少有一个元素),唤醒其他的出队线程
29 if (c == 0)
30 signalNotEmpty();
31}
32
33
3). poll操作
从队列头获取一个并移除一个元素,如果队列为空返回null.该方法并不等待其他线程入队元素.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 1public E poll() {
2 // 队列为空返回null
3 final AtomicInteger count = this.count;
4 if (count.get() == 0)
5 return null;
6 // 获取出队锁对象
7 E x = null;
8 int c = -1;
9 final ReentrantLock takeLock = this.takeLock;
10 takeLock.lock();
11 try {
12 if (count.get() > 0) {
13 // 队列不空,则出队并递减计数器
14 x = dequeue();
15 c = count.getAndDecrement();
16 // 出队后队列不为空,则唤醒其他出队线程
17 if (c > 1)
18 notEmpty.signal();
19 }
20 } finally {
21 takeLock.unlock();
22 }
23 // 出队前队列满,则出队后队列有空隙,唤醒其他入队线程
24 if (c == capacity)
25 signalNotFull();
26 return x;
27}
28
29
4). tack操作
获取队列的头部元素,并从队列中移除,如果队列为空,阻塞当前线程,直到队列不为空后返回元素.
该方法响应中断,会抛出异常.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 1public E take() throws InterruptedException {
2 // 获取出队锁对象和计数器,并响应中断式的进行加锁
3 E x;
4 int c = -1;
5 final AtomicInteger count = this.count;
6 final ReentrantLock takeLock = this.takeLock;
7 takeLock.lockInterruptibly();
8
9 try {
10 while (count.get() == 0) {
11 // 如果队列为空,循环条件挂起
12 notEmpty.await();
13 }
14
15 // 出队并递减计数器
16 x = dequeue();
17 c = count.getAndDecrement();
18 if (c > 1)
19 // 如果出队后还有元素,唤醒其他出队线程
20 notEmpty.signal();
21 } finally {
22 takeLock.unlock();
23 }
24 // 如果出队前队列已满,那么出队后出现了空位,唤醒其他入队线程
25 if (c == capacity)
26 signalNotFull();
27 return x;
28}
29
30
5). peek操作
获取头部节点元素,但不移除.
加出队锁的目的是防止获取了元素,但其他线程在该方法返回前将它出队了,造成不一致,和空指针异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1public E peek() {
2 if (count.get() == 0)
3 return null;
4 final ReentrantLock takeLock = this.takeLock;
5 takeLock.lock();
6 try {
7 Node<E> first = head.next;
8 if (first == null)
9 return null;
10 else
11 return first.item;
12 } finally {
13 takeLock.unlock();
14 }
15}
16
17
6). remove操作
删除队列中指定的元素,有则删除,没有返回false.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 1public boolean remove(Object o) {
2 // 删除null元素直接返回false
3 if (o == null) return false;
4
5 // 同时加入队锁和出队锁
6 fullyLock();
7 try {
8 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
9 // 找到目标节点,删除节点
10 if (o.equals(p.item)) {
11 unlink(p, trail);
12 return true;
13 }
14 }
15 return false;
16 } finally {
17 // 解入队锁和出队锁
18 fullyUnlock();
19 }
20}
21
22// 删除trail节点后的p节点
23void unlink(Node<E> p, Node<E> trail) {
24 p.item = null;
25 trail.next = p.next;
26 // 如果p是尾节点,那么重新设置尾节点
27 if (last == p)
28 last = trail;
29 // 如果当前队列满,删除元素后队列不满,唤醒入队线程
30 if (count.getAndDecrement() == capacity)
31 notFull.signal();
32}
33
34
(3). 小结
LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于:
- 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
- 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
- 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
- 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。