高并发数据结构Disruptor解析(5)

释放双眼,带上耳机,听听看~!

WaitStrategy

在Disruptor中,有很多需要等待的情况。例如:使用了SequenceBarrier的消费者需要在某种条件下等待,比如A消费者和B消费者,假设A消费者必须消费B消费者消费完的。
这些等待,还有唤醒等待的方法,由如下的WaitStrategy实现:
我们先来看接口类:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1public interface WaitStrategy {
2    /**
3     * @param sequence 需要等待available的sequence
4     * @param cursor 对应RingBuffer的Cursor
5     * @param dependentSequence 需要等待(依赖)的Sequence
6     * @param barrier 多消费者注册的SequenceBarrier
7     * @return 已经available的sequence
8     * @throws AlertException
9     * @throws InterruptedException
10     * @throws TimeoutException
11     */
12    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
13            throws AlertException, InterruptedException, TimeoutException;
14
15    /**
16     * 唤醒所有等待的消费者
17     */
18    void signalAllWhenBlocking();
19}
20

我们在生产上主要用到三个实现:
BlockingWaitStrategy:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
1public final class BlockingWaitStrategy implements WaitStrategy
2{
3    private final Lock lock = new ReentrantLock();
4    private final Condition processorNotifyCondition = lock.newCondition();
5
6    @Override
7    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
8        throws AlertException, InterruptedException
9    {
10        long availableSequence;
11        if (cursorSequence.get() < sequence)
12        {
13            lock.lock();
14            try
15            {
16                while (cursorSequence.get() < sequence)
17                {
18                    //检查是否Alert,如果Alert,则抛出AlertException
19                    //Alert在这里代表对应的消费者被halt停止了
20                    barrier.checkAlert();
21                    //在processorNotifyCondition上等待唤醒
22                    processorNotifyCondition.await();
23                }
24            }
25            finally
26            {
27                lock.unlock();
28            }
29        }
30
31        while ((availableSequence = dependentSequence.get()) < sequence)
32        {
33            barrier.checkAlert();
34        }
35
36        return availableSequence;
37    }
38
39    @Override
40    public void signalAllWhenBlocking()
41    {
42        lock.lock();
43        try
44        {
45            //生产者生产消息后,会唤醒所有等待的消费者线程
46            processorNotifyCondition.signalAll();
47        }
48        finally
49        {
50            lock.unlock();
51        }
52    }
53}
54

所有的WaitStrategy都需要考虑两个情况:

  1. 线程或者消费者被停止,或者被中断。由于消费者实现了可以被停止,所以,WaitStrategy也需要能检测到停止信号
  2. 等待与唤醒:在不能消费时,等待,在有新消息时,唤醒检查是否满足可以消费的条件。

BlockingWaitStrategy是一种利用锁和等待机制的WaitStrategy,CPU消耗少,但是延迟比较高。
原理是,所有消费者首先查是否Alert,如果是,则抛AlertException从等待中返回。之后检查sequence是否已经被发布,就是当前cursorSequence是否大于想消费的sequence。若不满足,利用processorNotifyCondition.await()等待。
按照之前生产者解析所述,由于生产者在生产Event之后会调用signalAllWhenBlocking()唤醒等待,让所有消费者重新检查。所以,之前的waitFor方法中lock await中只检查cursorSequence.get() < sequence而不是dependentSequence.get() < sequence.
同时,我们之前提到,WaitStrategy也需要能检测到停止信号。而await()并不响应线程interrupt。所以,终止消费者时,也需要调用signalAllWhenBlocking来唤醒await。

SleepingWaitStrategy:
SleepingWaitStrategy是另一种较为平衡CPU消耗与延迟的WaitStrategy,在不同次数的重试后,采用不同的策略选择继续尝试或者让出CPU或者sleep。这种策略延迟不均匀。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
1public class SleepingWaitStrategy implements WaitStrategy {
2    //重试200次
3    private static final int DEFAULT_RETRIES = 200;
4    private final int retries;
5
6    public SleepingWaitStrategy() {
7        this(DEFAULT_RETRIES);
8    }
9
10    public SleepingWaitStrategy(int retries) {
11        this.retries = retries;
12    }
13
14    @Override
15    public long waitFor(
16            final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
17            throws AlertException, InterruptedException {
18        long availableSequence;
19        int counter = retries;
20        //直接检查dependentSequence.get() &lt; sequence
21        while ((availableSequence = dependentSequence.get()) &lt; sequence) {
22            counter = applyWaitMethod(barrier, counter);
23        }
24
25        return availableSequence;
26    }
27
28    @Override
29    public void signalAllWhenBlocking() {
30    }
31
32    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
33            throws AlertException {
34        //检查是否需要终止
35        barrier.checkAlert();
36        //如果在200~100,重试
37        if (counter &gt; 100) {
38            --counter;
39        }
40        //如果在100~0,调用Thread.yield()让出CPU
41        else if (counter &gt; 0) {
42            --counter;
43            Thread.yield();
44        }
45        //&lt;0的话,利用LockSupport.parkNanos(1L)来sleep最小时间
46        else {
47            LockSupport.parkNanos(1L);
48        }
49        return counter;
50    }
51}
52

BusySpinWaitStrategy:
BusySpinWaitStrategy是一种延迟最低,最耗CPU的策略。通常用于消费线程数小于CPU数的场景。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1public class BusySpinWaitStrategy implements WaitStrategy {
2    @Override
3    public long waitFor(
4            final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
5            throws AlertException, InterruptedException {
6
7        long availableSequence;
8        //一直while自旋检查
9        while ((availableSequence = dependentSequence.get()) &lt; sequence) {
10            barrier.checkAlert();
11        }
12        return availableSequence;
13    }
14
15    @Override
16    public void signalAllWhenBlocking() {
17    }
18}
19

给TA打赏
共{{data.count}}人
人已打赏
安全经验

人们为何痛恨Google Adsense

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索