高并发数据结构Disruptor解析(6)

释放双眼,带上耳机,听听看~!

SequenceBarrier

SequenceBarrier是消费者与Ringbuffer之间建立消费关系的桥梁,同时也是消费者与消费者之间消费依赖的抽象。

SequenceBarrier只有一个实现类,就是ProcessingSequenceBarrier。ProcessingSequenceBarrier由生产者Sequencer,消费定位cursorSequence,等待策略waitStrategy还有一组依赖sequence:dependentSequence组成:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1public ProcessingSequenceBarrier(
2        final Sequencer sequencer,
3        final WaitStrategy waitStrategy,
4        final Sequence cursorSequence,
5        final Sequence[] dependentSequences)
6{
7   this.sequencer = sequencer;
8   this.waitStrategy = waitStrategy;
9   this.cursorSequence = cursorSequence;
10   if (0 == dependentSequences.length)
11   {
12       dependentSequence = cursorSequence;
13   }
14   else
15   {
16       dependentSequence = new FixedSequenceGroup(dependentSequences);
17   }
18}
19

首先,为了实现消费依赖,SequenceBarrier肯定有一个获取可以消费的sequence方法,就是


1
2
1long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
2

实现为:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1@Override
2    public long waitFor(final long sequence)
3        throws AlertException, InterruptedException, TimeoutException
4    {
5        //检查是否alerted
6        checkAlert();
7        //通过等待策略获取下一个可消费的sequence,这个sequence通过之前的讲解可以知道,需要大于cursorSequence和dependentSequence,我们可以通过dependentSequence实现先后消费
8        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
9        //等待可能被中断,所以检查下availableSequence是否小于sequence
10        if (availableSequence < sequence)
11        {
12            return availableSequence;
13        }
14        //如果不小于,返回所有sequence(可能多生产者)和availableSequence中最大的
15        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
16    }
17

其他方法实现很简单,功能上分别有:

  1. 获取当前cursorSequence(并没有什么用,就是为了监控)

  2. 负责中断和恢复的alert标记


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1    @Override
2    public long getCursor()
3    {
4        return dependentSequence.get();
5    }
6
7    @Override
8    public boolean isAlerted()
9    {
10        return alerted;
11    }
12
13    @Override
14    public void alert()
15    {
16        alerted = true;
17        waitStrategy.signalAllWhenBlocking();
18    }
19
20    @Override
21    public void clearAlert()
22    {
23        alerted = false;
24    }
25
26    @Override
27    public void checkAlert() throws AlertException
28    {
29        if (alerted)
30        {
31            throw AlertException.INSTANCE;
32        }
33    }
34

构造SequenceBarrier在框架中只有一个入口,就是AbstractSequencer的:


1
2
3
4
5
1public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
2    {
3        return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
4    }
5

SequenceProcessor

通过SequenceBarrier,我们可以实现消费之间的依赖关系,但是,消费方式(比如广播,群组消费等等),需要通过SequenceProcessor的实现类实现:
通过类依赖关系我们发现,EventProcessor都是拓展了Runnable接口,也就是我们可以把它们当做线程处理。

1. BatchEventProcessor:

它的构造方法:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1/**
2     * 构造一个消费者之间非互斥消费的消费者
3     *
4     * @param dataProvider    对应的RingBuffer
5     * @param sequenceBarrier 依赖关系,通过构造不同的sequenceBarrier用互相的dependentsequence,我们可以构造出先后消费关系
6     * @param eventHandler    用户实现的处理消费的event的业务消费者.
7     */
8    public BatchEventProcessor(
9        final DataProvider<T> dataProvider,
10        final SequenceBarrier sequenceBarrier,
11        final EventHandler<? super T> eventHandler)
12    {
13        this.dataProvider = dataProvider;
14        this.sequenceBarrier = sequenceBarrier;
15        this.eventHandler = eventHandler;
16
17        if (eventHandler instanceof SequenceReportingEventHandler)
18        {
19            ((SequenceReportingEventHandler<?>) eventHandler).setSequenceCallback(sequence);
20        }
21
22        timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
23    }
24

