Java读源码之Netty深入剖析2-1-1 netty的设计模式
Java读源码之Netty深入剖析2-1-1 netty的设计模式
如果要阅读源码,首先就要学会基本的设计模式。
设计模式是前人总结出来的软件设计方法,有利于使代码更加简洁优雅。
了解了netty的设计模式,再去看源码,会有一种焕然大悟的感觉。
一、单例模式
单例模式是最常见的设计模式:
1、忽略反射的影响,全局只有一个实例
2、有可能会出现延迟创建实例对象,要使用的时候才创建3、这种设计模式能够避免线程安全问题
netty设计模式-单例模式
单例模式是最常见的设计模式:
1、忽略反射的影响,全局只有一个实例
2、有可能会出现延迟创建实例对象,要使用的时候才创建
3、这种设计模式能够避免线程安全问题
最常见的单例模式实现方法:
懒汉模式(单例对象 volatile + 双重检测机制 -> 禁止指令重排)
public class SingletonExample5 {
// 私有构造函数
private SingletonExample5() {
}
// 1、memory = allocate() 分配对象的内存空间
// 2、ctorInstance() 初始化对象
// 3、instance = memory 设置instance指向刚分配的内存
// 单例对象 volatile + 双重检测机制 -> 禁止指令重排
private volatile static SingletonExample5 instance = null;
// 静态的工厂方法
public static SingletonExample5 getInstance() {
if (instance == null) { // 双重检测机制 // B
synchronized (SingletonExample5.class) { // 同步锁
if (instance == null) {
instance = new SingletonExample5(); // A - 3
}
}
}
return instance;
}
用私有构造函数实现全局一个对象实例,用静态工厂方法实现延迟加载,用各种锁的机制,实现线程安全。
netty里面的单例模式有MqttEncoder和ReadTimeoutException,代码如下:
@ChannelHandler.Sharable
public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
public static final MqttEncoder INSTANCE = new MqttEncoder();
private MqttEncoder() { }
public final class ReadTimeoutException extends TimeoutException {
private static final long serialVersionUID = 169287984113283421L;
public static final ReadTimeoutException INSTANCE = new ReadTimeoutException();
private ReadTimeoutException() { }
}
他们都是使用私有构造函数实现全局单个对象实例,用public static final实现延迟加载和线程安全。
二、策略模式
1、封装一系列可替换的算法家族
2、支持动态选择某一个策略
netty设计模式-策略模式
策略模式的特点大致如下:
1、封装一系列可替换的算法家族
2、支持动态选择某一个策略
常见的策略模式实现方式
/**
* @see DefaultEventExecutorChooserFactory#newChooser(EventExecutor[])
*/
public class Strategy {
private Cache cacheMemory = new CacheMemoryImpl();
private Cache cacheRedis = new CacheRedisImpl();
public interface Cache {
boolean add(String key, Object object);
}
public class CacheMemoryImpl implements Cache {
@Override
public boolean add(String key, Object object) {
// 保存到map
return false;
}
}
public class CacheRedisImpl implements Cache {
@Override
public boolean add(String key, Object object) {
// 保存到redis
return false;
}
}
public Cache getCache(String key) {
if (key.length() < 10) {
return cacheRedis;
}
return cacheMemory;
}
}
在netty的DefaultEventExecutorChooserFactory 的里面,就是如此实现策略模式的:
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
三、装饰器模式
装饰器模式被大量地使用在各种框架的源码里面,真正学会了对看源码和设计软件受益匪浅。
1、装饰者和被装饰者继承同一个接口
2、装饰者给被装饰者动态修改行为
netty设计模式-装饰器模式
装饰器模式被大量地使用在各种框架的源码里面,真正学会了对看源码和设计软件受益匪浅。
1、装饰者和被装饰者继承同一个接口
2、装饰者给被装饰者动态修改行为
首先我们一生活中的例子来看一看装饰器模式:
/**
* @see io.netty.buffer.WrappedByteBuf;
* @see io.netty.buffer.UnreleasableByteBuf
* @see io.netty.buffer.SimpleLeakAwareByteBuf
*/
public class Decorate {
// 优惠方案
public interface OnSalePlan {
float getPrice(float oldPrice);
}
// 无优惠
public static class NonePlan implements OnSalePlan {
static final OnSalePlan INSTANCE = new NonePlan();
private NonePlan() {
}
public float getPrice(float oldPrice) {
return oldPrice;
}
}
// 立减优惠
public static class KnockPlan implements OnSalePlan {
// 立减金额
private float amount;
public KnockPlan(float amount) {
this.amount = amount;
}
public float getPrice(float oldPrice) {
return oldPrice - amount;
}
}
// 打折优惠
public static class DiscountPlan implements OnSalePlan {
// 折扣
public int discount;
private OnSalePlan previousPlan;
public DiscountPlan(int discount, OnSalePlan previousPlan) {
this.discount = discount;
this.previousPlan = previousPlan;
}
public DiscountPlan(int discount) {
this(discount, NonePlan.INSTANCE);
}
public float getPrice(float oldPrice) {
return previousPlan.getPrice(oldPrice) * discount / 10;
}
}
public static void main(String[] args) {
DiscountPlan simpleDiscountPlan = new DiscountPlan(5);
System.out.println(simpleDiscountPlan.getPrice(100));
KnockPlan previousPlan = new KnockPlan(50);
DiscountPlan complexDiscountPlan = new DiscountPlan(5, previousPlan);
System.out.println(complexDiscountPlan.getPrice(100));
}
}
我们以OnSalePlan 作为接口,装饰者或者被装饰者都继承或者实现这个接口。
NonePlan是一种实现方案、KnockPlan也是一种实现方案、DiscountPlan也是一种实现方案,但是它可以实现我们的装饰器模式。
在main函数里面,我们首先实现了一个简单的打折方案,这里没有用到装饰器模式,打印出50;重点在第二个方案,首先实例出一个立减方案,立减方案KnockPlan作为被装饰者传入到DiscountPlan打折方案里面,DiscountPlan就是装饰者,它装饰了KnockPlan的行为,使得它的行为得以改变。
在netty里面,我们以WrappedByteBuf、UnreleasableByteBuf 、SimpleLeakAwareByteBuf这三个类讲解分析里面用到的设计模式。
WrappedByteBuf是所有装饰器的基类,它继承自ByteBuf
class WrappedByteBuf extends ByteBuf {
protected final ByteBuf buf;
protected WrappedByteBuf(ByteBuf buf) {
if (buf == null) {
throw new NullPointerException("buf");
}
this.buf = buf;
}
@Override
public final boolean hasMemoryAddress() {
return buf.hasMemoryAddress();
}
@Override
public final long memoryAddress() {
return buf.memoryAddress();
}
...
}
首先看构造函数,传入一个ByteBuf实例,这个传入的实例就是被装饰者,它的行为可以被当前这个类,也就是WrappedByteBuf(也就是装饰者)动态改变。因为这个WrappedByteBuf它只是装饰器的基类,所以他只对传入的被装饰者的行为做简单的返回,没做任何修改,...后面是更多的方法,都是直接调用被装饰者的方法。
接下来看WrappedBuf的一个子类UnreleasableByteBuf,这个类一看就是什么什么不释放的类,具体什么我们先不管,我们找到源代码:
final class UnreleasableByteBuf extends WrappedByteBuf {
private SwappedByteBuf swappedBuf;
UnreleasableByteBuf(ByteBuf buf) {
super(buf);
}
...
@Override
public boolean release() {
return false;
}
...
}
首先调用父类WrappedByteBuf的构造方法,前面我们分析过,就是把被装饰者传进来,以供以后使用。然后看里面有一行 public boolean release() { return false;},我们不管它release释放了什么,反正它release就是返回false,装饰或者说是改变了被装饰者buf的行为。
另外一个WrappedByteBuf的子类SimpleLeakAwareByteBuf,这个类顾名思义就是内存泄漏自动感知的一个ByteBuf,源码:
final class SimpleLeakAwareByteBuf extends WrappedByteBuf {
private final ResourceLeak leak;
SimpleLeakAwareByteBuf(ByteBuf buf, ResourceLeak leak) {
super(buf);
this.leak = leak;
}
...
@Override
public boolean release() {
boolean deallocated = super.release();
if (deallocated) {
leak.close();
}
return deallocated;
}
...
}
构造器还是调用父类的方法,在release 这个方法里面,如果发现内存泄漏了,就执行leak.close()这个方法,然后在返回,其实也是修饰了被装饰者,动态改变了被装饰着的行为。
当然WrappedByteBuf还有很多子类的,我们目前只分析这两个来实现分析netty装饰器模式。
四、观察者模式
1、观察者和被观察者
2、观察者订阅消息,被观察者发布消息
3、订阅则能收到,取消订阅收不到
netty设计模式-观察者模式
这个模式可以说在java的源码里面应用很广泛了,各种addListener,future这些,最终都是观察者模式的体现。
1、观察者和被观察者
2、观察者订阅消息,被观察者发布消息
3、订阅则能收到,取消订阅收不到
首先我们看看观察者模式的一个生活中的例子。
/**
* 被观察者
*/
public interface Observerable {
void registerObserver(Observer o);
void removeObserver(Observer o);
void notifyObserver();
}
/**
* 观察者
*/
public interface Observer {
void notify(String message);
}
public static class Girl implements Observerable {
private String message;
List<Observer> observerList;
public Girl() {
observerList = new ArrayList<>();
}
@Override
public void registerObserver(Observer o) {
observerList.add(o);
}
@Override
public void removeObserver(Observer o) {
observerList.remove(o);
}
@Override
public void notifyObserver() {
for (Observer observer : observerList) {
observer.notify(message);
}
}
public void hasBoyFriend() {
message = "女神有男朋友了";
notifyObserver();
}
public void getMarried() {
message = "女神结婚了,你们都死心吧!";
notifyObserver();
}
public void getSingled() {
message = "女神单身了,你们有机会了!";
notifyObserver();
}
}
/**
* 男孩
*/
public static class Boy implements Observer {
public void notify(String message) {
System.out.println("男孩收到消息:" + message);
}
}
/**
* 男人
*/
public static class Man implements Observer {
public void notify(String message) {
System.out.println("男人收到消息:" + message);
}
}
/**
* 老男人
*/
public static class OldMan implements Observer {
public void notify(String message) {
System.out.println("老男人收到消息:" + message);
}
}
public static void main(String[] args) {
Girl girl = new Girl();
Boy boy = new Boy();
Man man = new Man();
OldMan oldMan = new OldMan();
// 通知男孩、男人、老男人,女神有男朋友了
girl.registerObserver(boy);
girl.registerObserver(man);
girl.registerObserver(oldMan);
girl.hasBoyFriend();
System.out.println("====================");
// 通知男孩,男人,女神结婚了
girl.removeObserver(oldMan);
girl.getMarried();
System.out.println("====================");
girl.registerObserver(oldMan);
girl.getSingled();
}
流程就是,观察者注册到被观察者(也就是被观察者的列表里面添加观察者),然后发生了事情,被观察者就调用列表里面观察者的方法。
对于netty,我们首先查看一下我们使用的例子
public void write(NioSocketChannel channel, Object object) {
ChannelFuture channelFuture = channel.writeAndFlush(object);
channelFuture.addListener(future -> {
if (future.isSuccess()) {
} else {
}
});
channelFuture.addListener(future -> {
if (future.isSuccess()) {
} else {
}
});
}
首先创建一个channelFuture,这个就是被观察者,addListener把观察者都加进去。
首先查看被观察者创建的过程,writeAndFlush一直跟进去,可以看到这样一段代码:
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
newPromise()这个函数其实就是创建一个被观察者。newPromise()跟进去就是这样一行代码:
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}
ChannelPromise就是被观察者。
创建被观察者之后,就要把观察者添加进去。跟进去ChannelPromise的实现者DefaultChannelPromise的addListener方法,一直跟进去,会看到这样一串有趣的代码:
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
添加listener就添加listener,为什么还要分三段?netty在这里做了一个优化。首先我们要知道的是,listeners定义为一个Object,意味着它可以装换成任意对象。
/**
* One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
* If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
*
* Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
*/
private Object listeners;
那么,如果第一个listener进来,直接定义listeners为listener;第二个进来,就把它定义为DefaultFutureListeners,把之前的listener和现在进来的包装进去,其实这个时候listeners就变成了一个列表,点进去DefaultFutureListeners就能看到如下代码:
@SuppressWarnings("unchecked")
DefaultFutureListeners(
GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
第三次就把新加进来的添加到DefaultFutureListeners里面就行了。有兴趣自己看源码。
把观察者添加进去之后,什么时候通知呢。netty的NioSocketChannel的WriteAndFlush执行完,成功或者失败,都会通知到listener。继续看writeAndFlush方法,跟进去会看到:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
首先看invokeWrite0,这个代码中可能会有失败,会告知listener;最终如果成功在invokeFlush0里面体现,也会通知给listener;
先看invokeWrite0,跟进去看到 write方法,用HeadContext的实现,再跟进去,调用DefaultPromiser的tryFailure方法就能看到notifyListener()了。notifyListeners();
notifyListeners();这个才是是通知linstener的方法。
再一直跟进去会看到这段代码:
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
l.operationComplete(future)这个方法就会通知到观察者。
同理,invokeFlush0进去,使用HeadContext的flush方法,一直进去,使用NioSocketChannel的doWrite方法,一直进去,看到这样的代码:
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transferred() >= region.count();
if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transferred() >= region.count()) {
done = true;
break;
}
}
in.progress(flushedAmount);
}
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
incompleteWrite(setOpWrite);
}
这里有个in.remove()的代码,当读完了之后,就可以删除Buffer,然后通知listener成功了。
in.remove()进去,有个safeSuccess()方法,一直trySuccess(),使用DefaultPromise的trySuccess(),最后又看到这个熟悉的方法了:
@Override
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return true;
}
return false;
}
最后都同 invokeWrite0一样。
netty就通过Promise模式或者说ChannelFuture模式实现了观察者模式。netty就通过实现观察者模式达到异步通知,每次写成功写失败,都会回调给listener,也就是回调到future.isSuccess()等方法。其实就是变种的观察者模式。
五、迭代器模式
1、有一个迭代器接口。
2、对容器里面各个对象进行访问。
迭代器模式使用很多,但是被我们经常忽略它居然也是一种模式。
1、有一个迭代器接口。
2、对容器里面各个对象进行访问。
netty里面的CompositeByteBuf这个零拷贝的实现,就使用了迭代器模式。首先看一段代码:
public static void main(String[] args) {
ByteBuf header = Unpooled.wrappedBuffer(new byte[]{1, 2, 3});
ByteBuf body = Unpooled.wrappedBuffer(new byte[]{4, 5, 6});
ByteBuf merge = merge(header, body);
merge.forEachByte(value -> {
System.out.println(value);
return true;
});
}
public static ByteBuf merge(ByteBuf header, ByteBuf body) {
CompositeByteBuf byteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(2);
byteBuf.addComponent(true, header);
byteBuf.addComponent(true, body);
return byteBuf;
}
这段代码是把两个ByteBuf添加到一起,forEachByte就是实现了迭代器模式。那么怎么说它是零拷贝呢?
找forEachByte的实现,在AbstractByteBuf里面。
有这样一段代码:
@Override
public int forEachByte(ByteProcessor processor) {
ensureAccessible();
try {
return forEachByteAsc0(readerIndex, writerIndex, processor);
} catch (Exception e) {
PlatformDependent.throwException(e);
return -1;
}
}
从readerIndex开始读,读到writeIndex。继续点进去查看:
private int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception {
for (; start < end; ++start) {
if (!processor.process(_getByte(start))) {
return start;
}
}
return -1;
}
查看_getByte的实现,当然是找CompositeByteBuf的实现了:
@Override
protected byte _getByte(int index) {
Component c = findComponent(index);
return c.buf.getByte(index - c.offset);
}
先找出是哪个componet,然后迭代的时候其实是直接返回这个componet的byte内容就可以,这样就实现了零拷贝。别的类迭代的话,可能会把所有的数据都复制一遍。
六、责任链模式
1、责任处理器接口
2、创建链,添加删除责任处理器接口
3、上下文
4、责任链终止机制。
netty设计模式-责任链模式
责任链模式的定义:使多个对象都有机会处理请求,从而避免请求的发送者和接受者之间的耦合关系, 将这个对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理他为止。
首先来看看责任链模式的四个要素:
1、责任处理器接口
2、创建链,添加删除责任处理器接口
3、上下文
4、责任链终止机制。
在netty里面,很明显channelHandler和Pipeline构成了责任链模式。让我们通过上面的要素,一个一个分析
1、首先是责任处理器接口:
ChannelHandler就是责任处理器接口,ChannelInboundHandler、ChannelOuntboundHandler是它的两个增强。
2、找到创建链,添加删除责任处理器接口ChannelPipeline:
里面有各种add和remove的方法
3、然后是上下文ChannelHandlerContext:
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
/**
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
*/
Channel channel();
/**
* Returns the {@link EventExecutor} which is used to execute an arbitrary task.
*/
EventExecutor executor();
...
}
里面有两个最重要的方法,一个返回绑定的channel,一个返回executor来执行任务。
4、最后我们看责任链终止机制
现在我们自定义一个InBoundHandlerC
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
ctx.fireChannelRead(msg);
}
}
ctx.fiteChannelRead方法就是为了把责任传递下去。如果注释掉了,消息就不会传递。另外如果不重写channelRead方法,默认会传递,让我们查看ChannelInboundHandlerAdapter的channelRead方法:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
默认会传递。这里另外提一句,java里面的很多filter,是否继续向下传递,都是return true还是return 方法来实现的。
最后,消息是如何一步步向下传递的呢,让我们看AbstractChannelHandlerContext的fireChannelRead方法:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
继续点击findContextInbound(),可以看到:
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
也就是,这里是不停地指向下一个对象实现的。