首页 > 编程语言 >4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)

4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)

时间:2023-05-15 19:06:32浏览次数:36  
标签:HDFS Exception java String HDFSUtil srcPath hdfsUtil test path

目录

  • Hadoop系列文章目录
  • 一、pom.xml
  • 二、junit测试类
  • 三、操作类(帮助类)
  • 四、高可用环境的操作类



本文编写了java对HDFS的常见操作,并且均测试通过。其功能包含构造conf、设置系统环境变量、创建目录、判断文件是否存在、获取文件/目录的大小等,具体见下图。

4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置)_hdfs

本文分为四部分,即pom.xml、junit测试类、操作类、高可用环境操作类。

  • pom.xml描述本代码运行需要哪些maven依赖
  • junit测试类是对操作类的单元测试
  • 操作类就是对hdfs api的使用
  • 高可用环境操作类就是操作类在高可用环境中的配置

一、pom.xml

<?xml version="1.0"?>
<project
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
	xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>com.okcard</groupId>
		<artifactId>bigdata-component</artifactId>
		<version>0.0.1-SNAPSHOT</version>
	</parent>
	<groupId>com.okcard</groupId>
	<artifactId>hadoop</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>hadoop</name>
	<url>http://maven.apache.org</url>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>
	
	<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>3.1.4</version>
		</dependency>
  		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.13</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.22</version>
		</dependency>
	</dependencies>
</project>

二、junit测试类

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @author alanchan HDFSUtil测试用例
 */
public class HDFSUtilTest {
	private HDFSUtil hdfsUtil;

	@Before
	public void setUp() throws Exception {
		hdfsUtil = HDFSUtil.getHDFSUtilInstance();
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance(strings);
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf,strings);
	}

