WaitStrategy
在Disruptor中,有很多需要等待的情况。例如:使用了SequenceBarrier的消费者需要在某种条件下等待,比如A消费者和B消费者,假设A消费者必须消费B消费者消费完的。
这些等待,还有唤醒等待的方法,由如下的WaitStrategy实现:
我们先来看接口类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1public interface WaitStrategy {
2 /**
3 * @param sequence 需要等待available的sequence
4 * @param cursor 对应RingBuffer的Cursor
5 * @param dependentSequence 需要等待(依赖)的Sequence
6 * @param barrier 多消费者注册的SequenceBarrier
7 * @return 已经available的sequence
8 * @throws AlertException
9 * @throws InterruptedException
10 * @throws TimeoutException
11 */
12 long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
13 throws AlertException, InterruptedException, TimeoutException;
14
15 /**
16 * 唤醒所有等待的消费者
17 */
18 void signalAllWhenBlocking();
19}
20
我们在生产上主要用到三个实现:
BlockingWaitStrategy:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 1public final class BlockingWaitStrategy implements WaitStrategy
2{
3 private final Lock lock = new ReentrantLock();
4 private final Condition processorNotifyCondition = lock.newCondition();
5
6 @Override
7 public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
8 throws AlertException, InterruptedException
9 {
10 long availableSequence;
11 if (cursorSequence.get() < sequence)
12 {
13 lock.lock();
14 try
15 {
16 while (cursorSequence.get() < sequence)
17 {
18 //检查是否Alert,如果Alert,则抛出AlertException
19 //Alert在这里代表对应的消费者被halt停止了
20 barrier.checkAlert();
21 //在processorNotifyCondition上等待唤醒
22 processorNotifyCondition.await();
23 }
24 }
25 finally
26 {
27 lock.unlock();
28 }
29 }
30
31 while ((availableSequence = dependentSequence.get()) < sequence)
32 {
33 barrier.checkAlert();
34 }
35
36 return availableSequence;
37 }
38
39 @Override
40 public void signalAllWhenBlocking()
41 {
42 lock.lock();
43 try
44 {
45 //生产者生产消息后,会唤醒所有等待的消费者线程
46 processorNotifyCondition.signalAll();
47 }
48 finally
49 {
50 lock.unlock();
51 }
52 }
53}
54
所有的WaitStrategy都需要考虑两个情况:
- 线程或者消费者被停止,或者被中断。由于消费者实现了可以被停止,所以,WaitStrategy也需要能检测到停止信号
- 等待与唤醒:在不能消费时,等待,在有新消息时,唤醒检查是否满足可以消费的条件。
BlockingWaitStrategy是一种利用锁和等待机制的WaitStrategy,CPU消耗少,但是延迟比较高。
原理是,所有消费者首先查是否Alert,如果是,则抛AlertException从等待中返回。之后检查sequence是否已经被发布,就是当前cursorSequence是否大于想消费的sequence。若不满足,利用processorNotifyCondition.await()等待。
按照之前生产者解析所述,由于生产者在生产Event之后会调用signalAllWhenBlocking()唤醒等待,让所有消费者重新检查。所以,之前的waitFor方法中lock await中只检查cursorSequence.get() < sequence而不是dependentSequence.get() < sequence.
同时,我们之前提到,WaitStrategy也需要能检测到停止信号。而await()并不响应线程interrupt。所以,终止消费者时,也需要调用signalAllWhenBlocking来唤醒await。
SleepingWaitStrategy:
SleepingWaitStrategy是另一种较为平衡CPU消耗与延迟的WaitStrategy,在不同次数的重试后,采用不同的策略选择继续尝试或者让出CPU或者sleep。这种策略延迟不均匀。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 1public class SleepingWaitStrategy implements WaitStrategy {
2 //重试200次
3 private static final int DEFAULT_RETRIES = 200;
4 private final int retries;
5
6 public SleepingWaitStrategy() {
7 this(DEFAULT_RETRIES);
8 }
9
10 public SleepingWaitStrategy(int retries) {
11 this.retries = retries;
12 }
13
14 @Override
15 public long waitFor(
16 final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
17 throws AlertException, InterruptedException {
18 long availableSequence;
19 int counter = retries;
20 //直接检查dependentSequence.get() < sequence
21 while ((availableSequence = dependentSequence.get()) < sequence) {
22 counter = applyWaitMethod(barrier, counter);
23 }
24
25 return availableSequence;
26 }
27
28 @Override
29 public void signalAllWhenBlocking() {
30 }
31
32 private int applyWaitMethod(final SequenceBarrier barrier, int counter)
33 throws AlertException {
34 //检查是否需要终止
35 barrier.checkAlert();
36 //如果在200~100,重试
37 if (counter > 100) {
38 --counter;
39 }
40 //如果在100~0,调用Thread.yield()让出CPU
41 else if (counter > 0) {
42 --counter;
43 Thread.yield();
44 }
45 //<0的话,利用LockSupport.parkNanos(1L)来sleep最小时间
46 else {
47 LockSupport.parkNanos(1L);
48 }
49 return counter;
50 }
51}
52
BusySpinWaitStrategy:
BusySpinWaitStrategy是一种延迟最低,最耗CPU的策略。通常用于消费线程数小于CPU数的场景。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1public class BusySpinWaitStrategy implements WaitStrategy {
2 @Override
3 public long waitFor(
4 final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
5 throws AlertException, InterruptedException {
6
7 long availableSequence;
8 //一直while自旋检查
9 while ((availableSequence = dependentSequence.get()) < sequence) {
10 barrier.checkAlert();
11 }
12 return availableSequence;
13 }
14
15 @Override
16 public void signalAllWhenBlocking() {
17 }
18}
19