Java读源码之Netty深入剖析2-1-1 netty的设计模式

Java读源码之Netty深入剖析2-1-1  netty的设计模式

如果要阅读源码,首先就要学会基本的设计模式。

设计模式是前人总结出来的软件设计方法,有利于使代码更加简洁优雅。

了解了netty的设计模式,再去看源码,会有一种焕然大悟的感觉。

一、单例模式

单例模式是最常见的设计模式:

1、忽略反射的影响,全局只有一个实例

2、有可能会出现延迟创建实例对象,要使用的时候才创建3、这种设计模式能够避免线程安全问题

netty单例模式分析详情-----------------------------------------------------------------------------------------------

 

netty设计模式-单例模式


单例模式是最常见的设计模式:

1、忽略反射的影响,全局只有一个实例

2、有可能会出现延迟创建实例对象,要使用的时候才创建

3、这种设计模式能够避免线程安全问题

 

最常见的单例模式实现方法:

    懒汉模式(单例对象 volatile + 双重检测机制 -> 禁止指令重排)

    public class SingletonExample5 {
        // 私有构造函数
        private SingletonExample5() {
        }
        // 1、memory = allocate() 分配对象的内存空间
        // 2、ctorInstance() 初始化对象
        // 3、instance = memory 设置instance指向刚分配的内存
        // 单例对象 volatile + 双重检测机制 -> 禁止指令重排
        private volatile static SingletonExample5 instance = null;
     
        // 静态的工厂方法
        public static SingletonExample5 getInstance() {
            if (instance == null) { // 双重检测机制        // B
                synchronized (SingletonExample5.class) { // 同步锁
                    if (instance == null) {
                        instance = new SingletonExample5(); // A - 3
                    }
                }
            }
            return instance;
        }

用私有构造函数实现全局一个对象实例,用静态工厂方法实现延迟加载,用各种锁的机制,实现线程安全。

netty里面的单例模式有MqttEncoder和ReadTimeoutException,代码如下:

    @ChannelHandler.Sharable
    public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
     
        public static final MqttEncoder INSTANCE = new MqttEncoder();
     
        private MqttEncoder() { }

    public final class ReadTimeoutException extends TimeoutException {
     
        private static final long serialVersionUID = 169287984113283421L;
     
        public static final ReadTimeoutException INSTANCE = new ReadTimeoutException();
     
        private ReadTimeoutException() { }
    }

他们都是使用私有构造函数实现全局单个对象实例,用public static final实现延迟加载和线程安全。

 

 

 

 

二、策略模式

1、封装一系列可替换的算法家族

2、支持动态选择某一个策略

netty策略模式分析详情-------------------------------------------------------------------------------------------------------------------

 

netty设计模式-策略模式

策略模式的特点大致如下:

1、封装一系列可替换的算法家族

2、支持动态选择某一个策略

 

常见的策略模式实现方式

    /**
     * @see DefaultEventExecutorChooserFactory#newChooser(EventExecutor[])
     */
    public class Strategy {
        private Cache cacheMemory = new CacheMemoryImpl();
        private Cache cacheRedis = new CacheRedisImpl();
     
        public interface Cache {
            boolean add(String key, Object object);
        }
     
        public class CacheMemoryImpl implements Cache {
            @Override
            public boolean add(String key, Object object) {
                // 保存到map
                return false;
            }
        }
     
        public class CacheRedisImpl implements Cache {
            @Override
            public boolean add(String key, Object object) {
                // 保存到redis
                return false;
            }
        }
     
        public Cache getCache(String key) {
            if (key.length() < 10) {
                return cacheRedis;
            }
            return cacheMemory;
        }
    }

在netty的DefaultEventExecutorChooserFactory 的里面,就是如此实现策略模式的:

    @UnstableApi
    public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
     
        public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
     
        private DefaultEventExecutorChooserFactory() { }
     
        @SuppressWarnings("unchecked")
        @Override
        public EventExecutorChooser newChooser(EventExecutor[] executors) {
            if (isPowerOfTwo(executors.length)) {
                return new PowerOfTowEventExecutorChooser(executors);
            } else {
                return new GenericEventExecutorChooser(executors);
            }
        }
     
        private static boolean isPowerOfTwo(int val) {
            return (val & -val) == val;
        }
     
        private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
     
            PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
     
            @Override
            public EventExecutor next() {
                return executors[idx.getAndIncrement() & executors.length - 1];
            }
        }
     
        private static final class GenericEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
     
            GenericEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
     
            @Override
            public EventExecutor next() {
                return executors[Math.abs(idx.getAndIncrement() % executors.length)];
            }
        }
    }

 

 

 

 