//	assertTrue(String message, boolean condition) 
//	assertFalse(String message, boolean condition)
//	assertNotEquals(String message, Object unexpected,Object actual)
//	assertArrayEquals(String message, Object[] expecteds,Object[] actuals)
//	assertNotNull(String message, Object object)
//	assertNull(String message, Object object)
//	assertSame(String message, Object expected, Object actual)
	@Test
	public void testMkdir() {
		try {
			String path = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	@Test
	public void testExists() {
		try {
			String path = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(path));
			assertTrue("測試目錄是否存在成功", hdfsUtil.exists(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testGetHdfsFileSize() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing";
			hdfsUtil.writeHdfsFile(path, fileContent);
			long fileSize = hdfsUtil.getHdfsFileSize(path);
//			assertTrue("创建目录成功", hdfsUtil.mkdir(path));
			assertEquals("文件大小与期望一致", fileContent.length(), fileSize);
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWriteHdfsFile() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWriteHdfsFile2() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(new Path(path), fileContent.getBytes()));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWriteUTFHdfsFile() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing";
			assertTrue("写文件成功", hdfsUtil.writeUTFHdfsFile(path, fileContent));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testReadHdfsFile() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
			assertEquals("读取文件内容与预期一致", fileContent, hdfsUtil.readHdfsFile(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testReadUTFHdfsFile() {
		try {
			String path = "/test/testsub";
			String fileContent = "this is a testing";
			assertTrue("写文件成功", hdfsUtil.writeUTFHdfsFile(path, fileContent));
			assertEquals("读取UTF文件内容与预期一致", fileContent, hdfsUtil.readUTFHdfsFile(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWriteAndAppendHdfsFile() {
		try {
			// 構造conf
			Configuration conf = new Configuration();
			conf.set("fs.defaultFS", "hdfs://server1:8020");
			conf.setBoolean("dfs.support.append", true);
			conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
			conf.set("fe.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
			conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
			// 調用示例
			hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
			String path = "/test/testsub";
			String fileContent = "this is a testing";

			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));

			fileContent = "\n line this is another testing";
			assertTrue("追加写文件成功", hdfsUtil.writeAndAppendHdfsFile(path, fileContent));

			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWalkHdfsDir() {
		try {
			String dir = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(dir));
			String path = "/test/testsub/testsub.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
			hdfsUtil.walkHdfsDir(new Path("/test"));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(dir, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testWalkHdfsDir_Iter() {
		try {
			String dir = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(dir));
			String path = "/test/testsub/testsub.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
			hdfsUtil.walkHdfsDir_Iter(new Path("/test"), true);
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(dir, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testUploadLocalFileToHdfs() {
		try {
			String localFile = "d://test.txt";
			BufferedWriter out = new BufferedWriter(new FileWriter(localFile));
			out.write("this is test uplaod file testing");
			out.close();

//			File tempFile = new File(localFile);
//			tempFile.deleteOnExit();
//			tempFile.delete();

			String path = "/test/testsub/testsub.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));

			assertTrue("上传本地文件成功", hdfsUtil.uploadLocalFileToHdfs(localFile, path));

			assertTrue("刪除本地临时文件成功", new File(localFile).delete());
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/test/testsub", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testDownloadHdfsToLocal() {
		try {
			String path = "/test/testsub/testsub.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));

			String localFile = "d://testdown.txt";
			assertTrue("从HDFS上下载文件到本地成功", hdfsUtil.downloadHdfsToLocal(path, localFile));

			BufferedReader in = new BufferedReader(new FileReader(localFile));
			StringBuilder localFileContent = new StringBuilder();
			String line;
			while ((line = in.readLine()) != null) {
				localFileContent.append(line).append("\n");
			}
			localFileContent.deleteCharAt(localFileContent.length() - 1);

			in.close();
			assertEquals("下文件内容与hdfs文件内容一致", localFileContent.toString(), fileContent);
			assertTrue("刪除本地临时文件成功", new File(localFile).delete());
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/test/testsub", true));

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testRename() {
		try {
			String path = "/test/test/testsub.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));

			String destPath = "/t/test.log";
			assertTrue("rename成功", hdfsUtil.rename(path, destPath, true));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/t", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testDeleteHdfs() {
		try {
			String path = "/test/test/testsub.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyFile() {
		try {
			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destpath = "/test/test/dest.txt";
			assertTrue("複製文件成功", hdfsUtil.copyFile(srcPath, destpath));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyFile2() {
		try {
			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destpath = "/test/test/dest.txt";
			assertTrue("複製文件成功", hdfsUtil.copyFile2(srcPath, destpath));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyDir() {
		try {
			String dir = "/test/test";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));

			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destpath = "/test1";
			assertTrue("複製文件目錄成功", hdfsUtil.copyDir(dir, destpath, false));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test1", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyFilesExcludeDir() {
		try {
			String dir = "/test/test";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));

			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			srcPath = "/test/test/testsub/src2.txt";
			fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destpath = "/test2";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(destpath));
			assertTrue("複製文件目錄成功", hdfsUtil.copyFilesExcludeDir(dir, destpath, false));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test2", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyDirOnlyFiles() {
		try {
			String dir = "/test/test";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));

			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			srcPath = "/test/test/testsub/src2.txt";
			fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destpath = "/test3";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(destpath));
			assertTrue("複製文件目錄成功", hdfsUtil.copyDirOnlyFiles(dir, destpath, false));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test3", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testSearchFile() {
		try {
			String dir = "/test/test";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));

			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			srcPath = "/test/test/testsub/src2.txt";
			fileContent = "this is a testing\n line this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String searchFileName = "src2.txt";
			assertEquals("查找文件数量与预期一致", 1, hdfsUtil.searchFile(dir, searchFileName, true).size());

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testCopyFromLocalFileWithProgress() {
		try {
			String destPath = "/test/test/t.rat";
			String localBigFile = "D:\\BaiduNetdiskDownload\\test.rar";
			assertTrue("查找文件数量与预期一致", hdfsUtil.copyFromLocalFileWithProgress(localBigFile, destPath));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testMergeFile() {
		try {
			String dir = "/test/test";
			assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));

			String srcPath = "/test/test/src.txt";
			String fileContent = "this is a testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			srcPath = "/test/test/src2.txt";
			fileContent = "this is another testing";
			assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));

			String destPath = "/test/src3.txt";
			assertTrue("文件合并成功", hdfsUtil.mergeFile(dir, destPath));

			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
			assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs(destPath, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Test
	public void testTest() {
		hdfsUtil.test();
		hdfsUtil.test("123");
		hdfsUtil.test("123", "456");
		hdfsUtil.test("");
//		test();
//		test("test");
//		test("123", "321");
		assertEquals("Regular multiplication should work", "", "");
	}

	@After
	public void close() {
		hdfsUtil.close();
	}
}

三、操作类(帮助类)

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

import lombok.extern.slf4j.Slf4j;

/**
 * @author alanchan hdfs常用帮助类
 */
@Slf4j
public class HDFSUtil {
	private static Configuration conf = null;
	private static FileSystem fileSystem = null;
	private static final String HADOOP_USER_NAME = "alanchan";
	private static final String DEFAULTFS = "hdfs://server1:8020";
	private static HDFSUtil hdfsInstance = null;

	/**
	 * 
	 * @param strings 默认第一个参数是defaultFS,可以传更多参数,如果传入2个参数,则第二个默认为user,其他参数暂没用
	 * @throws Exception
	 */
	private HDFSUtil(String... strings) throws Exception {
		// 设置客户端身份 以具备权限在hdfs上进行操作
		if (strings.length >= 2) {
			System.setProperty("HADOOP_USER_NAME", strings[1]);
		} else {
			System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
		}
		conf = new Configuration();
		// 设置操作的文件系统是HDFS 并且指定HDFS操作地址。第一个参数是defaultFS
		if (strings.length >= 1) {
			conf.set("fs.defaultFS", strings[0]);
		} else {
			conf.set("fs.defaultFS", DEFAULTFS);
		}
		fileSystem = FileSystem.get(conf);
	}

	private HDFSUtil(Configuration conf, String... strings) throws Exception {
		// 设置客户端身份 以具备权限在hdfs上进行操作
		if (strings.length >= 1) {
			System.setProperty("HADOOP_USER_NAME", strings[0]);
		} else {
			System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
		}

		this.conf = conf;
		fileSystem = FileSystem.get(this.conf);
	}

	public static HDFSUtil getHDFSUtilInstance(Configuration conf, String... strings) throws Exception {
		if (hdfsInstance == null) {
			hdfsInstance = new HDFSUtil(conf, strings);
		}
		return hdfsInstance;
	}

	public static HDFSUtil getHDFSUtilInstance(String... strings) throws Exception {
		if (hdfsInstance == null) {
			hdfsInstance = new HDFSUtil(strings);
		}
		return hdfsInstance;
	}

	public boolean mkdir(String path) throws Exception {
		return mkdir(new Path(path));
	}

	public boolean mkdir(Path p) throws Exception {
		if (!fileSystem.exists(p)) {
			fileSystem.mkdirs(p);
		}
		return fileSystem.exists(p);
	}

	public boolean exists(String path) throws Exception {
		return exists(new Path(path));
	}

	public boolean exists(Path p) throws Exception {
		return fileSystem.exists(p);
	}

	public long getHdfsFileSize(String path) throws Exception {
		return getHdfsFileSize(new Path(path));
	}

	public long getHdfsFileSize(Path path) throws Exception {
		return fileSystem.getContentSummary(path).getLength();
	}

	/**
	 * 
	 * @param path        写入文件的路径,含文件名称
	 * @param fileContent 写入文件的内容
	 * @return 如果文件存在,且文件大小大于0,则视为写入成功
	 * @throws Exception
	 */
	public boolean writeHdfsFile(String path, String fileContent) throws Exception {
		return writeHdfsFile(new Path(path), fileContent);
	}

	public boolean writeHdfsFile(Path path, byte[] fileContent) throws Exception {
		FSDataOutputStream in = fileSystem.create(path);
		in.write(fileContent);
		in.flush();
		in.close();

		return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
	}

	public boolean writeHdfsFile(Path path, String fileContent) throws Exception {
		FSDataOutputStream in = fileSystem.create(path);
		in.write(fileContent.getBytes());
		in.flush();
		in.close();

		return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
	}

	/**
	 * Writes a string to the underlying output stream usingmodified UTF-8 encoding
	 * in a machine-independent manner.
	 * 
	 * @param path
	 * @param fileContent
	 * @return
	 * @throws Exception
	 */
	public boolean writeUTFHdfsFile(Path path, String fileContent) throws Exception {
		FSDataOutputStream in = fileSystem.create(path);
		in.writeUTF(fileContent);
		in.flush();
		in.close();

		return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
	}

	public boolean writeUTFHdfsFile(String path, String fileContent) throws Exception {
		return writeUTFHdfsFile(new Path(path), fileContent);
	}

	public String readHdfsFile(String path) throws Exception {
		return readHdfsFile(new Path(path));
	}

	public String readHdfsFile(Path path) throws Exception {
		FSDataInputStream out = fileSystem.open(path);
		BufferedReader br = new BufferedReader(new InputStreamReader(out));
		String line;
		StringBuilder result = new StringBuilder();
		while ((line = br.readLine()) != null) {
			result.append(line).append("\n");
		}
		// 去掉最后一个换行符
		result.deleteCharAt(result.length() - 1);
		br.close();
		out.close();
		return result.toString();
	}

	public String readUTFHdfsFile(String path) throws Exception {
		return readUTFHdfsFile(new Path(path));
	}

	/**
	 * 好像只能读取通过writeUTF写入的文件
	 * 
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public String readUTFHdfsFile(Path path) throws Exception {
		FSDataInputStream out = fileSystem.open(path);
		String fileContent = out.readUTF();
		out.close();
		return fileContent;
	}

	// 構造conf
//	Configuration conf = new Configuration();
//	conf.set("fs.defaultFS", DEFAULTFS);
//	conf.setBoolean("dfs.support.append", true);
//	conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
//	conf.set("fe.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
//	conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
	// 調用示例
//	HDFSUtil.getHDFSUtilInstance(conf).writeAndAppendHdfsFile(new Path(path1), path2);
	/**
	 * 该例的前提是文件已经存在,仅仅是演示追加fileContent内容至目标文件中
	 * 
	 * @param destPath
	 * @param fileContent
	 * @return
	 * @throws Exception
	 */
	public boolean writeAndAppendHdfsFile(Path destPath, String fileContent) throws Exception {
		fileSystem = destPath.getFileSystem(conf);

		if (!fileSystem.exists(destPath)) {
			fileSystem.createNewFile(destPath);
		}

		FSDataOutputStream output = fileSystem.append(destPath);
//		FSDataOutputStreamBuilder output = fileSystem.appendFile(destPath);
		output.write(fileContent.getBytes("UTF-8"));
		// 文件最后多一个换行符
		output.write("\n".getBytes("UTF-8"));
//		fileSystem.close();
		output.close();

		return fileSystem.exists(destPath) && getHdfsFileSize(destPath) > 0;
	}

	public boolean writeAndAppendHdfsFile(String destPath, String fileContent) throws Exception {
		return writeAndAppendHdfsFile(new Path(destPath), fileContent);
	}

	public void walkHdfsDir(String path) throws Exception {
		walkHdfsDir(new Path(path));
	}

	public void walkHdfsDir(Path path) throws Exception {
		FileStatus[] fileStatuses = fileSystem.listStatus(path);
		for (FileStatus fileStatus : fileStatuses) {
			if (fileStatus.isDirectory()) {
				walkHdfsDir(fileStatus.getPath());
				log.info("目录 = {}", fileStatus.getPath());
			} else {
				log.info("文件完整路径 = {},文件名={}", fileStatus.getPath(), fileStatus.getPath().getName());
			}
		}

	}

	public void walkHdfsDir_Iter(String path, boolean recursive) throws Exception {
		walkHdfsDir_Iter(new Path(path), recursive);
	}

	/**
	 * 
	 * @param path
	 * @param recursive 是否递归
	 * @throws Exception
	 */
	public void walkHdfsDir_Iter(Path path, boolean recursive) throws Exception {
		RemoteIterator<LocatedFileStatus> lfsRemoteIterator = fileSystem.listFiles(path, recursive);
		while (lfsRemoteIterator.hasNext()) {
			LocatedFileStatus next = lfsRemoteIterator.next();
			log.info("文件完整路径 = {},文件名={}", next.getPath(), next.getPath().getName());
		}
	}

	public boolean uploadLocalFileToHdfs(String localFile, String dest) throws Exception {
		return uploadLocalFileToHdfs(localFile, new Path(dest));
	}

	/**
	 * 
	 * @param localFile
	 * @param dest
	 * @return 简单比较目标和源文件的大小判断是否上传成功
	 * @throws Exception
	 */
	public boolean uploadLocalFileToHdfs(String localFile, Path dest) throws Exception {
		fileSystem.copyFromLocalFile(new Path(localFile), dest);
		File file = new File(localFile);
		return file.length() == getHdfsFileSize(dest);
	}

	public boolean downloadHdfsToLocal(String hdfsFile, String localFile) throws Exception {
		return downloadHdfsToLocal(new Path(hdfsFile), localFile);
	}

	public boolean downloadHdfsToLocal(Path hdfsFile, String localFile) throws Exception {
		fileSystem.copyToLocalFile(hdfsFile, new Path(localFile));
		File file = new File(localFile);
		return file.length() == getHdfsFileSize(hdfsFile);
	}

	/**
	 * 
	 * @param srcPath
	 * @param destPath
	 * @param mkdirFlag 如果目标目录不存在是否创建
	 * @return
	 * @throws Exception
	 */
	public boolean rename(Path srcPath, Path destPath, boolean mkdirFlag) throws Exception {
		boolean flag = false;
		if (fileSystem.exists(srcPath)) {
			if (mkdirFlag && !fileSystem.exists(destPath)) {
				fileSystem.mkdirs(destPath);
			}
			flag = fileSystem.rename(srcPath, destPath);
		}

		return flag;
	}

	public boolean rename(String srcPath, String destPath, boolean mkdirFlag) throws Exception {
		return rename(new Path(srcPath), new Path(destPath), mkdirFlag);
	}

	/**
	 * 删除文件或目录
	 * 
	 * @param dest
	 * @param recursive 是否递归删除
	 * @return
	 * @throws Exception
	 */
	public boolean deleteHdfs(Path dest, boolean recursive) throws Exception {
		return fileSystem.delete(dest, recursive);
	}

	public boolean deleteHdfs(String dest, boolean recursive) throws Exception {
		return deleteHdfs(new Path(dest), recursive);
	}

	/**
	 * 拷贝单个文件或流?
	 * 
	 * @param srcPath
	 * @param destpath
	 * @throws Exception
	 */
	public void copyFile(Path srcPath, Path destpath) throws Exception {
		FSDataInputStream in = fileSystem.open(srcPath);
		FSDataOutputStream out = fileSystem.create(destpath);
		IOUtils.copyBytes(in, out, conf);

		IOUtils.closeStream(in);
		IOUtils.closeStream(out);
	}

	public boolean copyFile(String srcPath, String destpath) throws Exception {
		copyFile(new Path(srcPath), new Path(destpath));
		return true;
	}

	public boolean copyFile2(String srcPath, String destpath) throws Exception {
		copyFile2(new Path(srcPath), new Path(destpath));
		return true;
	}

	public void copyFile2(Path srcPath, Path destpath) throws Exception {
		FSDataInputStream in = fileSystem.open(srcPath);
		FSDataOutputStream out = fileSystem.create(destpath);

		byte[] b = new byte[1024];
		int hasRead = 0;
		while ((hasRead = in.read(b)) > 0) {
			out.write(b, 0, hasRead);
		}

		IOUtils.closeStream(in);
		IOUtils.closeStream(out);
	}

	public boolean copyDir(String srcPath, String destpath, boolean deleteSource) throws Exception {
		return copyDir(new Path(srcPath), new Path(destpath), deleteSource);
	}

	/**
	 * 文件夹拷贝,包含文件夾
	 * 
	 * @param srcPath
	 * @param destpath
	 * @param deleteSource
	 * @return
	 * @throws Exception
	 */
	public boolean copyDir(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
//		  public static boolean copy(FileSystem srcFS, Path src,FileSystem dstFS, Path dst,boolean deleteSource,Configuration conf) throws IOException {
		return FileUtil.copy(fileSystem, srcPath, fileSystem, destpath, deleteSource, conf);
	}

	/**
	 * 拷貝文件及目錄,但不包含srcPath的第一層目錄,即srcPath是一个目录,但只拷贝srcPath最后一层目录下的所有文件及目录,但不包含本身
	 * 例如:srcPath=/copyDir/src,destPath=/copyDir/dest,最终结果是将/copyDir/src下面的所有文件及目录拷贝至/copyDir/dest下,但不包含src这个文件夹
	 * 
	 * @param srcPath
	 * @param destpath
	 * @param deleteSource
	 * @throws Exception
	 */
	public boolean copyFilesExcludeDir(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
		boolean flag = false;
		FileStatus[] fileStatuses = fileSystem.listStatus(srcPath);
		for (FileStatus fileStatus : fileStatuses) {
			if (fileStatus.isDirectory()) {
//				copyFilesExcludeDir(fileStatus.getPath(), destpath, deleteSource);
				flag = FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, deleteSource, conf);
			} else {
				flag = FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, deleteSource, conf);
			}
		}
		return flag;
	}

	public boolean copyFilesExcludeDir(String srcPath, String destpath, boolean deleteSource) throws Exception {
		return copyFilesExcludeDir(new Path(srcPath), new Path(destpath), deleteSource);
	}

	/**
	 * 拷贝源文件夹下的所有文件到目标文件夹,不含源文件夹下的文件夹
	 * 
	 * @param srcPath
	 * @param destpath
	 * @param deleteSource
	 * @return
	 * @throws Exception
	 */
	public boolean copyDirOnlyFiles(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
		boolean flag = false;
		RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, true);
		while (sourceFiles.hasNext()) {
			flag = FileUtil.copy(fileSystem, sourceFiles.next().getPath(), fileSystem, destpath, deleteSource, conf);
		}
		return flag;
	}

	public boolean copyDirOnlyFiles(String srcPath, String destpath, boolean deleteSource) throws Exception {
		return copyDirOnlyFiles(new Path(srcPath), new Path(destpath), deleteSource);
	}

	/**
	 * 查找文件,精确查找 equals
	 * 
	 * @param srcPath
	 * @param searchFileName
	 * @param recursive
	 * @return
	 * @throws Exception
	 */
	public List<Path> searchFile(Path srcPath, String searchFileName, boolean recursive) throws Exception {
		List<Path> list = new ArrayList<Path>();

		RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, recursive);
		while (sourceFiles.hasNext()) {
			Path file = sourceFiles.next().getPath();
			String srcFileName = file.getName();
			if (searchFileName.equals(srcFileName)) {
				list.add(file);
			}
		}
		return list;
	}

	public List<Path> searchFile(String srcPath, String searchFileName, boolean recursive) throws Exception {
		return searchFile(new Path(srcPath), searchFileName, recursive);
	}

	/**
	 * 通過流的方式,带进度条的本地文件拷贝至hdfs
	 * 
	 * @param fs
	 * @throws Exception
	 */
	public boolean copyFromLocalFileWithProgress(File srcPath, Path destPath) throws Exception {
		InputStream in = new BufferedInputStream(new FileInputStream(srcPath));
		final float fileSize = srcPath.length() / 65536;
		FSDataOutputStream out = fileSystem.create(destPath, new Progressable() {
			long fileCount = 0;

			@Override
			// progress 方法在每次上传了64K(64*1024=65536)B字节大小的文件之后会自动调用一次
			// 通过累加之后的结果与整个文件大小来计算总上传文件的进度
			public void progress() {
				fileCount++;
				System.out.println("进度%:" + (fileCount / fileSize) * 100 + " %");
			}

		});
		IOUtils.copyBytes(in, out, 4096);

		IOUtils.closeStream(in);
		IOUtils.closeStream(out);
		return getHdfsFileSize(destPath) == srcPath.length();
	}

	public boolean copyFromLocalFileWithProgress(String srcPath, String destPath) throws Exception {
		return copyFromLocalFileWithProgress(new File(srcPath), new Path(destPath));
	}

	/**
	 * 合并文件
	 * 
	 * @param srcPath  目錄
	 * @param destPath 文件名,含路徑
	 * @return
	 * @throws Exception
	 */
	public boolean mergeFile(Path srcPath, Path destPath) throws Exception {
		FSDataOutputStream fout = fileSystem.create(destPath);
		if (fileSystem.isDirectory(srcPath)) {
			RemoteIterator<LocatedFileStatus> lsfr = fileSystem.listFiles(srcPath, true);
			while (lsfr.hasNext()) {
				LocatedFileStatus next = lsfr.next();
				FSDataInputStream fin = fileSystem.open(next.getPath());
				IOUtils.copyBytes(fin, fout, 4096);
				fin.close();
			}
		}
		fout.close();
		return getHdfsFileSize(srcPath) == getHdfsFileSize(srcPath);
	}

	public boolean mergeFile(String srcPath, String destPath) throws Exception {
		return mergeFile(new Path(srcPath), new Path(destPath));
	}

	public void close() {
		if (fileSystem != null) {
			try {
				fileSystem.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		String path1 = "/test/test.rar";
		String path2 = "This is a testing";
		String searchFileName = "D:\\BaiduNetdiskDownload\\2test.rar";

		try {

//			HDFSUtil.getHDFSUtilInstance().writeUTFHdfsFile(path1, path2);copyDir  copyFilesExcludeDir
			String srcPath = "/testhdfs_copyDir/src", destPath = "/testhdfs_copyDir/dest";
			System.out.println(HDFSUtil.getHDFSUtilInstance().copyFilesExcludeDir(srcPath, destPath, false));

		} catch (Exception e) {
			e.printStackTrace();
		}

//		test();
//		test("test");
//		test("123", "321");

	}

	static void test(String... strings) {
		if (strings.length == 1) {
			System.out.println("testing=" + strings[0]);
		} else {
			System.out.println("testing=" + strings.length);
		}
	}

	/**
	 * 没有验证,仅示例加载配置文件
	 * 
	 * @param srcPath
	 * @param destPath
	 * @throws Exception
	 */
	private void copyOrMoveFiles(Path srcPath, Path destPath) throws Exception {
		Configuration conf = new Configuration();
		conf.addResource("basedir/hdfs-site.xml");
		conf.addResource("basedir/core-site.xml");
//            FileSystem fs = FileSystem.newInstance(conf);
		FileContext fc = FileContext.getFileContext(conf);
		FileContext.Util util = fc.util();
		util.copy(srcPath, destPath, false, false);
	}

	/**
	 * 遍历目录,IOUtils.copyBytes(),顺序复制,通过获取本地文件系统或者直接new FileInputStream()的方式实现 2种
	 * 沒有驗證,主要參考其流、conf的應用
	 * 
	 * @param srcPath
	 * @param destPath
	 * @throws Exception
	 */
	private void upload(Path srcPath, Path destPath) throws Exception {
		// 获取本地文件系统
		LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
		if (localFs.isDirectory(srcPath)) {
			RemoteIterator<LocatedFileStatus> lsfr = localFs.listFiles(srcPath, true);
			while (lsfr.hasNext()) {
				LocatedFileStatus next = lsfr.next();
				FSDataInputStream fin = fileSystem.open(next.getPath());
				FSDataOutputStream fout = fileSystem.create(new Path("/test/data/" + next.getPath().getName()));
				IOUtils.copyBytes(fin, fout, 4096);
				fin.close();
				fout.close();
			}
		} else {
			FSDataInputStream fin = fileSystem.open(srcPath);
//			InputStream fin = new FileInputStream("D://a.txt");
			FSDataOutputStream fout = fileSystem.create(destPath);
			IOUtils.copyBytes(fin, fout, 4096);
			fin.close();
			fout.close();
		}

	}

	private void upload(String srcPath, String destPath) throws Exception {
		upload(new Path(srcPath), new Path(destPath));
	}

	/**
	 * 沒有驗證,主要參考其流、conf的應用
	 * 
	 * @param srcPath
	 * @param destPath
	 * @param fs
	 * @param local
	 * @param fc
	 * @throws Exception
	 */
	private void downLoad(Path srcPath, Path destPath, FileSystem fs, FileSystem local, FileContext fc)
			throws Exception {
// 2.字节流的复制
		FileOutputStream localFileOut = new FileOutputStream("D:\\test\\a.txt");
		FSDataInputStream remoteFin = fs.open(srcPath);

		IOUtils.copyBytes(remoteFin, localFileOut, 2048);
		remoteFin.close();
		localFileOut.close();

//4.下载前做合并
		LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
		FSDataOutputStream fsdOut = localFs.create(destPath);
		if (fs.isDirectory(srcPath)) {
			RemoteIterator<LocatedFileStatus> lfsr = fs.listFiles(srcPath, true);
			while (lfsr.hasNext()) {
				LocatedFileStatus next = lfsr.next();
				FSDataInputStream fdIn = fs.open(next.getPath());
				IOUtils.copyBytes(fdIn, fsdOut, 2048);
				fdIn.close();
			}
			fsdOut.close();
		} else {
			FSDataInputStream fsIn = fs.open(srcPath);
			IOUtils.copyBytes(fsIn, fsdOut, 2048);
			fsIn.close();
			fsdOut.close();
		}
	}

}

四、高可用环境的操作类

该示例仅仅是示例高可用环境下的api访问,实际的类还是第三部分的。

import static org.junit.Assert.assertTrue;

import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @author alanchan 测试高可用环境下
 */
public class HDFSHAUtilTest {
	private HDFSUtil hdfsUtil;

	@Before
	public void setUp() throws Exception {
		Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://HadoopHAcluster");
        conf.set("dfs.nameservices", "HadoopHAcluster");
        conf.set("dfs.ha.namenodes.HadoopHAcluster", "nn1,nn2");
        conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn1", "server1:8020");
        conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn2", "server2:8020");
        conf.set("dfs.client.failover.proxy.provider.HadoopHAcluster","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance();
		hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance(strings);
//		hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf,strings);
		


	}

	@Test
	public void testMkdir() {
		try {
			String path = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	@Test
	public void testExists() {
		try {
			String path = "/test/testsub";
			assertTrue("创建目录成功", hdfsUtil.mkdir(path));
			assertTrue("測試目錄是否存在成功", hdfsUtil.exists(path));
			assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@After
	public void close() {
		hdfsUtil.close();
	}
}


标签:HDFS,Exception,java,String,HDFSUtil,srcPath,hdfsUtil,test,path
From: https://blog.51cto.com/alanchan2win/6280441

相关文章

  • 12、HDFS Trash垃圾桶回收介绍与示例
    文章目录Hadoop系列文章目录一、介绍二、HDFSTrash功能开启1、关闭HDFS集群2、修改core-site.xml文件3、同步集群配置文件4、启动HDFS集群三、HDFSTrash功能验证1、删除文件并验证2、不进回收站的删除3、从Trash中恢复文件4、清空Trash本文主要介绍HDFSTrash垃圾桶回收。前提......
  • 5、zookeeper的java -Curator(服务注册与发现)
    目录Zookeeper系列文章目录一、知识介绍1、ServiceInstance2、ServiceProvider3、ServiceDiscovery1)、注册/注销服务2)、查询服务3)、服务缓存二、示例11、pom.xml2、ServiceInstance3、ServiceProvider及ServiceDiscovery4、注册服务的添加、删除、查询5、验证示例1三、示例21、Ins......
  • 4、zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布
    目录Zookeeper系列文章目录一、zookeeper原生JavaAPI二、ZkClient三、Apachecurator1、pom.xml2、定义常量类3、连接实例化4、事务操作示例5、CRUD示例6、监听示例7、计数器示例1)、单机原子自增性实现1、Synchronized示例2、Lock示例3、AtomicInteger示例2)、分布式线程安全原子......
  • 2、HDFS操作 - shell客户端
    目录Hadoop系列文章目录一、语法格式二、具体命令示例1、mkdir命令2、ls命令3、put命令4、rm命令5、moveFromLocal命令6、-get7、cat命令8、head命令9、tail命令10、cp拷贝命令11、appendToFile命令12、df命令13、du命令14、mv命令15、setrep命令16、checksum17、co......
  • java数组去重_JAVA数组去重常用方法
    java数组去重_JAVA数组去重常用方法发布于 2022-09-1017:18:356950举报大家好,又见面了,我是你们的朋友全栈君。packagecom.zxj.test;importjava.util.ArrayList;importjava.util.Arrays;importjava.util.HashMap;importjava.util.List;importjava.util.......
  • Java对象和json对象
    java对象和json对象转换接口classPersion1{@SerializedName("name")@ExposeStringname;publicvoidsetName(Stringname){this.name=name;}@OverridepublicStringtoString()......
  • java.lang.IllegalArgumentException: Illegal URL:
    问题描述: java.lang.IllegalArgumentException:IllegalURL:翻译为:IllegalArgumentException:非法网址:原因:网址错误,网址前面加http:// 后面加/ ;(如下图) ......
  • ChatGPT Plugin开发setup - Java(Spring Boot) Python(fastapi)
    记录一下快速模板,整体很简单,如果不接auth,只需要以下:提供一个/.well-known/ai-plugin.json接口,返回openAI所需要的格式提供openAPI规范的文档CORS设置其他的和普通的web开发类似.本地开发就直接使用localhost即可,前几天官方localhost无法联通,最近应该修复了.要让GPT......
  • 将java开发环境装入linux系统
    使用该指令下载jdkwgethttps://dragonwell.oss-cn-shanghai.aliyuncs.com/8.6.6/Alibaba_Dragonwell_8.6.6_x64_linux.tar.gz使用 tarxf+文件名 解压文件tarxfAlibaba_Dragonwell_8.6.6_x64_linux.tar.gz配置环境变量vim/etc/profile 打开环境变量文件exportJAV......
  • 直播平台搭建源码,java相册制作
    直播平台搭建源码,java相册制作 packagecn.demo2;importjavax.imageio.ImageIO;importjavax.swing.*;importjava.awt.*;importjava.awt.image.BufferedImage;importjava.io.IOException;/** *电子相册 */publicclassMyImageextendsJPanel{  //定义一个成员变量......