Disruptor启动过程源码分析(二)
现在我们第一步吧Disruptor实例化好了,下面我们看看handlerEventWith(添加消费者的监听)内部都做了些什么
public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
继续追:
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers)
{
checkNotStarted();
//创建一个handler数组
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
//创建一个SequenceBarrier屏障
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
//循环遍历传入的handlers
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
{
final EventHandler<? super T> eventHandler = eventHandlers[i];
//把遍历的handler用于实例化一个batchEventProcessor
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null)
{
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
//把batchEventProcessor添加到consumerRepository中
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
//把batchEventProcessor添加到handler数组当中
processorSequences[i] = batchEventProcessor.getSequence();
}
if (processorSequences.length > 0)
{
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
}
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {
if (processorSequences.length > 0) {
ringBuffer.addGatingSequences(processorSequences);
for (final Sequence barrierSequence : barrierSequences) {
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
这里主要的作用就是把handler加入到一个consumerRepository中,这个consumerRepository是干啥用的目前我们还不知道,等会再回头看看。后面有一步consumerRepository.unMarkEventProcessorsAsEndOfChain操作。看看里面方法的内容,逻辑就是遍历processorSequence的handler元素加入到RingBuffer的sequencer数组中,然后遍历barrierSequences的handler的元素从中移除,注意我们刚开始传入的barrierSequences是第0个元素也就是初始位置。
这里我们或许对barrierSequences和processorSequence有一个初步的认识:barrierSequences存的是已经处理过的handler,processorSequence存的是等待处理的handler。我们直接追到addGatingSequences的中,其实现是在Sequencer的实现类AbstractSequencer中
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
可以看到这里是对一个SequenceGroups进行操作,然而这是个什么东东,我们继续点开看看
class SequenceGroups
{
static <T> void addSequences(
final T holder,
final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
final Cursored cursor,
final Sequence... sequencesToAdd)
{
long cursorSequence;
Sequence[] updatedSequences;
Sequence[] currentSequences;
do
{
得到当前handler的数组
currentSequences = updater.get(holder);
更新当前数组,把新的handler加入到新数组当中
updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
得到指向handler的指针
cursorSequence = cursor.getCursor();
int index = currentSequences.length;
//填充数组
for (Sequence sequence : sequencesToAdd)
{
sequence.set(cursorSequence);
updatedSequences[index++] = sequence;
}
}
//进行CAS操作更新handler数组
while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
//给新添加的handler分配指针
cursorSequence = cursor.getCursor();
for (Sequence sequence : sequencesToAdd)
{
sequence.set(cursorSequence);
}
}
static <T> boolean removeSequence(
final T holder,
final AtomicReferenceFieldUpdater<T, Sequence[]> sequenceUpdater,
final Sequence sequence)
{
//...
}
private static <T> int countMatching(T[] values, final T toMatch)
{
//...
}
}
这里主要就是通过CAS更新AbstractSequencer里面的SEQUECNE_UPDATE,这个SEQUECNE_UPDATE的声明
private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");
可以看出其是对gatingSequence的原子性更新。
总结一下:上面的createEventProcessors是对gatingSequence的更新,这里我们还没直接的看到gatingSequence字段的声明,但是可以作出如下推测:
1、gatingSequence也是一个Sequence序列,Sequence序列用于标记handler数组的下标
2、gatingSequence是当前线程所能执行的所有handler下标的集合。如果其他线程添加了handler,由于有cas操作,当前线程对其具有可见性。
让我们重新回回回回回到Disruptor#createEventProcessor这个方法,让我们看看最后一步:把当前disruptor、consumerRepository、processorSequence作为参数实例化EventHandlerGroup
public class EventHandlerGroup<T>
{
private final Disruptor<T> disruptor;
private final ConsumerRepository<T> consumerRepository;
private final Sequence[] sequences;
EventHandlerGroup(final Disruptor<T> disruptor,
final ConsumerRepository<T> consumerRepository,
final Sequence[] sequences)
{
this.disruptor = disruptor;
this.consumerRepository = consumerRepository;
this.sequences = Arrays.copyOf(sequences, sequences.length);
}
//...
}
EventHandlerGroup对于每一个传入的消费者,其中的handler都会对其进行消费。这里EventHandlerGroup初始化完毕了。
最后总结一句:Main里面的handlerEventWith方法的最终目的就是实例化EventHandlerGroup,EventHandlerGroup里面实例化了传入的handler用于对消费者进行消费。
本人刚接触源码不久,如有不正确的地方,请指出我会及时改正,谢谢大家。