购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

4.4 HDFS Java API操作

使用HDFS Java API可以远程对HDFS系统中的文件进行新建、删除、读取等操作。本节主要介绍如何在Eclipse中使用HDFS Java API与HDFS文件系统进行交互。

在使用Java API之前,首先需要新建一个Hadoop项目。Hadoop项目的结构与普通的Java项目一样,只是所需的依赖包不同。

在Eclipse(或IDEA)中新建一个Maven项目“hdfs_demo”(Maven项目的搭建此处不做过多讲解),然后在该项目的pom.xml文件中添加以下代码,以引入Hadoop的Java API依赖包:

     <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.1</version>
     </dependency>

配置好pom.xml后,即可使用HDFS Java API进行程序的编写。

4.4.1 读取数据

FileSystem是HDFS Java API的核心工具类,该类是一个抽象类,其中封装了很多操作文件的方法,使用这些方法可以很轻松地操作HDFS中的文件。例如,在HDFS文件系统的根目录下有一个文件file.txt,可以直接使用FileSystem API读取该文件内容,具体操作步骤如下。

1.编写程序

在新建的hdfs_demo项目中新建Java类FileSystemCat.java,写入查询显示HDFS中的/file.txt文件内容的代码,代码编写步骤如下:

01 创建Configuration对象。

02 得到FileSystem对象。

03 进行文件操作。

完整代码如下所示:

代码分析:

在运行HDFS程序之前,需要先初始化Configuration对象,该对象的主要作用是读取HDFS的系统配置信息,也就是安装Hadoop时候的配置文件,例如core-site.xml、hdfs-site.xml、mapred-site.xml等文件。

FileSystem是一个普通的文件系统API,可以使用静态工厂方法取得FileSystem实例,并传入Configuration对象参数。FileSystem类的继承结构如图4-9所示。

图4-9 FileSystem类的继承结构

通过调用FileSystem对象的open()方法,取得文件的输入流。该方法实际上返回的是一个FSDataInputStream对象,而不是标准的java.io类对象。FSDataInputStream类是继承了java.io.DataInputStream类的一个特殊类,支持随机访问,因此可以从流的任意位置读取数据。FSDataInputStream类的主要作用是使用DataInputStream包装一个输入流,并且使用BufferedInputStream实现对输入的缓冲。FSDataInputStream类的部分定义源码如下:

     public class FSDataInputStream extends DataInputStream{
     }

FSDataInputStream类的继承结构如图4-10所示。

图4-10 FSDataInputStream类的继承结构

2.运行程序

直接在Eclipse中右击代码空白处,选择【Run As】|【Java Application】命令运行该程序即可,若控制台中能正确输出文件file.txt的内容,说明代码编写正确。

当然,也可以将项目导出为jar包,然后上传jar包到Hadoop集群的任意一个节点上,执行以下命令运行该程序:

     $ hadoop jar hdfs_demo.jar hdfs.demo.FileSystemCat

上述命令需要到$HADOOP_HOME/bin目录中执行,若配置了该目录的系统PATH变量,则可以在任意目录执行。其中的hdfs_demo.jar为项目导出的jar包名称,此处为相对路径,hdfs.demo为类FileSystemCat所在的包名。

4.4.2 创建目录

使用FileSystem的创建目录方法mkdirs(),可以创建未存在的父目录,就像java.io.File的mkdirs()方法一样。如果目录创建成功,它会返回true。

下面这个例子是在HDFS文件系统根目录下创建一个名为mydir的文件夹。

1.编写程序

在新建的hdfs_demo项目中新建Java类CreateDir.java,该类的完整代码如下所示:

2.运行程序

代码的运行参考4.4.1节,若控制台中能正确输出“创建目录成功!”,说明代码编写正确。

4.4.3 创建文件

使用FileSystem的创建文件方法create(),可以在HDFS文件系统的指定路径创建一个文件,并向其写入内容。例如,在HDFS系统根目录创建一个文件newfile2.txt,并写入内容“我是文件内容”,代码如下:

     /**
      * 定义创建文件方法
      */
     public static void createFile() throws Exception {
          Configuration conf = new Configuration();
          conf.set("fs.default.name", "hdfs://192.168.170.133:9000");
          FileSystem fs = FileSystem.get(conf);
          //打开一个输出流
          FSDataOutputStream outputStream = fs.create(new
                Path("hdfs:/newfile2.txt"));
        //写入文件内容
        outputStream.write("我是文件内容".getBytes());
        outputStream.close();
        fs.close();
        System.out.println("文件创建成功!");
     }