三、装饰器模式

装饰器模式被大量地使用在各种框架的源码里面,真正学会了对看源码和设计软件受益匪浅。

1、装饰者和被装饰者继承同一个接口

2、装饰者给被装饰者动态修改行为

netty装饰器模式分析详情---------------------------------------------------------------------------------------------------------

 

netty设计模式-装饰器模式
 

装饰器模式被大量地使用在各种框架的源码里面,真正学会了对看源码和设计软件受益匪浅。

1、装饰者和被装饰者继承同一个接口

2、装饰者给被装饰者动态修改行为

 

首先我们一生活中的例子来看一看装饰器模式:

    /**
     * @see io.netty.buffer.WrappedByteBuf;
     * @see io.netty.buffer.UnreleasableByteBuf
     * @see io.netty.buffer.SimpleLeakAwareByteBuf
     */
    public class Decorate {
     
        // 优惠方案
        public interface OnSalePlan {
            float getPrice(float oldPrice);
        }
     
        // 无优惠
        public static class NonePlan implements OnSalePlan {
            static final OnSalePlan INSTANCE = new NonePlan();
     
            private NonePlan() {
     
            }
     
            public float getPrice(float oldPrice) {
                return oldPrice;
            }
        }
     
        // 立减优惠
        public static class KnockPlan implements OnSalePlan {
            // 立减金额
            private float amount;
     
            public KnockPlan(float amount) {
                this.amount = amount;
            }
     
            public float getPrice(float oldPrice) {
                return oldPrice - amount;
            }
        }
     
     
        // 打折优惠
        public static class DiscountPlan implements OnSalePlan {
            // 折扣
            public int discount;
            private OnSalePlan previousPlan;
     
            public DiscountPlan(int discount, OnSalePlan previousPlan) {
                this.discount = discount;
                this.previousPlan = previousPlan;
            }
     
            public DiscountPlan(int discount) {
                this(discount, NonePlan.INSTANCE);
            }
     
            public float getPrice(float oldPrice) {
                return previousPlan.getPrice(oldPrice) * discount / 10;
            }
        }
     
        public static void main(String[] args) {
            DiscountPlan simpleDiscountPlan = new DiscountPlan(5);
            System.out.println(simpleDiscountPlan.getPrice(100));
     
            KnockPlan previousPlan = new KnockPlan(50);
            DiscountPlan complexDiscountPlan = new DiscountPlan(5, previousPlan);
            System.out.println(complexDiscountPlan.getPrice(100));
        }
     
    }

我们以OnSalePlan 作为接口,装饰者或者被装饰者都继承或者实现这个接口。

NonePlan是一种实现方案、KnockPlan也是一种实现方案、DiscountPlan也是一种实现方案,但是它可以实现我们的装饰器模式。

在main函数里面,我们首先实现了一个简单的打折方案,这里没有用到装饰器模式,打印出50;重点在第二个方案,首先实例出一个立减方案,立减方案KnockPlan作为被装饰者传入到DiscountPlan打折方案里面,DiscountPlan就是装饰者,它装饰了KnockPlan的行为,使得它的行为得以改变。

在netty里面,我们以WrappedByteBuf、UnreleasableByteBuf 、SimpleLeakAwareByteBuf这三个类讲解分析里面用到的设计模式。

