Sequence(续)
之前说了Sequence通过给他的核心值value添加前置无用的padding long还有后置无用的padding long来避免对于value操作的false sharing的发生。那么对于这个value的操作是怎么操作的呢?
这里我们需要先了解下Unsafe类这个东西,可以参考我的另一篇文章 – Java Unsafe 类。
Unsafe中有一些底层为C++的方法,对于Sequence,其中做了:
获取Unsafe,通过Unsafe获取Sequence中的value的地址,根据这个地址CAS更新。
com.lmax.disruptor.Sequence.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 1public class Sequence extends RhsPadding
2{
3 static final long INITIAL_VALUE = -1L;
4 private static final Unsafe UNSAFE;
5 private static final long VALUE_OFFSET;
6
7 static
8 {
9 UNSAFE = Util.getUnsafe();
10 try
11 {
12 VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value"));
13 }
14 catch (final Exception e)
15 {
16 throw new RuntimeException(e);
17 }
18 }
19
20 /**
21 * 默认初始value为-1
22 */
23 public Sequence()
24 {
25 this(INITIAL_VALUE);
26 }
27
28 public Sequence(final long initialValue)
29 {
30 UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue);
31 }
32
33 public long get()
34 {
35 return value;
36 }
37
38 /**
39 * 利用Unsafe更新value的地址内存上的值从而更新value的值
40 */
41 public void set(final long value)
42 {
43 UNSAFE.putOrderedLong(this, VALUE_OFFSET, value);
44 }
45
46 /**
47 * 利用Unsafe原子更新value
48 */
49 public void setVolatile(final long value)
50 {
51 UNSAFE.putLongVolatile(this, VALUE_OFFSET, value);
52 }
53
54 /**
55 * 利用Unsafe CAS
56 */
57 public boolean compareAndSet(final long expectedValue, final long newValue)
58 {
59 return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue);
60 }
61
62
63 public long incrementAndGet()
64 {
65 return addAndGet(1L);
66 }
67
68 public long addAndGet(final long increment)
69 {
70 long currentValue;
71 long newValue;
72
73 do
74 {
75 currentValue = get();
76 newValue = currentValue + increment;
77 }
78 while (!compareAndSet(currentValue, newValue));
79
80 return newValue;
81 }
82
83 @Override
84 public String toString()
85 {
86 return Long.toString(get());
87 }
88}
89
Producer
SingleProducerSequencer
接下来我们先从Producer看起。Disruptor分为单生产者和多生产者,先来关注下单生产者的核心类SingleProducerSequencer,类结构如下:
针对这些接口做一下简单的描述:
**Cursored接口:**实现此接口的类,可以理解为,记录某个sequence的类。例如,生产者在生产消息时,需要知道当前ringBuffer下一个生产的位置,这个位置需要更新,每次更新,需要访问getCursor来定位。
**Sequenced接口:**实现此接口类,可以理解为,实现一个有序的存储结构,也就是RingBuffer的一个特性。一个Producer,在生产Event时,先获取下一位置的Sequence,之后填充Event,填充好后再publish,这之后,这个Event就可以被消费处理了
- getBufferSize获取ringBuffer的大小
- hasAvailableCapacity判断空间是否足够
- remainingCapacity获取ringBuffer的剩余空间
- next申请下一个或者n个sequence(value)作为生产event的位置
- tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException
- publish发布Event
Sequencer接口:**Sequencer接口,扩展了Cursored和Sequenced接口。在前两者的基础上,增加了消费与生产相关的方法。其中一个比较重要的设计是关于**GatingSequence的设计:
之后我们会提到,RingBuffer的头由一个名字为Cursor的Sequence对象维护,用来协调生产者向RingBuffer中填充数据。表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的话,队列尾的维护就是无锁的。但是,在生产者方确定RingBuffer是否已满就需要跟踪更多信息。为此,GatingSequence用来跟踪相关Sequence
- INITIAL_CURSOR_VALUE: -1 为 sequence的起始值
- claim: 申请一个特殊的Sequence,只有设定特殊起始值的ringBuffer时才会使用(一般是多个生产者时才会使用)
- isAvailable:非阻塞,验证一个sequence是否已经被published并且可以消费
- addGatingSequences:将这些sequence加入到需要跟踪处理的gatingSequences中
- removeGatingSequence:移除某个sequence
- newBarrier:给定一串需要跟踪的sequence,创建SequenceBarrier。SequenceBarrier是用来给多消费者确定消费位置是否可以消费用的
- getMinimumSequence:获取这个ringBuffer的gatingSequences中最小的一个sequence
- getHighestPublishedSequence:获取最高可以读取的Sequence
- newPoller:目前没用,不讲EventPoller相关的内容(没有用到)
之后,抽象类AbstractSequencer实现Sequencer这个接口:定义了5个域:
1
2
3
4
5
6 1 private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
2 protected final int bufferSize;
3 protected final WaitStrategy waitStrategy;
4 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
5 protected volatile Sequence[] gatingSequences = new Sequence[0];
6
- SEQUENCE_UPDATER 是用来原子更新gatingSequences 的工具类
- bufferSize记录生产目标RingBuffer的大小
- waitStrategy表示这个生产者的等待策略(之后会讲)
- cursor:生产定位,初始为-1
- **gatingSequences **:前文已讲
构造方法增加了一些对于这个类的限制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy)
2{
3 if (bufferSize < 1)
4 {
5 throw new IllegalArgumentException("bufferSize must not be less than 1");
6 }
7 if (Integer.bitCount(bufferSize) != 1)
8 {
9 throw new IllegalArgumentException("bufferSize must be a power of 2");
10 }
11
12 this.bufferSize = bufferSize;
13 this.waitStrategy = waitStrategy;
14}
15
bufferSize不能小于1并且bufferSize必须是2的n次方。原因我的第一篇文章已经讲述。
对于getCursor和getBufferSize的实现,这里仅仅是简单的getter:
1
2
3
4
5
6
7
8
9
10
11
12 1@Override
2public final long getCursor()
3{
4 return cursor.get();
5}
6
7@Override
8public final int getBufferSize()
9{
10 return bufferSize;
11}
12
对于addGatingSequences和removeGatingSequence,则是原子更新:
1
2
3
4
5
6
7
8
9
10 1public final void addGatingSequences(Sequence... gatingSequences)
2{
3 SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
4}
5
6public boolean removeGatingSequence(Sequence sequence)
7{
8 return SequenceGroups.removeSequence(this, SEQUENCE_UPDATER, sequence);
9}
10
原子更新工具类静态方法代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 1/**
2 * 原子添加sequences
3 *
4 * @param holder 原子更新的域所属的类对象
5 * @param updater 原子更新的域对象
6 * @param cursor 定位
7 * @param sequencesToAdd 要添加的sequences
8 * @param <T>
9 */
10static <T> void addSequences(
11 final T holder,
12 final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
13 final Cursored cursor,
14 final Sequence... sequencesToAdd)
15{
16 long cursorSequence;
17 Sequence[] updatedSequences;
18 Sequence[] currentSequences;
19 //在更新成功之前,一直重新读取currentSequences,扩充为添加所有sequence之后的updatedSequences
20 do
21 {
22 currentSequences = updater.get(holder);
23 updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
24 cursorSequence = cursor.getCursor();
25
26 int index = currentSequences.length;
27 //将新的sequences的值设置为cursorSequence
28 for (Sequence sequence : sequencesToAdd)
29 {
30 sequence.set(cursorSequence);
31 updatedSequences[index++] = sequence;
32 }
33 }
34 while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
35
36 cursorSequence = cursor.getCursor();
37 for (Sequence sequence : sequencesToAdd)
38 {
39 sequence.set(cursorSequence);
40 }
41}
42
43/**
44 * 原子移除某个指定的sequence
45 *
46 * @param holder 原子更新的域所属的类对象
47 * @param sequenceUpdater 原子更新的域对象
48 * @param sequence 要移除的sequence
49 * @param <T>
50 * @return
51 */
52static <T> boolean removeSequence(
53 final T holder,
54 final AtomicReferenceFieldUpdater<T, Sequence[]> sequenceUpdater,
55 final Sequence sequence)
56{
57 int numToRemove;
58 Sequence[] oldSequences;
59 Sequence[] newSequences;
60
61 do
62 {
63 oldSequences = sequenceUpdater.get(holder);
64
65 numToRemove = countMatching(oldSequences, sequence);
66
67 if (0 == numToRemove)
68 {
69 break;
70 }
71
72 final int oldSize = oldSequences.length;
73 newSequences = new Sequence[oldSize - numToRemove];
74
75 for (int i = 0, pos = 0; i < oldSize; i++)
76 {
77 final Sequence testSequence = oldSequences[i];
78 if (sequence != testSequence)
79 {
80 newSequences[pos++] = testSequence;
81 }
82 }
83 }
84 while (!sequenceUpdater.compareAndSet(holder, oldSequences, newSequences));
85
86 return numToRemove != 0;
87}
88
89private static <T> int countMatching(T[] values, final T toMatch)
90{
91 int numToRemove = 0;
92 for (T value : values)
93 {
94 if (value == toMatch) // Specifically uses identity
95 {
96 numToRemove++;
97 }
98 }
99 return numToRemove;
100}
101
对于newBarrier,返回的是一个ProcessingSequenceBarrier:
SequenceBarrier我们之后会详讲,这里我们可以理解为用来协调消费者消费的对象。例如消费者A依赖于消费者B,就是消费者A一定要后于消费者B消费,也就是A只能消费B消费过的,也就是A的sequence一定要小于B的。这个Sequence的协调,通过A和B设置在同一个SequenceBarrier上实现。同时,我们还要保证所有的消费者只能消费被Publish过的。这里我们先不深入
1
2
3
4
5 1public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
2{
3 return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
4}
5
之后到了我们这次的核心,SingleProducerSequencer,观察它的结构,他依然利用了long冗余避免CPU的false sharing,这次的field不只有一个,而是有两个,所以,前后放上7个long类型,这样在最坏的情况下也能避免false sharing(参考我的第一篇文章)
这两个field是:
1
2
3 1protected long nextValue = Sequence.INITIAL_VALUE;
2protected long cachedValue = Sequence.INITIAL_VALUE;
3
初始值都为-1,这里强调下,由于这个类并没有实现任何的Barrier,所以在Disruptor框架中,这个类并不是线程安全的。不过由于从命名上看,就是单一生产者,所以在使用的时候也不会用多线程去调用里面的方法。
之后就是对AbstractSequencer抽象方法的实现:
hasAvailableCapacity判断空间是否足够:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1@Override
2public boolean hasAvailableCapacity(int requiredCapacity) {
3 //下一个生产Sequence位置
4 long nextValue = this.nextValue;
5 //下一位置加上所需容量减去整个bufferSize,如果为正数,那证明至少转了一圈,则需要检查gatingSequences(由消费者更新里面的Sequence值)以保证不覆盖还未被消费的
6 long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
7 //Disruptor经常用缓存,这里缓存之间所有gatingSequences最小的那个,这样不用每次都遍历一遍gatingSequences,影响效率
8 long cachedGatingSequence = this.cachedValue;
9 //只要wrapPoint大于缓存的所有gatingSequences最小的那个,就重新检查更新缓存
10 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
11 {
12 long minSequence = Util.getMinimumSequence(gatingSequences, nextValue);
13 this.cachedValue = minSequence;
14 //空间不足返回false
15 if (wrapPoint > minSequence)
16 {
17 return false;
18 }
19 }
20 //若wrapPoint小于缓存的所有gatingSequences最小的那个,证明可以放心生产
21 return true;
22}
23
对于next方法:申请下一个或者n个sequence(value)作为生产event的位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 1@Override
2public long next() {
3 return next(1);
4}
5
6@Override
7public long next(int n) {
8 if (n < 1) {
9 throw new IllegalArgumentException("n must be > 0");
10 }
11
12 long nextValue = this.nextValue;
13 //next方法和之前的hasAvailableCapacity同理,只不过这里是相当于阻塞的
14 long nextSequence = nextValue + n;
15 long wrapPoint = nextSequence - bufferSize;
16 long cachedGatingSequence = this.cachedValue;
17
18 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
19 long minSequence;
20 //只要wrapPoint大于最小的gatingSequences,那么不断唤醒消费者去消费,并利用LockSupport让出CPU,直到wrapPoint不大于最小的gatingSequences
21 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {
22 waitStrategy.signalAllWhenBlocking();
23 LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
24 }
25 //同理,缓存最小的gatingSequences
26 this.cachedValue = minSequence;
27 }
28
29 this.nextValue = nextSequence;
30
31 return nextSequence;
32}
33
tryNext尝试申请下一个或者n个sequence(value)作为生产event的位置,容量不足会抛出InsufficientCapacityException。而这里的容量检查,就是通过之前的hasAvailableCapacity方法检查:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 1 @Override
2 public long tryNext() throws InsufficientCapacityException {
3 return tryNext(1);
4 }
5
6 @Override
7 public long tryNext(int n) throws InsufficientCapacityException {
8 if (n < 1) {
9 throw new IllegalArgumentException("n must be > 0");
10 }
11
12 if (!hasAvailableCapacity(n)) {
13 throw InsufficientCapacityException.INSTANCE;
14 }
15
16 long nextSequence = this.nextValue += n;
17
18 return nextSequence;
19 }
20
publish发布Event:
1
2
3
4
5
6
7
8
9
10
11
12 1 @Override
2 public void publish(long sequence) {
3 //cursor代表可以消费的sequence
4 cursor.set(sequence);
5 waitStrategy.signalAllWhenBlocking();
6 }
7
8 @Override
9 public void publish(long lo, long hi) {
10 publish(hi);
11 }
12
其他:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 1 @Override
2 public void claim(long sequence) {
3 nextValue = sequence;
4 }
5
6 @Override
7 public boolean isAvailable(long sequence) {
8 return sequence <= cursor.get();
9 }
10
11 @Override
12 public long getHighestPublishedSequence(long nextSequence, long availableSequence) {
13 return availableSequence;
14 }
15
下面,我们针对SingleProducerSequencer画一个简单的工作流程:
假设有如下RingBuffer和SingleProducerSequencer,以及对应的消费者辅助类SequenceBarrier,这里不画消费者,假设有不断通过SequenceBarrier消费的消费者。SingleProducerSequencer的gatingSequences数组内保存这一个指向某个Sequence的引用,同时这个Sequence也会被SequenceBarrier更新以表示消费者消费到哪里了。这里生产的Sequence还有消费的Sequence都是从零开始不断增长的,即使大于BufferSize,也可以通过sequence的值对BufferSize取模定位到RingBuffer上。
假设SingleProducerSequencer这时生产两个Event,要放入RingBuffer。则假设先调用hasAvailableCapacity(2)判断下。代码流程是:
wrapPoint = (nextValue + requiredCapacity) – bufferSize = (-1 + 2) – 4 = -3
-3 < cachedValue所以不用检查gateSequences直接返回true。假设返回true,就开始填充,之后调用publish更新cursor,这样消费者调用isAvailable根据Cursor就可以判断,sequence:0和sequence:1可以消费了。
假设这之后,消费者消费了一个Event,更新Sequence为0.
之后,生产者要生产四个Event,调用hasAvailableCapacity(4)检查。代码流程是:
wrapPoint = (nextValue + requiredCapacity) – bufferSize = (1 + 4) – 4 = 1
1 > cachedValue所以要重新检查,这是最小的Sequence是0,但是1 > 仍然大于最小的Sequence,所以更新cachedValue,返回false。
至此,展示了一个简单的生产过程,SingleProducerSequencer也就讲完啦。