代码分析如下:

FileSystem实例的create()方法返回一个文件输出流对象FSDataOutputStream,该类继承了java.io.DataOutputStream类。与FSDataInputStream类不同的是,FSDataOutputStream类不支持随机访问,因此不能从文件的任意位置写入数据,只能从文件末尾追加数据。

create()方法有多个重载方法,允许指定是否强制覆盖已有文件(默认覆盖)、文件副本数量、写入文件的缓冲大小、文件块大小、文件权限许可等。create()方法的其中几个重载方法的定义源码如下:

     public FSDataOutputStream create(Path f) throws IOException {
     }
     public FSDataOutputStream create(Path f, boolean overwrite)
             throws IOException {
     }
     public FSDataOutputStream create(Path f, short replication)
             throws IOException {
     }
     public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize)
             throws IOException {
     }
     public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
                         short replication, long blockSize)
             throws IOException {
     }
     public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
                         short replication, long blockSize, Progressable progress)
             throws IOException {
     }

在调用create()方法时,还可以传入一个Progressable对象,该Progressable是一个接口,其中定义了一个progress()回调方法,使用该方法可以得知数据被写入数据节点的进度。Progressable接口的源码如下:

     public interface Progressable {
       void progress();
     }

例如,上传文件D:/soft/test.zip到HDFS根目录,并通过在控制台打印“.”显示上传进度,代码如下:

     public static void main(String[] args) throws IOException {
          Configuration conf = new Configuration();
          conf.set("fs.default.name", "hdfs://192.168.170.133:9000");
          InputStream in=new BufferedInputStream(
                  new FileInputStream("D:/soft/test.zip"));
          FileSystem fs = FileSystem.get(conf);
          //上传文件并监控上传进度
          FSDataOutputStream outputStream = fs.create(new Path("hdfs:/test.zip"),
                  new Progressable() {
            @Override
            public void progress() {//回调方法显示进度
              System.out.print(".");
            }
          });
         IOUtils.copyBytes(in, outputStream, 4096, false);
        }
     }

运行上述代码,每当上传64KB数据包到Hadoop数据节点时将调用一次progress()方法。

此外,还可以使用FileSystem的append()方法在文件末尾追加数据。这对于日志文件需要持续写入数据非常有用。append()方法的调用源码如下:

     FSDataOutputStream outputStream = fs.append(new Path("hdfs:/newfile2.txt"));

4.4.4 删除文件

使用FileSystem的deleteOnExit()方法,可以删除HDFS文件系统中已经存在的文件。例如,删除HDFS系统根目录下的文件newfile.txt,代码如下:

     /**
      * 定义删除文件方法
      */
     public static void deleteFile() throws Exception{
          Configuration conf = new Configuration();
          conf.set("fs.default.name", "hdfs://192.168.170.133:9000");
          FileSystem fs = FileSystem.get(conf);
          Path path = new Path("hdfs:/newfile.txt");
          //删除文件
          boolean isok = fs.deleteOnExit(path);
          if(isok){
              System.out.println("删除成功!");
          }else{
              System.out.println("删除失败!");
          }
          fs.close();
     }

4.4.5 遍历文件和目录

使用FileSystem的listStatus()方法,可以遍历HDFS文件系统中指定路径下的所有目录和文件。例如,递归遍历HDFS系统根目录下的所有文件和目录并输出路径信息,代码如下:

上述代码通过调用FileSystem的listStatus()方法获得指定路径下的一级子目录及文件,并将结果存储于FileStatus类型的数组中;然后循环遍历该数组,当遇到目录时,再次调用listStatus()方法取得该目录下的所有子目录及文件,从而能够递归取得指定路径下的所有目录及文件。

假设HDFS文件系统的根目录有文件夹input、文件newfile.txt,文件夹input中有文件test.txt,则上述代码的输出结果为:

     hdfs://192.168.170.133:9000/input
     hdfs://192.168.170.133:9000/input/test.txt
     hdfs://192.168.170.133:9000/newfile.txt

4.4.6 获取文件或目录的元数据

使用FileSystem的getFileStatus ()方法,可以获得HDFS文件系统中的文件或目录的元数据信息,包括文件路径、文件修改日期、文件上次访问日期、文件长度、文件备份数、文件大小等。getFileStatus()方法返回一个FileStatus对象,元数据信息则封装在了该对象中。

