hdfs读流程分析
hdfs一次读的过程,涉及三个部分:Client、Namenode、Datanode,通过Client与Namenode之间的通信(取文件所在的 block和datanode)和Client与Datanode之间的数据通信来完成一次的读的过程。
首先看一下客户端Client,Client利用FileSystem API来读取数据,主要需要三个步骤:
Configuration conf = new Configuration();
//1.获取文件系统
FileSystem fs = FileSystem.get(URI.create(uri),conf);
InputStream in = null;
try {
//2.获取文件流
in = fs.open(new Path(uri));
//3.读取文件流
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
首先看第一步,可以查看FileSystem的源代码,如下:
/** Returns the FileSystem for this URI's scheme and authority. The scheme
* of the URI determines a configuration property name,
* <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
* The entire URI is passed to the FileSystem instance's initialize method.
*/
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null) { // no scheme: use default FS
return get(conf);
}
if (authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
private static FileSystem createFileSystem(URI uri, Configuration conf
) throws IOException {
Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + uri.getScheme());
}
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;
}
synchronized FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
FileSystem fs = map.get(key);
if (fs == null) {
fs = createFileSystem(uri, conf);
if (map.isEmpty() && !clientFinalizer.isAlive()) {
Runtime.getRuntime().addShutdownHook(clientFinalizer);
}
fs.key = key;
map.put(key, fs);
}
return fs;
}
根据提供的uri来决定创建的文件系统类型,通过反射机制来创建,这里假定问hdfs,比较通用。
下一步,创建一个流(输入流in),代码如下:
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file to open
*/
public FSDataInputStream open(Path f) throws IOException {
return open(f, getConf().getInt("io.file.buffer.size", 4096));
}
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file name to open
* @param bufferSize the size of the buffer to be used.
*/
public abstract FSDataInputStream open(Path f, int bufferSize)
throws IOException;
//DistributedFileSystem
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return new DFSClient.DFSDataInputStream(
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
}
FileSystem类中的open方法为一个抽象方法,HDFS的实现是下面的,通过DFSClient创建一个DFSDataInputStream对象,这个对象的创建需要一个DFSInputStream对象作为参数。类似于Java IO流的创建方式,利用facade模式。接下来先看一下,DFSInputStream对象的创建,它是由
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
来创建的,创建流程源代码如下:
/** * Create an input stream that obtains a nodelist from the * namenode, and then reads from all the right places. Creates * inner subclass of InputStream that does the right out-of-band * work. */ DFSInputStream open(String src, int buffersize, boolean verifyChecksum, FileSystem.Statistics stats ) throws IOException { checkOpen(); // Get block info from namenode return new DFSInputStream(src, buffersize, verifyChecksum); } DFSInputStream(String src, int buffersize, boolean verifyChecksum ) throws IOException { this.verifyChecksum = verifyChecksum; this.buffersize = buffersize; this.src = src; prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize); openInfo(); } /** * Grab the open-file info from namenode */ synchronized void openInfo() throws IOException { LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize); if (newInfo == null) { throw new IOException("Cannot open filename " + src); } if (locatedBlocks != null) { Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { throw new IOException("Blocklist for " + src + " has changed!"); } } } this.locatedBlocks = newInfo; this.currentNode = null; }
主要步骤在于openInfo方法中,callGetBlockLocations方法返回一个LocateBlocks对象,此对象记录了src这个文件所在的block,以及这个block所在的DatanodeInfo,这个方法,会和Namenode进行交互,此处Namenode的分析先略过。DFSInputStream创建完成之后,用此对象创建DFSDataInputStream流对象。DFSDataInputStream流对象创建很简单,自己看其构造方法源码就可以了。
下面就是最主要的步骤读取的流程,Client调用IOUtils.copyBytes方法来读取数据,此方法源码如下:
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)
throws IOException {
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
byte buf[] = new byte[buffSize];
try {
int bytesRead = in.read(buf);
while (bytesRead >= 0) {
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf);
}
} finally {
if(close) {
out.close();
in.close();
}
}
}
此方法中,利用in.read(buf),来完成读,in.read(buf)中in对象,实质是DFSDataInputStream对象,此对象继承结构如下:
DFSDataInputStream.read(buf)实质是调用了DataInputStream.read(buf),源代码如下:
public final int read(byte b[]) throws IOException {
return in.read(b, 0, b.length);
}
而这里的In其实是DFSinputStream,所以实质调用的是DFSInputStream.read(b,0,b.length),其源代码如下:
/**
* Read the entire buffer.
*/
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
failures = 0;
if (pos < getFileLength()) {
int retries = 2;
while (retries > 0) {
try {
if (pos > blockEnd) {
currentNode = blockSeekTo(pos);
}
int realLen = Math.min(len, (int) (blockEnd - pos + 1));
int result = readBuffer(buf, off, realLen);
if (result >= 0) {
pos += result;
} else {
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
if (stats != null && result != -1) {
stats.incrementBytesRead(result);
}
return result;
} catch (ChecksumException ce) {
throw ce;
} catch (IOException e) {
if (retries == 1) {
LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
}
blockEnd = -1;
if (currentNode != null) { addToDeadNodes(currentNode); }
if (--retries == 0) {
throw e;
}
}
}
}
return -1;
}