Disruptor启动过程源码分析(二)

接着 Disruptor启动过程源码分析(一)

现在我们第一步吧Disruptor实例化好了,下面我们看看handlerEventWith(添加消费者的监听)内部都做了些什么

 public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
    {
        return createEventProcessors(new Sequence[0], handlers);
    }

继续追:

EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
                                               final EventHandler<? super T>[] eventHandlers)
    {
        checkNotStarted();
        //创建一个handler数组
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        //创建一个SequenceBarrier屏障
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
        //循环遍历传入的handlers
        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
        {
            final EventHandler<? super T> eventHandler = eventHandlers[i];
        //把遍历的handler用于实例化一个batchEventProcessor
            final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);

            if (exceptionHandler != null)
            {
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }
            //把batchEventProcessor添加到consumerRepository中
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            //把batchEventProcessor添加到handler数组当中
            processorSequences[i] = batchEventProcessor.getSequence();
        }

        if (processorSequences.length > 0)
        {
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }

        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
    }


private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {
        if (processorSequences.length > 0) {
            ringBuffer.addGatingSequences(processorSequences);
            for (final Sequence barrierSequence : barrierSequences) {
                ringBuffer.removeGatingSequence(barrierSequence);
            }
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
    }

这里主要的作用就是把handler加入到一个consumerRepository中,这个consumerRepository是干啥用的目前我们还不知道,等会再回头看看。后面有一步consumerRepository.unMarkEventProcessorsAsEndOfChain操作。看看里面方法的内容,逻辑就是遍历processorSequence的handler元素加入到RingBuffer的sequencer数组中,然后遍历barrierSequences的handler的元素从中移除,注意我们刚开始传入的barrierSequences是第0个元素也就是初始位置。

这里我们或许对barrierSequences和processorSequence有一个初步的认识:barrierSequences存的是已经处理过的handler,processorSequence存的是等待处理的handler。我们直接追到addGatingSequences的中,其实现是在Sequencer的实现类AbstractSequencer中


    public final void addGatingSequences(Sequence... gatingSequences)
    {
        SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
    }

可以看到这里是对一个SequenceGroups进行操作,然而这是个什么东东,我们继续点开看看

class SequenceGroups
{
    static <T> void addSequences(
        final T holder,
        final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
        final Cursored cursor,
        final Sequence... sequencesToAdd)
    {
        long cursorSequence;
        Sequence[] updatedSequences;
        Sequence[] currentSequences;
 do
        {
            得到当前handler的数组
            currentSequences = updater.get(holder);
            更新当前数组,把新的handler加入到新数组当中
            updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
            得到指向handler的指针
            cursorSequence = cursor.getCursor();

            int index = currentSequences.length;
            //填充数组
            for (Sequence sequence : sequencesToAdd)
            {
                sequence.set(cursorSequence);
                updatedSequences[index++] = sequence;
            }
        }
        //进行CAS操作更新handler数组
        while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
        //给新添加的handler分配指针
        cursorSequence = cursor.getCursor();
        for (Sequence sequence : sequencesToAdd)
        {
            sequence.set(cursorSequence);
        }
    }

    static <T> boolean removeSequence(
        final T holder,
        final AtomicReferenceFieldUpdater<T, Sequence[]> sequenceUpdater,
        final Sequence sequence)
    {

       //...
    }

    private static <T> int countMatching(T[] values, final T toMatch)
    {
        //...
    }
}

这里主要就是通过CAS更新AbstractSequencer里面的SEQUECNE_UPDATE,这个SEQUECNE_UPDATE的声明

  private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");

可以看出其是对gatingSequence的原子性更新。

总结一下:上面的createEventProcessors是对gatingSequence的更新,这里我们还没直接的看到gatingSequence字段的声明,但是可以作出如下推测:

1、gatingSequence也是一个Sequence序列,Sequence序列用于标记handler数组的下标

2、gatingSequence是当前线程所能执行的所有handler下标的集合。如果其他线程添加了handler,由于有cas操作,当前线程对其具有可见性。

让我们重新回回回回回到Disruptor#createEventProcessor这个方法,让我们看看最后一步:把当前disruptor、consumerRepository、processorSequence作为参数实例化EventHandlerGroup

public class EventHandlerGroup<T>
{
    private final Disruptor<T> disruptor;
    private final ConsumerRepository<T> consumerRepository;
    private final Sequence[] sequences;

    EventHandlerGroup(final Disruptor<T> disruptor,
                      final ConsumerRepository<T> consumerRepository,
                      final Sequence[] sequences)
    {
        this.disruptor = disruptor;
        this.consumerRepository = consumerRepository;
        this.sequences = Arrays.copyOf(sequences, sequences.length);
    }
  //...
}

Disruptor启动过程源码分析(二)

EventHandlerGroup对于每一个传入的消费者,其中的handler都会对其进行消费。这里EventHandlerGroup初始化完毕了。

 

最后总结一句:Main里面的handlerEventWith方法的最终目的就是实例化EventHandlerGroup,EventHandlerGroup里面实例化了传入的handler用于对消费者进行消费。

 

本人刚接触源码不久,如有不正确的地方,请指出我会及时改正,谢谢大家。