获取文件或目录元数据的代码如下:

     import java.sql.Timestamp;
     import org.apache.hadoop.conf.Configuration;
     import org.apache.hadoop.fs.FileStatus;
     import org.apache.hadoop.fs.FileSystem;
     import org.apache.hadoop.fs.Path;
     /**
      * 获取文件或目录的元数据信息
      */
     public class FileStatusCat {
       public static void main(String[] args) throws Exception {
         //创建Configuration对象
         Configuration conf = new Configuration();
         //设置HDFS访问地址
         conf.set("fs.default.name", "hdfs://192.168.170.133:9000");
         //取得FileSystem文件系统实例
         FileSystem fs = FileSystem.get(conf);
         FileStatus fileStatus = fs.getFileStatus(new Path("hdfs:/file.txt"));
         //判断是文件夹还是文件
         if (fileStatus.isDirectory()) {
           System.out.println("这是一个文件夹");
         } else {
           System.out.println("这是一个文件");
         }
         //输出元数据信息
         System.out.println("文件路径:" + fileStatus.getPath());
         System.out.println("文件修改日期:"
             + new Timestamp(fileStatus.getModificationTime()).toString());
         System.out.println("文件上次访问日期:"
             + new Timestamp(fileStatus.getAccessTime()).toString());
         System.out.println("文件长度:" + fileStatus.getLen());
         System.out.println("文件备份数:" + fileStatus.getReplication());
         System.out.println("文件块大小:" + fileStatus.getBlockSize());
         System.out.println("文件所有者:" + fileStatus.getOwner());
         System.out.println("文件所在分组:" + fileStatus.getGroup());
         System.out.println("文件的权限:" + fileStatus.getPermission().toString());
       }
     }

上述代码的输出结果如下:

     这是一个文件
     文件路径:hdfs://192.168.170.133:9000/file.txt
     文件修改日期:2018-07-19 15:40:13.533
     文件上次访问日期:2018-07-19 15:40:13.016
     文件长度:40
     文件备份数:2
     文件块大小:134217728
     文件所有者:hadoop
     文件所在分组:supergroup
     文件的权限:rw-r--r--

4.4.7 上传本地文件

使用FileSystem的copyFromLocalFile()方法,可以将操作系统本地的文件上传到HDFS文件系统中,该方法需要传入两个Path类型的参数,分别代表本地目录/文件和HDFS目录/文件。

例如,将Windows系统中D盘的copy_test.txt文件上传到HDFS文件系统的根目录,代码如下:

     /**
      * 定义方法,上传本地文件到HDFS
      */
     public static void uploadFileToHDFS() throws Exception {
         //1.创建配置器
         Configuration conf = new Configuration();
         conf.set("fs.default.name", "hdfs://192.168.170.133:9000");
         //2.取得FileSystem文件系统实例
         FileSystem fs = FileSystem.get(conf);
         //3.创建可供hadoop使用的文件系统路径
         Path src = new Path("D:/copy_test.txt");             //本地目录/文件
         Path dst = new Path("hdfs:/");                       //HDFS目录/文件
         //4.复制上传本地文件至HDFS文件系统中
         fs.copyFromLocalFile(src, dst);
         System.out.println("文件上传成功!");
     }

4.4.8 下载文件到本地

使用FileSystem的copyToLocalFile()方法,可以将HDFS文件系统中的文件下载到操作系统本地,该方法需要传入两个Path类型的参数,分别代表HDFS目录/文件和本地目录/文件。

例如,将HDFS文件系统根目录的文件newfile2.txt下载到Windows系统中D盘根目录,并重命名为new.txt,代码如下: c5B896QP7xYvLF1UFViyLUYT7sWP0xZMsg4dughIqhrwga6Dp0JmAZj8n3hlwYc+

     /**
      * 定义方法,下载文件到本地
      */
     public static void downloadFileToLocal() throws Exception{
       //1.创建配置器
       Configuration conf = new Configuration();
       conf.set("fs.default.name", "hdfs://192.168.170.133:9000");
       //2.取得FileSystem文件系统实例
       FileSystem fs = FileSystem.get(conf);
       //3.创建可供hadoop使用的文件系统路径
       Path src = new Path("hdfs:/newfile2.txt");                  //HDFS目录/文件
       Path dst = new Path("D:/new.txt");                          //本地目录/文件
       //4.从HDFS文件系统中复制下载文件至本地
       fs.copyToLocalFile(false,src,dst,true);
       System.out.println("文件下载成功!");
     }
点击中间区域
呼出菜单
上一章
目录
下一章
×