SequenceBarrier
SequenceBarrier是消费者与Ringbuffer之间建立消费关系的桥梁,同时也是消费者与消费者之间消费依赖的抽象。
SequenceBarrier只有一个实现类,就是ProcessingSequenceBarrier。ProcessingSequenceBarrier由生产者Sequencer,消费定位cursorSequence,等待策略waitStrategy还有一组依赖sequence:dependentSequence组成:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 1public ProcessingSequenceBarrier(
2 final Sequencer sequencer,
3 final WaitStrategy waitStrategy,
4 final Sequence cursorSequence,
5 final Sequence[] dependentSequences)
6{
7 this.sequencer = sequencer;
8 this.waitStrategy = waitStrategy;
9 this.cursorSequence = cursorSequence;
10 if (0 == dependentSequences.length)
11 {
12 dependentSequence = cursorSequence;
13 }
14 else
15 {
16 dependentSequence = new FixedSequenceGroup(dependentSequences);
17 }
18}
19
首先,为了实现消费依赖,SequenceBarrier肯定有一个获取可以消费的sequence方法,就是
1
2 1long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
2
实现为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1@Override
2 public long waitFor(final long sequence)
3 throws AlertException, InterruptedException, TimeoutException
4 {
5 //检查是否alerted
6 checkAlert();
7 //通过等待策略获取下一个可消费的sequence,这个sequence通过之前的讲解可以知道,需要大于cursorSequence和dependentSequence,我们可以通过dependentSequence实现先后消费
8 long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
9 //等待可能被中断,所以检查下availableSequence是否小于sequence
10 if (availableSequence < sequence)
11 {
12 return availableSequence;
13 }
14 //如果不小于,返回所有sequence(可能多生产者)和availableSequence中最大的
15 return sequencer.getHighestPublishedSequence(sequence, availableSequence);
16 }
17
其他方法实现很简单,功能上分别有:
-
获取当前cursorSequence(并没有什么用,就是为了监控)
-
负责中断和恢复的alert标记
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 1 @Override
2 public long getCursor()
3 {
4 return dependentSequence.get();
5 }
6
7 @Override
8 public boolean isAlerted()
9 {
10 return alerted;
11 }
12
13 @Override
14 public void alert()
15 {
16 alerted = true;
17 waitStrategy.signalAllWhenBlocking();
18 }
19
20 @Override
21 public void clearAlert()
22 {
23 alerted = false;
24 }
25
26 @Override
27 public void checkAlert() throws AlertException
28 {
29 if (alerted)
30 {
31 throw AlertException.INSTANCE;
32 }
33 }
34
构造SequenceBarrier在框架中只有一个入口,就是AbstractSequencer的:
1
2
3
4
5 1public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
2 {
3 return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
4 }
5
SequenceProcessor
通过SequenceBarrier,我们可以实现消费之间的依赖关系,但是,消费方式(比如广播,群组消费等等),需要通过SequenceProcessor的实现类实现:
通过类依赖关系我们发现,EventProcessor都是拓展了Runnable接口,也就是我们可以把它们当做线程处理。
1. BatchEventProcessor:
它的构造方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 1/**
2 * 构造一个消费者之间非互斥消费的消费者
3 *
4 * @param dataProvider 对应的RingBuffer
5 * @param sequenceBarrier 依赖关系,通过构造不同的sequenceBarrier用互相的dependentsequence,我们可以构造出先后消费关系
6 * @param eventHandler 用户实现的处理消费的event的业务消费者.
7 */
8 public BatchEventProcessor(
9 final DataProvider<T> dataProvider,
10 final SequenceBarrier sequenceBarrier,
11 final EventHandler<? super T> eventHandler)
12 {
13 this.dataProvider = dataProvider;
14 this.sequenceBarrier = sequenceBarrier;
15 this.eventHandler = eventHandler;
16
17 if (eventHandler instanceof SequenceReportingEventHandler)
18 {
19 ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
20 }
21
22 timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
23 }
24
线程为一个死循环:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 1 @Override
2 public void run()
3 {
4 //检查状态
5 if (!running.compareAndSet(false, true))
6 {
7 throw new IllegalStateException("Thread is already running");
8 }
9 //清理
10 sequenceBarrier.clearAlert();
11 //如果用户实现的EventHandler继承了LifecycleAware,则执行其onStart方法
12 notifyStart();
13
14 T event = null;
15 //sequence初始值为-1,设计上当前值是已经消费过的
16 long nextSequence = sequence.get() + 1L;
17 try
18 {
19 while (true)
20 {
21 try
22 {
23 //获取当前可以消费的最大sequence
24 final long availableSequence = sequenceBarrier.waitFor(nextSequence);
25 while (nextSequence <= availableSequence)
26 {
27 //获取并处理
28 event = dataProvider.get(nextSequence);
29 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
30 nextSequence++;
31 }
32 //设置当前sequence,注意,出现异常需要特殊处理,防止重复消费
33 sequence.set(availableSequence);
34 }
35 catch (final TimeoutException e)
36 {
37 //wait超时异常
38 notifyTimeout(sequence.get());
39 }
40 catch (final AlertException ex)
41 {
42 //中断异常
43 if (!running.get())
44 {
45 break;
46 }
47 }
48 catch (final Throwable ex)
49 {
50 exceptionHandler.handleEventException(ex, nextSequence, event);
51 //如果出现异常则设置为nextSequence
52 sequence.set(nextSequence);
53 nextSequence++;
54 }
55 }
56 }
57 finally
58 {
59 //如果用户实现的EventHandler继承了LifecycleAware,则执行其onShutdown方法
60 notifyShutdown();
61 running.set(false);
62 }
63 }
64
可以看出:
- BatchEventProcessor可以处理超时,可以处理中断,可以通过用户实现的异常处理类处理异常,同时,发生异常之后再次启动,不会漏消费,也不会重复消费。
- 不同的BatchEventProcessor之间通过SequenceBarrier进行依赖消费。原理如下图所示:
假设我们有三个消费者BatchEventProcessor1,BatchEventProcessor2,BatchEventProcessor3. 1需要先于2和3消费,那么构建BatchEventProcessor和SequenceBarrier时,我们需要让BatchEventProcessor2和BatchEventProcessor3的SequenceBarrier的dependentSequence中加入SequenceBarrier1的sequence。
其实这里2和3共用一个SequenceBarrier就行。
2. WorkProcessor
另一种消费者是WorkProcessor。利用它,可以实现互斥消费,同样的利用SequenceBarrier可以实现消费顺序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 1public void run()
2 {
3 if (!running.compareAndSet(false, true))
4 {
5 throw new IllegalStateException("Thread is already running");
6 }
7 sequenceBarrier.clearAlert();
8
9 notifyStart();
10
11 boolean processedSequence = true;
12 long cachedAvailableSequence = Long.MIN_VALUE;
13 long nextSequence = sequence.get();
14 T event = null;
15 while (true)
16 {
17 try
18 {
19 if (processedSequence)
20 {
21 processedSequence = false;
22 //获取下一个可以消费的Sequence
23 do
24 {
25 nextSequence = workSequence.get() + 1L;
26 sequence.set(nextSequence - 1L);
27 }
28 //多个WorkProcessor之间,如果共享一个workSequence,那么,可以实现互斥消费,因为只有一个线程可以CAS更新成功
29 while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
30 }
31
32 if (cachedAvailableSequence >= nextSequence)
33 {
34 event = ringBuffer.get(nextSequence);
35 workHandler.onEvent(event);
36 processedSequence = true;
37 }
38 else
39 {
40 cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
41 }
42 }
43 catch (final TimeoutException e)
44 {
45 notifyTimeout(sequence.get());
46 }
47 catch (final AlertException ex)
48 {
49 if (!running.get())
50 {
51 break;
52 }
53 }
54 catch (final Throwable ex)
55 {
56 // handle, mark as processed, unless the exception handler threw an exception
57 exceptionHandler.handleEventException(ex, nextSequence, event);
58 processedSequence = true;
59 }
60 }
61
62 notifyShutdown();
63
64 running.set(false);
65 }
66
3. WorkerPool
多个WorkerProcessor可以组成一个WorkerPool:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1public WorkerPool(
2 final RingBuffer<T> ringBuffer,
3 final SequenceBarrier sequenceBarrier,
4 final ExceptionHandler<? super T> exceptionHandler,
5 final WorkHandler<? super T>... workHandlers)
6 {
7 this.ringBuffer = ringBuffer;
8 final int numWorkers = workHandlers.length;
9 workProcessors = new WorkProcessor[numWorkers];
10
11 for (int i = 0; i < numWorkers; i++)
12 {
13 workProcessors[i] = new WorkProcessor<T>(
14 ringBuffer,
15 sequenceBarrier,
16 workHandlers[i],
17 exceptionHandler,
18 workSequence);
19 }
20 }
21
里面的 workHandlers[i]共享同一个workSequence,所以,同一个WorkerPool内,是互斥消费。