首页 > 其他分享 >CompletableFuture之批量上传

CompletableFuture之批量上传

时间:2023-06-30 16:45:47浏览次数:43  
标签:String 批量 UPLOAD 任务 CompletableFuture scheduleKey 上传

前言

最近接到一个需求,批量上传图片到服务器及实时更新上传进度。当处理大量文件上传任务时,效率是一个关键因素。传统的串行方式会导致任务耗时较长,而使用并发处理可以极大地提高上传效率。想到很久之前用CompletableFuture优化过一些多统计的业务场景,效果都还不错,因此在这里也使用它来优化一下上传的效率。

 

CompletableFuture简介

CompletableFuture类是Java 8引入的,它实现了FutureCompletionStage接口,提供了更强大和灵活的异步编程功能。CompletableFuture除了具有Future的特性外,还提供了更多的操作和组合方式来处理异步任务。它可以更方便地处理异步任务,实现并发编程,并提供更好的异常处理和结果转换机制。在进行异步编程时,CompletableFuture是一个更为强大和推荐的选择。

主要特点:

  1. 异步执行:允许将任务提交给后台线程,在任务执行期间不会阻塞主线程。这样可以提高应用程序的响应性能,特别是在处理I/O密集型操作时,如网络请求或数据库查询。

  2. 链式调用和组合操作:支持链式调用,可以将多个异步任务按照顺序连接起来形成一个任务流水线。每个任务的执行依赖于前一个任务的结果,这种串行的处理方式可以简化异步任务的编写和管理。

  3. 异常处理:提供了异常处理的机制,可以通过异常回调方法来捕获和处理任务执行过程中的异常情况。这样可以更好地控制和处理任务执行过程中的异常,提供更健壮的代码。

  4. 转换和合并结果:提供了一系列的转换和合并操作,可以对任务的结果进行映射、转换和合并。这样可以方便地对任务的结果进行处理和转换,得到最终期望的结果。

  5. 多任务并行执行:支持等待多个任务并行执行,并等待它们全部完成或任意一个完成。这种能力使得在处理并发任务时可以更好地利用系统资源,提高任务执行的效率。

 

串行和并行的效率对比

测试批量上传了1000张图片,每张图片在579KB,一共564MB。使用串行方式上传,总时长为501秒,使用并行方式上传,总时长是108秒,通过对比优化前后的代码,可以明显看出使用CompletableFuture并发处理方式的效率更高。由于任务是并行执行的,多核处理器的能力得到了充分的利用,从而大大提高了批量上传的速度。

 

串行处理方式

/**
 * describe: 批量上传图片
 *
 * @param files 图片文件集合
 * @param fileId 文件夹id
 * @param scheduleKey 上传进度key
 * @date 2023年06月28日 11:42:03
 * @author Tang
 */
@Override
public BatchUploadVO batchUpload2(MultipartFile[] files, Long fileId, String scheduleKey) {
	//取上传配置
	String jsonStr = CacheConfigure.getValue(CacheKeyConstant.IMG_RESOURCE_UPLOAD_CONFIG, String.class);
	ImgResourceUploadConfigDTO config = JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), ImgResourceUploadConfigDTO.class);
	List<String> imgTypeList = Arrays.asList(config.getImgType().split(","));

	List<String> errorNames = Lists.newCopyOnWriteArrayList();
	String userName = SecurityAuthorHolder.getSecurityUser().getUsername();

	for(MultipartFile file : files){
		try {
			RedisUtil.setInteger(CacheKeyConstant.UPLOAD_SCHEDULE_TOTAL + scheduleKey, files.length, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
			String suffix = Objects.requireNonNull(file.getOriginalFilename()).substring(file.getOriginalFilename().lastIndexOf(".") + 1);
			ServerException.Assert(!imgTypeList.contains(suffix), "文件格式不正确,支持" + String.join(",", imgTypeList));
			ServerException.Assert(file.getSize() > config.getMaxSize() * 1024, "文件最大不能超过" + config.getMaxSize() + "K");
			//上传
			ImgResourceEntity saveData = upload(file, config);
			saveData.setFileId(fileId);
			saveData.setCreator(userName);
			baseMapper.insert(saveData);
			//缓存自增  供轮询查询实时进度
			RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_SUCCESS + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
		} catch (Exception e) {
			errorNames.add(file.getOriginalFilename());
			RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_ERROR + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
		}
	}

	BatchUploadVO vo = schedule(scheduleKey);
	vo.setErrFileNames(errorNames);
	return vo;
}

  

