文章目录
-
PriorityBlockingQueue原理探究
-
(1). 结构
* (2). PriorityBlockingQueue原理介绍 -
1). offer操作
* 2). poll操作
* 3). take操作1
21 * (3). 小结
2
PriorityBlockingQueue原理探究
PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素,内部使用平衡二叉树堆实现,所以直接遍历队列元素不保证有序.在构造函数需传入comparator,用于插入元素时继续排序,若没有传入comparator,则插入的元素必须实现Comparatable接口.
(1). 结构
PriorityBlockingQueue内部有一个数组queue,用来存放队列元素,size存放元素个数.allocationSpinLock是一个自旋锁,其使用CAS操作保证只有一个线程可以对队列进行操作,状态为0或1.
没有notFull条件变量是因为这个队列是无界的,入队操作是非阻塞的.
(2). PriorityBlockingQueue原理介绍
1). offer操作
在队列中插入一个元素,因为是无界队列,所以一定会返回true.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 1public boolean offer(E e) {
2 // 对入队元素进行非空判断
3 if (e == null)
4 throw new NullPointerException();
5 // 加锁
6 final ReentrantLock lock = this.lock;
7 lock.lock();
8 int n, cap;
9 Object[] array;
10 // 如果当前元素个数>=队列容量,则扩容
11 while ((n = size) >= (cap = (array = queue).length))
12 tryGrow(array, cap);
13 try {
14 // 比较器,如果有传入比较器的话使用自定义的比较器,如果没有使用默认的
15 Comparator<? super E> cmp = comparator;
16 if (cmp == null)
17 // n是原队列第一个空位,e是入队元素,array是队列
18 siftUpComparable(n, e, array);
19 else
20 siftUpUsingComparator(n, e, array, cmp);
21 // 队列元素数+1
22 size = n + 1;
23 // 解锁所有因为空队列挂起的条件阻塞
24 notEmpty.signal();
25 } finally {
26 lock.unlock();
27 }
28 return true;
29}
30// 扩容操作
31private void tryGrow(Object[] array, int oldCap) {
32 // 释放锁
33 // 使用CAS控制只能有一个线程成功扩容,释放锁让其他线程进行入队出队操作,降低并发性
34 lock.unlock();
35 Object[] newArray = null;
36 // 这也是一个锁,只让一个线程进行扩容
37 if (allocationSpinLock == 0 &&
38 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
39 try {
40 // 如果oldCap小于64,扩容为2倍+2,如果大于,扩容50%
41 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));
42 // 按照之前算法扩容后的容量如果溢出,则最小扩容量为原容量+1
43 if (newCap - MAX_ARRAY_SIZE > 0) {
44 int minCap = oldCap + 1;
45 // 如果最小扩容量溢出或者小于0,那么扩容失败
46 if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
47 throw new OutOfMemoryError();
48 newCap = MAX_ARRAY_SIZE; // 扩容为极限大小
49 }
50 // 如果正常扩容情况下没有溢出,创建一个新数组,大小为扩容后的数组
51 if (newCap > oldCap && queue == array)
52 newArray = new Object[newCap];
53 } finally {
54 allocationSpinLock = 0;// 解锁
55 }
56 }
57 // 第一个线程获取锁之后第二个线程会直接来到这里,让出CPU资源给第一个线程
58 if (newArray == null)
59 Thread.yield();
60 // 加锁,判断并拷贝数组
61 lock.lock();
62 if (newArray != null && queue == array) {
63 queue = newArray;
64 System.arraycopy(array, 0, newArray, 0, oldCap);
65 }
66}
67// 建立堆
68private static <T> void siftUpComparable(int k, T x, Object[] array) {
69 Comparable<? super T> key = (Comparable<? super T>) x;
70 while (k > 0) {
71 // 找到k的父节点,如果k小于父节点的值,将父节点置换为k
72 // 直到k大于等于父节点的值,这样就构造了一个极小堆(所有父节点小于子节点)
73 int parent = (k - 1) >>> 1;
74 Object e = array[parent];
75 if (key.compareTo((T) e) >= 0)
76 break;
77 array[k] = e;
78 k = parent;
79 }
80 array[k] = key;
81}
82
83
2). poll操作
获取内部根节点的元素,如果队列为空,返回null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 1public E poll() {
2 final ReentrantLock lock = this.lock;
3 lock.lock();
4 try {
5 return dequeue();
6 } finally {
7 lock.unlock();
8 }
9}
10// 获取内部根节点的元素,如果队列为空,返回null
11private E dequeue() {
12 // 判断队列是否为空
13 int n = size - 1;
14 if (n < 0)
15 return null;
16 else {
17 // 将第n个元素取出为x
18 // 第0个元素取出为result
19 Object[] array = queue;
20 E result = (E) array[0];
21 E x = (E) array[n];
22 array[n] = null;
23 Comparator<? super E> cmp = comparator;
24 if (cmp == null)
25 siftDownComparable(0, x, array, n);
26 else
27 siftDownUsingComparator(0, x, array, n, cmp);
28 size = n;
29 return result;
30 }
31}
32// k为空闲位置,x为尾元素,array为堆,n为堆大小
33// 一直用小的孩子向上弥补父节点,直到最后一层,用最后一个节点补上
34private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
35 if (n > 0) {
36 Comparable<? super T> key = (Comparable<? super T>)x;
37 int half = n >>> 1;// 无符号右移
38 while (k < half) {
39 // 子节点默认为左孩子
40 int child = (k << 1) + 1;
41 Object c = array[child];
42 // 右孩子
43 int right = child + 1;
44 // 如果 (右孩子在堆内 && 左孩子大于右孩子) 那么右孩子代替左孩子作为孩子节点,并且c为孩子的值
45 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
46 c = array[child = right];
47 // 当尾元素小于孩子时,退出
48 if (key.compareTo((T) c) <= 0)
49 break;
50 // 用孩子节点向上替补空闲的父节点
51 array[k] = c;
52 k = child;
53 }
54 array[k] = key;
55 }
56}
57
58
3). take操作
获取根节点元素,如果队列为空阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1public E take() throws InterruptedException {
2 final ReentrantLock lock = this.lock;
3 lock.lockInterruptibly();
4 E result;
5 try {
6 // 循环获取根节点元素,如果队列为空,挂起
7 // 循环防止多线程同时被挂起
8 while ( (result = dequeue()) == null)
9 notEmpty.await();
10 } finally {
11 lock.unlock();
12 }
13 return result;
14}
15
16
17
(3). 小结
内部使用二叉树堆维护元素优先级,使用可扩容的数组作为元素存储的数据结构.出队时保证出队元素是根节点,并重置整个堆为极小堆.
内部使用了一个独占锁来控制并发.只使用了一个条件变量notEmpty而没有使用notFull是因为这个队列是无界的,不存在满队列情况.