HDFS读写文件操作流程
1、读取文件:
1、客户端首先创建DistributedFileSystem对象
2、向NameNode发出下载请求
3、NameNode根据举例优先选择合适的节点以及数据块信息响应给客户端
4、客户端根据响应创建输出流下载数据,此时会先读取nn1数据,读取结束如果数据不够再建立下一个通道读取下一个节点数据,直到读取完毕。
案例:从hdfs上截取方式下载数据
@Test //文件截取,下载块文件(比如查日志的时候只查看最后一块) //查第一块 public void readFileSeek1() throws URISyntaxException, IOException, InterruptedException { //获取对象 Configuration conf = new Configuration (); FileSystem fs = FileSystem.get (new URI ("hdfs://hadoop01:9000"), conf, "root"); //获取输入流 FSDataInputStream fis = fs.open (new Path ("/hadoop-2.9.2.tar.gz")); //获取输出流 FileOutputStream fos = new FileOutputStream (new File ("D:/hadoop-2.9.2.part1")); //文件拷贝(此时只拷128M) byte[] bytes = new byte[1024]; for (int i=0;i<1024*128;i++){ fis.read(bytes); fos.write (bytes); } //关闭资源 IOUtils.closeStream (fos); IOUtils.closeStream (fis); fs.close (); } //下载最后一块 @Test public void readFileSeek2() throws URISyntaxException, IOException, InterruptedException { //获取对象 Configuration conf = new Configuration (); FileSystem fs = FileSystem.get (new URI ("hdfs://hadoop01:9000"), conf, "root"); //获取输入流 FSDataInputStream fis = fs.open (new Path ("/hadoop-2.9.2.tar.gz")); //设置指定读取的节点 fis.seek (1024*1024*256); //获取输出流 FileOutputStream fos = new FileOutputStream (new File ("D:/hadoop-2.9.2.part1")); //流拷贝 IOUtils.copyBytes (fis,fos,conf); //关闭资源 IOUtils.closeStream (fos); IOUtils.closeStream (fis); fs.close (); } }
写入文件:
1、客户端首先会创建一个DistributedFileSystem的实例对象
2、然后向NameNode请求上传文件,NameNode检查路径确认文件路径然后响应
3、客户端接收到可以上传指令后向NameNode请求上传到的具体位置
4、NameNode根据距离和负载情况选择出相应的DataNode节点
5、客户端拿到位置创建流开始建立通道,DataNode进行序列化写入数据
6、DataNode写入数据结束后最终响应给客户端
例: @Test public void testCopyFromlFile() throws URISyntaxException, IOException, InterruptedException { //获取fs对象 Configuration conf = new Configuration (); FileSystem fs = FileSystem.get (new URI ("hdfs://xmaster:9000"), conf, "root"); //执行上传API\ fs.copyFromLocalFile (new Path ("D:/test.txt"),new Path ("/test.txt")); //关闭资源 fs.close (); }
@Test //将本地的文件上传到HDFS上 public void putFileToHDFS() throws URISyntaxException, IOException, InterruptedException { //获取对象 Configuration conf = new Configuration (); FileSystem fs = FileSystem.get (new URI ("hdfs://hadoop01:9000"), conf, "root"); //获取输入流 FileInputStream fis = new FileInputStream ("D:/test.txt"); //获取输出流 FSDataOutputStream fos = fs.create (new Path ("/test.txt")); //拷贝 IOUtils.copyBytes (fis, fos, conf); //关闭资源 IOUtils.closeStream (fos); IOUtils.closeStream (fis); fs.close (); }