高并发数据结构Disruptor解析(2)

释放双眼,带上耳机,听听看~!

Sequence(续)

之前说了Sequence通过给他的核心值value添加前置无用的padding long还有后置无用的padding long来避免对于value操作的false sharing的发生。那么对于这个value的操作是怎么操作的呢?
这里我们需要先了解下Unsafe类这个东西,可以参考我的另一篇文章 – Java Unsafe 类。
Unsafe中有一些底层为C++的方法,对于Sequence,其中做了:
获取Unsafe,通过Unsafe获取Sequence中的value的地址,根据这个地址CAS更新。
com.lmax.disruptor.Sequence.java


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
1public class Sequence extends RhsPadding
2{
3    static final long INITIAL_VALUE = -1L;
4    private static final Unsafe UNSAFE;
5    private static final long VALUE_OFFSET;
6
7    static
8    {
9        UNSAFE = Util.getUnsafe();
10        try
11        {
12            VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
13        }
14        catch (final Exception e)
15        {
16            throw new RuntimeException(e);
17        }
18    }
19
20    /**
21     * 默认初始value为-1
22     */
23    public Sequence()
24    {
25        this(INITIAL_VALUE);
26    }
27
28    public Sequence(final long initialValue)
29    {
30        UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
31    }
32
33    public long get()
34    {
35        return value;
36    }
37
38    /**
39     * 利用Unsafe更新value的地址内存上的值从而更新value的值
40     */
41    public void set(final long value)
42    {
43        UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
44    }
45
46    /**
47     * 利用Unsafe原子更新value
48     */
49    public void setVolatile(final long value)
50    {
51        UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
52    }
53
54    /**
55     * 利用Unsafe CAS
56     */
57    public boolean compareAndSet(final long expectedValue, final long newValue)
58    {
59        return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
60    }
61
62
63    public long incrementAndGet()
64    {
65        return addAndGet(1L);
66    }
67
68    public long addAndGet(final long increment)
69    {
70        long currentValue;
71        long newValue;
72
73        do
74        {
75            currentValue = get();
76            newValue = currentValue + increment;
77        }
78        while (!compareAndSet(currentValue, newValue));
79
80        return newValue;
81    }
82
83    @Override
84    public String toString()
85    {
86        return Long.toString(get());
87    }
88}
89

Producer

SingleProducerSequencer

接下来我们先从Producer看起。Disruptor分为单生产者和多生产者,先来关注下单生产者的核心类SingleProducerSequencer,类结构如下:
针对这些接口做一下简单的描述:
**Cursored接口:**实现此接口的类,可以理解为,记录某个sequence的类。例如,生产者在生产消息时,需要知道当前ringBuffer下一个生产的位置,这个位置需要更新,每次更新,需要访问getCursor来定位。
**Sequenced接口:**实现此接口类,可以理解为,实现一个有序的存储结构,也就是RingBuffer的一个特性。一个Producer,在生产Event时,先获取下一位置的Sequence,之后填充Event,填充好后再publish,这之后,这个Event就可以被消费处理了

  • getBufferSize获取ringBuffer的大小
  • hasAvailableCapacity判断空间是否足够
  • remainingCapacity获取ringBuffer的剩余空间
  • next申请下一个或者n个sequence(value)作为生产event的位置
  • tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException
  • publish发布Event