WrappedByteBuf是所有装饰器的基类,它继承自ByteBuf

    class WrappedByteBuf extends ByteBuf {
     
        protected final ByteBuf buf;
     
        protected WrappedByteBuf(ByteBuf buf) {
            if (buf == null) {
                throw new NullPointerException("buf");
            }
            this.buf = buf;
        }
     
        @Override
        public final boolean hasMemoryAddress() {
            return buf.hasMemoryAddress();
        }
     
        @Override
        public final long memoryAddress() {
            return buf.memoryAddress();
        }
        ...
    }

首先看构造函数,传入一个ByteBuf实例,这个传入的实例就是被装饰者,它的行为可以被当前这个类,也就是WrappedByteBuf(也就是装饰者)动态改变。因为这个WrappedByteBuf它只是装饰器的基类,所以他只对传入的被装饰者的行为做简单的返回,没做任何修改,...后面是更多的方法,都是直接调用被装饰者的方法。

 

接下来看WrappedBuf的一个子类UnreleasableByteBuf,这个类一看就是什么什么不释放的类,具体什么我们先不管,我们找到源代码:

    final class UnreleasableByteBuf extends WrappedByteBuf {
     
        private SwappedByteBuf swappedBuf;
     
        UnreleasableByteBuf(ByteBuf buf) {
            super(buf);
        }
     
        ...
     
        @Override
        public boolean release() {
            return false;
        }
     
        ...
    }

首先调用父类WrappedByteBuf的构造方法,前面我们分析过,就是把被装饰者传进来,以供以后使用。然后看里面有一行 public boolean release() { return false;},我们不管它release释放了什么,反正它release就是返回false,装饰或者说是改变了被装饰者buf的行为。

另外一个WrappedByteBuf的子类SimpleLeakAwareByteBuf,这个类顾名思义就是内存泄漏自动感知的一个ByteBuf,源码:

    final class SimpleLeakAwareByteBuf extends WrappedByteBuf {
     
        private final ResourceLeak leak;
     
        SimpleLeakAwareByteBuf(ByteBuf buf, ResourceLeak leak) {
            super(buf);
            this.leak = leak;
        }
     
        ...
     
        @Override
        public boolean release() {
            boolean deallocated =  super.release();
            if (deallocated) {
                leak.close();
            }
            return deallocated;
        }
     
        ...
    }

构造器还是调用父类的方法,在release 这个方法里面,如果发现内存泄漏了,就执行leak.close()这个方法,然后在返回,其实也是修饰了被装饰者,动态改变了被装饰着的行为。

当然WrappedByteBuf还有很多子类的,我们目前只分析这两个来实现分析netty装饰器模式。

 

 

四、观察者模式

1、观察者和被观察者

2、观察者订阅消息,被观察者发布消息

3、订阅则能收到,取消订阅收不到

netty观察者模式分析详情---------------------------------------------------------------------------------------------------------------

 

netty设计模式-观察者模式
 

这个模式可以说在java的源码里面应用很广泛了,各种addListener,future这些,最终都是观察者模式的体现。

1、观察者和被观察者

2、观察者订阅消息,被观察者发布消息

3、订阅则能收到,取消订阅收不到

首先我们看看观察者模式的一个生活中的例子。

        /**
         * 被观察者
         */
        public interface Observerable {
            void registerObserver(Observer o);
     
            void removeObserver(Observer o);
     
            void notifyObserver();
        }
     
        /**
         * 观察者
         */
        public interface Observer {
            void notify(String message);
        }
     
        public static class Girl implements Observerable {
            private String message;
     
            List<Observer> observerList;
     
            public Girl() {
                observerList = new ArrayList<>();
            }
     
     
            @Override
            public void registerObserver(Observer o) {
                observerList.add(o);
            }
     
            @Override
            public void removeObserver(Observer o) {
                observerList.remove(o);
            }
     
            @Override
            public void notifyObserver() {
                for (Observer observer : observerList) {
                    observer.notify(message);
                }
            }
     
            public void hasBoyFriend() {
                message = "女神有男朋友了";
                notifyObserver();
            }
     
            public void getMarried() {
                message = "女神结婚了,你们都死心吧!";
                notifyObserver();
            }
     
            public void getSingled() {
                message = "女神单身了,你们有机会了!";
                notifyObserver();
            }
        }
     
        /**
         * 男孩
         */
        public static class Boy implements Observer {
            public void notify(String message) {
                System.out.println("男孩收到消息:" + message);
            }
        }
     
        /**
         * 男人
         */
        public static class Man implements Observer {
            public void notify(String message) {
                System.out.println("男人收到消息:" + message);
            }
        }
     
        /**
         * 老男人
         */
        public static class OldMan implements Observer {
            public void notify(String message) {
                System.out.println("老男人收到消息:" + message);
            }
        }
     
        public static void main(String[] args) {
            Girl girl = new Girl();
            Boy boy = new Boy();
            Man man = new Man();
            OldMan oldMan = new OldMan();
     
            // 通知男孩、男人、老男人,女神有男朋友了
            girl.registerObserver(boy);
            girl.registerObserver(man);
            girl.registerObserver(oldMan);
            girl.hasBoyFriend();
            System.out.println("====================");
     
            // 通知男孩,男人,女神结婚了
            girl.removeObserver(oldMan);
            girl.getMarried();
            System.out.println("====================");
     
     
            girl.registerObserver(oldMan);
            girl.getSingled();
        }