串行处理调用时间

 

并行处理方式

/**
 * describe: 批量上传图片
 *
 * @param files 图片文件集合
 * @param fileId 文件夹id
 * @param scheduleKey 上传进度key
 * @date 2023年06月28日 11:42:03
 * @author Tang
 */
@Override
public BatchUploadVO batchUpload(MultipartFile[] files, Long fileId, String scheduleKey) {
	ExecutorService executor = Executors.newFixedThreadPool(10);

	//取上传配置
	String jsonStr = CacheConfigure.getValue(CacheKeyConstant.IMG_RESOURCE_UPLOAD_CONFIG, String.class);
	ImgResourceUploadConfigDTO config = JSONObject.toJavaObject(JSONObject.parseObject(jsonStr), ImgResourceUploadConfigDTO.class);
	List<String> imgTypeList = Arrays.asList(config.getImgType().split(","));

	List<String> errorNames = Lists.newCopyOnWriteArrayList();
	String userName = SecurityAuthorHolder.getSecurityUser().getUsername();

	CompletableFuture<Void> allFutures = CompletableFuture.allOf(
			Arrays.stream(files).map(v ->
					CompletableFuture.runAsync(
							() -> {
								try {
									RedisUtil.setInteger(CacheKeyConstant.UPLOAD_SCHEDULE_TOTAL + scheduleKey, files.length, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
									String suffix = Objects.requireNonNull(v.getOriginalFilename()).substring(v.getOriginalFilename().lastIndexOf(".") + 1);
									ServerException.Assert(!imgTypeList.contains(suffix), "文件格式不正确,支持" + String.join(",", imgTypeList));
									ServerException.Assert(v.getSize() > config.getMaxSize() * 1024, "文件最大不能超过" + config.getMaxSize() + "K");
									//上传
									ImgResourceEntity saveData = upload(v, config);
									saveData.setFileId(fileId);
									saveData.setCreator(userName);
									baseMapper.insert(saveData);
									//缓存自增  供轮询查询实时进度
									RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_SUCCESS + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
								} catch (Exception e) {
									errorNames.add(v.getOriginalFilename());
									RedisUtil.incrementValue(CacheKeyConstant.UPLOAD_SCHEDULE_ERROR + scheduleKey, CacheTimeConstant.BATCH_UPLOAD_EXPIRED_TIME);
								}
							}, executor)
			).toArray(CompletableFuture[]::new)
	);
	// 等待所有 CompletableFuture 完成
	allFutures.join();

	// 关闭线程池
	executor.shutdown();

	BatchUploadVO vo = schedule(scheduleKey);
	vo.setErrImgFileNames(errorNames);
	return vo;
}

  

 并行调用处理时间

 

实现过程中的注意事项

  1. 线程池的使用:为了实现并发处理,可以使用线程池来管理并执行异步任务。通过合理设置线程池的大小和参数,可以控制并发线程的数量和资源的利用率。

  2. 异常处理:在并发处理中,每个任务都是独立执行的,因此需要适当处理任务中可能出现的异常情况,避免异常的影响扩散。

  3. 进度更新:为了实时更新上传进度,可以将每个任务的进度信息保存到Redis中,并在前端通过轮询查询的方式获取最新的进度信息。

  4. 线程安全:确保上传逻辑的线程安全性,避免多线程环境下的竞态条件和数据一致性问题。

 

总结

使用CompletableFuture来优化批量上传任务是一种高效且灵活的方式。通过并发处理,我们可以充分利用多核处理器的能力,提高任务的执行效率。同时,通过实时更新上传进度并返回总体的上传结果,可以给用户更好的体验。
在实现过程中,我们需要合理使用线程池、处理异常、保证数据同步和线程安全,以确保上传任务的稳定性和性能。同时,我们还可以利用CompletableFuture提供的方法来处理任务的结果、异常和其他相关操作,以满足具体的业务需求。
通过使用CompletableFuture进行批量上传任务的优化,可以显著提高系统的性能和用户体验,适用于需要处理大量并发任务的场景。

 

标签:String,批量,UPLOAD,任务,CompletableFuture,scheduleKey,上传
From: https://www.cnblogs.com/-tang/p/17517164.html

相关文章

  • Nginx 报错 504 Gateway Time-out 和无法上传大于1M文件的解决方法
    Nginx报错504GatewayTime-out的解决方法修改nginx.conf配置文件。keepalive_timeout600;fastcgi_connect_timeout600;fastcgi_send_timeout600;fastcgi_read_timeout600;proxy_connect_timeout600;proxy_send_timeout600;proxy_read_timeou......
  • 前端 http大文件断点续传上传
    ​IE的自带下载功能中没有断点续传功能,要实现断点续传功能,需要用到HTTP协议中鲜为人知的几个响应头和请求头。 一. 两个必要响应头Accept-Ranges、ETag        客户端每次提交下载请求时,服务端都要添加这两个响应头,以保证客户端和服务端将此下载识别为可以断点续传......
  • 零代码编程:用ChatGPT来批量合并多个PDF文件
    一个文件夹里面有38个PDF文件,现在想合并成一个PDF文件。用ChatGPT可以非常简单的实现。在ChatGPT中输入提示词如下:这两个文件夹里面有多个PDF文件,写一段Python程序,将文件夹里面的PDF文件按照文件标题名合并成一个PDF文件,然后保存到文件夹中F:\BaiduNetdiskDownload\小兔兵兵第1季绘......
  • 解决minio使用nginx代理出现问题(上传文件可以,创建桶404)
    minio使用nginx代理出现问题(上传文件可以,创建桶失败)server{listen80;#error_page500502503504404/404.html;注释掉location/{proxy_set_headerX-Forwarded-Proto$scheme;proxy_set_headerX-Real-IP$remote_addr;......
  • WEB安全之:文件上传
    郑重声明:本笔记编写目的只用于安全知识提升,并与更多人共享安全知识,切勿使用笔记中的技术进行违法活动,利用笔记中的技术造成的后果与作者本人无关。倡导维护网络安全人人有责,共同维护网络文明和谐。目录WEB安全之:文件上传1文件上传过程1.1浏览器打开上传页面1.2用户提交上传......
  • windows上传app到构建版本的方法
    ios打包好ipa文件后,ipa文件需要上架到appstore,用户才能安装。而在appstore里,无法直接将ipa上传,需要使用工具上传,但是官方提供的工具,比如xcode等只能安装在苹果电脑上。我们这篇文章,重点将介绍如何使用windows电脑将ipa文件上传appstore的构建版本里和上架的基本流程。上架ipa......
  • php通过Curl给接口上传文件。
    在PHP中使用cURL上传文件至接口,你可以通过CURLOPT_POSTFIELDS选项来设置文件的内容。以下是一个示例:functionuploadFile($url,$filePath,$fieldName){$ch=curl_init($url);$postData=array($fieldName=>newCURLFile($filePath));c......
  • PHP文件上传封装
    classFileUploader{private$targetDirectory;private$allowedExtensions;private$maxFileSize;publicfunction__construct($targetDirectory,$allowedExtensions=array(),$maxFileSize=1048576){$this->targetDirectory=$ta......
  • wifi智能计量插座-10A 定时上传插座状态,电压,电流有功功率,视在功率,功率因数电量,温度 至
    wifi智能计量插座-10A定时上传插座状态,电压,电流有功功率,视在功率,功率因数电量,温度至MQTT服务器wifi智能计量插座-10A定时上传wifi智能计量插座-10A定时上传插座状态,电压,电流有功功率,视在功率,功率因数电量,温度至MQTT服务器插座状态,电压,电流有功功率,视在功率,功率因数电量,温度......
  • 在element-ui视频上传使用canvas截取视频帧数,并且转为视频封面。
    <el-upload:http-request="getFile"//自定义上传action:on-change="handleFileChange"//监听文件上传ref="upload"accept="video/*"//定义格式为视频><el-butto......