Sequencer接口:**Sequencer接口,扩展了Cursored和Sequenced接口。在前两者的基础上,增加了消费与生产相关的方法。其中一个比较重要的设计是关于**GatingSequence的设计:
之后我们会提到,RingBuffer的头由一个名字为Cursor的Sequence对象维护,用来协调生产者向RingBuffer中填充数据。表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的话,队列尾的维护就是无锁的。但是,在生产者方确定RingBuffer是否已满就需要跟踪更多信息。为此,GatingSequence用来跟踪相关Sequence

  • INITIAL_CURSOR_VALUE: -1 为 sequence的起始值
  • claim: 申请一个特殊的Sequence,只有设定特殊起始值的ringBuffer时才会使用(一般是多个生产者时才会使用)
  • isAvailable:非阻塞,验证一个sequence是否已经被published并且可以消费
  • addGatingSequences:将这些sequence加入到需要跟踪处理的gatingSequences中
  • removeGatingSequence:移除某个sequence
  • newBarrier:给定一串需要跟踪的sequence,创建SequenceBarrier。SequenceBarrier是用来给多消费者确定消费位置是否可以消费用的
  • getMinimumSequence:获取这个ringBuffer的gatingSequences中最小的一个sequence
  • getHighestPublishedSequence:获取最高可以读取的Sequence
  • newPoller:目前没用,不讲EventPoller相关的内容(没有用到)

之后,抽象类AbstractSequencer实现Sequencer这个接口:定义了5个域:


1
2
3
4
5
6
1    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
2    protected final int bufferSize;
3    protected final WaitStrategy waitStrategy;
4    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
5    protected volatile Sequence[] gatingSequences = new Sequence[0];
6
  • SEQUENCE_UPDATER 是用来原子更新gatingSequences 的工具类
  • bufferSize记录生产目标RingBuffer的大小
  • waitStrategy表示这个生产者的等待策略(之后会讲)
  • cursor:生产定位,初始为-1
  • **gatingSequences **:前文已讲

构造方法增加了一些对于这个类的限制:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
2{
3    if (bufferSize < 1)
4    {
5        throw new IllegalArgumentException("bufferSize must not be less than 1");
6    }
7    if (Integer.bitCount(bufferSize) != 1)
8    {
9        throw new IllegalArgumentException("bufferSize must be a power of 2");
10    }
11
12    this.bufferSize = bufferSize;
13    this.waitStrategy = waitStrategy;
14}
15

bufferSize不能小于1并且bufferSize必须是2的n次方。原因我的第一篇文章已经讲述。
对于getCursor和getBufferSize的实现,这里仅仅是简单的getter:


1
2
3
4
5
6
7
8
9
10
11
12
1@Override
2public final long getCursor()
3{
4    return cursor.get();
5}
6
7@Override
8public final int getBufferSize()
9{
10    return bufferSize;
11}
12

对于addGatingSequences和removeGatingSequence,则是原子更新:


1
2
3
4
5
6
7
8
9
10
1public final void addGatingSequences(Sequence... gatingSequences)
2{
3    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
4}
5
6public boolean removeGatingSequence(Sequence sequence)
7{
8    return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
9}
10

原子更新工具类静态方法代码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
1/**
2 * 原子添加sequences
3 *
4 * @param holder 原子更新的域所属的类对象
5 * @param updater 原子更新的域对象
6 * @param cursor 定位
7 * @param sequencesToAdd 要添加的sequences
8 * @param <T>
9 */
10static <T> void addSequences(
11        final T holder,
12        final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
13        final Cursored cursor,
14        final Sequence... sequencesToAdd)
15{
16    long cursorSequence;
17    Sequence[] updatedSequences;
18    Sequence[] currentSequences;
19    //在更新成功之前,一直重新读取currentSequences,扩充为添加所有sequence之后的updatedSequences
20    do
21    {
22        currentSequences = updater.get(holder);
23        updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
24        cursorSequence = cursor.getCursor();
25
26        int index = currentSequences.length;
27        //将新的sequences的值设置为cursorSequence
28        for (Sequence sequence : sequencesToAdd)
29        {
30            sequence.set(cursorSequence);
31            updatedSequences[index++] = sequence;
32        }
33    }
34    while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
35
36    cursorSequence = cursor.getCursor();
37    for (Sequence sequence : sequencesToAdd)
38    {
39        sequence.set(cursorSequence);
40    }
41}
42
43/**
44 * 原子移除某个指定的sequence
45 *
46 * @param holder 原子更新的域所属的类对象
47 * @param sequenceUpdater 原子更新的域对象
48 * @param sequence 要移除的sequence
49 * @param <T>
50 * @return
51 */
52static <T> boolean removeSequence(
53        final T holder,
54        final AtomicReferenceFieldUpdater<T, Sequence[]> sequenceUpdater,
55        final Sequence sequence)
56{
57    int numToRemove;
58    Sequence[] oldSequences;
59    Sequence[] newSequences;
60
61    do
62    {
63        oldSequences = sequenceUpdater.get(holder);
64
65        numToRemove = countMatching(oldSequences, sequence);
66
67        if (0 == numToRemove)
68        {
69            break;
70        }
71
72        final int oldSize = oldSequences.length;
73        newSequences = new Sequence[oldSize - numToRemove];
74
75        for (int i = 0, pos = 0; i < oldSize; i++)
76        {
77            final Sequence testSequence = oldSequences[i];
78            if (sequence != testSequence)
79            {
80                newSequences[pos++] = testSequence;
81            }
82        }
83    }
84    while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));
85
86    return numToRemove != 0;
87}
88
89private static <T> int countMatching(T[] values, final T toMatch)
90{
91    int numToRemove = 0;
92    for (T value : values)
93    {
94        if (value == toMatch) // Specifically uses identity
95        {
96            numToRemove++;
97        }
98    }
99    return numToRemove;
100}
101

