HDFS Java API操作
1.
在 https://mvnrepository.com/ 网站可以找到Maven中配置的jar包
2.
以下是处于完全分布式
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
* Description: HDFSAPI<br/>
* Copyright (c) , 2018, xlj <br/>
* This program is protected by copyright laws. <br/>
* Program Name:HDFSAPI.java <br/>
* @version : 1.0
*/
public class HDFSAPI {
public static void main(String[] args) throws IOException {
//读取HDFS的文件到控制台,参数是你上传到HDFS的路径
catFileToConsole("/data.txt");
//读取本地文件中的内容存储到本地
catFileToLocal("/data.txt");
}
/**
* @param filePath
* @throws IOException
*/
public static void catFileToConsole(String filePath) throws IOException {
/*
* 任何文件系统都是与当前的环境变量紧密联系,对于当前的HDFS来说
* 我们需要在创建当前文件系统的实例之前,必须获得当前环境变量
* Configuration
* 为用户提供当前环境变量的一个实例。其中封装了当前搭建环境的配置
* 有了这个配置实例,才能继续调用FileSystem类
*/
//1.获取Configuration对象
Configuration conf = new Configuration();
/*2.需要设置当前相关属性---->设置core-site.xml对应相关属性
* 因为现在是要连接HDFS分布式文件系统,所以要配置一个连接属性
* 9000--->是一个端口,这是一个内部通讯端口号
* 50070-->HDFS WebUI界面的端口号
* 50090-->SecondaryNamenode端口号
*
*/
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
//需要都文件系统进行连接访问,FileSystem这是一个抽象类
//提供N个get方法来获取当前的连接
//当前会抛出一个IOException的异常,就是一个流操作
FileSystem fs = FileSystem.get(conf);
//open方法连接HDFS 参数:一个path--》HDFS分布式文件系统要访问的具体的地址
FSDataInputStream fis = fs.open(new Path(filePath));
//推荐一个工具类IOUtils
IOUtils.copyBytes(fis, System.out, 4096, true);;
}
/**
* 读取文件中的内容存储到本地
* @param filePath
* @throws IOException
*/
public static void catFileToLocal(String filePath) throws IOException {
//1.创建连接配置
Configuration conf = new Configuration();
//2.设计相关属性
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
//3.获取FileSystem对象
FileSystem fs = FileSystem.get(conf);
//4.获取一个输入流对象,读取当前HDFS分布式文件系统中文件夹的内容
FSDataInputStream fis = fs.open(new Path(filePath));
//5.创建一个输出流将内容写到本地文件
OutputStream os = new FileOutputStream(new File("dir/file.txt"));
IOUtils.copyBytes(fis, os, 4096, true);
}
/**
* 在文件系统中创建文件夹
* @param dirPath
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void mkdir(String dirPath) throws IOException, InterruptedException, URISyntaxException {
//1.获取configuration对象
Configuration conf = new Configuration();
//2.直接创建FileSystem
//三个参数 :
/*
* 第一个参数:URI --hdfs的内部通信网站 URI ---net包
* 第二个参数:Configuration对象
* 第三个参数:用户---root
*/
FileSystem fs = FileSystem.get(new URI("hdfs://hadoo1:9000"), conf,"root");
//创建当前路径下的文件夹,这里需要注意的是方法名多一个s
//就意味着即可以创建一个文件夹,也可以创建多文件夹
boolean result = fs.mkdirs(new Path(dirPath));
if(result) {
System.out.println("文件夹创建成功");
}else {
System.out.println("文件夹创建失败");
}
}
/**
* 创建空文件
* @param filePath
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void touchFile(String filePath) throws IOException, InterruptedException, URISyntaxException {
//1.创建Configuration对象
Configuration conf = new Configuration();
//设置属性
//conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
//有设置过副本数量 conf.set设置副本数量--在不设置副本数量的前提下默认是3
//所以这里创建出来的文件使用的是默认值
//若需要自己的副本数量 conf.set需要设置副本属性
// <name>dfs.replication</name> <value>2</value>
// 3.获取FileSystem对象
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),conf,"root");
//4.创建空文件
fs.create(new Path(filePath));
System.out.println("创建成功");
}
/**
* 显示文件信息
* @param dirPath
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void list(String dirPath) throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), conf,"root");
/*
* FileSystem 类似于Java中的File
* 所有的FileSystem 中提供了一个遍历文件夹的方式 listStatus 返回值是一个数组,数组元素的数据类型是FileStatus
*/
FileStatus [] fss = fs.listStatus(new Path(dirPath));
for (FileStatus f : fss) {
System.out.println("文件名字:"+f.getPath().getName());
System.out.println("文件的所属者:"+f.getOwner());
System.out.println("文件的所属组: "+f.getGroup());
System.out.println("文件的大小: "+f.getLen());
System.out.println("文件的副本数:"+f.getReplication());
System.out.println("是否是目录:"+f.isDir());
}
}
/**
* 获取HDFS中资源情况
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void getSource() throws IOException, InterruptedException, URISyntaxException {
//通过FileSystem获取连接
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root");
//通过其子类转换成当前子类对象并代用其获取状态的方法getStatus
DistributedFileSystem dfs = (DistributedFileSystem)fs;
FsStatus fss = dfs.getStatus();
System.out.println("总量:"+(fss.getCapacity() / 1024 / 1024)+"GB");
System.out.println("使用的量:"+(fss.getUsed() / 1024 / 1024)+"MB");
System.out.println("w维持总量:"+(fss.getRemaining() / 1024 / 1024)+"GB");
}
/**
* 获取单台节点的信息
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void getNodeInfos() throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root");
DistributedFileSystem dfs = (DistributedFileSystem)fs;
DatanodeInfo[] dis = dfs.getDataNodeStats();
for (DatanodeInfo datanodeInfo : dis) {
System.out.println("当前节点的总容量:"+(datanodeInfo.getCapacity()));
System.out.println("HostName:"+(datanodeInfo.getHostName()));
System.out.println("IP地址:"+(datanodeInfo.getIpAddr()));
System.out.println(datanodeInfo.getName());
}
}
/**
* 块信息
* @param filePath
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
public static void getBlockInfos(String filePath) throws IOException, InterruptedException, URISyntaxException {
//通过FileSystem获取连接
FileSystem fs = FileSystem.get(new URI("hdfs://hadoo1:9000"), new Configuration(), "root");
//拿到当前文件的描述信息
FileStatus fss = fs.getFileStatus(new Path(filePath));
//获取文件中块的信息-->返回值是一个数组,为什么
//因为只要文件大于128M就会被切分为其他块
BlockLocation[] bls = fs.getFileBlockLocations(fss, 0, fss.getLen());
for (BlockLocation bl : bls) {
//BlockLocation方法:返回值是字符串数组,将bl对象中的属性值存在数组中
for (int i = 0; i < bl.getHosts().length; i++) {
System.out.println(bl.getHosts()[i]);
}
}
}
/**
* 带进度的文件上传
* @param filePath
* @param filepath2
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void upLoadProcess(String filePath,String filepath2) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root");
FSDataOutputStream fos = fs.create(new Path(filePath),new Progressable() {
//Progressable接口中提供了一个progress这个方法,每次在写文件的时候写64K
@Override
public void progress() {
//循环 获取文件大小等等
System.out.println("*");
}
});
//获取一个文件
InputStream fis = new FileInputStream(new File(filepath2));
IOUtils.copyBytes(fis, fos, 1024,true);
}
/**
* 文件的上传 相当于 -put
* @param paths
* @param HDFSfilePath
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void upload(String localFilePath,String HDFSfilePath) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(), "root");
/*
* 第一个 参数:本地文件的位置
* 第二个参数:HDFS文件系统中存储文件的位置
*/
fs.copyFromLocalFile(new Path(localFilePath), new Path(HDFSfilePath));
}
public static void uploads(Path []paths ,String HDFSfilePath) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(), "root");
/*1.上传成功后是否要删除本地文件
*2.上传后若有相同文件是否要覆盖
*这两个参数都是boolean类型
*true---上传成功后要删除和覆盖
*false--不删除也不覆盖
*3.删除是多个文件路径
*4.HDFS文件系统中要存储的路径
*/
fs.copyFromLocalFile(false, true,paths,new Path(HDFSfilePath));
}
public static void download(String HDFSfilePath,String localFilePath) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), new Configuration(), "root");
/*
* 1.下载完后是否要删除HDFS文件 boolean
* 2.HDFS文件系统的路径
* 3.当前下载到本地的路径
* 4.是否要使用本地的文件系统,改用为java的IO流
*/
fs.copyToLocalFile(false, new Path(HDFSfilePath),new Path(localFilePath),true);
}
/**
* 删除HDFS文件系统中的文件
* @param filePath
* @throws URISyntaxException
* @throws InterruptedException
* @throws IOException
*/
public static void deleteFile(String filePath) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(),"root");
Path path = new Path(filePath);
if(fs.exists(path)) {
//检查是不是一个目录,是就递归删除,不是就直接删除
if(fs.isDirectory(path)) {
fs.delete(path,true);
}else {
fs.delete(path,false);
}
}else {
System.out.println("删除的文件不存在");
}
}
/**
* 移动HDFS系统中的文件,集移动剪切于一身
* @param src
* @param dsc
* @throws IOException
* @throws InterruptedException
* @throws URISyntaxException
*/
public static void rename(String src,String dsc) throws IOException, InterruptedException, URISyntaxException {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),new Configuration(),"root");
fs.rename(new Path(src), new Path(dsc));
}
}