异步Servlet 和disruptor的融合构建文件中心
通过请求异步化带来其他明显优点:
- 可以处理更高并发连接数,提高系统整体吞吐量
- 请求解析与业务处理完全分离,职责单一
- 自定义业务线程池,我们可以更容易对其监控,降级等处理
- 可以根据不同业务,自定义不同线程池,相互隔离,不用互相影响
所以具体使用过程,我们还需要进行的相应的压测,观察响应时间以及吞吐量等其他指标,综合选择。
1.构建文件上传下载事件处理类
上传处理类
package com.open.capacity.file.event;
import java.util.List;
import org.springframework.web.multipart.MultipartFile;
import com.open.capacity.common.disruptor.event.BaseEvent;
import com.open.capacity.file.constant.FileType;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class UploadEvent extends BaseEvent {
/**
* 文件存储
*/
private FileType fileType ;
/**
* 单个批量上传
*/
private String commandType ;
/**
* 租户
*/
private String tenant ;
/**
* 分片ID
*/
private String guid ;
/**
* 当前分片
*/
private Integer chunk ;
/**
* 总分片
*/
private Integer chunks ;
/**
* 文件
*/
private List<MultipartFile> files;
}
下载处理类
package com.open.capacity.file.event;
import com.open.capacity.common.disruptor.event.BaseEvent;
import com.open.capacity.file.constant.FileType;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class DownloadEvent extends BaseEvent {
/**
* 文件存储
*/
private FileType fileType ;
/**
* 单个批量上传
*/
private String commandType ;
/**
* 租户
*/
private String tenant ;
/**
* 文件
*/
private String fileId ;
}
2.文件上传之核心接口
package com.open.capacity.file.controller;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import com.beust.jcommander.internal.Lists;
import com.open.capacity.common.context.TenantContextHolder;
import com.open.capacity.common.disruptor.DisruptorTemplate;
import com.open.capacity.common.dto.PageResult;
import com.open.capacity.common.dto.ResponseEntity;
import com.open.capacity.common.exception.BusinessException;
import com.open.capacity.file.constant.CommandType;
import com.open.capacity.file.constant.FileType;
import com.open.capacity.file.context.UploadContext;
import com.open.capacity.file.entity.FileInfo;
import com.open.capacity.file.entity.MergeFileDTO;
import com.open.capacity.file.event.DownloadEvent;
import com.open.capacity.file.event.UploadEvent;
import com.open.capacity.file.service.FileServiceFactory;
import com.open.capacity.file.utils.FileUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.vavr.control.Try;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@RestController
@Api(tags = "文件模块api")
@Slf4j
@RequestMapping("/file")
public class FileController {
@Resource
private FileServiceFactory fileServiceFactory;
@Autowired
private DisruptorTemplate disruptorTemplate;
@ApiOperation("文件上传")
@PostMapping(value = "/files-anon")
@SneakyThrows
public void filesAnon(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
Assert.isTrue(file != null, "No files");
UploadContext context = new UploadContext();
javax.servlet.AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(900000);
context.setAsyncContext(asyncContext);
List<MultipartFile> files = Lists.newArrayList();
files.add(FileUtil.createFileItem(file));
UploadEvent event = UploadEvent.builder().tenant(TenantContextHolder.getTenant()).fileType(FileType.S3)
.commandType(CommandType.UPLOAD).files(files).build();
disruptorTemplate.publish(CommandType.UPLOAD, event, context);
}
@ApiOperation("批量上传文件上传")
@PostMapping(value = "/upload", consumes = "multipart/form-data", produces = { "application/json" })
@SneakyThrows
public void filesAnon(@RequestParam("files") List<MultipartFile> files, HttpServletRequest request) {
Assert.isTrue(files != null, "No files");
UploadContext context = new UploadContext();
javax.servlet.AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(900000);
context.setAsyncContext(asyncContext);
UploadEvent event = UploadEvent.builder().tenant(TenantContextHolder.getTenant()).fileType(FileType.S3)
.commandType(CommandType.BATCH_UPLOAD)
.files(files.stream().map(file -> FileUtil.createFileItem(file)).collect(Collectors.toList())).build();
disruptorTemplate.publish(CommandType.BATCH_UPLOAD, event, context);
}
/**
* 上传大文件
*
* @param file
* @param chunks
*/
@PostMapping(value = "/files-anon/bigFile")
public void bigFile(String guid, Integer chunk, MultipartFile file, Integer chunks, HttpServletRequest request) {
Assert.isTrue(file != null, "No files");
UploadContext context = new UploadContext();
javax.servlet.AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(900000);
context.setAsyncContext(asyncContext);
List<MultipartFile> files = Lists.newArrayList();
files.add(FileUtil.createFileItem(file));
UploadEvent event = UploadEvent.builder().tenant(TenantContextHolder.getTenant()).fileType(FileType.S3)
.commandType(CommandType.PART_UPLOAD).files(files).chunk(chunk).chunks(chunks).guid(guid).build();
disruptorTemplate.publish(CommandType.PART_UPLOAD, event, context);
}
/**
* 合并文件
*
* @param mergeFileDTO
*/
@RequestMapping(value = "/files-anon/merge", method = RequestMethod.POST)
public ResponseEntity mergeFile(@RequestBody MergeFileDTO mergeFileDTO) {
try {
String filepath = new SimpleDateFormat("yyyy/MM/dd/").format(new Date());
return ResponseEntity.succeed(fileServiceFactory.getService(FileType.S3).merge(mergeFileDTO.getGuid(),
mergeFileDTO.getFileName(), filepath), "操作成功");
} catch (Exception ex) {
return ResponseEntity.failed("操作失败");
}
}
@GetMapping("/download")
public void downloadFile(@RequestParam String id, HttpServletRequest request) {
Assert.isTrue(id != null, "No files");
UploadContext context = new UploadContext();
javax.servlet.AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(900000);
context.setAsyncContext(asyncContext);
DownloadEvent event = DownloadEvent.builder().tenant(TenantContextHolder.getTenant()).fileType(FileType.S3)
.commandType(CommandType.DOWNLOAD).fileId(id).build();
disruptorTemplate.publish(CommandType.DOWNLOAD, event, context);
}
/**
* 文件查询
*
* @param params
* @return
*/
@GetMapping("/files")
public PageResult<FileInfo> findFiles(@RequestParam Map<String, Object> params) {
return fileServiceFactory.getService(FileType.S3).findList(params);
}
/**
* 文件删除
*
* @param id
*/
@DeleteMapping("/{id}")
public ResponseEntity delete(@PathVariable Long id) {
Try.of(() -> fileServiceFactory.getService(FileType.S3).delete(id))
.onFailure(ex -> log.error("file-delete-error", ex))
.getOrElseThrow(item -> new BusinessException("操作失败"));
return ResponseEntity.succeed("操作成功");
}
/**
* 上传失败
*
* @param mergeFileDTO
* @return
*/
@RequestMapping(value = "/files-anon/uploadError", method = RequestMethod.POST)
public ResponseEntity uploadError(@RequestBody MergeFileDTO mergeFileDTO) {
try {
// 使用默认的 FileService
fileServiceFactory.getService(FileType.S3).uploadError(mergeFileDTO.getGuid(), mergeFileDTO.getFileName(),
"");
return ResponseEntity.succeed("操作成功");
} catch (Exception ex) {
return ResponseEntity.failed("操作失败");
}
}
}
3.核心事件驱动处理类
1.单文件上传
package com.open.capacity.file.event.lisener;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import javax.annotation.Resource;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.open.capacity.common.context.TenantContextHolder;
import com.open.capacity.common.disruptor.annocation.Channel;
import com.open.capacity.common.disruptor.event.BaseEvent;
import com.open.capacity.common.disruptor.listener.EventListener;
import com.open.capacity.common.disruptor.thread.ExecutorService;
import com.open.capacity.common.exception.BusinessException;
import com.open.capacity.file.constant.CommandType;
import com.open.capacity.file.context.UploadContext;
import com.open.capacity.file.entity.FileInfo;
import com.open.capacity.file.event.UploadEvent;
import com.open.capacity.file.service.FileServiceFactory;
import cn.hutool.core.io.IoUtil;
import lombok.SneakyThrows;
@Service
@Channel(CommandType.UPLOAD)
public class UploadEventLisener extends EventListener<UploadEvent, UploadContext> {
@Resource
private FileServiceFactory fileServiceFactory;
@Resource
private ObjectMapper objectMapper;
@Override
public boolean accept(BaseEvent event) {
UploadEvent uploadEvent = (UploadEvent) event;
// s3 单个文件上传
if (CommandType.UPLOAD.equals(uploadEvent.getCommandType())) {
return true;
}
return false;
}
@Override
@SneakyThrows
public void onEvent(UploadEvent event, UploadContext eventContext) {
executorService.execute(CommandType.UPLOAD, () -> {
ServletResponse response = eventContext.getAsyncContext().getResponse();
response.setCharacterEncoding("UTF-8");
response.setContentType("application/json;charset=UTF-8");
try (ServletOutputStream out = response.getOutputStream()) {
TenantContextHolder.setTenant(event.getTenant());
MultipartFile file = event.getFiles().get(0);
FileInfo fileInfo = fileServiceFactory.getService(event.getFileType()).upload(file);
IoUtil.write(out, false,objectMapper.writeValueAsString(fileInfo).getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new BusinessException(e.getMessage());
} finally {
eventContext.getAsyncContext().complete();
}
});
}
}
2.批量文件上传
package com.open.capacity.file.event.lisener;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import javax.annotation.Resource;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import com.beust.jcommander.internal.Lists;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.open.capacity.common.context.TenantContextHolder;
import com.open.capacity.common.disruptor.annocation.Channel;
import com.open.capacity.common.disruptor.event.BaseEvent;
import com.open.capacity.common.disruptor.listener.EventListener;
import com.open.capacity.common.disruptor.thread.ExecutorService;
import com.open.capacity.common.exception.BusinessException;
import com.open.capacity.file.constant.CommandType;
import com.open.capacity.file.context.UploadContext;
import com.open.capacity.file.entity.FileInfo;
import com.open.capacity.file.event.UploadEvent;
import com.open.capacity.file.service.FileServiceFactory;
import lombok.SneakyThrows;
@Service
@Channel(CommandType.BATCH_UPLOAD)
public class BatchUploadEventLisener extends EventListener<UploadEvent, UploadContext> {
@Resource
private FileServiceFactory fileServiceFactory;
@Resource
private ObjectMapper objectMapper;
@Override
public boolean accept(BaseEvent event) {
UploadEvent uploadEvent = (UploadEvent) event;
// s3 批量个文件上传
if (CommandType.BATCH_UPLOAD.equals(uploadEvent.getCommandType())) {
return true;
}
return false;
}
@Override
@SneakyThrows
public void onEvent(UploadEvent event, UploadContext eventContext) {
executorService.execute(CommandType.BATCH_UPLOAD ,()-> {
ServletResponse response = eventContext.getAsyncContext().getResponse();
response.setCharacterEncoding("UTF-8");
response.setContentType("application/json;charset=UTF-8");
try (ServletOutputStream out = response.getOutputStream()) {
TenantContextHolder.setTenant(event.getTenant());
List<FileInfo> fileInfos = Lists.newArrayList();
for (MultipartFile file : event.getFiles()) {
FileInfo fileInfo = fileServiceFactory.getService(event.getFileType()).upload(file);
fileInfos.add(fileInfo);
}
out.write(objectMapper.writeValueAsString(fileInfos).getBytes(StandardCharsets.UTF_8));
out.flush();
} catch (IOException e) {
throw new BusinessException(e.getMessage());
} finally {
eventContext.getAsyncContext().complete();
}
});
}
}
3.s3协议分片上传
package com.open.capacity.file.event.lisener;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import javax.annotation.Resource;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.open.capacity.common.context.TenantContextHolder;
import com.open.capacity.common.disruptor.annocation.Channel;
import com.open.capacity.common.disruptor.event.BaseEvent;
import com.open.capacity.common.disruptor.listener.EventListener;
import com.open.capacity.common.disruptor.thread.ExecutorService;
import com.open.capacity.common.dto.ResponseEntity;
import com.open.capacity.common.exception.BusinessException;
import com.open.capacity.file.constant.CommandType;
import com.open.capacity.file.context.UploadContext;
import com.open.capacity.file.entity.FileInfo;
import com.open.capacity.file.event.UploadEvent;
import com.open.capacity.file.service.FileServiceFactory;
import cn.hutool.core.io.IoUtil;
import lombok.Cleanup;
import lombok.SneakyThrows;
@Service
@Channel(CommandType.PART_UPLOAD)
public class PartUploadEventLisener extends EventListener<UploadEvent, UploadContext> {
@Resource
private FileServiceFactory fileServiceFactory;
@Resource
private ObjectMapper objectMapper;
@Override
public boolean accept(BaseEvent event) {
UploadEvent uploadEvent = (UploadEvent) event;
// s3 单个文件分片上传
if (CommandType.PART_UPLOAD.equals(uploadEvent.getCommandType())) {
return true;
}
return false;
}
@Override
@SneakyThrows
public void onEvent(UploadEvent event, UploadContext eventContext) {
String ok = objectMapper.writeValueAsString(ResponseEntity.succeed("操作成功"));
String ko = objectMapper.writeValueAsString(ResponseEntity.succeed("操作失败"));
executorService.execute(CommandType.PART_UPLOAD, () -> {
ServletResponse response = eventContext.getAsyncContext().getResponse();
response.setCharacterEncoding("UTF-8");
response.setContentType("application/json;charset=UTF-8");
ServletOutputStream out = null;
try {
out = response.getOutputStream();
TenantContextHolder.setTenant(event.getTenant());
MultipartFile file = event.getFiles().get(0);
String filepath = new SimpleDateFormat("yyyy/MM/dd/").format(new Date());
fileServiceFactory.getService(event.getFileType()).chunk(event.getGuid(), event.getChunk(), file,
event.getChunks(), filepath);
IoUtil.write(out, false, ok.getBytes(StandardCharsets.UTF_8));
} catch (Exception e1) {
IoUtil.write(out, false, ko.getBytes(StandardCharsets.UTF_8));
} finally {
IoUtil.close(out);
eventContext.getAsyncContext().complete();
}
});
}
}
4.文件下载
package com.open.capacity.file.event.lisener;
import java.io.IOException;
import javax.annotation.Resource;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import org.jboss.marshalling.ByteInputStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.util.UriUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.open.capacity.common.context.TenantContextHolder;
import com.open.capacity.common.disruptor.annocation.Channel;
import com.open.capacity.common.disruptor.event.BaseEvent;
import com.open.capacity.common.disruptor.listener.EventListener;
import com.open.capacity.common.disruptor.thread.ExecutorService;
import com.open.capacity.common.exception.BusinessException;
import com.open.capacity.file.constant.CommandType;
import com.open.capacity.file.constant.FileType;
import com.open.capacity.file.context.UploadContext;
import com.open.capacity.file.entity.DownloadDto;
import com.open.capacity.file.event.DownloadEvent;
import com.open.capacity.file.service.FileServiceFactory;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.util.URLUtil;
import lombok.SneakyThrows;
@Service
@Channel(CommandType.DOWNLOAD)
public class DownloadEventLisener extends EventListener<DownloadEvent, UploadContext> {
@Resource
private FileServiceFactory fileServiceFactory;
@Resource
private ObjectMapper objectMapper;
@Override
public boolean accept(BaseEvent event) {
DownloadEvent uploadEvent = (DownloadEvent) event;
// s3 单个文件下载
if (CommandType.DOWNLOAD.equals(uploadEvent.getCommandType())) {
return true;
}
return false;
}
@Override
@SneakyThrows
public void onEvent(DownloadEvent event, UploadContext eventContext) {
executorService.execute(CommandType.DOWNLOAD ,()-> {
HttpServletResponse response = (HttpServletResponse) eventContext.getAsyncContext().getResponse();
response.setCharacterEncoding("UTF-8");
response.setContentType("application/json;charset=UTF-8");
try (ServletOutputStream out = response.getOutputStream()) {
TenantContextHolder.setTenant(event.getTenant());
DownloadDto downloadInfo = fileServiceFactory.getService(FileType.S3).download(event.getFileId());
response.setCharacterEncoding("UTF-8");
response.setHeader(HttpHeaders.CONTENT_DISPOSITION,
"attachment;fileName=" + URLUtil.encode(downloadInfo.getFileName()));
response.setContentType(MediaType.MULTIPART_FORM_DATA_VALUE);
response.setHeader(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS, "Content-Disposition,Content-Length");
IoUtil.write(out, false, downloadInfo.getBytes());
out.flush();
} catch (IOException e) {
throw new BusinessException(e.getMessage());
} finally {
eventContext.getAsyncContext().complete();
}
});
}
}
项目地址:
https://github.com/owenwangwen/open-capacity-platform
标签:异步,capacity,中心,文件,com,file,import,open,event From: https://blog.51cto.com/u_13005375/5984275