对于newBarrier,返回的是一个ProcessingSequenceBarrier:
SequenceBarrier我们之后会详讲,这里我们可以理解为用来协调消费者消费的对象。例如消费者A依赖于消费者B,就是消费者A一定要后于消费者B消费,也就是A只能消费B消费过的,也就是A的sequence一定要小于B的。这个Sequence的协调,通过A和B设置在同一个SequenceBarrier上实现。同时,我们还要保证所有的消费者只能消费被Publish过的。这里我们先不深入


1
2
3
4
5
1public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
2{
3    return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
4}
5

之后到了我们这次的核心,SingleProducerSequencer,观察它的结构,他依然利用了long冗余避免CPU的false sharing,这次的field不只有一个,而是有两个,所以,前后放上7个long类型,这样在最坏的情况下也能避免false sharing(参考我的第一篇文章)
这两个field是:


1
2
3
1protected long nextValue = Sequence.INITIAL_VALUE;
2protected long cachedValue = Sequence.INITIAL_VALUE;
3

初始值都为-1,这里强调下,由于这个类并没有实现任何的Barrier,所以在Disruptor框架中,这个类并不是线程安全的。不过由于从命名上看,就是单一生产者,所以在使用的时候也不会用多线程去调用里面的方法。
之后就是对AbstractSequencer抽象方法的实现:
hasAvailableCapacity判断空间是否足够:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1@Override
2public boolean hasAvailableCapacity(int requiredCapacity) {
3    //下一个生产Sequence位置
4    long nextValue = this.nextValue;
5    //下一位置加上所需容量减去整个bufferSize,如果为正数,那证明至少转了一圈,则需要检查gatingSequences(由消费者更新里面的Sequence值)以保证不覆盖还未被消费的
6    long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
7    //Disruptor经常用缓存,这里缓存之间所有gatingSequences最小的那个,这样不用每次都遍历一遍gatingSequences,影响效率
8    long cachedGatingSequence = this.cachedValue;
9    //只要wrapPoint大于缓存的所有gatingSequences最小的那个,就重新检查更新缓存
10    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
11    {
12        long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
13        this.cachedValue = minSequence;
14        //空间不足返回false
15        if (wrapPoint > minSequence)
16        {
17            return false;
18        }
19    }
20    //若wrapPoint小于缓存的所有gatingSequences最小的那个,证明可以放心生产
21    return true;
22}
23

