Java并发编程–并发队列原理之PriorityBlockingQueue

释放双眼,带上耳机,听听看~!

文章目录

  • PriorityBlockingQueue原理探究

  • (1). 结构
    * (2). PriorityBlockingQueue原理介绍

  • 1). offer操作
    * 2). poll操作
    * 3). take操作

    
    
    1
    2
    1  * (3). 小结
    2

PriorityBlockingQueue原理探究

​  PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素,内部使用平衡二叉树堆实现,所以直接遍历队列元素不保证有序.在构造函数需传入comparator,用于插入元素时继续排序,若没有传入comparator,则插入的元素必须实现Comparatable接口.

(1). 结构

Java并发编程--并发队列原理之PriorityBlockingQueue

​  PriorityBlockingQueue内部有一个数组queue,用来存放队列元素,size存放元素个数.allocationSpinLock是一个自旋锁,其使用CAS操作保证只有一个线程可以对队列进行操作,状态为0或1.

​  没有notFull条件变量是因为这个队列是无界的,入队操作是非阻塞的.

(2). PriorityBlockingQueue原理介绍

1). offer操作

​  在队列中插入一个元素,因为是无界队列,所以一定会返回true.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
1public boolean offer(E e) {
2    // 对入队元素进行非空判断
3    if (e == null)
4        throw new NullPointerException();
5    // 加锁
6    final ReentrantLock lock = this.lock;
7    lock.lock();
8    int n, cap;
9    Object[] array;
10    // 如果当前元素个数>=队列容量,则扩容
11    while ((n = size) >= (cap = (array = queue).length))
12        tryGrow(array, cap);
13    try {
14        // 比较器,如果有传入比较器的话使用自定义的比较器,如果没有使用默认的
15        Comparator<? super E> cmp = comparator;
16        if (cmp == null)
17            // n是原队列第一个空位,e是入队元素,array是队列
18            siftUpComparable(n, e, array);
19        else
20            siftUpUsingComparator(n, e, array, cmp);
21        // 队列元素数+1
22        size = n + 1;
23        // 解锁所有因为空队列挂起的条件阻塞
24        notEmpty.signal();
25    } finally {
26        lock.unlock();
27    }
28    return true;
29}
30// 扩容操作
31private void tryGrow(Object[] array, int oldCap) {
32    // 释放锁
33    // 使用CAS控制只能有一个线程成功扩容,释放锁让其他线程进行入队出队操作,降低并发性
34    lock.unlock();
35    Object[] newArray = null;
36    // 这也是一个锁,只让一个线程进行扩容
37    if (allocationSpinLock == 0 &&
38        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
39        try {
40            // 如果oldCap小于64,扩容为2倍+2,如果大于,扩容50%
41            int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1));
42            // 按照之前算法扩容后的容量如果溢出,则最小扩容量为原容量+1
43            if (newCap - MAX_ARRAY_SIZE > 0) {
44                int minCap = oldCap + 1;
45                // 如果最小扩容量溢出或者小于0,那么扩容失败
46                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
47                    throw new OutOfMemoryError();
48                newCap = MAX_ARRAY_SIZE; // 扩容为极限大小
49            }
50            // 如果正常扩容情况下没有溢出,创建一个新数组,大小为扩容后的数组
51            if (newCap > oldCap && queue == array)
52                newArray = new Object[newCap];
53        } finally {
54            allocationSpinLock = 0;// 解锁
55        }
56    }
57    // 第一个线程获取锁之后第二个线程会直接来到这里,让出CPU资源给第一个线程
58    if (newArray == null)
59        Thread.yield();
60    // 加锁,判断并拷贝数组
61    lock.lock();
62    if (newArray != null && queue == array) {
63        queue = newArray;
64        System.arraycopy(array, 0, newArray, 0, oldCap);
65    }
66}
67// 建立堆
68private static <T> void siftUpComparable(int k, T x, Object[] array) {
69    Comparable<? super T> key = (Comparable<? super T>) x;
70    while (k > 0) {
71        // 找到k的父节点,如果k小于父节点的值,将父节点置换为k
72        // 直到k大于等于父节点的值,这样就构造了一个极小堆(所有父节点小于子节点)
73        int parent = (k - 1) >>> 1;
74        Object e = array[parent];
75        if (key.compareTo((T) e) >= 0)
76            break;
77        array[k] = e;
78        k = parent;
79    }
80    array[k] = key;
81}
82
83

2). poll操作

​  获取内部根节点的元素,如果队列为空,返回null


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
1public E poll() {
2    final ReentrantLock lock = this.lock;
3    lock.lock();
4    try {
5        return dequeue();
6    } finally {
7        lock.unlock();
8    }
9}
10// 获取内部根节点的元素,如果队列为空,返回null
11private E dequeue() {
12    // 判断队列是否为空
13    int n = size - 1;
14    if (n < 0)
15        return null;
16    else {
17        // 将第n个元素取出为x
18        // 第0个元素取出为result
19        Object[] array = queue;
20        E result = (E) array[0];
21        E x = (E) array[n];
22        array[n] = null;
23        Comparator<? super E> cmp = comparator;
24        if (cmp == null)
25            siftDownComparable(0, x, array, n);
26        else
27            siftDownUsingComparator(0, x, array, n, cmp);
28        size = n;
29        return result;
30    }
31}
32// k为空闲位置,x为尾元素,array为堆,n为堆大小
33// 一直用小的孩子向上弥补父节点,直到最后一层,用最后一个节点补上
34private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
35    if (n > 0) {
36        Comparable<? super T> key = (Comparable<? super T>)x;
37        int half = n >>> 1;// 无符号右移
38        while (k < half) {
39            // 子节点默认为左孩子
40            int child = (k << 1) + 1;
41            Object c = array[child];
42            // 右孩子
43            int right = child + 1;
44            // 如果 (右孩子在堆内 && 左孩子大于右孩子) 那么右孩子代替左孩子作为孩子节点,并且c为孩子的值
45            if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
46                c = array[child = right];
47            // 当尾元素小于孩子时,退出
48            if (key.compareTo((T) c) <= 0)
49                break;
50            // 用孩子节点向上替补空闲的父节点
51            array[k] = c;
52            k = child;
53        }
54        array[k] = key;
55    }
56}
57
58

3). take操作

​  获取根节点元素,如果队列为空阻塞


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1public E take() throws InterruptedException {
2    final ReentrantLock lock = this.lock;
3    lock.lockInterruptibly();
4    E result;
5    try {
6        // 循环获取根节点元素,如果队列为空,挂起
7        // 循环防止多线程同时被挂起
8        while ( (result = dequeue()) == null)
9            notEmpty.await();
10    } finally {
11        lock.unlock();
12    }
13    return result;
14}
15
16
17

(3). 小结

​  内部使用二叉树堆维护元素优先级,使用可扩容的数组作为元素存储的数据结构.出队时保证出队元素是根节点,并重置整个堆为极小堆.

​  内部使用了一个独占锁来控制并发.只使用了一个条件变量notEmpty而没有使用notFull是因为这个队列是无界的,不存在满队列情况.

给TA打赏
共{{data.count}}人
人已打赏
安全技术

用node.js做cluster,监听异常的邮件提醒服务

2021-12-21 16:36:11

安全技术

从零搭建自己的SpringBoot后台框架(二十三)

2022-1-12 12:36:11

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索