使用HDFS Java API可以远程对HDFS系统中的文件进行新建、删除、读取等操作。本节主要介绍如何在Eclipse中使用HDFS Java API与HDFS文件系统进行交互。
在使用Java API之前,首先需要新建一个Hadoop项目。Hadoop项目的结构与普通的Java项目一样,只是所需的依赖包不同。
在Eclipse(或IDEA)中新建一个Maven项目“hdfs_demo”(Maven项目的搭建此处不做过多讲解),然后在该项目的pom.xml文件中添加以下代码,以引入Hadoop的Java API依赖包:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.1</version> </dependency>
配置好pom.xml后,即可使用HDFS Java API进行程序的编写。
FileSystem是HDFS Java API的核心工具类,该类是一个抽象类,其中封装了很多操作文件的方法,使用这些方法可以很轻松地操作HDFS中的文件。例如,在HDFS文件系统的根目录下有一个文件file.txt,可以直接使用FileSystem API读取该文件内容,具体操作步骤如下。
在新建的hdfs_demo项目中新建Java类FileSystemCat.java,写入查询显示HDFS中的/file.txt文件内容的代码,代码编写步骤如下:
01 创建Configuration对象。
02 得到FileSystem对象。
03 进行文件操作。
完整代码如下所示:
代码分析:
在运行HDFS程序之前,需要先初始化Configuration对象,该对象的主要作用是读取HDFS的系统配置信息,也就是安装Hadoop时候的配置文件,例如core-site.xml、hdfs-site.xml、mapred-site.xml等文件。
FileSystem是一个普通的文件系统API,可以使用静态工厂方法取得FileSystem实例,并传入Configuration对象参数。FileSystem类的继承结构如图4-9所示。
图4-9 FileSystem类的继承结构
通过调用FileSystem对象的open()方法,取得文件的输入流。该方法实际上返回的是一个FSDataInputStream对象,而不是标准的java.io类对象。FSDataInputStream类是继承了java.io.DataInputStream类的一个特殊类,支持随机访问,因此可以从流的任意位置读取数据。FSDataInputStream类的主要作用是使用DataInputStream包装一个输入流,并且使用BufferedInputStream实现对输入的缓冲。FSDataInputStream类的部分定义源码如下:
public class FSDataInputStream extends DataInputStream{ }
FSDataInputStream类的继承结构如图4-10所示。
图4-10 FSDataInputStream类的继承结构
直接在Eclipse中右击代码空白处,选择【Run As】|【Java Application】命令运行该程序即可,若控制台中能正确输出文件file.txt的内容,说明代码编写正确。
当然,也可以将项目导出为jar包,然后上传jar包到Hadoop集群的任意一个节点上,执行以下命令运行该程序:
$ hadoop jar hdfs_demo.jar hdfs.demo.FileSystemCat
上述命令需要到$HADOOP_HOME/bin目录中执行,若配置了该目录的系统PATH变量,则可以在任意目录执行。其中的hdfs_demo.jar为项目导出的jar包名称,此处为相对路径,hdfs.demo为类FileSystemCat所在的包名。
使用FileSystem的创建目录方法mkdirs(),可以创建未存在的父目录,就像java.io.File的mkdirs()方法一样。如果目录创建成功,它会返回true。
下面这个例子是在HDFS文件系统根目录下创建一个名为mydir的文件夹。
在新建的hdfs_demo项目中新建Java类CreateDir.java,该类的完整代码如下所示:
代码的运行参考4.4.1节,若控制台中能正确输出“创建目录成功!”,说明代码编写正确。
使用FileSystem的创建文件方法create(),可以在HDFS文件系统的指定路径创建一个文件,并向其写入内容。例如,在HDFS系统根目录创建一个文件newfile2.txt,并写入内容“我是文件内容”,代码如下:
/** * 定义创建文件方法 */ public static void createFile() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.170.133:9000"); FileSystem fs = FileSystem.get(conf); //打开一个输出流 FSDataOutputStream outputStream = fs.create(new Path("hdfs:/newfile2.txt")); //写入文件内容 outputStream.write("我是文件内容".getBytes()); outputStream.close(); fs.close(); System.out.println("文件创建成功!"); }
代码分析如下:
FileSystem实例的create()方法返回一个文件输出流对象FSDataOutputStream,该类继承了java.io.DataOutputStream类。与FSDataInputStream类不同的是,FSDataOutputStream类不支持随机访问,因此不能从文件的任意位置写入数据,只能从文件末尾追加数据。
create()方法有多个重载方法,允许指定是否强制覆盖已有文件(默认覆盖)、文件副本数量、写入文件的缓冲大小、文件块大小、文件权限许可等。create()方法的其中几个重载方法的定义源码如下:
public FSDataOutputStream create(Path f) throws IOException { } public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { } public FSDataOutputStream create(Path f, short replication) throws IOException { } public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException { } public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException { } public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { }
在调用create()方法时,还可以传入一个Progressable对象,该Progressable是一个接口,其中定义了一个progress()回调方法,使用该方法可以得知数据被写入数据节点的进度。Progressable接口的源码如下:
public interface Progressable { void progress(); }
例如,上传文件D:/soft/test.zip到HDFS根目录,并通过在控制台打印“.”显示上传进度,代码如下:
public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.170.133:9000"); InputStream in=new BufferedInputStream( new FileInputStream("D:/soft/test.zip")); FileSystem fs = FileSystem.get(conf); //上传文件并监控上传进度 FSDataOutputStream outputStream = fs.create(new Path("hdfs:/test.zip"), new Progressable() { @Override public void progress() {//回调方法显示进度 System.out.print("."); } }); IOUtils.copyBytes(in, outputStream, 4096, false); } }
运行上述代码,每当上传64KB数据包到Hadoop数据节点时将调用一次progress()方法。
此外,还可以使用FileSystem的append()方法在文件末尾追加数据。这对于日志文件需要持续写入数据非常有用。append()方法的调用源码如下:
FSDataOutputStream outputStream = fs.append(new Path("hdfs:/newfile2.txt"));
使用FileSystem的deleteOnExit()方法,可以删除HDFS文件系统中已经存在的文件。例如,删除HDFS系统根目录下的文件newfile.txt,代码如下:
/** * 定义删除文件方法 */ public static void deleteFile() throws Exception{ Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.170.133:9000"); FileSystem fs = FileSystem.get(conf); Path path = new Path("hdfs:/newfile.txt"); //删除文件 boolean isok = fs.deleteOnExit(path); if(isok){ System.out.println("删除成功!"); }else{ System.out.println("删除失败!"); } fs.close(); }
使用FileSystem的listStatus()方法,可以遍历HDFS文件系统中指定路径下的所有目录和文件。例如,递归遍历HDFS系统根目录下的所有文件和目录并输出路径信息,代码如下:
上述代码通过调用FileSystem的listStatus()方法获得指定路径下的一级子目录及文件,并将结果存储于FileStatus类型的数组中;然后循环遍历该数组,当遇到目录时,再次调用listStatus()方法取得该目录下的所有子目录及文件,从而能够递归取得指定路径下的所有目录及文件。
假设HDFS文件系统的根目录有文件夹input、文件newfile.txt,文件夹input中有文件test.txt,则上述代码的输出结果为:
hdfs://192.168.170.133:9000/input hdfs://192.168.170.133:9000/input/test.txt hdfs://192.168.170.133:9000/newfile.txt
使用FileSystem的getFileStatus ()方法,可以获得HDFS文件系统中的文件或目录的元数据信息,包括文件路径、文件修改日期、文件上次访问日期、文件长度、文件备份数、文件大小等。getFileStatus()方法返回一个FileStatus对象,元数据信息则封装在了该对象中。
获取文件或目录元数据的代码如下:
import java.sql.Timestamp; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; /** * 获取文件或目录的元数据信息 */ public class FileStatusCat { public static void main(String[] args) throws Exception { //创建Configuration对象 Configuration conf = new Configuration(); //设置HDFS访问地址 conf.set("fs.default.name", "hdfs://192.168.170.133:9000"); //取得FileSystem文件系统实例 FileSystem fs = FileSystem.get(conf); FileStatus fileStatus = fs.getFileStatus(new Path("hdfs:/file.txt")); //判断是文件夹还是文件 if (fileStatus.isDirectory()) { System.out.println("这是一个文件夹"); } else { System.out.println("这是一个文件"); } //输出元数据信息 System.out.println("文件路径:" + fileStatus.getPath()); System.out.println("文件修改日期:" + new Timestamp(fileStatus.getModificationTime()).toString()); System.out.println("文件上次访问日期:" + new Timestamp(fileStatus.getAccessTime()).toString()); System.out.println("文件长度:" + fileStatus.getLen()); System.out.println("文件备份数:" + fileStatus.getReplication()); System.out.println("文件块大小:" + fileStatus.getBlockSize()); System.out.println("文件所有者:" + fileStatus.getOwner()); System.out.println("文件所在分组:" + fileStatus.getGroup()); System.out.println("文件的权限:" + fileStatus.getPermission().toString()); } }
上述代码的输出结果如下:
这是一个文件 文件路径:hdfs://192.168.170.133:9000/file.txt 文件修改日期:2018-07-19 15:40:13.533 文件上次访问日期:2018-07-19 15:40:13.016 文件长度:40 文件备份数:2 文件块大小:134217728 文件所有者:hadoop 文件所在分组:supergroup 文件的权限:rw-r--r--
使用FileSystem的copyFromLocalFile()方法,可以将操作系统本地的文件上传到HDFS文件系统中,该方法需要传入两个Path类型的参数,分别代表本地目录/文件和HDFS目录/文件。
例如,将Windows系统中D盘的copy_test.txt文件上传到HDFS文件系统的根目录,代码如下:
/** * 定义方法,上传本地文件到HDFS */ public static void uploadFileToHDFS() throws Exception { //1.创建配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.170.133:9000"); //2.取得FileSystem文件系统实例 FileSystem fs = FileSystem.get(conf); //3.创建可供hadoop使用的文件系统路径 Path src = new Path("D:/copy_test.txt"); //本地目录/文件 Path dst = new Path("hdfs:/"); //HDFS目录/文件 //4.复制上传本地文件至HDFS文件系统中 fs.copyFromLocalFile(src, dst); System.out.println("文件上传成功!"); }
使用FileSystem的copyToLocalFile()方法,可以将HDFS文件系统中的文件下载到操作系统本地,该方法需要传入两个Path类型的参数,分别代表HDFS目录/文件和本地目录/文件。
例如,将HDFS文件系统根目录的文件newfile2.txt下载到Windows系统中D盘根目录,并重命名为new.txt,代码如下:
/** * 定义方法,下载文件到本地 */ public static void downloadFileToLocal() throws Exception{ //1.创建配置器 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.170.133:9000"); //2.取得FileSystem文件系统实例 FileSystem fs = FileSystem.get(conf); //3.创建可供hadoop使用的文件系统路径 Path src = new Path("hdfs:/newfile2.txt"); //HDFS目录/文件 Path dst = new Path("D:/new.txt"); //本地目录/文件 //4.从HDFS文件系统中复制下载文件至本地 fs.copyToLocalFile(false,src,dst,true); System.out.println("文件下载成功!"); }