对于next方法:申请下一个或者n个sequence(value)作为生产event的位置


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
1@Override
2public long next() {
3    return next(1);
4}
5
6@Override
7public long next(int n) {
8    if (n < 1) {
9        throw new IllegalArgumentException("n must be > 0");
10    }
11
12    long nextValue = this.nextValue;
13    //next方法和之前的hasAvailableCapacity同理,只不过这里是相当于阻塞的
14    long nextSequence = nextValue + n;
15    long wrapPoint = nextSequence - bufferSize;
16    long cachedGatingSequence = this.cachedValue;
17
18    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
19        long minSequence;
20        //只要wrapPoint大于最小的gatingSequences,那么不断唤醒消费者去消费,并利用LockSupport让出CPU,直到wrapPoint不大于最小的gatingSequences
21        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
22            waitStrategy.signalAllWhenBlocking();
23            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
24        }
25        //同理,缓存最小的gatingSequences
26        this.cachedValue = minSequence;
27    }
28
29    this.nextValue = nextSequence;
30
31    return nextSequence;
32}
33

tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException。而这里的容量检查,就是通过之前的hasAvailableCapacity方法检查:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1    @Override
2    public long tryNext() throws InsufficientCapacityException {
3        return tryNext(1);
4    }
5
6    @Override
7    public long tryNext(int n) throws InsufficientCapacityException {
8        if (n < 1) {
9            throw new IllegalArgumentException("n must be > 0");
10        }
11
12        if (!hasAvailableCapacity(n)) {
13            throw InsufficientCapacityException.INSTANCE;
14        }
15
16        long nextSequence = this.nextValue += n;
17
18        return nextSequence;
19    }
20

publish发布Event:


1
2
3
4
5
6
7
8
9
10
11
12
1    @Override
2    public void publish(long sequence) {
3        //cursor代表可以消费的sequence
4        cursor.set(sequence);
5        waitStrategy.signalAllWhenBlocking();
6    }
7
8    @Override
9    public void publish(long lo, long hi) {
10        publish(hi);
11    }
12

其他:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1    @Override
2    public void claim(long sequence) {
3        nextValue = sequence;
4    }
5
6    @Override
7    public boolean isAvailable(long sequence) {
8        return sequence <= cursor.get();
9    }
10
11    @Override
12    public long getHighestPublishedSequence(long nextSequence, long availableSequence) {
13        return availableSequence;
14    }
15

下面,我们针对SingleProducerSequencer画一个简单的工作流程:
假设有如下RingBuffer和SingleProducerSequencer,以及对应的消费者辅助类SequenceBarrier,这里不画消费者,假设有不断通过SequenceBarrier消费的消费者。SingleProducerSequencer的gatingSequences数组内保存这一个指向某个Sequence的引用,同时这个Sequence也会被SequenceBarrier更新以表示消费者消费到哪里了。这里生产的Sequence还有消费的Sequence都是从零开始不断增长的,即使大于BufferSize,也可以通过sequence的值对BufferSize取模定位到RingBuffer上。
假设SingleProducerSequencer这时生产两个Event,要放入RingBuffer。则假设先调用hasAvailableCapacity(2)判断下。代码流程是:
wrapPoint = (nextValue + requiredCapacity) – bufferSize = (-1 + 2) – 4 = -3
-3 < cachedValue所以不用检查gateSequences直接返回true。假设返回true,就开始填充,之后调用publish更新cursor,这样消费者调用isAvailable根据Cursor就可以判断,sequence:0和sequence:1可以消费了。
假设这之后,消费者消费了一个Event,更新Sequence为0.
之后,生产者要生产四个Event,调用hasAvailableCapacity(4)检查。代码流程是:
wrapPoint = (nextValue + requiredCapacity) – bufferSize = (1 + 4) – 4 = 1
1 > cachedValue所以要重新检查,这是最小的Sequence是0,但是1 > 仍然大于最小的Sequence,所以更新cachedValue,返回false。
至此,展示了一个简单的生产过程,SingleProducerSequencer也就讲完啦。

给TA打赏
共{{data.count}}人
人已打赏
安全经验

Google Adsense广告设置到位置放置的技巧

2021-10-11 16:36:11

安全经验

安全咨询服务

2022-1-12 14:11:49

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索