目录
- Hadoop系列文章目录
- 一、pom.xml
- 二、junit测试类
- 三、操作类(帮助类)
- 四、高可用环境的操作类
本文编写了java对HDFS的常见操作,并且均测试通过。其功能包含构造conf、设置系统环境变量、创建目录、判断文件是否存在、获取文件/目录的大小等,具体见下图。
本文分为四部分,即pom.xml、junit测试类、操作类、高可用环境操作类。
- pom.xml描述本代码运行需要哪些maven依赖
- junit测试类是对操作类的单元测试
- 操作类就是对hdfs api的使用
- 高可用环境操作类就是操作类在高可用环境中的配置
一、pom.xml
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.okcard</groupId>
<artifactId>bigdata-component</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>com.okcard</groupId>
<artifactId>hadoop</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>hadoop</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
二、junit测试类
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author alanchan HDFSUtil测试用例
*/
public class HDFSUtilTest {
private HDFSUtil hdfsUtil;
@Before
public void setUp() throws Exception {
hdfsUtil = HDFSUtil.getHDFSUtilInstance();
// hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
// hdfsUtil = HDFSUtil.getHDFSUtilInstance(strings);
// hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf,strings);
}
// assertTrue(String message, boolean condition)
// assertFalse(String message, boolean condition)
// assertNotEquals(String message, Object unexpected,Object actual)
// assertArrayEquals(String message, Object[] expecteds,Object[] actuals)
// assertNotNull(String message, Object object)
// assertNull(String message, Object object)
// assertSame(String message, Object expected, Object actual)
@Test
public void testMkdir() {
try {
String path = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testExists() {
try {
String path = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(path));
assertTrue("測試目錄是否存在成功", hdfsUtil.exists(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testGetHdfsFileSize() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing";
hdfsUtil.writeHdfsFile(path, fileContent);
long fileSize = hdfsUtil.getHdfsFileSize(path);
// assertTrue("创建目录成功", hdfsUtil.mkdir(path));
assertEquals("文件大小与期望一致", fileContent.length(), fileSize);
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWriteHdfsFile() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWriteHdfsFile2() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(new Path(path), fileContent.getBytes()));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWriteUTFHdfsFile() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeUTFHdfsFile(path, fileContent));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testReadHdfsFile() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
assertEquals("读取文件内容与预期一致", fileContent, hdfsUtil.readHdfsFile(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testReadUTFHdfsFile() {
try {
String path = "/test/testsub";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeUTFHdfsFile(path, fileContent));
assertEquals("读取UTF文件内容与预期一致", fileContent, hdfsUtil.readUTFHdfsFile(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWriteAndAppendHdfsFile() {
try {
// 構造conf
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://server1:8020");
conf.setBoolean("dfs.support.append", true);
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
conf.set("fe.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
// 調用示例
hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
String path = "/test/testsub";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
fileContent = "\n line this is another testing";
assertTrue("追加写文件成功", hdfsUtil.writeAndAppendHdfsFile(path, fileContent));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWalkHdfsDir() {
try {
String dir = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(dir));
String path = "/test/testsub/testsub.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
hdfsUtil.walkHdfsDir(new Path("/test"));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(dir, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testWalkHdfsDir_Iter() {
try {
String dir = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(dir));
String path = "/test/testsub/testsub.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
hdfsUtil.walkHdfsDir_Iter(new Path("/test"), true);
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(dir, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testUploadLocalFileToHdfs() {
try {
String localFile = "d://test.txt";
BufferedWriter out = new BufferedWriter(new FileWriter(localFile));
out.write("this is test uplaod file testing");
out.close();
// File tempFile = new File(localFile);
// tempFile.deleteOnExit();
// tempFile.delete();
String path = "/test/testsub/testsub.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
assertTrue("上传本地文件成功", hdfsUtil.uploadLocalFileToHdfs(localFile, path));
assertTrue("刪除本地临时文件成功", new File(localFile).delete());
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/test/testsub", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testDownloadHdfsToLocal() {
try {
String path = "/test/testsub/testsub.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
String localFile = "d://testdown.txt";
assertTrue("从HDFS上下载文件到本地成功", hdfsUtil.downloadHdfsToLocal(path, localFile));
BufferedReader in = new BufferedReader(new FileReader(localFile));
StringBuilder localFileContent = new StringBuilder();
String line;
while ((line = in.readLine()) != null) {
localFileContent.append(line).append("\n");
}
localFileContent.deleteCharAt(localFileContent.length() - 1);
in.close();
assertEquals("下文件内容与hdfs文件内容一致", localFileContent.toString(), fileContent);
assertTrue("刪除本地临时文件成功", new File(localFile).delete());
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/test/testsub", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testRename() {
try {
String path = "/test/test/testsub.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
String destPath = "/t/test.log";
assertTrue("rename成功", hdfsUtil.rename(path, destPath, true));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs("/t", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testDeleteHdfs() {
try {
String path = "/test/test/testsub.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(path, fileContent));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyFile() {
try {
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destpath = "/test/test/dest.txt";
assertTrue("複製文件成功", hdfsUtil.copyFile(srcPath, destpath));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyFile2() {
try {
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destpath = "/test/test/dest.txt";
assertTrue("複製文件成功", hdfsUtil.copyFile2(srcPath, destpath));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyDir() {
try {
String dir = "/test/test";
assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destpath = "/test1";
assertTrue("複製文件目錄成功", hdfsUtil.copyDir(dir, destpath, false));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test1", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyFilesExcludeDir() {
try {
String dir = "/test/test";
assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
srcPath = "/test/test/testsub/src2.txt";
fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destpath = "/test2";
assertTrue("創建目錄成功", hdfsUtil.mkdir(destpath));
assertTrue("複製文件目錄成功", hdfsUtil.copyFilesExcludeDir(dir, destpath, false));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test2", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyDirOnlyFiles() {
try {
String dir = "/test/test";
assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
srcPath = "/test/test/testsub/src2.txt";
fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destpath = "/test3";
assertTrue("創建目錄成功", hdfsUtil.mkdir(destpath));
assertTrue("複製文件目錄成功", hdfsUtil.copyDirOnlyFiles(dir, destpath, false));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test3", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testSearchFile() {
try {
String dir = "/test/test";
assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
srcPath = "/test/test/testsub/src2.txt";
fileContent = "this is a testing\n line this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String searchFileName = "src2.txt";
assertEquals("查找文件数量与预期一致", 1, hdfsUtil.searchFile(dir, searchFileName, true).size());
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCopyFromLocalFileWithProgress() {
try {
String destPath = "/test/test/t.rat";
String localBigFile = "D:\\BaiduNetdiskDownload\\test.rar";
assertTrue("查找文件数量与预期一致", hdfsUtil.copyFromLocalFileWithProgress(localBigFile, destPath));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testMergeFile() {
try {
String dir = "/test/test";
assertTrue("創建目錄成功", hdfsUtil.mkdir(dir));
String srcPath = "/test/test/src.txt";
String fileContent = "this is a testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
srcPath = "/test/test/src2.txt";
fileContent = "this is another testing";
assertTrue("写文件成功", hdfsUtil.writeHdfsFile(srcPath, fileContent));
String destPath = "/test/src3.txt";
assertTrue("文件合并成功", hdfsUtil.mergeFile(dir, destPath));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs("/test/test", true));
assertTrue("递归刪除目录成功", hdfsUtil.deleteHdfs(destPath, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testTest() {
hdfsUtil.test();
hdfsUtil.test("123");
hdfsUtil.test("123", "456");
hdfsUtil.test("");
// test();
// test("test");
// test("123", "321");
assertEquals("Regular multiplication should work", "", "");
}
@After
public void close() {
hdfsUtil.close();
}
}
三、操作类(帮助类)
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import lombok.extern.slf4j.Slf4j;
/**
* @author alanchan hdfs常用帮助类
*/
@Slf4j
public class HDFSUtil {
private static Configuration conf = null;
private static FileSystem fileSystem = null;
private static final String HADOOP_USER_NAME = "alanchan";
private static final String DEFAULTFS = "hdfs://server1:8020";
private static HDFSUtil hdfsInstance = null;
/**
*
* @param strings 默认第一个参数是defaultFS,可以传更多参数,如果传入2个参数,则第二个默认为user,其他参数暂没用
* @throws Exception
*/
private HDFSUtil(String... strings) throws Exception {
// 设置客户端身份 以具备权限在hdfs上进行操作
if (strings.length >= 2) {
System.setProperty("HADOOP_USER_NAME", strings[1]);
} else {
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
}
conf = new Configuration();
// 设置操作的文件系统是HDFS 并且指定HDFS操作地址。第一个参数是defaultFS
if (strings.length >= 1) {
conf.set("fs.defaultFS", strings[0]);
} else {
conf.set("fs.defaultFS", DEFAULTFS);
}
fileSystem = FileSystem.get(conf);
}
private HDFSUtil(Configuration conf, String... strings) throws Exception {
// 设置客户端身份 以具备权限在hdfs上进行操作
if (strings.length >= 1) {
System.setProperty("HADOOP_USER_NAME", strings[0]);
} else {
System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);
}
this.conf = conf;
fileSystem = FileSystem.get(this.conf);
}
public static HDFSUtil getHDFSUtilInstance(Configuration conf, String... strings) throws Exception {
if (hdfsInstance == null) {
hdfsInstance = new HDFSUtil(conf, strings);
}
return hdfsInstance;
}
public static HDFSUtil getHDFSUtilInstance(String... strings) throws Exception {
if (hdfsInstance == null) {
hdfsInstance = new HDFSUtil(strings);
}
return hdfsInstance;
}
public boolean mkdir(String path) throws Exception {
return mkdir(new Path(path));
}
public boolean mkdir(Path p) throws Exception {
if (!fileSystem.exists(p)) {
fileSystem.mkdirs(p);
}
return fileSystem.exists(p);
}
public boolean exists(String path) throws Exception {
return exists(new Path(path));
}
public boolean exists(Path p) throws Exception {
return fileSystem.exists(p);
}
public long getHdfsFileSize(String path) throws Exception {
return getHdfsFileSize(new Path(path));
}
public long getHdfsFileSize(Path path) throws Exception {
return fileSystem.getContentSummary(path).getLength();
}
/**
*
* @param path 写入文件的路径,含文件名称
* @param fileContent 写入文件的内容
* @return 如果文件存在,且文件大小大于0,则视为写入成功
* @throws Exception
*/
public boolean writeHdfsFile(String path, String fileContent) throws Exception {
return writeHdfsFile(new Path(path), fileContent);
}
public boolean writeHdfsFile(Path path, byte[] fileContent) throws Exception {
FSDataOutputStream in = fileSystem.create(path);
in.write(fileContent);
in.flush();
in.close();
return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
}
public boolean writeHdfsFile(Path path, String fileContent) throws Exception {
FSDataOutputStream in = fileSystem.create(path);
in.write(fileContent.getBytes());
in.flush();
in.close();
return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
}
/**
* Writes a string to the underlying output stream usingmodified UTF-8 encoding
* in a machine-independent manner.
*
* @param path
* @param fileContent
* @return
* @throws Exception
*/
public boolean writeUTFHdfsFile(Path path, String fileContent) throws Exception {
FSDataOutputStream in = fileSystem.create(path);
in.writeUTF(fileContent);
in.flush();
in.close();
return fileSystem.exists(path) && getHdfsFileSize(path) > 0;
}
public boolean writeUTFHdfsFile(String path, String fileContent) throws Exception {
return writeUTFHdfsFile(new Path(path), fileContent);
}
public String readHdfsFile(String path) throws Exception {
return readHdfsFile(new Path(path));
}
public String readHdfsFile(Path path) throws Exception {
FSDataInputStream out = fileSystem.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(out));
String line;
StringBuilder result = new StringBuilder();
while ((line = br.readLine()) != null) {
result.append(line).append("\n");
}
// 去掉最后一个换行符
result.deleteCharAt(result.length() - 1);
br.close();
out.close();
return result.toString();
}
public String readUTFHdfsFile(String path) throws Exception {
return readUTFHdfsFile(new Path(path));
}
/**
* 好像只能读取通过writeUTF写入的文件
*
* @param path
* @return
* @throws Exception
*/
public String readUTFHdfsFile(Path path) throws Exception {
FSDataInputStream out = fileSystem.open(path);
String fileContent = out.readUTF();
out.close();
return fileContent;
}
// 構造conf
// Configuration conf = new Configuration();
// conf.set("fs.defaultFS", DEFAULTFS);
// conf.setBoolean("dfs.support.append", true);
// conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
// conf.set("fe.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
// conf.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enable", true);
// 調用示例
// HDFSUtil.getHDFSUtilInstance(conf).writeAndAppendHdfsFile(new Path(path1), path2);
/**
* 该例的前提是文件已经存在,仅仅是演示追加fileContent内容至目标文件中
*
* @param destPath
* @param fileContent
* @return
* @throws Exception
*/
public boolean writeAndAppendHdfsFile(Path destPath, String fileContent) throws Exception {
fileSystem = destPath.getFileSystem(conf);
if (!fileSystem.exists(destPath)) {
fileSystem.createNewFile(destPath);
}
FSDataOutputStream output = fileSystem.append(destPath);
// FSDataOutputStreamBuilder output = fileSystem.appendFile(destPath);
output.write(fileContent.getBytes("UTF-8"));
// 文件最后多一个换行符
output.write("\n".getBytes("UTF-8"));
// fileSystem.close();
output.close();
return fileSystem.exists(destPath) && getHdfsFileSize(destPath) > 0;
}
public boolean writeAndAppendHdfsFile(String destPath, String fileContent) throws Exception {
return writeAndAppendHdfsFile(new Path(destPath), fileContent);
}
public void walkHdfsDir(String path) throws Exception {
walkHdfsDir(new Path(path));
}
public void walkHdfsDir(Path path) throws Exception {
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
walkHdfsDir(fileStatus.getPath());
log.info("目录 = {}", fileStatus.getPath());
} else {
log.info("文件完整路径 = {},文件名={}", fileStatus.getPath(), fileStatus.getPath().getName());
}
}
}
public void walkHdfsDir_Iter(String path, boolean recursive) throws Exception {
walkHdfsDir_Iter(new Path(path), recursive);
}
/**
*
* @param path
* @param recursive 是否递归
* @throws Exception
*/
public void walkHdfsDir_Iter(Path path, boolean recursive) throws Exception {
RemoteIterator<LocatedFileStatus> lfsRemoteIterator = fileSystem.listFiles(path, recursive);
while (lfsRemoteIterator.hasNext()) {
LocatedFileStatus next = lfsRemoteIterator.next();
log.info("文件完整路径 = {},文件名={}", next.getPath(), next.getPath().getName());
}
}
public boolean uploadLocalFileToHdfs(String localFile, String dest) throws Exception {
return uploadLocalFileToHdfs(localFile, new Path(dest));
}
/**
*
* @param localFile
* @param dest
* @return 简单比较目标和源文件的大小判断是否上传成功
* @throws Exception
*/
public boolean uploadLocalFileToHdfs(String localFile, Path dest) throws Exception {
fileSystem.copyFromLocalFile(new Path(localFile), dest);
File file = new File(localFile);
return file.length() == getHdfsFileSize(dest);
}
public boolean downloadHdfsToLocal(String hdfsFile, String localFile) throws Exception {
return downloadHdfsToLocal(new Path(hdfsFile), localFile);
}
public boolean downloadHdfsToLocal(Path hdfsFile, String localFile) throws Exception {
fileSystem.copyToLocalFile(hdfsFile, new Path(localFile));
File file = new File(localFile);
return file.length() == getHdfsFileSize(hdfsFile);
}
/**
*
* @param srcPath
* @param destPath
* @param mkdirFlag 如果目标目录不存在是否创建
* @return
* @throws Exception
*/
public boolean rename(Path srcPath, Path destPath, boolean mkdirFlag) throws Exception {
boolean flag = false;
if (fileSystem.exists(srcPath)) {
if (mkdirFlag && !fileSystem.exists(destPath)) {
fileSystem.mkdirs(destPath);
}
flag = fileSystem.rename(srcPath, destPath);
}
return flag;
}
public boolean rename(String srcPath, String destPath, boolean mkdirFlag) throws Exception {
return rename(new Path(srcPath), new Path(destPath), mkdirFlag);
}
/**
* 删除文件或目录
*
* @param dest
* @param recursive 是否递归删除
* @return
* @throws Exception
*/
public boolean deleteHdfs(Path dest, boolean recursive) throws Exception {
return fileSystem.delete(dest, recursive);
}
public boolean deleteHdfs(String dest, boolean recursive) throws Exception {
return deleteHdfs(new Path(dest), recursive);
}
/**
* 拷贝单个文件或流?
*
* @param srcPath
* @param destpath
* @throws Exception
*/
public void copyFile(Path srcPath, Path destpath) throws Exception {
FSDataInputStream in = fileSystem.open(srcPath);
FSDataOutputStream out = fileSystem.create(destpath);
IOUtils.copyBytes(in, out, conf);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
public boolean copyFile(String srcPath, String destpath) throws Exception {
copyFile(new Path(srcPath), new Path(destpath));
return true;
}
public boolean copyFile2(String srcPath, String destpath) throws Exception {
copyFile2(new Path(srcPath), new Path(destpath));
return true;
}
public void copyFile2(Path srcPath, Path destpath) throws Exception {
FSDataInputStream in = fileSystem.open(srcPath);
FSDataOutputStream out = fileSystem.create(destpath);
byte[] b = new byte[1024];
int hasRead = 0;
while ((hasRead = in.read(b)) > 0) {
out.write(b, 0, hasRead);
}
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
public boolean copyDir(String srcPath, String destpath, boolean deleteSource) throws Exception {
return copyDir(new Path(srcPath), new Path(destpath), deleteSource);
}
/**
* 文件夹拷贝,包含文件夾
*
* @param srcPath
* @param destpath
* @param deleteSource
* @return
* @throws Exception
*/
public boolean copyDir(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
// public static boolean copy(FileSystem srcFS, Path src,FileSystem dstFS, Path dst,boolean deleteSource,Configuration conf) throws IOException {
return FileUtil.copy(fileSystem, srcPath, fileSystem, destpath, deleteSource, conf);
}
/**
* 拷貝文件及目錄,但不包含srcPath的第一層目錄,即srcPath是一个目录,但只拷贝srcPath最后一层目录下的所有文件及目录,但不包含本身
* 例如:srcPath=/copyDir/src,destPath=/copyDir/dest,最终结果是将/copyDir/src下面的所有文件及目录拷贝至/copyDir/dest下,但不包含src这个文件夹
*
* @param srcPath
* @param destpath
* @param deleteSource
* @throws Exception
*/
public boolean copyFilesExcludeDir(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
boolean flag = false;
FileStatus[] fileStatuses = fileSystem.listStatus(srcPath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
// copyFilesExcludeDir(fileStatus.getPath(), destpath, deleteSource);
flag = FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, deleteSource, conf);
} else {
flag = FileUtil.copy(fileSystem, fileStatus.getPath(), fileSystem, destpath, deleteSource, conf);
}
}
return flag;
}
public boolean copyFilesExcludeDir(String srcPath, String destpath, boolean deleteSource) throws Exception {
return copyFilesExcludeDir(new Path(srcPath), new Path(destpath), deleteSource);
}
/**
* 拷贝源文件夹下的所有文件到目标文件夹,不含源文件夹下的文件夹
*
* @param srcPath
* @param destpath
* @param deleteSource
* @return
* @throws Exception
*/
public boolean copyDirOnlyFiles(Path srcPath, Path destpath, boolean deleteSource) throws Exception {
boolean flag = false;
RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, true);
while (sourceFiles.hasNext()) {
flag = FileUtil.copy(fileSystem, sourceFiles.next().getPath(), fileSystem, destpath, deleteSource, conf);
}
return flag;
}
public boolean copyDirOnlyFiles(String srcPath, String destpath, boolean deleteSource) throws Exception {
return copyDirOnlyFiles(new Path(srcPath), new Path(destpath), deleteSource);
}
/**
* 查找文件,精确查找 equals
*
* @param srcPath
* @param searchFileName
* @param recursive
* @return
* @throws Exception
*/
public List<Path> searchFile(Path srcPath, String searchFileName, boolean recursive) throws Exception {
List<Path> list = new ArrayList<Path>();
RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(srcPath, recursive);
while (sourceFiles.hasNext()) {
Path file = sourceFiles.next().getPath();
String srcFileName = file.getName();
if (searchFileName.equals(srcFileName)) {
list.add(file);
}
}
return list;
}
public List<Path> searchFile(String srcPath, String searchFileName, boolean recursive) throws Exception {
return searchFile(new Path(srcPath), searchFileName, recursive);
}
/**
* 通過流的方式,带进度条的本地文件拷贝至hdfs
*
* @param fs
* @throws Exception
*/
public boolean copyFromLocalFileWithProgress(File srcPath, Path destPath) throws Exception {
InputStream in = new BufferedInputStream(new FileInputStream(srcPath));
final float fileSize = srcPath.length() / 65536;
FSDataOutputStream out = fileSystem.create(destPath, new Progressable() {
long fileCount = 0;
@Override
// progress 方法在每次上传了64K(64*1024=65536)B字节大小的文件之后会自动调用一次
// 通过累加之后的结果与整个文件大小来计算总上传文件的进度
public void progress() {
fileCount++;
System.out.println("进度%:" + (fileCount / fileSize) * 100 + " %");
}
});
IOUtils.copyBytes(in, out, 4096);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
return getHdfsFileSize(destPath) == srcPath.length();
}
public boolean copyFromLocalFileWithProgress(String srcPath, String destPath) throws Exception {
return copyFromLocalFileWithProgress(new File(srcPath), new Path(destPath));
}
/**
* 合并文件
*
* @param srcPath 目錄
* @param destPath 文件名,含路徑
* @return
* @throws Exception
*/
public boolean mergeFile(Path srcPath, Path destPath) throws Exception {
FSDataOutputStream fout = fileSystem.create(destPath);
if (fileSystem.isDirectory(srcPath)) {
RemoteIterator<LocatedFileStatus> lsfr = fileSystem.listFiles(srcPath, true);
while (lsfr.hasNext()) {
LocatedFileStatus next = lsfr.next();
FSDataInputStream fin = fileSystem.open(next.getPath());
IOUtils.copyBytes(fin, fout, 4096);
fin.close();
}
}
fout.close();
return getHdfsFileSize(srcPath) == getHdfsFileSize(srcPath);
}
public boolean mergeFile(String srcPath, String destPath) throws Exception {
return mergeFile(new Path(srcPath), new Path(destPath));
}
public void close() {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
String path1 = "/test/test.rar";
String path2 = "This is a testing";
String searchFileName = "D:\\BaiduNetdiskDownload\\2test.rar";
try {
// HDFSUtil.getHDFSUtilInstance().writeUTFHdfsFile(path1, path2);copyDir copyFilesExcludeDir
String srcPath = "/testhdfs_copyDir/src", destPath = "/testhdfs_copyDir/dest";
System.out.println(HDFSUtil.getHDFSUtilInstance().copyFilesExcludeDir(srcPath, destPath, false));
} catch (Exception e) {
e.printStackTrace();
}
// test();
// test("test");
// test("123", "321");
}
static void test(String... strings) {
if (strings.length == 1) {
System.out.println("testing=" + strings[0]);
} else {
System.out.println("testing=" + strings.length);
}
}
/**
* 没有验证,仅示例加载配置文件
*
* @param srcPath
* @param destPath
* @throws Exception
*/
private void copyOrMoveFiles(Path srcPath, Path destPath) throws Exception {
Configuration conf = new Configuration();
conf.addResource("basedir/hdfs-site.xml");
conf.addResource("basedir/core-site.xml");
// FileSystem fs = FileSystem.newInstance(conf);
FileContext fc = FileContext.getFileContext(conf);
FileContext.Util util = fc.util();
util.copy(srcPath, destPath, false, false);
}
/**
* 遍历目录,IOUtils.copyBytes(),顺序复制,通过获取本地文件系统或者直接new FileInputStream()的方式实现 2种
* 沒有驗證,主要參考其流、conf的應用
*
* @param srcPath
* @param destPath
* @throws Exception
*/
private void upload(Path srcPath, Path destPath) throws Exception {
// 获取本地文件系统
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
if (localFs.isDirectory(srcPath)) {
RemoteIterator<LocatedFileStatus> lsfr = localFs.listFiles(srcPath, true);
while (lsfr.hasNext()) {
LocatedFileStatus next = lsfr.next();
FSDataInputStream fin = fileSystem.open(next.getPath());
FSDataOutputStream fout = fileSystem.create(new Path("/test/data/" + next.getPath().getName()));
IOUtils.copyBytes(fin, fout, 4096);
fin.close();
fout.close();
}
} else {
FSDataInputStream fin = fileSystem.open(srcPath);
// InputStream fin = new FileInputStream("D://a.txt");
FSDataOutputStream fout = fileSystem.create(destPath);
IOUtils.copyBytes(fin, fout, 4096);
fin.close();
fout.close();
}
}
private void upload(String srcPath, String destPath) throws Exception {
upload(new Path(srcPath), new Path(destPath));
}
/**
* 沒有驗證,主要參考其流、conf的應用
*
* @param srcPath
* @param destPath
* @param fs
* @param local
* @param fc
* @throws Exception
*/
private void downLoad(Path srcPath, Path destPath, FileSystem fs, FileSystem local, FileContext fc)
throws Exception {
// 2.字节流的复制
FileOutputStream localFileOut = new FileOutputStream("D:\\test\\a.txt");
FSDataInputStream remoteFin = fs.open(srcPath);
IOUtils.copyBytes(remoteFin, localFileOut, 2048);
remoteFin.close();
localFileOut.close();
//4.下载前做合并
LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
FSDataOutputStream fsdOut = localFs.create(destPath);
if (fs.isDirectory(srcPath)) {
RemoteIterator<LocatedFileStatus> lfsr = fs.listFiles(srcPath, true);
while (lfsr.hasNext()) {
LocatedFileStatus next = lfsr.next();
FSDataInputStream fdIn = fs.open(next.getPath());
IOUtils.copyBytes(fdIn, fsdOut, 2048);
fdIn.close();
}
fsdOut.close();
} else {
FSDataInputStream fsIn = fs.open(srcPath);
IOUtils.copyBytes(fsIn, fsdOut, 2048);
fsIn.close();
fsdOut.close();
}
}
}
四、高可用环境的操作类
该示例仅仅是示例高可用环境下的api访问,实际的类还是第三部分的。
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @author alanchan 测试高可用环境下
*/
public class HDFSHAUtilTest {
private HDFSUtil hdfsUtil;
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://HadoopHAcluster");
conf.set("dfs.nameservices", "HadoopHAcluster");
conf.set("dfs.ha.namenodes.HadoopHAcluster", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn1", "server1:8020");
conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn2", "server2:8020");
conf.set("dfs.client.failover.proxy.provider.HadoopHAcluster","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
// hdfsUtil = HDFSUtil.getHDFSUtilInstance();
hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf);
// hdfsUtil = HDFSUtil.getHDFSUtilInstance(strings);
// hdfsUtil = HDFSUtil.getHDFSUtilInstance(conf,strings);
}
@Test
public void testMkdir() {
try {
String path = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testExists() {
try {
String path = "/test/testsub";
assertTrue("创建目录成功", hdfsUtil.mkdir(path));
assertTrue("測試目錄是否存在成功", hdfsUtil.exists(path));
assertTrue("遞歸刪除目录成功", hdfsUtil.deleteHdfs(path, true));
} catch (Exception e) {
e.printStackTrace();
}
}
@After
public void close() {
hdfsUtil.close();
}
}