Java HDFS上传下载文件测试
HDFS引用
maven pom.xml加入引用
<!--3.1.2-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0</version>
</dependency>
参数配置
server:
port: 8080
#注意这个设置会引发 重新加定义的
servlet:
context-path: /file
spring:
application:
name: file-server
thymeleaf:
prefix: classpath:/templates/
suffix: .html
mode: HTML
encoding: utf-8
servlet:
content-type: text/html
cache: false
servlet:
multipart:
#文件上传大小设置 最大测试
max-file-size: 6144MB
max-request-size: 6144MB
##HDFS文件系统服务器的地址以及端口
hdfs:
#cdh 测试环境 hadoop hdfs CDH6.3 用户名: hdfs
user: hdfs
path: hdfs://192.168.1.15:8020
代码
controller
import com.x.file.service.HdfsFileService;
import com.x.file.utils.HDFSUtils;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.FileInputStream;
import java.io.OutputStream;
/**
* HDFS目录文件控制器
*
* @author
* @create
*/
@RestController
@RequestMapping("/hdfs")
@Slf4j
public class HdfsFileController {
@Autowired
HdfsFileService hdfsFileService;
//上传
@RequestMapping(value = "upFileSaveToHdfs", method = RequestMethod.POST,
consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
@ResponseBody
public JSONObject upFileSaveToHdfs(@RequestPart(name = "upFile") MultipartFile upFile) throws Exception {
JSONObject json = new JSONObject();
json.put("code", "9999");
json.put("message", "失败");
if (upFile.getSize() > 0) {
try {
//String path = request.getRealPath("/");
//System.out.println("应用服务器临时文件存储路径:" + path);
// 得到文件的原始名称,如:png
String fileName = upFile.getOriginalFilename();
System.out.println("上传文件名:" + fileName);
// 自定义文件名
String uuid = UUID.randomUUID().toString().replace("-", "");
// 文件后缀
String suffix = fileName.substring(fileName.lastIndexOf(".") + 1);
System.out.println("文件后缀:" + suffix);
// 完整的应用服务器 临时文件名
//String tmpFileName = path + uuid + '.' + suffix;
// 存于hdfs的文件名
String tmpHdfsFileName = "up_" + uuid + '.' + suffix;
// 测试上传centos7的安装文件到hdfs再下载下来并进行测试安装都是成功的!!!!
// 测试大文件??超过1G试验 612MB,在配置里配大小, 超过大小直接进入不了这里
// the request was rejected because its size (642384174) exceeds the configured
// maximum (10485760)
// 通过文件的原始名称,可以对上传文件类型做简单限制,如:只能上传jpg和png的图片文件
//大小写
if (fileName.toLowerCase().endsWith("pdf")
|| fileName.toLowerCase().endsWith("ofd")
|| fileName.toLowerCase().endsWith("iso")
|| fileName.toLowerCase().endsWith("rar")
|| fileName.toLowerCase().endsWith("zip")
|| fileName.toLowerCase().endsWith("jpg")
|| fileName.toLowerCase().endsWith("png")
|| fileName.toLowerCase().endsWith("txt")) {
// 多件文件参数:@RequestParam(name = "filename") MultipartFile[] upfile
// for (int i = 0; i < upfile.length; i++) {
// InputStream inputStream = upfile[i].getInputStream();
// }
System.out.println("存入hadoop hdfs");
String newSaveFileName="/up/"+tmpHdfsFileName;
if( hdfsFileService.saveFile(newSaveFileName,upFile)){
json.put("code", "0000");
json.put("message", "写入HDFS成功。");
}else {
json.put("code", "0002");
json.put("message", "调用服务写入HDFS失败。");
}
}else{
json.put("code", "0001");
json.put("message", "文件类型有误");
}
} catch (Exception e) {
json.put("code", "0003");
json.put("message", "上传文件失败");
e.printStackTrace();
}
}
return json;
}
//下载
@RequestMapping(value = "getFileByHdfs",method = RequestMethod.GET)
protected void getFileByHdfs(@RequestParam("fileName") String fileName, HttpServletRequest request, HttpServletResponse response)
throws ServletException, Exception {
request.setCharacterEncoding("utf-8");
response.setCharacterEncoding("utf-8");
System.out.println("开始下载文件");
System.out.println("想要下载的hadoop hdfs文件:" + fileName);
// 获得想要下载文件的名称
String tmpFileName = fileName.substring(fileName.lastIndexOf("/") + 1);
System.out.println("下载文件名:" + tmpFileName);
String suffix = fileName.substring(fileName.lastIndexOf(".") + 1);
System.out.println("下载文件后缀:" + tmpFileName);
// request.getParameter("filename");
// 要下载的文件类型---客户端通过MIME来区分文件类型
// response.setContentType(this.getServletContext().getMimeType(filename));
// response.setContentType("multipart/form-data");
// 告诉客户端不是直接解析,而是下载下来
// response.setHeader("Content-Disposition","attachment;filename+"+filename);
FileInputStream fis = null;
OutputStream os = null;
try {
if (StringUtils.isNotBlank(fileName)) {
if( HDFSUtils.resourceIsExist(fileName)) {
response.setContentType("application/octet-stream");//
response.setHeader("content-type", "application/octet-stream");
// response.setHeader("Content-Disposition", "attachment;fileName=" + filename);// 设置文件名
response.setHeader("Content-Disposition", "attachment;fileName=" + tmpFileName);// 设置文件名 客户端提示下载
hdfsFileService.getFile(fileName,response);
}else{
response.getWriter().println("");
}
}
System.out.println("完成了下载文件" + fileName);
} catch (Exception e) {
response.getWriter().println("");
e.printStackTrace();
}
}
}
service
接口定义
package com.x.file.service;
import com.alibaba.fastjson.JSONObject;
import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
public interface HdfsFileService {
void getFile(String getFileName, HttpServletResponse response) throws Exception;
Boolean saveFile(String saveFileName,MultipartFile upfile) throws IOException;
}
接口实现
package com.x.file.serviceImpl;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ynbwjf.file.utils.HDFSUtils;
import com.ynbwjf.file.utils.HttpUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.ynbwjf.file.service.HdfsFileService;
import org.springframework.web.multipart.MultipartFile;
import javax.servlet.http.HttpServletResponse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
public class HdfsFileServiceImpl implements HdfsFileService {
public void getFile(String getFileName, HttpServletResponse response) throws Exception {
HDFSUtils.getFile(getFileName, response);
}
public Boolean saveFile(String saveFileName, MultipartFile upfile) {
try {
InputStream inputStream = upfile.getInputStream();
if (inputStream != null) {
if (HDFSUtils.saveFile(saveFileName, inputStream)) {
return true;
}
} else {
return false;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}
HDFSUtils
package com.x.file.utils;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.net.URI;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* HDFS目录和文件操作工具类
*
* @author
* @create
*/
@Component
@Slf4j
public class HDFSUtils {
// HDFS文件系统服务器的地址以及端口
private static String HDFS_PATH;
// HDFS文件系统服务器的地址以及端口
private static String HDFS_USER;
// HDFS文件系统的操作对象
private static FileSystem fileSystem = null;
// 配置对象
private static Configuration configuration = null;
@Value("${hdfs.path}")
private String hdfsPath;
@Value("${hdfs.user}")
private String hdfsUser;
@PostConstruct
public void init() {
HDFS_PATH = hdfsPath;
HDFS_USER = hdfsUser;
}
/**
* 初始化configuration和fileSystem
*
* @throws Exception
*/
private static void setUp() throws Exception {
configuration = new Configuration();
// 第一参数是服务器的URI,第二个参数是配置对象,第三个参数是文件系统的用户名
fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER);
}
/**
* 释放资源
*
* @throws Exception
*/
private static void tearDown() throws Exception {
configuration = null;
fileSystem.close();
}
/**
* 检查文件或目录是否存在
*
* @param name 文件或目录名称
* @return
* @throws IOException
*/
public static boolean resourceIsExist(String name) throws Exception {
setUp();
return fileSystem.exists(new Path(name));
}
/**
* 获取当前登录用户的在HDFS文件系统中的家目录
*
* @return
* @throws Exception
*/
public static Path getHomeDirectory() throws Exception {
setUp();
return fileSystem.getHomeDirectory();
}
/**
* 判断某个目录名称是否是真实目录
*
* @param directorName 目录名称
* @return
* @throws Exception
*/
public static boolean isDirector(String directorName) throws Exception {
setUp();
return fileSystem.isDirectory(new Path(directorName));
}
/**
* 在HDFS根目录下递归创建目录
*
* @param directorName 完整的目录路径
* @throws Exception
*/
public static boolean mkDirs(String directorName) throws Exception {
if (resourceIsExist(directorName)) {
throw new Exception("要创建的目录已存在!");
}
return fileSystem.mkdirs(new Path(directorName));
}
/**
* 递归删除HDFS根目录下的目录或文件
*
* @param name 完整的目录或文件路径
* @param isRecursionDel 是否递归删除
* @throws Exception
*/
public static boolean delResources(String name, Boolean isRecursionDel) throws Exception {
if (!resourceIsExist(name)) {
throw new Exception("要删除的目录或文件不存在!");
}
// 第二个参数指定是否要递归删除,false=否,true=是
return fileSystem.delete(new Path(name), isRecursionDel);
}
/**
* 给目录或文件重命名
*
* @param oldName 旧目录或文件名称
* @param newName 新目录或文件名称
* @throws Exception
*/
public static boolean renameResource(String oldName, String newName) throws Exception {
if (!resourceIsExist(oldName)) {
throw new Exception("要重命名的目录或文件不存在!");
}
return fileSystem.rename(new Path(oldName), new Path(newName));
}
/**
* 判断某个文件名称是否是真实文件
*
* @param fileName 文件名称(包含文件路径)
* @return
* @throws Exception
*/
public static boolean isFile(String fileName) throws Exception {
setUp();
return fileSystem.isFile(new Path(fileName));
}
/**
* 查看某个文件的内容
*
* @param fileName 文件名称
* @throws Exception
*/
public static void cat(String fileName) throws Exception {
if (!resourceIsExist(fileName)) {
throw new Exception("要查看的文件不存在!");
}
//读取文件
FSDataInputStream in = fileSystem.open(new Path(fileName));
// 将文件内容输出到控制台上,第三个参数表示输出多少字节的内容
IOUtils.copyBytes(in, System.out, 1024);
System.out.println("\n");
IOUtils.closeStream(in);
}
/**
* 创建文件并写入内容
*
* @param fileName 文件名称
* @param fileContent 文件内容
* @throws Exception
*/
public static void create(String fileName, String fileContent) throws Exception {
if (resourceIsExist(fileName) || isFile(fileName)) {
throw new Exception("要创建的文件已存在!");
}
// 创建文件,第二个参数表示是否重写。
FSDataOutputStream outputStream = fileSystem.create(new Path(fileName), false);
// 写入文件内容
outputStream.write(fileContent.getBytes());
outputStream.flush();
IOUtils.closeStream(outputStream);
}
/**
* 上传本地文件到HDFS文件夹中
*
* @param localFilePath 本地文件路径
* @param targetDirector 目标文件夹
* @throws Exception
*/
public static void copyFromLocalFileToDirector(String localFilePath, String targetDirector) throws Exception {
System.out.println("copyFromLocalFile-localFilePath:"+localFilePath);
System.out.println("copyFromLocalFile-targetDirector:"+targetDirector);
// 如果目标文件夹不存在则创建
if (!resourceIsExist(targetDirector)) {
fileSystem.mkdirs(new Path(targetDirector));
}
// System.out.println("文件分隔符"+File.separator); //文件分隔符\
String fileName = localFilePath.substring(localFilePath.lastIndexOf(File.separator), localFilePath.length());
if (resourceIsExist(targetDirector + File.separator + fileName)) {
throw new Exception("目标目录下已存在同名文件!");
}
Path localPath = new Path(localFilePath);
Path hdfsPath = new Path(targetDirector);
//这种方式能进行副本数设置吗
// 第一个参数是本地文件的路径,第二个则是HDFS的路径,注意第二参数为路径+文件名时会造成只创建一个文件名,大小为0的,不报错误
fileSystem.copyFromLocalFile(localPath, hdfsPath);
//hdfsPath 是个目录,这个操作无效
//设置副本数
//short count=1;
//fileSystem.setReplication(hdfsPath, count);
//System.out.println("copyFromLocalFile设置副本数为:"+count);
}
/**
* 上传本地文件到HDFS 这里会设置副本数为1
*
* @param localFilePath 本地文件路径
* @param targetDirector 目标文件夹
* @param targetFileName 目标文件名
* @throws Exception
*/
public static void copyFromLocalFileName(String localFilePath, String targetDirector,String targetFileName) throws Exception {
System.out.println("copyFromLocalFileTest-localFilePath:"+localFilePath);
System.out.println("copyFromLocalFileTest-targetDirector:"+targetDirector);
System.out.println("copyFromLocalFileTest-targetFileName:"+targetFileName);
// 如果目标文件夹不存在则创建 会不会这里造成的错误??? 把文件名当目录事先创建占用了???????? 这里分开看看
if (!resourceIsExist(targetDirector)) {
fileSystem.mkdirs(new Path(targetDirector));
}
// System.out.println("文件分隔符"+File.separator); //文件分隔符\
String fileName = localFilePath.substring(localFilePath.lastIndexOf(File.separator), localFilePath.length());
if (resourceIsExist(targetDirector + File.separator + fileName)) {
throw new Exception("目标目录下已存在同名文件!");
}
Path localPath = new Path(localFilePath);
Path hdfsPath = new Path(targetDirector+targetFileName);
//这种方式能进行副本数设置吗
// 第一个参数是本地文件的路径,第二个则是HDFS的路径,注意第二参数为路径+文件名时会造成只创建一个文件名,大小为0的,不报错误
fileSystem.copyFromLocalFile(localPath, hdfsPath);
//hdfsPath 是个目录,这个操作无效
//设置副本数
//short count=1;
short count=2;
fileSystem.setReplication(hdfsPath, count);
System.out.println("copyFromLocalFile设置副本数为:"+count);
}
/**
* 上传本地大体加文件到HDFS,并显示进度条。
*
* @param localFilePath 本地文件路径
* @param targetDirector 目标文件夹
* @throws Exception
*/
public static void copyFromLocalFileWithProgress(String localFilePath, String targetDirector) throws Exception {
// 如果目标文件夹不存在则创建
if (!resourceIsExist(targetDirector)) {
fileSystem.mkdirs(new Path(targetDirector));
}
String fileName = localFilePath.substring(localFilePath.lastIndexOf(File.separator), localFilePath.length());
if (resourceIsExist(targetDirector + File.separator + fileName)) {
throw new Exception("目标目录下已存在同名文件!");
}
final float fileSize = new File(localFilePath).length() / 65536;
InputStream in = new BufferedInputStream(new FileInputStream(new File(localFilePath)));
FSDataOutputStream outputStream = fileSystem.create(new Path(targetDirector + File.separator + fileName),
new Progressable() {
long fileCount = 0;
@Override
public void progress() {
fileCount++;
System.out.println("总进度:" + (fileCount / fileSize) * 100 + "%");
}
});
IOUtils.copyBytes(in, outputStream, 4096);
IOUtils.closeStream(in);
IOUtils.closeStream(outputStream);
}
/**
* 把HDFS的文件下载到本地
*
* @param fileName HDFS上的文件名称(包含文件路径)
* @param targetDirector 目标存储路径
* @throws Exception
*/
public static void copyToLocalFile(String fileName, String targetDirector) throws Exception {
if (!resourceIsExist(fileName)) {
throw new Exception("要下载的文件不存在!");
}
// 如果目标目录不存在则创建
File file = new File(targetDirector);
if (!file.exists()) {
file.mkdirs();
}
String fileNameStr = fileName.substring(fileName.lastIndexOf("/"), fileName.length());
OutputStream outputStream = new FileOutputStream(new File(targetDirector + File.separator + fileNameStr));
FSDataInputStream in = fileSystem.open(new Path(fileName));
IOUtils.copyBytes(in, outputStream, 1024);
IOUtils.closeStream(in);
IOUtils.closeStream(outputStream);
}
/**
* 上传文件
*
* @param targetDirector 目标文件夹
* @param files 文件信息
*/
public static void copyFromLocalFileWithProgress(String targetDirector, List<MultipartFile> files)
throws Exception {
// 如果目标文件夹不存在则创建
if (!resourceIsExist(targetDirector)) {
fileSystem.mkdirs(new Path(targetDirector));
}
if (files != null && files.size() > 0) {
InputStream in = null;
FSDataOutputStream outputStream = null;
for (MultipartFile file : files) {
String fileName = file.getOriginalFilename();
if (resourceIsExist(targetDirector + File.separator + fileName)) {
throw new Exception("目标目录下已存在同名文件!");
}
final float fileSize = file.getSize() / 65536;
in = new BufferedInputStream(file.getInputStream());
outputStream = fileSystem.create(new Path(targetDirector + File.separator + fileName),
new Progressable() {
long fileCount = 0;
@Override
public void progress() {
fileCount++;
System.out.println("总进度:" + (fileCount / fileSize) * 100 + "%");
}
});
IOUtils.copyBytes(in, outputStream, 4096);
}
IOUtils.closeStream(in);
IOUtils.closeStream(outputStream);
}
}
/**
* 设置HDFS资源副本系数
*
* @param resourceName 资源名称
* @param count 副本系数
* @throws Exception
*/
public static void setReplication(String resourceName, short count) throws Exception {
if (!resourceIsExist(resourceName)) {
throw new Exception("要设置HDFS副本系数的资源不存在!");
}
fileSystem.setReplication(new Path(resourceName), count);
}
/**
* 将相对路径转化为HDFS文件路径
*
* @author adminstrator
* @since 1.0.0
* @param dstPath 相对路径,比如:/data
* @return java.lang.String
*/
private static String generateHdfsPath(String dstPath) {
String hdfsPath = HDFS_PATH;// defaultHdfsUri;
if (dstPath.startsWith("/")) {
hdfsPath += dstPath;
} else {
hdfsPath = hdfsPath + "/" + dstPath;
}
return hdfsPath;
}
// 这个能不能直接将流发给客户端???
/**
* 打开HDFS上面的文件并返回byte数组,方便Web端下载文件
* <p>
* new ResponseEntity<byte[]>(byte数组, headers, HttpStatus.CREATED);
* </p>
* <p>
* 或者:new ResponseEntity<byte[]>(FileUtils.readFileToByteArray(templateFile),
* headers, HttpStatus.CREATED);
* </p>
*
* @author adminstrator
* @since 1.0.0
* @param path HDFS的相对目录路径,比如:/testDir/b.txt
* @return FSDataInputStream
*/
public static byte[] openWithBytes(String path) {
// HDFS文件路径
// Path hdfsPath = new Path(generateHdfsPath(path));
String path1 = "/dir/text1.txt";
Path hdfsPath = new Path(path1);
FileSystem fileSystem = null;
FileInputStream inputStream = null;
try {
// FSDataInputStream inputStream = new FileInputStream(path1);
// fileSystem = getFileSystem();
System.out.println("测试path=" + path1);
// inputStream = fileSystem.open(hdfsPath);
// fileSystem.open(f)
System.out.println("测试读取完成");
// import org.apache.hadoop.io.IOUtils;
// import org.apache.commons.io.IOUtils;
// 注意这里冲突,需使用...
// return org.apache.commons.io.IOUtils.toByteArray(inputStream);
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
// ignore
}
}
} catch (Exception e) {
System.out.println("path文件读取失败");
e.printStackTrace();
System.out.println("=============");
// logger.error(MessageFormat.format("打开HDFS上面的文件失败,path:{0}",path),e);
} finally {
}
return null;
}
//在hdfs nameNode服务上是比较快的
//在非hdfs nameNode 感觉不出来
//测试使用 静态变量fileSystem
public static Boolean saveFile(String newFile, InputStream is) throws IOException, InterruptedException {
try {
System.out.println("直接写入hdfs文件名:" + newFile);
if (null == is) {
System.out.println("saveFile取得InputStream 为空.");
return false;
}
if (null==fileSystem){
setUp();
}
if (null!=fileSystem) {
short count = 1;
String newPathFile1 = HDFS_PATH + newFile;
System.out.println("test将存入hdfs文件:" + newFile);
FSDataOutputStream outputStream = fileSystem.create(new Path(newPathFile1), count);
IOUtils.copyBytes(is, outputStream, 1024);
System.out.println("\n");
IOUtils.closeStream(is);
IOUtils.closeStream(outputStream);
return true;
}
}catch(Exception e){
e.printStackTrace();
}
return false;
}
/**
* 下载某个文件 不产生临时文件 但文件会被修改的
*
* @param fileName 文件名称
* @throws Exception
*/
public static void getFile(String fileName, HttpServletResponse response)
throws Exception {
if (!resourceIsExist(fileName)) {
throw new Exception("要查看的文件不存在!");
}
// 读取文件
FSDataInputStream in = fileSystem.open(new Path(fileName));
// 将文件内容输出到控制台上,第三个参数表示输出多少字节的内容
IOUtils.copyBytes(in, response.getOutputStream(), 1024);
System.out.println("\n");
IOUtils.closeStream(in);
}
}
html 测试上传
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>上传文件</title>
</head>
<body>
<div align="center">
<h1>测试文件上传直接存入hdfs</h1>
<form action="./hdfs/upFileSaveToHdfs" method="post" enctype="multipart/form-data">
<input type="file" name="upFile"><br>
<input type="submit" value="上传文件直接存入hdfs">
</form>
</div>
</body>
</html>
标签:HDFS,Exception,Java,String,上传下载,fileName,new,import,throws
From: https://blog.51cto.com/u_12668715/6982243