1、简介
分片上传通常被用于各种需要处理大文件的场景,如视频平台、云存储服务等。它不仅可以加速文件上传过程,还可以提高系统的稳定性和容错性。简单来说就是:
1)把大文件分割成多个小文件(比如每个5MB)
2)逐个/并发上传小文件
3)所有小文件都上传完后,将所有小文件合并成大文件
2、主流上传实现
当涉及到文件上传时,特别是对大文件进行上传,分片上传是一种常见的解决方案。BiliBili和腾讯云COS都采用了这种方法来优化文件上传的性能和稳定性,如下:
!
BiliBili在其上传请求中携带了start和end参数,这些参数指示了每个分片在整个文件中的位置。这样做的好处是可以精确地确定每个分片的范围,确保上传的完整性。
另一方面,腾讯云COS采用了partNumber参数,用于标识每个分片的序号。通过使用partNumber,可以确保分片的顺序正确,并且在合并分片时,可以根据partNumber进行正确的排序和组装。
还有常见的做法是将每片文件的start 和 end放到请求头的 Range上,进行请求。
由上可知,分片上传是处理大文件上传时的首要选择。它通过将文件分割成较小的部分,降低了单个请求的负担,提高了上传效率,并且在网络故障或中断时具有更好的容错性。接下来,我们将深入探讨分片上传的实现原理以及如何在实际应用中进行部署和优化。
3、实现原理
分片上传的实现原理是将大文件分割成多个较小的部分(通常称为分片),然后将这些分片分别上传到服务器上。一旦所有分片都上传完成,服务器端会将这些分片按照特定的顺序组合成完整的文件。这种方法的主要优点是可以降低单个请求的负载,提高上传速度,并且在网络中断或失败时可以更容易地恢复上传进度。
在实际应用中部署和优化分片上传涉及以下几个方面:
- 选择合适的分片大小: 分片大小的选择会影响上传的性能和效率。通常情况下,分片大小应该足够小,以避免单个分片过大导致的上传失败,同时又不要太小以减少上传请求的数量。根据文件大小和网络条件进行调整是很重要的。
- 实现断点续传功能: 断点续传功能可以在上传过程中出现网络中断或上传失败时,从上次中断的地方继续上传。这需要在客户端和服务器端都实现相应的逻辑,包括记录已上传的分片信息以及在恢复上传时正确处理已上传的分片。
- 并发上传和异步处理: 在大文件上传时,可以考虑并发上传多个分片以加快上传速度。此外,可以采用异步处理的方式,使得上传操作不会阻塞用户界面或其他重要的业务逻辑。
- 实现分片校验: 为了确保上传的完整性,可以在客户端和服务器端实现分片校验机制。这可以通过计算分片的哈希值或使用其他校验算法来实现。在上传完成后,服务器端可以验证所有分片的校验值以确保文件的完整性。
- 优化网络传输和服务器性能: 对于服务器端,可以优化网络传输和存储系统以提高分片上传的性能。这包括使用高性能的网络设备、优化服务器端的存储系统以及合理分配服务器资源等方面。
4、前端相关逻辑
当用户在前端选择要上传的文件后,浏览器会创建一个File对象来表示这个文件。File对象包含了文件的元数据(例如文件名、大小、类型等)以及文件的内容。在分片上传中,我们可以通过File对象提供的slice方法来截取文件的分片。
File对象的slice方法接受两个参数,即起始位置和结束位置,用于截取文件的指定部分。这样我们就可以将大文件分割成多个小片段,以便分片上传。
使用MD5算法给buffer加密,主要是因为MD5是一种不可逆的哈希算法。对于给定的输入数据,MD5算法会生成一个128位(16字节)的哈希值,通常表示为32个十六进制数字。这个哈希值具有以下特点:
- 唯一性:对于不同的输入数据,MD5算法生成的哈希值几乎是唯一的,即使输入数据只有微小的变化,生成的哈希值也会完全不同。
- 不可逆性:MD5算法是单向的,即无法通过哈希值逆推出原始数据。这意味着无法从MD5哈希值还原出原始文件内容,因此可以保证上传的文件内容不被泄露。
- 固定长度:MD5算法生成的哈希值长度固定为128位(16字节),无论输入数据的大小。
因此,在文件上传过程中,我们可以使用MD5算法对文件内容进行哈希运算,生成唯一的哈希值,并将这个哈希值作为文件的摘要信息一并上传到服务器端。服务器端可以使用相同的MD5算法对接收到的文件内容进行哈希运算,并与客户端上传的哈希值进行比对,以确保文件的完整性和准确性。
上传百分比这块的判断条件,需要前后端商量进行定义,保证最后一个上传的返回值和其他不一样就行。
<script setup lang="ts">
import { ref } from "vue"; // 引入vue中的ref函数,用于创建响应式数据
import { message } from "@/utils/message"; // 引入消息提示工具
import axios from "axios"; // 引入axios库,用于发送HTTP请求
import SparkMD5 from "spark-md5"; // 引入spark-md5库,用于计算文件的MD5哈希值
defineOptions({ // 定义组件的选项
name: "VideoUploadFragment" // 组件名字
});
// 是否上传的标志,初始值为true
const uploadFlag = ref(true);
// 上传视频接口地址,使用环境变量VITE_FILE_BASE_PATH拼接而成
const uploadUrl = ref(import.meta.env.VITE_FILE_BASE_PATH + "/file/upload");
const uploadByFragmentUrl = ref(
import.meta.env.VITE_FILE_BASE_PATH + "/file/fragment/upload"
);
// 存储地址,默认为"default"
const uploadType = ref("default");
// 视频上传百分比,初始值为0
const videoUploadPercent = ref(0);
// 通过分片上传文件的函数,接收data和file作为参数
const uploadFileByFragment = async ({ data, file }) => {
try {
uploadFlag.value = false; // 设置上传标志为false,表示正在上传中
// 如果文件小于等于5MB,直接上传
if (file.size <= 5 * 1024 * 1024) {
const formData = new FormData(); // 创建一个FormData对象,用于包装待上传的数据
for (const key in data) {
formData.append(key, data[key]); // 将data中的参数添加到formData中
}
formData.append("file", file); // 将文件添加到formData中
return await upload(uploadUrl.value, formData); // 调用upload函数上传文件
} else {
// 如果文件大于5MB,分片上传
data.file = file; // 将文件添加到data对象中
return await uploadByPieces(uploadByFragmentUrl.value, data); // 调用uploadByPieces函数分片上传文件
}
} catch (e) {
return e;
}
};
// 单个文件上传的函数,接收url和data作为参数
const upload = (url: string, data: FormData) => {
return new Promise((resolve, reject) => {
axios({
url,
method: "post",
data,
headers: {
"Content-Type": "multipart/form-data" // 设置请求头为multipart/form-data,用于上传文件
}
})
.then(res => {
videoUploadPercent.value = 100; // 将上传百分比设置为100
message("文件上传完成", { type: "success" }); // 提示上传完成
return resolve(res.data); // 返回上传成功的结果
})
.catch(err => {
return reject(err); // 返回上传失败的错误信息
});
});
};
// 分片上传文件的函数,接收url和data作为参数
const uploadByPieces = async (url: string, data) => {
const bytesPerPiece = 1024 * 1024 * 5; // 每片的大小为5MB
const totalPieces = Math.ceil(data.file.size / bytesPerPiece); // 计算文件切片总数
let md5String = ""; // 存储文件的MD5哈希值
// 异步函数,生成文件的MD5哈希值
const generateMD5 = async (): Promise<string> => {
const fileReader = new FileReader(); // 创建一个FileReader对象,用于读取文件内容
const file = data.file; // 获取文件对象
return new Promise((resolve, reject) => {
fileReader.onload = (event: ProgressEvent<FileReader>) => {
const arrayBuffer = event.target?.result as ArrayBuffer; // 获取文件内容的ArrayBuffer对象
const md5String = SparkMD5.ArrayBuffer.hash(arrayBuffer); // 使用spark-md5库计算文件的MD5哈希值
resolve(md5String); // 返回计算得到的MD5哈希值
};
fileReader.onerror = error => {
reject(error); // 返回错误信息
};
fileReader.readAsArrayBuffer(file); // 以ArrayBuffer的形式读取文件内容
});
};
md5String = await generateMD5(); // 调用异步函数,生成文件的MD5哈希值
console.log("MD5:", md5String); // 打印MD5哈希值
// 分片上传文件的函数,接收start和index作为参数
const uploadChunk = async (start: number, index: number) => {
if (start >= data.file.size) {
return; // 如果start大于等于文件大小,则退出函数
}
let end = start + bytesPerPiece; // 计算分片的结束位置
if (end > data.file.size) {
end = data.file.size; // 如果end大于文件大小,则将end设置为文件大小
}
const chunk = data.file.slice(start, end); // 截取文件的分片
const sliceIndex = index; // 分片索引
const formData = new FormData(); // 创建一个FormData对象,用于包装待上传的数据
formData.append("file", chunk); // 将文件分片添加到formData中
formData.append("type", uploadType.value); // 将上传类型添加到formData中
formData.append("fileName", data.file.name); // 将文件名添加到formData中
formData.append("sliceIndex", sliceIndex.toString()); // 将分片索引添加到formData中
formData.append("totalPieces", totalPieces.toString()); // 将切片总数添加到formData中
formData.append("md5", md5String); // 将文件的MD5哈希值添加到formData中
try {
const response = await axios.post(url, formData, { // 发送POST请求,上传文件分片
headers: {
"Content-Type": "multipart/form-data" // 设置请求头为multipart/form-data,用于上传文件
}
});
const data = response.data.data.index; // 获取上传结果中的分片索引
if (data === "fail" || data === "-2") {
console.error("上传失败:", data); // 如果上传失败,则打印错误信息
} else if (Number.parseInt(data) !== -1) {
const currentPercent = Number.parseInt( // 未上传完成、计算当前上传进度百分比
((index / totalPieces) * 100).toFixed(2)
);
console.log(currentPercent, "======"); // 打印当前上传进度百分比
if (videoUploadPercent.value < currentPercent) {
videoUploadPercent.value = currentPercent; // 更新上传百分比
}
} else {
videoUploadPercent.value = 100; // 如果上传完成,则将上传百分比设置为100
message("文件上传完成", { type: "success" }); // 提示文件上传完成
}
} catch (error) {
console.error("上传失败:", error); // 打印上传失败的错误信息
message("上传失败", { type: "error" }); // 提示上传失败
}
};
// 创建并发上传任务
const maxConcurrentUploads = 10; // 最大并发上传数量
const uploadTasks = [];
let currentIndex = 0;
// 同时提交最大并发上传数量的任务
async function submitNextUploadTasks() {
while (
currentIndex < totalPieces - 1 &&
uploadTasks.length < maxConcurrentUploads
) {
const start = currentIndex * bytesPerPiece; // 计算分片的起始位置
uploadTasks.push(uploadChunk(start, currentIndex)); // 将分片上传任务添加到uploadTasks数组中
currentIndex++; // 更新currentIndex的值
}
}
// 等待所有上传任务完成
while (currentIndex < totalPieces - 1 || uploadTasks.length > 0) {
// 提交下一批上传任务
await submitNextUploadTasks();
// 等待当前并发上传任务完成
await Promise.all(uploadTasks);
// 清空已完成的上传任务
uploadTasks.length = 0;
}
// 单独执行最后一个上传任务
const lastIndex = totalPieces - 1;
const lastStart = lastIndex * bytesPerPiece;
await uploadChunk(lastStart, lastIndex);
};
// 刷新页面的函数
const reloadPage = () => {
location.reload(); // 刷新页面
};
</script>
<template>
<div
class="w-[100% -48px] h-[600px] bg-white flex flex-col justify-center items-center gap-y-5"
>
<el-upload
:drag="uploadFlag" // 设置是否支持拖拽上传
:http-request="uploadFileByFragment" // 设置上传文件的函数
:show-file-list="false" // 设置是否显示文件列表
:data="{ type: uploadType }" // 设置上传时附带的额外参数
:disabled="!uploadFlag" // 设置是否禁用上传按钮
multiple // 设置是否支持多文件上传
class="w-1/2"
>
<div
:class="{
['bg-[#F0F8FFFF] py-[5.5rem] px-[0.625Srem]']: !uploadFlag,
['w-full flex flex-col justify-center items-center gap-y-5 py-10 rounded-md']: true
}"
>
<IconifyIconOnline
v-if="uploadFlag" // 根据上传标志动态显示上传图标
icon="ep:upload-filled" // 设置上传图标的图标名称
class="w-16 h-16 text-gray-200"
/>
<div v-if="uploadFlag" class="el-upload__text">
将文件拖到此处,或<em>点击上传</em> // 根据上传标志动态显示上传提示文本
</div>
<el-progress
v-if="!uploadFlag" // 根据上传标志动态显示上传进度条
type="circle" // 设置进度条的类型为圆形
:percentage="videoUploadPercent" // 设置上传进度百分比
class=""
/>
<div class="el-upload__tip">只能上传mp4/flv/avi文件,且不超过1000M</div> // 显示上传文件类型和大小限制的提示文本
</div>
</el-upload>
<el-button
v-if="!uploadFlag" // 根据上传标志动态显示返回按钮
class="w-1/2 !border-dashed" // 设置返回按钮的样式
@click="reloadPage" // 点击返回按钮时触发reloadPage函数
>返回
</el-button>
</div>
</template>
<style scoped lang="scss">
:deep(.el-upload-dragger) {
background-color: aliceblue; // 设置拖拽区域的背景颜色
}
:deep(.el-upload) {
width: 100%; // 设置上传组件的宽度为100%
}
</style>
5、后端相关逻辑
Minio相关接口:
在上传文件的过程中,通过Minio SDK的putObject方法将文件分片上传到Minio存储桶中。
在文件上传完成后,通过Minio SDK的composeObject方法将所有分片文件按顺序组合为一个完整的文件,并且删除所有分片文件,节省了存储空间和维护成本。
分片大小限制:
Minio的分片上传功能要求每个分片的大小不得小于5MB(除了最后一个分片),以保证分片上传和合并操作的正常进行。
校验文件完整性:
我们可以通过传递的MD5和合并以后计算的MD5进行比较,如果相同,证明文件完整,否则说明文件缺失上传失败。当文件较大时,MD5的计算是比较慢的,这个可以查找一些更加高效的计算方式。
秒传:
文件上传完成后,我们可以将相关信息保存数据库中,最好也将MD5进行保存。
因为我们可以在下次时上传时,检查数据库是否有存有对应MD5的文件,如果有我们可以直接使用该文件,这样就完成了秒传。
断点续传:
在上传过程中出现网络中断或上传失败时,我们可以利用Minio的方法查询是否存在相应的MD5目录,如果存在,可以通过总分片参数以及查询目录下所有的分片索引,来判断还有哪些没上传,返回相应索引继续上传,直至完成。
@Operation(description = "上传文件(分片)")
@PostMapping("/fragment/upload")
public ResponseDTO<SysFile> uploadByFragment(MultipartFile file, @RequestParam("type") String type,
@RequestParam("fileName") String fileName, @RequestParam("md5") String md5,
@RequestParam("sliceIndex") String sliceIndex, @RequestParam("totalPieces") String totalPieces) {
return ResponseDTO.success(fileService.uploadByFragment(file, fileName, type, md5, Integer.valueOf(sliceIndex), Integer.valueOf(totalPieces)));
}
@Override
public SysFile uploadByFragment(MultipartFile file, String originalFilename, String type, String md5, Integer sliceIndex, Integer totalPieces) {
log.info("UploadByFragment --- 上传文件的md5: {}", md5);
// 。。。。。。文件后缀名、文件大小、上传类型的校验,自定义 。。。。。。
try {
// 调用分片上传的逻辑
int index = minioUtil.uploadFileByFragment(file, sliceIndex, totalPieces, md5, minioProperties.getBucketName());
// 我这里返回的Index是根据已上传的分片索引和所有分片索引比较,如果全部上传,就返回-1
// 如果分片还未全部上传,并且不包含当前索引,那就返回未上传分片索引的任意一个(大于0),让前端进行后续分片请求;
// 如果包含当前索引,上传Minio,如果当前索引大于总分片-1,也返回-1,否则返回当前索引+1
if (index == -1) {
// 拼接上传路径
String fileName = System.currentTimeMillis() + "_" + originalFilename;
String filePath = getDatePath() + fileTypeEnum.getPath() + StrPool.SLASH + fileName;
// 调用分片合并的方法(合并后删除所有分片,成功返回-1,否则-2,自己定义这个值)
if (Objects.equals(minioUtil.mergeFragmentFile(filePath, totalPieces, md5, minioProperties.getBucketName()), "-1")) {
filePath = StrPool.SLASH + minioProperties.getBucketName() + StrPool.SLASH + filePath;
sysFile.setFilePath(filePath);
sysFile.setCreateTime(LocalDateTime.now());
String bucketName = extractBucketName(filePath);
filePath = filePath.replaceFirst(bucketName, "");
StatObjectResponse statObjectResponse = minioUtil.getObjectInfo(minioProperties.getBucketName(), filePath);
sysFile.setFileSize(String.valueOf(statObjectResponse.size()));
this.save(sysFile);
// 返回的东西根据自己需求定义
return sysFile;
} else {
sysFile.setIndex("-2");
}
}
} catch (Exception e) {
throw new ApiException(e.getMessage(), e);
}
return sysFile;
}
/**
* 上传
*
* @param file 文件分片
* @param sliceIndex 分片索引
* @param totalPieces 切片总数
* @param md5 整体文件MD5
* @param bucketName 存储桶名称
* @return 返回需要上传的文件序号,-1是上传完成
*/
public int uploadFileByFragment(MultipartFile file, Integer sliceIndex, Integer totalPieces, String md5, String bucketName) throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
// 判断存储桶是否存在 不存在则创建
createBucket(bucketName);
// 检查还需要上传的文件序号(校验如果有问题,可以先去除一下,并发可能有问题)
Iterable<Result<Item>> results = client.listObjects(
ListObjectsArgs.builder().bucket(bucketName).prefix(md5.concat("/")).build());
Set<String> objectNames = Sets.newHashSet();
for (Result<Item> item : results) {
objectNames.add(item.get().objectName());
}
List<Integer> indexList = Stream.iterate(0, i -> ++i)
.limit(totalPieces)
.filter(i -> !objectNames.contains(md5.concat("/").concat(Integer.toString(i))))
.sorted()
.toList();
// 返回需要上传的文件序号,-1是上传完成
if (!indexList.isEmpty()) {
if (!indexList.contains(sliceIndex)) {
return indexList.get(0);
}
} else {
return -1;
}
// 写入文件
client.putObject(
PutObjectArgs.builder()
.bucket(bucketName)
// 使用 md5 + 特殊字符 + 索引值 作为分片名称
.object(md5.concat("/").concat(Integer.toString(sliceIndex)))
.stream(file.getInputStream(), file.getSize(), -1)
.contentType(file.getContentType())
.build());
if (sliceIndex < totalPieces - 1) {
return ++sliceIndex;
} else {
return -1;
}
}
/**
* 此方法将多个文件分片合并为一个完整的文件,并验证合并后的文件的 MD5 值。
*
* @param fileName 合并后文件的名称。
* @param totalPieces 分片总数。
* @param md5 合并文件的 MD5 值,用于验证文件完整性。
* @param bucketName 存储桶名称
* @return -2 md5不匹配
* @throws Exception 如果在合并过程中发生任何错误,则抛出此异常。
*/
public String mergeFragmentFile(String fileName, Integer totalPieces, String md5, String bucketName) throws Exception {
// 完成上传从缓存目录合并迁移到正式目录
List<ComposeSource> sourceObjectList = Stream.iterate(0, i -> ++i)
.limit(totalPieces)
.map(i -> ComposeSource.builder()
.bucket(bucketName)
.object(md5.concat("/").concat(Integer.toString(i)))
.build())
.toList();
// 判断存储桶是否存在 不存在则创建
createBucket(bucketName);
// 多个文件分片组合为一个文件(ObjectWriteResponse response = composeObject(minioProperties.getBucketName(), fileName, sourceObjectList);)
composeObject(bucketName, fileName, sourceObjectList);
// 删除所有的分片文件
List<DeleteObject> delObjects = Stream.iterate(0, i -> ++i)
.limit(totalPieces)
.map(i -> new DeleteObject(md5.concat("/").concat(Integer.toString(i))))
.toList();
Iterable<Result<DeleteError>> results = removeObjects(bucketName, delObjects);
for (Result<DeleteError> result : results) {
DeleteError error = result.get();
log.error("Error in deleting object {}; {}", error.objectName(), error.message());
}
/**
* 验证md5 (太慢了) <p>try (InputStream stream = getObject(response.bucket(), response.object())) {String md5Hex = DigestUtils.md5Hex(stream);if (!md5Hex.equals(md5)) {log.error("分片合并时MD5验证失败");return "-2";}}</p>
*/
log.info("文件: {} 分片合并完成", fileName);
return "-1";
}
6、其他
如果不使用Minio进行存储,其实就是多了几步对流的处理,这个可以通过其他方法进行(不再阐述)。
标签:Minio,文件,const,SpringBoot,分片,md5,上传,MD5 From: https://www.cnblogs.com/huangrx/p/18241521