首页 > 编程语言 >Hadoop:java使用HDFS API实现基本操作工具类

Hadoop:java使用HDFS API实现基本操作工具类

时间:2024-08-03 10:50:18浏览次数:18  
标签:HDFS java Path new IOException remotePath fileSystem 基本操作 String

1、引入库

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>3.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>3.1.0</version>
</dependency>

 

2、java实现工具类

package com.example.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;

public class HDFSUtils {
    public static FileSystem fileSystem;
    static {
        Configuration conf = new Configuration();
        //设置文件系统类型
        conf.set("fs.defaultFS","hdfs://127.0.0.1:9000");
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);

        if(fileSystem == null){
            try {
                fileSystem = FileSystem.get(conf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static boolean createDir(String path) throws IOException {
        boolean flag = false;
        if (!fileSystem.exists(new Path(path))){//如果文件夹不存在
            flag = fileSystem.mkdirs(new Path(path));
        }

        return flag;
    }

    public static boolean delete(String path) throws IOException {
        boolean flag = false;
        if(fileSystem.exists(new Path(path))){
            flag = fileSystem.delete(new Path(path),true);
        }

        return flag;
    }

    public static void uploadToHdfs(String localPath,String remotePath,boolean override,boolean delSrc) throws IOException{
        if(fileSystem.exists(new Path(remotePath))){
            if(!override){
                throw new IOException(remotePath+" already exist");
            }
        }

        fileSystem.copyFromLocalFile(delSrc,new Path(localPath),new Path(remotePath));

    }

    public static void downloadFromHdfs(String localPath,String remotePath,boolean override,boolean delSrc) throws IOException{
        File localFile=new File(localPath);
        if(localFile.exists()){
            if(!override){
                throw new IOException(localPath+" already exist");
            }
            localFile.delete();
        }

        //最后一个参数指定true,就不会产生crc文件
        fileSystem.copyToLocalFile(delSrc,new Path(remotePath),new Path(localPath),true);
    }

    public static String readFile(String remotePath) throws IOException{
        if(!fileSystem.exists(new Path(remotePath))){
            throw new IOException(remotePath+" not exist");
        }
        StringBuffer sb=new StringBuffer();
        try(FSDataInputStream in=fileSystem.open(new Path(remotePath))){
            BufferedReader br = new BufferedReader(new InputStreamReader(in));
            // 定义行字符串
            String nextLine = "";
            // 通过循环读取缓冲字符输入流
            while ((nextLine = br.readLine()) != null) {
                sb.append(nextLine);
            }
            // 关闭缓冲字符输入流
            br.close();
        }//出了try块in会自动close

        return sb.toString();
    }

    public static void appendFile(String remotePath,String content) throws IOException{
        if(!fileSystem.exists(new Path(remotePath))){
            throw new IOException(remotePath+" not exist");
        }
        try(FSDataOutputStream out=fileSystem.append(new Path(remotePath))){
            out.write(content.getBytes(StandardCharsets.UTF_8));
        }
    }

    public static List<LocatedFileStatus> listFiles(String remotePath,boolean recursive) throws IOException{
        if(!fileSystem.exists(new Path(remotePath))){
            throw new IOException(remotePath+" not exist");
        }

        List<LocatedFileStatus> list=new LinkedList<>();
        RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path(remotePath), recursive);
        while(listFiles.hasNext()){
            list.add(listFiles.next());
        }

        return list;
    }

    public static List<FileStatus> listStatus(String remotePath) throws IOException{
        if(!fileSystem.exists(new Path(remotePath))){
            throw new IOException(remotePath+" not exist");
        }

        FileStatus[] listStatus = fileSystem.listStatus(new Path(remotePath));
        return Arrays.asList(listStatus);
    }


}

 

3、测试

try {
            //HDFSUtils.createDir("/t1/t1_1");
            //HDFSUtils.uploadToHdfs("D:\\tmp\\hello.txt","/t1/t1_1/hello.txt",false,false);
            //HDFSUtils.downloadFromHdfs("D:\\tmp\\hello.txt","/t1/t1_1/hello.txt",true,false);
            //System.out.println(HDFSUtils.readFile("/t1/t1_1/hello.txt"));
            //HDFSUtils.appendFile("/t1/t1_1/hello.txt","追加中文看看");

//            List<LocatedFileStatus> list=HDFSUtils.listFiles("/",false);
//            for(LocatedFileStatus lfs:list){
//                System.out.println(lfs.getPath().toString());
//            }
            List<FileStatus> list=HDFSUtils.listStatus("/");
            for(FileStatus fs:list){
                System.out.println(fs.getPath().toString());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

 

以上代码仅供各位及我本人进行参考。最后附上HDFS API地址,更多的高级功能,大家可以多多试验:

https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html

标签:HDFS,java,Path,new,IOException,remotePath,fileSystem,基本操作,String
From: https://www.cnblogs.com/ddcoder/p/18340172

相关文章