高并发数据结构Disruptor解析(4)

释放双眼,带上耳机,听听看~!

RingBuffer

RingBuffer类是Disruptor核心的数据结构类。它是一个环状的Buffer,上面的槽(slot)可以保存一个个Event。下面是Disruptor中RingBuffer类继承关系:

除了实现之前提到过的Sequenced和Cursored接口,这里还涉及到了DataProvider这个接口。


1
2
3
4
5
1public interface DataProvider<T>
2{
3    T get(long sequence);
4}
5

它只有一个方法get,这个方法就是获取某个sequence对应的对象,对象类型在这里是抽象的(T)。这个方法对于RingBuffer会在两个地方调用,第一个是在生产时,这个Event对象需要被生产者获取往里面填充数据。第二个是在消费时,获取这个Event对象用于消费。
EventSequencer接口没有自己的方法,只是为了将Sequencer和DataProvider合起来。
EventSink代表RingBuffer是一个以Event槽为基础的数据结构。同时实现EventSequencer和EventSink代表RingBuffer是一个以Event槽为基础元素保存的数据结构。
EventSink接口的主要方法都是发布Event,发布一个Event的流程是:申请下一个Sequence->申请成功则获取对应槽的Event->初始化并填充对应槽的Event->发布Event。这里,初始化,填充Event是通过实现EventTranslator,EventTranslatorOneArg,EventTranslatorTwoArg,EventTranslatorThreeArg,EventTranslatorVararg这些EventTranslator来做的。我们看下EventTranslator,EventTranslatorOneArg和EventTranslatorVararg的源码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1public interface EventTranslator<T>
2{
3    /**
4     * Translate a data representation into fields set in given event
5     *
6     * @param event    into which the data should be translated.
7     * @param sequence that is assigned to event.
8     */
9    void translateTo(final T event, long sequence);
10}
11
12public interface EventTranslatorOneArg<T, A>
13{
14    /**
15     * Translate a data representation into fields set in given event
16     *
17     * @param event    into which the data should be translated.
18     * @param sequence that is assigned to event.
19     * @param arg0     The first user specified argument to the translator
20     */
21    void translateTo(final T event, long sequence, final A arg0);
22}
23
24public interface EventTranslatorVararg<T>
25{
26    /**
27     * Translate a data representation into fields set in given event
28     *
29     * @param event    into which the data should be translated.
30     * @param sequence that is assigned to event.
31     * @param args     The array of user arguments.
32     */
33    void translateTo(final T event, long sequence, final Object... args);
34}
35

他们由生产者用户实现,将Event初始化并填充。在发布一条Event的时候,这些Translator的translate方法会被调用。在translate方法初始化并填充Event。对于EventTranslator,translate方法只接受Event和Sequence作为参数,对于其他的,都还会接受一个或多个参数用来初始化并填充Event。
EventSink接口是用来发布Event的,在发布的同时,调用绑定的Translator来初始化并填充Event。EventSink接口的大部分方法接受不同的Translator来处理Event:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
1public interface EventSink<E> {
2    /**
3     * 申请下一个Sequence->申请成功则获取对应槽的Event->利用translator初始化并填充对应槽的Event->发布Event
4     * @param translator translator用户实现,用于初始化Event,这里是不带参数Translator
5     */
6     void publishEvent(EventTranslator<E> translator);
7
8    /**
9     * 尝试申请下一个Sequence->申请成功则获取对应槽的Event->利用translator初始化并填充对应槽的Event->发布Event
10     * 若空间不足,则立即失败返回
11     * @param translator translator用户实现,用于初始化Event,这里是不带参数Translator
12     * @return 成功true,失败false
13     */
14     boolean tryPublishEvent(EventTranslator<E> translator);
15
16     <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0);
17
18     <A> boolean tryPublishEvent(EventTranslatorOneArg<E, A> translator, A arg0);
19
20     <A, B> void publishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1);
21
22     <A, B> boolean tryPublishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1);
23
24     <A, B, C> void publishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2);
25
26     <A, B, C> boolean tryPublishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2);
27
28     void publishEvent(EventTranslatorVararg<E> translator, Object... args);
29
30     boolean tryPublishEvent(EventTranslatorVararg<E> translator, Object... args);
31
32    /**
33     * 包括申请多个Sequence->申请成功则获取对应槽的Event->利用每个translator初始化并填充每个对应槽的Event->发布Event
34     * @param translators
35     */
36     void publishEvents(EventTranslator<E>[] translators);
37
38     void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize);
39
40     boolean tryPublishEvents(EventTranslator<E>[] translators);
41
42     boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize);
43
44     <A> void publishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0);
45
46     <A> void publishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0);
47
48     <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0);
49
50     <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0);
51
52     <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1);
53
54     <A, B> void publishEvents(
55            EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize, A[] arg0,
56            B[] arg1);
57
58     <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1);
59
60     <A, B> boolean tryPublishEvents(
61            EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize,
62            A[] arg0, B[] arg1);
63
64     <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2);
65
66     <A, B, C> void publishEvents(
67            EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt, int batchSize,
68            A[] arg0, B[] arg1, C[] arg2);
69
70     <A, B, C> boolean tryPublishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2);
71
72     <A, B, C> boolean tryPublishEvents(
73            EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt,
74            int batchSize, A[] arg0, B[] arg1, C[] arg2);
75
76     void publishEvents(EventTranslatorVararg<E> translator, Object[]... args);
77
78     void publishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args);
79
80     boolean tryPublishEvents(EventTranslatorVararg<E> translator, Object[]... args);
81
82     boolean tryPublishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args);
83}
84
85

