hdfs读流程分析

      hdfs一次读的过程,涉及三个部分:Client、Namenode、Datanode,通过Client与Namenode之间的通信(取文件所在的 block和datanode)和Client与Datanode之间的数据通信来完成一次的读的过程。

 

      首先看一下客户端Client,Client利用FileSystem API来读取数据,主要需要三个步骤:

Configuration  conf = new Configuration();
//1.获取文件系统
FileSystem fs = FileSystem.get(URI.create(uri),conf); 
InputStream in = null;
try {
        //2.获取文件流
        in = fs.open(new Path(uri));
        //3.读取文件流
        IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
	IOUtils.closeStream(in);
}

      首先看第一步,可以查看FileSystem的源代码,如下:

 

/** Returns the FileSystem for this URI's scheme and authority.  The scheme
   * of the URI determines a configuration property name,
   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
   * The entire URI is passed to the FileSystem instance's initialize method.
   */
  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();

    if (scheme == null) {                       // no scheme: use default FS
      return get(conf);
    }

    if (authority == null) {                       // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }
    
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
  }
  
  
 private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
    if (clazz == null) {
      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
    }
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;
  }
  
  
  
 synchronized FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      FileSystem fs = map.get(key);
      if (fs == null) {
        fs = createFileSystem(uri, conf);
        if (map.isEmpty() && !clientFinalizer.isAlive()) {
          Runtime.getRuntime().addShutdownHook(clientFinalizer);
        }
        fs.key = key;
        map.put(key, fs);
      }
      return fs;
    }

根据提供的uri来决定创建的文件系统类型,通过反射机制来创建,这里假定问hdfs,比较通用。

      下一步,创建一个流(输入流in),代码如下:

/**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file to open
   */
  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }


/**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file name to open
   * @param bufferSize the size of the buffer to be used.
   */
  public abstract FSDataInputStream open(Path f, int bufferSize)
    throws IOException;



//DistributedFileSystem
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    return new DFSClient.DFSDataInputStream(
          dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
  }

FileSystem类中的open方法为一个抽象方法,HDFS的实现是下面的,通过DFSClient创建一个DFSDataInputStream对象,这个对象的创建需要一个DFSInputStream对象作为参数。类似于Java IO流的创建方式,利用facade模式。接下来先看一下,DFSInputStream对象的创建,它是由

 dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));

来创建的,创建流程源代码如下: 

/**
   * Create an input stream that obtains a nodelist from the
   * namenode, and then reads from all the right places.  Creates
   * inner subclass of InputStream that does the right out-of-band
   * work.
   */
  DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
                      FileSystem.Statistics stats
      ) throws IOException {
    checkOpen();
    //    Get block info from namenode
    return new DFSInputStream(src, buffersize, verifyChecksum);
  }


DFSInputStream(String src, int buffersize, boolean verifyChecksum
                   ) throws IOException {
      this.verifyChecksum = verifyChecksum;
      this.buffersize = buffersize;
      this.src = src;
      prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
      openInfo();
    }

    /**
     * Grab the open-file info from namenode
     */
    synchronized void openInfo() throws IOException {
      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
      if (newInfo == null) {
        throw new IOException("Cannot open filename " + src);
      }

      if (locatedBlocks != null) {
        Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
        Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
        while (oldIter.hasNext() && newIter.hasNext()) {
          if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
            throw new IOException("Blocklist for " + src + " has changed!");
          }
        }
      }
      this.locatedBlocks = newInfo;
      this.currentNode = null;
    }

      主要步骤在于openInfo方法中,callGetBlockLocations方法返回一个LocateBlocks对象,此对象记录了src这个文件所在的block,以及这个block所在的DatanodeInfo,这个方法,会和Namenode进行交互,此处Namenode的分析先略过。DFSInputStream创建完成之后,用此对象创建DFSDataInputStream流对象。DFSDataInputStream流对象创建很简单,自己看其构造方法源码就可以了。

      下面就是最主要的步骤读取的流程,Client调用IOUtils.copyBytes方法来读取数据,此方法源码如下:

public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
    throws IOException {

    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
    byte buf[] = new byte[buffSize];
    try {
      int bytesRead = in.read(buf);
      while (bytesRead >= 0) {
        out.write(buf, 0, bytesRead);
        if ((ps != null) && ps.checkError()) {
          throw new IOException("Unable to write to output stream.");
        }
        bytesRead = in.read(buf);
      }
    } finally {
      if(close) {
        out.close();
        in.close();
      }
    }
  }

此方法中,利用in.read(buf),来完成读,in.read(buf)中in对象,实质是DFSDataInputStream对象,此对象继承结构如下:

hdfs读流程分析

 

 

DFSDataInputStream.read(buf)实质是调用了DataInputStream.read(buf),源代码如下:

public final int read(byte b[]) throws IOException {
	return in.read(b, 0, b.length);
    }

 而这里的In其实是DFSinputStream,所以实质调用的是DFSInputStream.read(b,0,b.length),其源代码如下:

/**
     * Read the entire buffer.
     */
    @Override
    public synchronized int read(byte buf[], int off, int len) throws IOException {
      checkOpen();
      if (closed) {
        throw new IOException("Stream closed");
      }
      failures = 0;

      if (pos < getFileLength()) {
        int retries = 2;
        while (retries > 0) {
          try {
            if (pos > blockEnd) {
              currentNode = blockSeekTo(pos);
            }
            int realLen = Math.min(len, (int) (blockEnd - pos + 1));
            int result = readBuffer(buf, off, realLen);
            
            if (result >= 0) {
              pos += result;
            } else {
              // got a EOS from reader though we expect more data on it.
              throw new IOException("Unexpected EOS from the reader");
            }
            if (stats != null && result != -1) {
              stats.incrementBytesRead(result);
            }
            return result;
          } catch (ChecksumException ce) {
            throw ce;            
          } catch (IOException e) {
            if (retries == 1) {
              LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
            }
            blockEnd = -1;
            if (currentNode != null) { addToDeadNodes(currentNode); }
            if (--retries == 0) {
              throw e;
            }
          }
        }
      }
      return -1;
    }