Hadoop学习笔记一(通过Java API 操作HDFS,文件上传、下载)
package demo.hdfs;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
public class TestDemo {
@Test
public void test1() throws Exception{
//获取HDFS某个目录的信息
//配置NameNode: HDFS的主节点
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.146.111:9000");
//通过传入的配置参数得到HDFS的客户端
FileSystem fs = FileSystem.get(conf);
//得到/tools目录的信息
FileStatus[] list= fs.listStatus(new Path("/tools"));
//获取文件的属性
for(FileStatus status:list) {
System.out.println(status.isDirectory()?"目录":"文件");
System.out.println(status.getAccessTime());
}
}
@Test
public void test2() throws Exception{
//获取某个数据块的信息
///配置NameNode: HDFS的主节点
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.146.111:9000");
//通过传入的配置参数得到HDFS的客户端
FileSystem fs = FileSystem.get(conf);
//获取/root/hadoop-2.7.3.tar.gz的数据块的信息
//得到该文件的状态信息
FileStatus fileStatus = fs.getFileStatus(new Path("/tools/hadoop-2.7.3.tar.gz"));
//然后获取数据块的信息
BlockLocation[] list = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
//System.out.println(list);
for(BlockLocation bl:list) {
//System.out.println(bl);
//获取数据块的主机信息
System.out.println(Arrays.toString(bl.getHosts()));
//获取数据块的名称
System.out.println(Arrays.toString(bl.getNames()));
}
}
}
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
首先分析HDFS文件上传的过程
1、建立与Namenode 的通信,配置Namenode 的信息
2、得到HDFS的客户端,传入Namenode的配置信息
3、打开一个输入流,创建一个输出流
4、使用Utils工具IOUtils.copyBytes(in, out, 1024);
package demo.hdfs;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestUpload {
@Test
public void testUpload1() throws Exception{
//配置NameNode: HDFS的主节点
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.146.111:9000");
//通过传入的配置参数得到HDFS的客户端
FileSystem fs = FileSystem.get(conf);
//打开一个输入流:字节流
InputStream in = new FileInputStream("f:\\temp\\hadoop-2.7.3.tar.gz");
//创建一个输出流 -------> HDFS
OutputStream out = fs.create(new Path("/tools/a.tar.gz"));
//创建一个缓冲区
byte[] buffer = new byte[1024];
//数据长度
int len = 0;
while ((len=in.read(buffer))>0) {
//读入了数据,写到输出流
out.write(buffer, 0, len);
}
out.flush();
//关闭流
in.close();
out.close();
}
@Test
public void testUpload2() throws Exception{
//配置NameNode: HDFS的主节点
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.146.111:9000");
//通过传入的配置参数得到HDFS的客户端
FileSystem fs = FileSystem.get(conf);
//打开一个输入流:字节流
InputStream in = new FileInputStream("f:\\temp\\hadoop-2.7.3.tar.gz");
//创建一个输出流 -------> HDFS
OutputStream out = fs.create(new Path("/tools/b.tar.gz"));
//使用工具类
IOUtils.copyBytes(in, out, 1024);
}
}
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
package demo.hdfs;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
public class TestDownload {
@Test
public void testUpload1() throws Exception{
//配置NameNode: HDFS的主节点
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.146.111:9000");
//通过传入的配置参数得到HDFS的客户端
FileSystem fs = FileSystem.get(conf);
//打开一个输入流,从/tools/a.tar.gz读入数据
InputStream input = fs.open(new Path("/tools/a.tar.gz"));
//创建一个输出流,f:\\temp
OutputStream output = new FileOutputStream("f:\\temp\\xyz.tar.gz");
//创建一个缓冲区
byte[] buffer = new byte[1024];
//数据长度
int len = 0;
while ((len=input.read(buffer))>0) {
//读入了数据,写到输出流
output.write(buffer, 0, len);
}
output.flush();
//关闭流
input.close();
output.close();
}
@Test
public void testUpload2() throws Exception{
//配置NameNode: HDFS的主节点
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.146.111:9000");
//通过传入的配置参数得到HDFS的客户端
FileSystem fs = FileSystem.get(conf);
//打开一个输入流,从/tools/a.tar.gz读入数据
InputStream input = fs.open(new Path("/tools/a.tar.gz"));
//创建一个输出流,f:\\temp
OutputStream output = new FileOutputStream("f:\\temp\\mmmm.tar.gz");
//使用工具类
IOUtils.copyBytes(input, output, 1024);
}
}