接下来到我们的主要环节,RingBuffer类。与之前相似,RingBuffer也是做了缓冲行填充。
RingBuffer类中保存了整个RingBuffer每个槽(entry或者slot)的Event对象,对应的field是private final Object[] entries;,这些对象只在RingBuffer初始化时被建立,之后就是修改这些对象(初始化Event和填充Event),并不会重新建立新的对象。RingBuffer可以有多生产者和消费者,所以这个entries会被多线程访问频繁的,但不会修改(因为不会重新建立新的对象,这个数组保存的是对对象的具体引用,所以不会变)。但是我们要避免他们和被修改的对象读取到同一个缓存行,避免缓存行失效重新读取。
我们看源代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
1abstract class RingBufferPad
2{
3    protected long p1, p2, p3, p4, p5, p6, p7;
4}
5
6abstract class RingBufferFields<E> extends RingBufferPad
7{
8    //Buffer数组填充
9    private static final int BUFFER_PAD;
10    //Buffer数组起始基址
11    private static final long REF_ARRAY_BASE;
12    //2^n=每个数组对象引用所占空间,这个n就是REF_ELEMENT_SHIFT
13    private static final int REF_ELEMENT_SHIFT;
14    private static final Unsafe UNSAFE = Util.getUnsafe();
15
16    static
17    {
18        //Object数组引用长度,32位为4字节,64位为8字节
19        final int scale = UNSAFE.arrayIndexScale(Object[].class);
20        if (4 == scale)
21        {
22            REF_ELEMENT_SHIFT = 2;
23        }
24        else if (8 == scale)
25        {
26            REF_ELEMENT_SHIFT = 3;
27        }
28        else
29        {
30            throw new IllegalStateException("Unknown pointer size");
31        }
32        //需要填充128字节,缓存行长度一般是128字节
33        BUFFER_PAD = 128 / scale;
34        // Including the buffer pad in the array base offset
35        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
36    }
37
38    private final long indexMask;
39    private final Object[] entries;
40    protected final int bufferSize;
41    protected final Sequencer sequencer;
42
43    RingBufferFields(
44        EventFactory<E> eventFactory,
45        Sequencer sequencer)
46    {
47        this.sequencer = sequencer;
48        this.bufferSize = sequencer.getBufferSize();
49        //保证buffer大小不小于1
50        if (bufferSize < 1)
51        {
52            throw new IllegalArgumentException("bufferSize must not be less than 1");
53        }
54        //保证buffer大小为2的n次方
55        if (Integer.bitCount(bufferSize) != 1)
56        {
57            throw new IllegalArgumentException("bufferSize must be a power of 2");
58        }
59        //m % 2^n  <=>  m & (2^n - 1)
60        this.indexMask = bufferSize - 1;
61        /**
62         * 结构:缓存行填充,避免频繁访问的任一entry与另一被修改的无关变量写入同一缓存行
63         * --------------
64         * *   数组头   * BASE
65         * *   Padding  * 128字节
66         * * reference1 * SCALE
67         * * reference2 * SCALE
68         * * reference3 * SCALE
69         * ..........
70         * *   Padding  * 128字节
71         * --------------
72         */
73        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
74        //利用eventFactory初始化RingBuffer的每个槽
75        fill(eventFactory);
76    }
77
78    private void fill(EventFactory<E> eventFactory)
79    {
80        for (int i = 0; i < bufferSize; i++)
81        {
82            entries[BUFFER_PAD + i] = eventFactory.newInstance();
83        }
84    }
85
86    @SuppressWarnings("unchecked")
87    protected final E elementAt(long sequence)
88    {
89        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
90    }
91}
92
93