流程就是,观察者注册到被观察者(也就是被观察者的列表里面添加观察者),然后发生了事情,被观察者就调用列表里面观察者的方法。

对于netty,我们首先查看一下我们使用的例子

        public void write(NioSocketChannel channel, Object object) {
            ChannelFuture channelFuture = channel.writeAndFlush(object);
            channelFuture.addListener(future -> {
                if (future.isSuccess()) {
     
                } else {
     
                }
            });
            channelFuture.addListener(future -> {
                if (future.isSuccess()) {
     
                } else {
     
                }
            });
        }

首先创建一个channelFuture,这个就是被观察者,addListener把观察者都加进去。

首先查看被观察者创建的过程,writeAndFlush一直跟进去,可以看到这样一段代码:

        @Override
        public ChannelFuture writeAndFlush(Object msg) {
            return writeAndFlush(msg, newPromise());
        }

newPromise()这个函数其实就是创建一个被观察者。newPromise()跟进去就是这样一行代码:

        @Override
        public ChannelPromise newPromise() {
            return new DefaultChannelPromise(channel(), executor());
        }

ChannelPromise就是被观察者。

 

创建被观察者之后,就要把观察者添加进去。跟进去ChannelPromise的实现者DefaultChannelPromise的addListener方法,一直跟进去,会看到这样一串有趣的代码:

        private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
            if (listeners == null) {
                listeners = listener;
            } else if (listeners instanceof DefaultFutureListeners) {
                ((DefaultFutureListeners) listeners).add(listener);
            } else {
                listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
            }
        }

添加listener就添加listener,为什么还要分三段?netty在这里做了一个优化。首先我们要知道的是,listeners定义为一个Object,意味着它可以装换成任意对象。

        /**
         * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
         * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
         *
         * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
         */
        private Object listeners;

那么,如果第一个listener进来,直接定义listeners为listener;第二个进来,就把它定义为DefaultFutureListeners,把之前的listener和现在进来的包装进去,其实这个时候listeners就变成了一个列表,点进去DefaultFutureListeners就能看到如下代码:

        @SuppressWarnings("unchecked")
        DefaultFutureListeners(
                GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
            listeners = new GenericFutureListener[2];
            listeners[0] = first;
            listeners[1] = second;
            size = 2;
            if (first instanceof GenericProgressiveFutureListener) {
                progressiveSize ++;
            }
            if (second instanceof GenericProgressiveFutureListener) {
                progressiveSize ++;
            }
        }

第三次就把新加进来的添加到DefaultFutureListeners里面就行了。有兴趣自己看源码。

 

把观察者添加进去之后,什么时候通知呢。netty的NioSocketChannel的WriteAndFlush执行完,成功或者失败,都会通知到listener。继续看writeAndFlush方法,跟进去会看到:

        private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
            if (invokeHandler()) {
                invokeWrite0(msg, promise);
                invokeFlush0();
            } else {
                writeAndFlush(msg, promise);
            }
        }

