spark2.1.0之源码分析——RPC传输管道处理器详解

spark2.1.0之源码分析——RPC传输管道处理器详解

提示:阅读本文前最好先阅读:

  1. 《Spark2.1.0之内置RPC框架》
  2. 《spark2.1.0之源码分析——RPC配置TransportConf》
  3. 《spark2.1.0之源码分析——RPC客户端工厂TransportClientFactory》
  4. spark2.1.0之源码分析——RPC服务器TransportServer》
  5. 《spark2.1.0之源码分析——RPC管道初始化》

         TransportChannelHandler实现了Netty的ChannelInboundHandler[1],以便对Netty管道中的消息进行处理。《spark2.1.0之源码分析——RPC管道初始化》一文所展示的图1中的Handler(除了MessageEncoder)由于都实现了ChannelInboundHandler接口,作为自定义的ChannelInboundHandler,因而都要重写channelRead方法。Netty框架使用工作链模式来对每个ChannelInboundHandler的实现类的channelRead方法进行链式调用。TransportChannelHandler实现的channelRead方法见代码清单1。

代码清单1         TransportChannelHandler的channelRead实现

 
  1. @Override

  2. public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {

  3. if (request instanceof RequestMessage) {

  4. requestHandler.handle((RequestMessage) request);

  5. } else if (request instanceof ResponseMessage) {

  6. responseHandler.handle((ResponseMessage) request);

  7. } else {

  8. ctx.fireChannelRead(request);

  9. }

  10. }

从代码清单1看到,当TransportChannelHandler读取到的request是RequestMessage类型时,则将此消息的处理进一步交给TransportRequestHandler,当request是ResponseMessage时,则将此消息的处理进一步交给TransportResponseHandler。

MessageHandler的继承体系

         TransportRequestHandler与TransportResponseHandler都继承自抽象类MessageHandler,MessageHandler定义了子类的规范,详细定义见代码清单2。

代码清单2         MessageHandler规范

 
  1. public abstract class MessageHandler<T extends Message> {

  2. public abstract void handle(T message) throws Exception;

  3. public abstract void channelActive();

  4. public abstract void exceptionCaught(Throwable cause);

  5. public abstract void channelInactive();

  6. }

MessageHandler中定义的各个方法的作用分别为:

  • handle:用于对接收到的单个消息进行处理;
  • channelActive:当channel**时调用;
  • exceptionCaught:当捕获到channel发生异常时调用;
  • channelInactive:当channel非**时调用;

Spark中MessageHandler类的继承体系如图1所示。

spark2.1.0之源码分析——RPC传输管道处理器详解

图1       MessageHandler类的继承体系

Message的继承体系

根据代码清单2,我们知道MessageHandler同时也是一个Java泛型类,其子类能处理的消息都派生自接口Message。Message的定义见代码清单3。