线程为一个死循环:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
1 @Override
2    public void run()
3    {
4        //检查状态
5        if (!running.compareAndSet(false, true))
6        {
7            throw new IllegalStateException("Thread is already running");
8        }
9        //清理
10        sequenceBarrier.clearAlert();
11        //如果用户实现的EventHandler继承了LifecycleAware,则执行其onStart方法
12        notifyStart();
13
14        T event = null;
15        //sequence初始值为-1,设计上当前值是已经消费过的
16        long nextSequence = sequence.get() + 1L;
17        try
18        {
19            while (true)
20            {
21                try
22                {
23                    //获取当前可以消费的最大sequence
24                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);
25                    while (nextSequence <= availableSequence)
26                    {
27                        //获取并处理
28                        event = dataProvider.get(nextSequence);
29                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
30                        nextSequence++;
31                    }
32                    //设置当前sequence,注意,出现异常需要特殊处理,防止重复消费
33                    sequence.set(availableSequence);
34                }
35                catch (final TimeoutException e)
36                {
37                    //wait超时异常
38                    notifyTimeout(sequence.get());
39                }
40                catch (final AlertException ex)
41                {
42                    //中断异常
43                    if (!running.get())
44                    {
45                        break;
46                    }
47                }
48                catch (final Throwable ex)
49                {
50                    exceptionHandler.handleEventException(ex, nextSequence, event);
51                    //如果出现异常则设置为nextSequence
52                    sequence.set(nextSequence);
53                    nextSequence++;
54                }
55            }
56        }
57        finally
58        {
59            //如果用户实现的EventHandler继承了LifecycleAware,则执行其onShutdown方法
60            notifyShutdown();
61            running.set(false);
62        }
63    }
64

可以看出:

  1. BatchEventProcessor可以处理超时,可以处理中断,可以通过用户实现的异常处理类处理异常,同时,发生异常之后再次启动,不会漏消费,也不会重复消费。
  2. 不同的BatchEventProcessor之间通过SequenceBarrier进行依赖消费。原理如下图所示:

假设我们有三个消费者BatchEventProcessor1,BatchEventProcessor2,BatchEventProcessor3. 1需要先于2和3消费,那么构建BatchEventProcessor和SequenceBarrier时,我们需要让BatchEventProcessor2和BatchEventProcessor3的SequenceBarrier的dependentSequence中加入SequenceBarrier1的sequence。
其实这里2和3共用一个SequenceBarrier就行。

2. WorkProcessor

另一种消费者是WorkProcessor。利用它,可以实现互斥消费,同样的利用SequenceBarrier可以实现消费顺序


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
1public void run()
2    {
3        if (!running.compareAndSet(false, true))
4        {
5            throw new IllegalStateException("Thread is already running");
6        }
7        sequenceBarrier.clearAlert();
8
9        notifyStart();
10
11        boolean processedSequence = true;
12        long cachedAvailableSequence = Long.MIN_VALUE;
13        long nextSequence = sequence.get();
14        T event = null;
15        while (true)
16        {
17            try
18            {
19                if (processedSequence)
20                {
21                    processedSequence = false;
22                    //获取下一个可以消费的Sequence
23                    do
24                    {
25                        nextSequence = workSequence.get() + 1L;
26                        sequence.set(nextSequence - 1L);
27                    }
28                    //多个WorkProcessor之间,如果共享一个workSequence,那么,可以实现互斥消费,因为只有一个线程可以CAS更新成功
29                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
30                }
31
32                if (cachedAvailableSequence >= nextSequence)
33                {
34                    event = ringBuffer.get(nextSequence);
35                    workHandler.onEvent(event);
36                    processedSequence = true;
37                }
38                else
39                {
40                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
41                }
42            }
43            catch (final TimeoutException e)
44            {
45                notifyTimeout(sequence.get());
46            }
47            catch (final AlertException ex)
48            {
49                if (!running.get())
50                {
51                    break;
52                }
53            }
54            catch (final Throwable ex)
55            {
56                // handle, mark as processed, unless the exception handler threw an exception
57                exceptionHandler.handleEventException(ex, nextSequence, event);
58                processedSequence = true;
59            }
60        }
61
62        notifyShutdown();
63
64        running.set(false);
65    }
66

3. WorkerPool

多个WorkerProcessor可以组成一个WorkerPool:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1public WorkerPool(
2        final RingBuffer<T> ringBuffer,
3        final SequenceBarrier sequenceBarrier,
4        final ExceptionHandler<? super T> exceptionHandler,
5        final WorkHandler<? super T>... workHandlers)
6    {
7        this.ringBuffer = ringBuffer;
8        final int numWorkers = workHandlers.length;
9        workProcessors = new WorkProcessor[numWorkers];
10
11        for (int i = 0; i < numWorkers; i++)
12        {
13            workProcessors[i] = new WorkProcessor<T>(
14                ringBuffer,
15                sequenceBarrier,
16                workHandlers[i],
17                exceptionHandler,
18                workSequence);
19        }
20    }
21

里面的 workHandlers[i]共享同一个workSequence,所以,同一个WorkerPool内,是互斥消费。

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense(Google网站联盟)广告申请指南

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索