注释中提到对于entries数组的缓存行填充,申请的数组大小为实际需要大小加上2 * BUFFER_PAD,所占空间就是2*128字节。由于数组中的元素经常访问,所以将数组中的实际元素两边各加上128字节的padding防止false sharing。
所以,初始化RingBuffer内所有对象时,从下标BUFFER_PAD开始,到BUFFER_PAD+bufferSize-1为止。取出某一sequence的对象,也是BUFFER_PAD开始算0,这里的:return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
代表取出entries对象,地址为REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)的对象。这里是个对象引用,地址是以REF_ARRAY_BASE 为基址(数组基址+数组头+引用偏移),每个引用占用2^REF_ELEMENT_SHIFT个字节,sequence 对bufferSize取模乘以2^REF_ELEMENT_SHIFT。
接下来看可以供用户调用的具体的构造方法,RingBuffer在Disruptor包外部不能直接调用其构造方法,用户只能用静态方法创建:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
1    /**
2     * Construct a RingBuffer with the full option set.
3     *
4     * @param eventFactory to newInstance entries for filling the RingBuffer
5     * @param sequencer    sequencer to handle the ordering of events moving through the RingBuffer.
6     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
7     */
8    RingBuffer(
9        EventFactory&lt;E&gt; eventFactory,
10        Sequencer sequencer)
11    {
12        super(eventFactory, sequencer);
13    }
14
15    /**
16     * Create a new multiple producer RingBuffer with the specified wait strategy.
17     *
18     * @param factory      used to create the events within the ring buffer.
19     * @param bufferSize   number of elements to create within the ring buffer.
20     * @param waitStrategy used to determine how to wait for new elements to become available.
21     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
22     * @see MultiProducerSequencer
23     */
24    public static &lt;E&gt; RingBuffer&lt;E&gt; createMultiProducer(
25        EventFactory&lt;E&gt; factory,
26        int bufferSize,
27        WaitStrategy waitStrategy)
28    {
29        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
30
31        return new RingBuffer&lt;E&gt;(factory, sequencer);
32    }
33
34    /**
35     * Create a new multiple producer RingBuffer using the default wait strategy  {@link BlockingWaitStrategy}.
36     *
37     * @param factory    used to create the events within the ring buffer.
38     * @param bufferSize number of elements to create within the ring buffer.
39     * @throws IllegalArgumentException if &lt;tt&gt;bufferSize&lt;/tt&gt; is less than 1 or not a power of 2
40     * @see MultiProducerSequencer
41     */
42    public static &lt;E&gt; RingBuffer&lt;E&gt; createMultiProducer(EventFactory&lt;E&gt; factory, int bufferSize)
43    {
44        return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
45    }
46
47    /**
48     * Create a new single producer RingBuffer with the specified wait strategy.
49     *
50     * @param factory      used to create the events within the ring buffer.
51     * @param bufferSize   number of elements to create within the ring buffer.
52     * @param waitStrategy used to determine how to wait for new elements to become available.
53     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
54     * @see SingleProducerSequencer
55     */
56    public static &lt;E&gt; RingBuffer&lt;E&gt; createSingleProducer(
57        EventFactory&lt;E&gt; factory,
58        int bufferSize,
59        WaitStrategy waitStrategy)
60    {
61        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
62
63        return new RingBuffer&lt;E&gt;(factory, sequencer);
64    }
65
66    /**
67     * Create a new single producer RingBuffer using the default wait strategy  {@link BlockingWaitStrategy}.
68     *
69     * @param factory    used to create the events within the ring buffer.
70     * @param bufferSize number of elements to create within the ring buffer.
71     * @throws IllegalArgumentException if &lt;tt&gt;bufferSize&lt;/tt&gt; is less than 1 or not a power of 2
72     * @see MultiProducerSequencer
73     */
74    public static &lt;E&gt; RingBuffer&lt;E&gt; createSingleProducer(EventFactory&lt;E&gt; factory, int bufferSize)
75    {
76        return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
77    }
78
79    /**
80     * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
81     *
82     * @param producerType producer type to use {@link ProducerType}.
83     * @param factory      used to create events within the ring buffer.
84     * @param bufferSize   number of elements to create within the ring buffer.
85     * @param waitStrategy used to determine how to wait for new elements to become available.
86     * @throws IllegalArgumentException if bufferSize is less than 1 or not a power of 2
87     */
88    public static &lt;E&gt; RingBuffer&lt;E&gt; create(
89        ProducerType producerType,
90        EventFactory&lt;E&gt; factory,
91        int bufferSize,
92        WaitStrategy waitStrategy)
93    {
94        switch (producerType)
95        {
96            case SINGLE:
97                return createSingleProducer(factory, bufferSize, waitStrategy);
98            case MULTI:
99                return createMultiProducer(factory, bufferSize, waitStrategy);
100            default:
101                throw new IllegalStateException(producerType.toString());
102        }
103    }
104

用户组装一个RingBuffer需要如下元素:实现EventFactory的Event的工厂,实现Sequencer的生产者,等待策略waitStrategy还有bufferSize。
接下来里面方法的实现都比较简单,这里不再赘述

给TA打赏
共{{data.count}}人
人已打赏
安全经验

人们为何痛恨Google Adsense

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索