首先看invokeWrite0,这个代码中可能会有失败,会告知listener;最终如果成功在invokeFlush0里面体现,也会通知给listener;

先看invokeWrite0,跟进去看到 write方法,用HeadContext的实现,再跟进去,调用DefaultPromiser的tryFailure方法就能看到notifyListener()了。notifyListeners();

notifyListeners();这个才是是通知linstener的方法。

再一直跟进去会看到这段代码:

    private static void notifyListener0(Future future, GenericFutureListener l) {
            try {
                l.operationComplete(future);
            } catch (Throwable t) {
                logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
            }
        }

l.operationComplete(future)这个方法就会通知到观察者。

同理,invokeFlush0进去,使用HeadContext的flush方法,一直进去,使用NioSocketChannel的doWrite方法,一直进去,看到这样的代码:

    @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            int writeSpinCount = -1;
     
            boolean setOpWrite = false;
            for (;;) {
                Object msg = in.current();
                if (msg == null) {
                    // Wrote all messages.
                    clearOpWrite();
                    // Directly return here so incompleteWrite(...) is not called.
                    return;
                }
     
                if (msg instanceof ByteBuf) {
                    ByteBuf buf = (ByteBuf) msg;
                    int readableBytes = buf.readableBytes();
                    if (readableBytes == 0) {
                        in.remove();
                        continue;
                    }
     
                    boolean done = false;
                    long flushedAmount = 0;
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }
                    for (int i = writeSpinCount - 1; i >= 0; i --) {
                        int localFlushedAmount = doWriteBytes(buf);
                        if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }
     
                        flushedAmount += localFlushedAmount;
                        if (!buf.isReadable()) {
                            done = true;
                            break;
                        }
                    }
     
                    in.progress(flushedAmount);
     
                    if (done) {
                        in.remove();
                    } else {
                        // Break the loop and so incompleteWrite(...) is called.
                        break;
                    }
                } else if (msg instanceof FileRegion) {
                    FileRegion region = (FileRegion) msg;
                    boolean done = region.transferred() >= region.count();
     
                    if (!done) {
                        long flushedAmount = 0;
                        if (writeSpinCount == -1) {
                            writeSpinCount = config().getWriteSpinCount();
                        }
     
                        for (int i = writeSpinCount - 1; i >= 0; i--) {
                            long localFlushedAmount = doWriteFileRegion(region);
                            if (localFlushedAmount == 0) {
                                setOpWrite = true;
                                break;
                            }
     
                            flushedAmount += localFlushedAmount;
                            if (region.transferred() >= region.count()) {
                                done = true;
                                break;
                            }
                        }
     
                        in.progress(flushedAmount);
                    }
     
                    if (done) {
                        in.remove();
                    } else {
                        // Break the loop and so incompleteWrite(...) is called.
                        break;
                    }
                } else {
                    // Should not reach here.
                    throw new Error();
                }
            }
            incompleteWrite(setOpWrite);
        }

这里有个in.remove()的代码,当读完了之后,就可以删除Buffer,然后通知listener成功了。

in.remove()进去,有个safeSuccess()方法,一直trySuccess(),使用DefaultPromise的trySuccess(),最后又看到这个熟悉的方法了:

     
        @Override
        public boolean trySuccess(V result) {
            if (setSuccess0(result)) {
                notifyListeners();
                return true;
            }
            return false;
        }

最后都同 invokeWrite0一样。

 

netty就通过Promise模式或者说ChannelFuture模式实现了观察者模式。netty就通过实现观察者模式达到异步通知,每次写成功写失败,都会回调给listener,也就是回调到future.isSuccess()等方法。其实就是变种的观察者模式。

 

 

 

 

五、迭代器模式

1、有一个迭代器接口。

2、对容器里面各个对象进行访问。

netty迭代器模式分析详情---------------------------------------------------------------------------------------------------

 

 

迭代器模式使用很多,但是被我们经常忽略它居然也是一种模式。

1、有一个迭代器接口。

2、对容器里面各个对象进行访问。