代码清单3         Message的定义

 
  1. public interface Message extends Encodable {

  2. Type type();

  3. ManagedBuffer body();

  4. boolean isBodyInFrame();

Message中定义的三个接口方法的作用分别为:

  • type:返回消息的类型;
  • body:返回消息中可选的内容体;
  • isBodyInFrame:用于判断消息的主体是否包含在消息的同一帧中。

Message接口继承了Encodable接口,Encodable的定义见代码清单4。

代码清单4         Encodable的定义

 
  1. public interface Encodable {

  2. int encodedLength();

  3. void encode(ByteBuf buf);

  4. }

实现Encodable接口的类将可以转换到一个ByteBuf中,多个对象将被存储到预先分配的单个ByteBuf,所以这里的encodedLength用于返回转换的对象数量。下面一起来看看Message的类继承体系,如图2所示。

spark2.1.0之源码分析——RPC传输管道处理器详解

图2       Message的类继承体系

从图2看到最终的消息实现类都直接或间接的实现了RequestMessage或ResponseMessage接口,其中RequestMessage的具体实现有四种,分别是:

  • ChunkFetchRequest:请求获取流的单个块的序列。
  • RpcRequest:此消息类型由远程的RPC服务端进行处理,是一种需要服务端向客户端回复的RPC请求信息类型。
  • OneWayMessage:此消息也需要由远程的RPC服务端进行处理,与RpcRequest不同的是不需要服务端向客户端回复。
  • StreamRequest:此消息表示向远程的服务发起请求,以获取流式数据。

由于OneWayMessage 不需要响应,所以ResponseMessage的对于成功或失败状态的实现各有三种,分别是:

  • ChunkFetchSuccess:处理ChunkFetchRequest成功后返回的消息;
  • ChunkFetchFailure:处理ChunkFetchRequest失败后返回的消息;
  • RpcResponse:处理RpcRequest成功后返回的消息;
  • RpcFailure:处理RpcRequest失败后返回的消息;
  • StreamResponse:处理StreamRequest成功后返回的消息;
  • StreamFailure:处理StreamRequest失败后返回的消息;

ManagedBuffer的继承体系

回头再看看代码清单3中对body接口的定义,可以看到其返回内容体的类型为ManagedBuffer。ManagedBuffer提供了由字节构成数据的不可变视图(也就是说ManagedBuffer并不存储数据,也不是数据的实际来源,这同关系型数据库的视图类似)。我们先来看看抽象类ManagedBuffer中对行为的定义,见代码清单5。

代码清单5         ManagedBuffer的定义

 
  1. public abstract class ManagedBuffer {

  2. public abstract long size();

  3. public abstract ByteBuffer nioByteBuffer() throws IOException;

  4. public abstract InputStream createInputStream() throws IOException;

  5. public abstract ManagedBuffer retain();

  6. public abstract ManagedBuffer release();

  7. public abstract Object convertToNetty() throws IOException;

  8. }

ManagedBuffer中定义了六个方法,分别为:

  • size:返回数据的字节数。
  • nioByteBuffer:将数据按照Nio的ByteBuffer类型返回。
  • createInputStream:将数据按照InputStream返回。
  • retain:当有新的使用者使用此视图时,增加引用此视图的引用数。
  • release:当有使用者不再使用此视图时,减少引用此视图的引用数。当引用数为0时释放缓冲区。
  • convertToNetty:将缓冲区的数据转换为Netty的对象,用来将数据写到外部。此方法返回的数据类型要么是io.netty.buffer.ByteBuf,要么是io.netty.channel.FileRegion。

ManagedBuffer的具体实现有很多,我们可以通过图3来了解。

spark2.1.0之源码分析——RPC传输管道处理器详解

图3       ManagedBuffer的继承体系

图3中列出了ManagedBuffer的五个实现类,其中TestManagedBuffer和RecordingManagedBuffer用于测试。NettyManagedBuffer中的缓冲为io.netty.buffer.ByteBuf,NioManagedBuffer中的缓冲为java.nio.ByteBuffer。NettyManagedBuffer和NioManagedBuffer的实现都非常简单,留给读者自行阅读。本节挑选FileSegmentManagedBuffer作为ManagedBuffer具体实现的例子进行介绍。

     FileSegmentManagedBuffer的作用为获取一个文件中的一段,它一共有四个由final修饰的属性,全部都通过FileSegmentManagedBuffer的构造器传入属性值,这四个属性为:

  • conf:即TransportConf。
  • file:所要读取的文件。
  • offset:所要读取文件的偏移量。
  • length:所要读取的长度。

下面将逐个介绍FileSegmentManagedBuffer对于ManagedBuffer的实现。

NIO方式读取文件

FileSegmentManagedBuffer实现的nioByteBuffer方法见代码清单6。

代码清单6         nioByteBuffer方法的实现

 
  1. @Override

  2. public ByteBuffer nioByteBuffer() throws IOException {

  3. FileChannel channel = null;

  4. try {

  5. channel = new RandomAccessFile(file, "r").getChannel();

  6. if (length < conf.memoryMapBytes()) {

  7. ByteBuffer buf = ByteBuffer.allocate((int) length);

  8. channel.position(offset);

  9. while (buf.remaining() != 0) {

  10. if (channel.read(buf) == -1) {

  11. throw new IOException(String.format("Reached EOF before filling buffer\n" +

  12. "offset=%s\nfile=%s\nbuf.remaining=%s",

  13. offset, file.getAbsoluteFile(), buf.remaining()));

  14. }

  15. }

  16. buf.flip();

  17. return buf;

  18. } else {

  19. return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);

  20. }

  21. } catch (IOException e) {

  22. try {

  23. if (channel != null) {

  24. long size = channel.size();

  25. throw new IOException("Error in reading " + this + " (actual file length " + size + ")",

  26. e);

  27. }

  28. } catch (IOException ignored) {

  29. // ignore

  30. }

  31. throw new IOException("Error in opening " + this, e);

  32. } finally {

  33. JavaUtils.closeQuietly(channel);

  34. }

  35. }

nioByteBuffer的实现还是很简单的,主要利用RandomAccessFile获取FileChannel,然后使用java.nio.ByteBuffer和FileChannel的API将数据写入缓冲区java.nio.ByteBuffer中。

文件流方式读取文件

FileSegmentManagedBuffer实现的createInputStream方法见代码清单7。

代码清单7         createInputStream的实现

 
  1. @Override

  2. public InputStream createInputStream() throws IOException {

  3. FileInputStream is = null;

  4. try {

  5. is = new FileInputStream(file);

  6. ByteStreams.skipFully(is, offset);

  7. return new LimitedInputStream(is, length);

  8. } catch (IOException e) {

  9. try {

  10. if (is != null) {

  11. long size = file.length();

  12. throw new IOException("Error in reading " + this + " (actual file length " + size + ")",

  13. e);

  14. }

  15. } catch (IOException ignored) {

  16. // ignore

  17. } finally {

  18. JavaUtils.closeQuietly(is);

  19. }

  20. throw new IOException("Error in opening " + this, e);

  21. } catch (RuntimeException e) {

  22. JavaUtils.closeQuietly(is);

  23. throw e;

  24. }

  25. }

createInputStream的实现还是很简单的,这里不多作介绍。

将数据转换为Netty对象

FileSegmentManagedBuffer实现的convertToNetty方法见代码清单8。

代码清单8         convertToNetty的实现

 
  1. @Override

  2. public Object convertToNetty() throws IOException {

  3. if (conf.lazyFileDescriptor()) {

  4. return new DefaultFileRegion(file, offset, length);

  5. } else {

  6. FileChannel fileChannel = new FileInputStream(file).getChannel();

  7. return new DefaultFileRegion(fileChannel, offset, length);

  8. }

  9. }

其他方法的实现

其他方法由于实现非常简单,所以这里就不一一列出了,感兴趣的读者可以自行查阅。