netty在dubbo中的应用
netty在dubbo中的应用
dubbo的底层通信是利用netty来实现的,出于好奇是如何实现的,把发现的过程记录一下。
首先down下来dubbo的源码,里面包含一个模块dubbo-demo,包含了dubbo-demo-provider和dubbo-demo-consumer,provider提供了一个DemoServiceImpl,我们需要启动它,启动时报错,看报错原因是连接不上注册中心,启动完zookeeper并修改了provider.xml中注册中心的配置,再次启动,ok!同上启动consumer,ok!
要想知道dubbo是如何应用netty的,首先要知道dubbo在哪里用到了,从dubbo的官网找到dubbo的框架图,再结合官网的介绍,可以大致推测出在Exchange和Transport这层用到的netty
http://dubbo.apache.org/docs/zh-cn/dev/sources/images/dubbo-framework.jpg
在Exchange这一层中找到ExchangeClient,点进abstract父类Client中,查看Client的继承类,找到了NettyClient,在NettyClient的方法中有doOpen和doConnect方法。隐隐觉得netty的初始化就是在这里,分别在这两个方法首行打上断点,然后debug provider,在启动的过程中居然没有进来。突然想到这是Client,应该是consumer启动的时候初始化才对。debug consumer,启动时果然进来了。
先看一下这个doOpen方法
@Override protected void doOpen() throws Throwable { final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); bootstrap = new Bootstrap(); bootstrap.group(nioEventLoopGroup) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) .channel(NioSocketChannel.class); if (getTimeout() < 3000) { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); } else { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()); } bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyClientHandler); } }); }
从doOpen这个方法中可以看到在consumer初始化时会去使用netty去创建channel,从调用栈中追溯信息
可以看到在获取bean的时候的大致流程,dubbo重写了springFactoryBean,所以从ApplicationContext中获取bean的时候,调用的是ReferenceBean.getObject。然后init ReferenceConfig,创建了代理。中间有一段过程暂时不去理会,以后慢慢研究,大致应该是创建代理时需要从注册中心获取到provider的信息。在拉取到provider的信息后,需要open channel,这个时候就到了上面的NettyClient.doOpen这里。
当open了channel,这样一个service的代理就创建出来了。从架构图中可以看到,在真正调用这个service的时候,是调用的DubboInvoker,DubboInvoker再调用client.request。找到DubboInvoker,可以看到这个doInvoker方法
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); // For compatibility FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future); RpcContext.getContext().setFuture(futureAdapter); Result result; if (isAsyncFuture) { // register resultCallback, sometimes we need the asyn result being processed by the filter chain. result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } return result; } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
入参的invocation中携带了本次调用的信息,如接口名,方法名,参数等等,在set了attachment后,需要选择一个client,在这里打上断点,
可以看到,currentClient中的client是使用的netty,所以currentClient.send(inv, isSent);是使用了netty来send的。找到NettyClient,没有send方法,找到父类AbstractClient,找到send方法如下
@Override public void send(Object message, boolean sent) throws RemotingException { if (send_reconnect && !isConnected()) { connect(); } Channel channel = getChannel(); //TODO Can the value returned by getChannel() be null? need improvement. if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } channel.send(message, sent); }
居然还有个TODO? 在这里send了消息。当然还会对message做serialize处理才真正发送出去。
provider使用netty
从架构图中的transport层可以找到相关信息,Transport.bind()这里创建了server,找到Transport,然后找到Transport的子类NettyTransport
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } @Override public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }
可以看到这里的bind创建了NettyServer,戳进去
@Override protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
在NettyServer里有个doOpen(),这里看到了netty的熟悉的代码,和我们自己创建一个Netty Server的代码基本没有什么太大的差别,在这里打个断点,追踪一个栈信息
原来dubbo的ServiceBean implement了ApplicationListener,在spring容器启动的时候,调用了ServiceBean的onApplicationEvent,在这里最终创建了NettyServer。启动后就可以以netty来监听端口并接收消息。接收到消息后再调用相应的DubboHandler,解析消息,最终找到需要的Service,再通过反射来invoker。