netty里面的CompositeByteBuf这个零拷贝的实现,就使用了迭代器模式。首先看一段代码:

        public static void main(String[] args) {
            ByteBuf header = Unpooled.wrappedBuffer(new byte[]{1, 2, 3});
            ByteBuf body = Unpooled.wrappedBuffer(new byte[]{4, 5, 6});
     
            ByteBuf merge = merge(header, body);
            merge.forEachByte(value -> {
                System.out.println(value);
                return true;
            });
        }
     
        public static ByteBuf merge(ByteBuf header, ByteBuf body) {
            CompositeByteBuf byteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(2);
            byteBuf.addComponent(true, header);
            byteBuf.addComponent(true, body);
     
            return byteBuf;
        }

这段代码是把两个ByteBuf添加到一起,forEachByte就是实现了迭代器模式。那么怎么说它是零拷贝呢?

找forEachByte的实现,在AbstractByteBuf里面。

有这样一段代码:

        @Override
        public int forEachByte(ByteProcessor processor) {
            ensureAccessible();
            try {
                return forEachByteAsc0(readerIndex, writerIndex, processor);
            } catch (Exception e) {
                PlatformDependent.throwException(e);
                return -1;
            }
        }

从readerIndex开始读,读到writeIndex。继续点进去查看:

        private int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception {
            for (; start < end; ++start) {
                if (!processor.process(_getByte(start))) {
                    return start;
                }
            }
     
            return -1;
        }

查看_getByte的实现,当然是找CompositeByteBuf的实现了:

        @Override
        protected byte _getByte(int index) {
            Component c = findComponent(index);
            return c.buf.getByte(index - c.offset);
        }

先找出是哪个componet,然后迭代的时候其实是直接返回这个componet的byte内容就可以,这样就实现了零拷贝。别的类迭代的话,可能会把所有的数据都复制一遍。

 

 

 

 

 

六、责任链模式

1、责任处理器接口

2、创建链,添加删除责任处理器接口

3、上下文

4、责任链终止机制。

netty责任链模式分析详情-------------------------------------------------------------------------------------------------------------

 

netty设计模式-责任链模式
 

责任链模式的定义:使多个对象都有机会处理请求,从而避免请求的发送者和接受者之间的耦合关系, 将这个对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理他为止。

首先来看看责任链模式的四个要素:

1、责任处理器接口

2、创建链,添加删除责任处理器接口

3、上下文

4、责任链终止机制。

在netty里面,很明显channelHandler和Pipeline构成了责任链模式。让我们通过上面的要素,一个一个分析

 

1、首先是责任处理器接口:

ChannelHandler就是责任处理器接口,ChannelInboundHandler、ChannelOuntboundHandler是它的两个增强。

 

2、找到创建链,添加删除责任处理器接口ChannelPipeline:

Java读源码之Netty深入剖析2-1-1 netty的设计模式

里面有各种add和remove的方法

 

3、然后是上下文ChannelHandlerContext:

    public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
     
        /**
         * Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
         */
        Channel channel();
     
        /**
         * Returns the {@link EventExecutor} which is used to execute an arbitrary task.
         */
        EventExecutor executor();
     
        ...
    }

里面有两个最重要的方法,一个返回绑定的channel,一个返回executor来执行任务。

 

4、最后我们看责任链终止机制

现在我们自定义一个InBoundHandlerC

    public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("InBoundHandlerC: " + msg);
            ctx.fireChannelRead(msg);
        }
    }

ctx.fiteChannelRead方法就是为了把责任传递下去。如果注释掉了,消息就不会传递。另外如果不重写channelRead方法,默认会传递,让我们查看ChannelInboundHandlerAdapter的channelRead方法:

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }

默认会传递。这里另外提一句,java里面的很多filter,是否继续向下传递,都是return true还是return 方法来实现的。

 

最后,消息是如何一步步向下传递的呢,让我们看AbstractChannelHandlerContext的fireChannelRead方法:

        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(), msg);
            return this;
        }

继续点击findContextInbound(),可以看到:

        private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while (!ctx.inbound);
            return ctx;
        }

也就是,这里是不停地指向下一个对象实现的。