Dubbo——Transport网络传输层
Transport网络传输层
1请求/响应的处理流程
NettyHandler:继承netty对象SimpleChannelHandler,重写了channelConnected、channelDisconnected、messageReceived、writeRequested、exceptionCaught方法,当netty的通道发生连接、断开连接、收到消息、写入消息、捕获异常等事件时触发NettyHandler中的对应方法。在这些方法中后续的处理链为:NettyServer/NettyClient—>MultiMessageHandler-->HeartbeatHandler—> AllChannelHandleràDecodeHandlerà
HeaderExchangerHandleràDubboProtocol$requestHandler,此链条为回调工作的处理流程。
MultiMessageHandler 对于消息的received事件,如果消息是批量传输的,解码后会由MultiMessage承载,并遍历其中的消息,交由下一个handler处理。
HeartbeatHandler:设置通道的读写时间戳
1)对于connected事件,设置通道的读写时间戳为当前时间;
2)对于disconnected事件,清空读写时间戳;
3)对于sent事件,设置通道的写时间戳为当前时间;
4)对于received事件,设置通道的读时间戳为当前时间;若消息是心跳消息请求(mEvent=true and mData=null),如果此心跳事件需要响应(mTwoWay=true),则构建心跳响应对象Response,并发送响应消息,不做后续channel处理;若消息是心跳响应(mEvent=true andmData=null),记录日志直接返回,不在做后续的channel处理;若不是心跳消息,则交由下一个处理器处理。
由Dispatcher创建的各类Handler处理器:根据URL参数dispather选择Dispatcher类,不同的Dispatcher创建不同的Handler,表示通道的事件(connected、disconnected、received、caught、sent)的处理模式。在这些Handler中构建异步执行任务ChannelEventRunnable,异步执行connected、disconnected、received、caught、sent等操作。目前支持四种Handler处理器:
1)AllChannelHandler:除发送消息之外,所有的事件都采用线程池处理;
2)ExecutionChannelHandler:除发送消息之外,其他全部使用线程池处理;与AllChannelHandler不同之处在于,若创建的线程池ExecutorService不可用,AllChannelHandler将使用共享线程池,而ExecutionChannelHandler只有报错了。
3)ChannelHandler:所有事件都直接有ChannelHandler处理,不采用线程池;
4)MessageOnlyChannelHandler:只有接受消息采用线程池;
5)ConnectionOrderedDispatcher:没搞明白;
在上述Handler的初始化过程中,会根据URL的参数threadpool来创建线程池,目前支持的线程池类型有三种,默认选择FixedThreadPool。
1)FixedThreadPool:此线程池启动时即创建固定大小的线程数,不做任何伸缩;
2)CachedThreadPool:此线程池可伸缩,线程空闲一分钟后回收,新请求重新创建线程;
3)LimitedThreadPool:此线程池一直增长,直到上限,增长后不收缩;
DecodeHandler:对读取的消息进行decode操作。
HeaderExchangeHandler:处理connected、disconnected、sent、received、caught等事件,并调用DubboProtocol$requestHandler内部类响应的事件方法。
1)connected事件:更新通道的读取时间戳和写入时间戳。调用DubboProtocol$requestHandler匿名内部类的connected事件方法。
2)disconnected事件:更新通道的读取时间戳和写入时间戳。调用DubboProtocol$requestHandler匿名内部类的disconnected事件方法(由ondisconnect属性配置)。
3)sent事件:更新通道的写入时间戳。回调DubboProtocol$requestHandler匿名内部类的sent,由于该内部类未重写sent方法,即调用父类的该方法,父类未实现任何处理逻辑。如果消息是Request,即是发送请求,则根据请求的唯一标识ID获取DefaultFuture对象,并设置发送时间为当前时间,用于在超时之后分析客户端的耗时、服务端的耗时。
4)received事件:更新通道的读取时间戳。如果是Request请求,并且需要对请求做回执时,则首先调用DubboProtocol$requestHandler匿名内部类的replay方法,在该方法中完成服务端本地方法调用并返回结果;然后根据Request对象的ID构建Response对象,将回调服务的执行结果作为Response对象的结果字段;最后调用channel.send方法向客户端发送响应消息。如果是Request请求并且不需要对请求做回执时,则调用DubboProtocol$requestHandler匿名内部类的received方法。 如果是Response且不是心跳响应,则根据response中的ID获取DefaultFuture对象,若DefaultFuture对象不存在,则该响应已经超时并且已经被删除了,若还存在,则设置response的值,唤醒线程,DefaultFuture对象的get方法返回调用值。
DubboProtocol$requestHandler:匿名内部类,实现了抽象类ExchangeHandlerAdapter类。
1)connected方法:检查URL中是否有“onconnect”参数,若有则回调该参数指定的方法;
2)disconnected方法:检查URL中是否有“ondisconnect”参数,若有则回调该参数指定的方法;
3)received方法:若消息是Invocation对象则调用reply方法,根据生成的serviceKey在AbstractProtocol.exporterMap中查找DubboExporter对象,然后根据此exporter对象获取服务端创建的invoker对象,从而完成服务端的本地方法调用;若不是则调用父类的received方法,不进行任何逻辑处理。
一、消费端发起远程调用的底层通信活动图
二、服务提供端接收请求并响应的底层通信
2 回调服务的处理逻辑
一、服务器的处理方式
以Netty为例,首先在初始化NettyClient或者NettyServer的时候,根据URL中的codec参数选择具体的codec类,默认使用DubboCountCodec类。
如下代码所示,在启动Netty客户端或者服务端的时候在Netty的pipeline中添加了编解码器。该编解码器就是上面根据URL的参数选择出来的。decoder为解码器,是一个SimpleChannelUpstreamHandler,从Socket到Netty中的时候,需要解码,也就是接收消息的时候,需要解码。
encoder是编码器,是OneToOneEncoder,这个类实现了ChannelDownstreamHandler,从发送消息的时候,需要编码。
nettyHandler实现了ChannelUpstreamHandler, ChannelDownstreamHandler两个,上下的时候都需要处理。
接收消息的时候,会先执行decoder,然后执行nettyHandler。
发送消息的时候,会先执行nettyHandler,然后执行encoder。
二、客户端发送调用请求的逻辑
1、先执行nettyHandler的writeRquested方法,从而decodeHandler的sent方法,该handler未实现sent方法,因而调用父类的方法透传请求。
2、执行encoder,调用链为:NettyCodecAdapter$InternalEncoder.encode --> DubboCountCodec.encode-->DubboCodec.encode-->ExchangeCodec.encode à…à DubboCodec.encodeRequestData,在该调用链中,最终调用CallbackServiceCodec.encodeInvocationArgument方法,在该方法中检查远程方法的参数是否为回调函数,若是则将客户端中调用远程方法时传入的回调函数参数暴露出来提供服务端使用,在CallbackServiceCodec.exportOrunexportCallbackService方法中完成回调函数是服务暴露,大致逻辑是:
1)生产一个唯一实例ID;
2)代理工厂创建Invoker代理;
3)调用代理工厂Protocol$Adpative根据URL中的协议选择具体的Protocol处理类,调用export方法,具体逻辑与《各类型暴露服务逻辑》一致。
三、服务端接受客户端请求的处理逻辑
1、先执行decoder,调用链如下:
NettyCodecAdapter$InternalDecoder.messageReceived –>DubboCountCodec.decode --> DubboCodec.decode--> DubboCodec.decodeBody -->DecodeableRpcInvocation.decode -->…-->CallbackServiceCodec.decodeInvocationArgument,在decodeInvocationArgument方法中判断该调用的方法中参数是否为回调函数,若是则调用CallbackServiceCodec.referOrdestroyCallbackService方法完成对客户端暴露的回调函数的服务引用。大致逻辑:
1)初始化ChannelWrappedInvoker 对象,其中将与客户端建立的通道Channel对象中参数之一,此Channel参数是回调函数消息发送的重要通道,在触发回调工作时,利用此Channel创建HeaderExchangeClient对象,由该对象完成回调的请求工作,具体逻辑与消费端发起远程调用的逻辑基本一致。
2)创建服务类的本地代理,即为ChannelWrappedInvoker 对象创建代理,将此代理作为服务端方法中回调参数的请求值,从而在服务端的方式实现中就可以使用该代理完成远程回调工作。
3)将DecodeableRpcInvocation类中的hasDecoded变量设置为true,以免在DecodeHandler中重复执行此解码工作。