1. 项目背景:
客户插入Excel文件,Ececel文件中包含大量的数据行和数据列,单线程按行读取,耗时大约半小时,体验感不好。
思路:先将excel文件按行读取,存入List,然后按照100均分,n=list.szie()/100 + 1; n就是要开启的线程总数。(实际使用的时候,数据库连接池的数量有限制,n的大小要结合数据库连接池的数量做合理设置,因为没一个线程都要获取数据库连接操作数据库)
2. 主线程代码
CountDownLatch以及AtomicBoolean请网上自行查看。
public UploadFileResultVo readExcel(MultipartFile file, String userName, String uploadId, String dataFileId) throws Exception {
UploadFileResultVo result = new UploadFileResultVo();
if(excelUtils.isValidNewProductExcel(file)){
Workbook workBook = excelUtils.getWorkBook(file);
log.info("uploadId:{} for file:{}",uploadId,file.getOriginalFilename());
Sheet sheet = workBook.getSheetAt(0);
int firstRowNum = sheet.getFirstRowNum();
int lastRowNum = sheet.getLastRowNum();
// 获取厂家信息中所有的行
List<Row> factoryRows = new ArrayList<>();
for(int i = firstRowNum + 1; i <= lastRowNum; i++ ){
Row row = sheet.getRow(i);
factoryRows.add(row);
}
log.info("factoryRows:{}", factoryRows.size());
// 用户上传的内容为空
if(factoryRows.isEmpty()){
log.info("the new product data file is empty");
String filePath = fileUtils.ossUploadSignalFile(file, FileTypeEnum.NEW_PRODUCT_FILE.getCode());
TFileInfoDo TFileInfoDo = new TFileInfoDo();
TFileInfoDo.setFilePath(filePath);
TFileInfoDo.setUploadId(uploadId);
TFileInfoDo.setCreateBy(userName);
TFileInfoDo.setDataFileId(dataFileId);
TFileInfoDo.setFileType(FileTypeEnum.NEW_PRODUCT_DATA_FILE.getCode());
TFileInfoMapper.insertTFileInfo(TFileInfoDo);
Long fileId = TFileInfoDo.getId();
FileBackBo fileBackBo = new FileBackBo();
int lastIndex = filePath.split("/").length - 1;
String fileName = filePath.split("/")[lastIndex];
fileBackBo.setFileId(String.valueOf(fileId));
fileBackBo.setName(fileName);
List<FileBackBo> list = new ArrayList<>();
list.add(fileBackBo);
result.setUploadId(uploadId);
result.setDataFileId(dataFileId);
result.setFiles(list);
return result;
}
// 厂家信息中每100行放入一个list,开启一个线程分析数据
List<List<Row>> resultList = averageAssign(factoryRows, 100);
log.info("size:{}", resultList.size());
// 获取最后合计行
// Row lastRow = sheet.getRow(lastRowNum);
// 子线程个数(包含最后一行数据分析线程)
// CountDownLatch sonThreadNumber = new CountDownLatch(resultList.size() + 1);
CountDownLatch sonThreadNumber = new CountDownLatch(resultList.size());
log.info("sonThreadNumber:{}", sonThreadNumber.getCount());
// 子线程回滚标志
AtomicBoolean rollBack = new AtomicBoolean(false);
// 开启分析工厂数据子线程,此处不需要用线程池
for(int threadNumber = 0; threadNumber < resultList.size(); threadNumber++){
GetRowContent getRowContent = new GetRowContent(resultList.get(threadNumber), uploadId, dataFileId, sonThreadNumber, rollBack, "factoryDataAnalysisThread" + threadNumber, userName);
Thread thread = new Thread(getRowContent);
thread.start();
}
// 分析最后一行数据子线程
// GetLastRowContent getLastRowContent = new GetLastRowContent(lastRow, uploadId, sonThreadNumber, rollBack, "lastRowDataAnalysisThread", userName);
// Thread thread = new Thread(getLastRowContent);
// thread.start();
// 主线程等待所有子线程数据分析完毕
sonThreadNumber.await();
if(rollBack.get()){
log.info("子线程执行有异常");
throw new RuntimeException("数据解析出错,请检查录入文件内容格式是否按照模板录入");
}
else {
log .info("所有子线程执行成功");
//文件存放
String filePath = fileUtils.ossUploadSignalFile(file, FileTypeEnum.NEW_PRODUCT_FILE.getCode());
TFileInfoDo TFileInfoDo = new TFileInfoDo();
TFileInfoDo.setFilePath(filePath);
TFileInfoDo.setUploadId(uploadId);
TFileInfoDo.setCreateBy(userName);
TFileInfoDo.setDataFileId(dataFileId);
TFileInfoDo.setFileType(FileTypeEnum.NEW_PRODUCT_DATA_FILE.getCode());
TFileInfoMapper.insertTFileInfo(TFileInfoDo);
Long fileId = TFileInfoDo.getId();
FileBackBo fileBackBo = new FileBackBo();
int lastIndex = filePath.split("/").length - 1;
String fileName = filePath.split("/")[lastIndex];
fileBackBo.setFileId(String.valueOf(fileId));
fileBackBo.setName(fileName);
List<FileBackBo> list = new ArrayList<>();
list.add(fileBackBo);
result.setUploadId(uploadId);
result.setDataFileId(dataFileId);
result.setFiles(list);
}
}
return result;
}
3. 子线程
子线程继承Runnable接口,这种实现不能将参数传递给run()方法。可以在子线程类的构造方法中将CountDownLatch、AtomicBoolean以及其它参数作为属性变量注入,这样在run()方法中直接调用属性变量。
run()方法
public class GetRowContent implements Runnable{
private final List<Row> resultList;
private final String uploadId;
private final String dataFileId;
private final CountDownLatch sonCountDownLatch;
private final AtomicBoolean rollBack;
private final String threadName;
private final String userName;
public GetRowContent(List<Row> resultList, String uploadId, String dataFileId, CountDownLatch sonCountDownLatch,AtomicBoolean rollBack, String threadName, String userName){
this.resultList = resultList;
this.uploadId = uploadId;
this.dataFileId = dataFileId;
this.sonCountDownLatch = sonCountDownLatch;
this.rollBack = rollBack;
this.threadName = threadName;
this.userName = userName;
}
@Override
public void run() {
// 其它线程已经报错,退出
if(rollBack.get()){
sonCountDownLatch.countDown();
return;
}
// 设置一个事务
PlatformTransactionManager transactionManager = (PlatformTransactionManager) SpringContextUtils.getBean("transactionManager");
GetRowContentService getRowContentService = (GetRowContentService) SpringContextUtils.getBean("getRowContentServiceBean");
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
// 事物隔离级别,开启新事务,这样会比较安全些
TransactionStatus status = transactionManager.getTransaction(def);
try{
getRowContentService.analysisData(resultList,uploadId, dataFileId, userName);
} catch(Exception e){
log.info("线程:{},执行异常",threadName);
e.printStackTrace();
rollBack.set(true);
}
finally{
sonCountDownLatch.countDown();
}
try{
sonCountDownLatch.await();
if(rollBack.get()){
log.info("线程:{},有子线程执行失败,进入回滚", threadName);
transactionManager.rollback(status);
}
else {
log .info("线程:{},执行成功,提交数据", threadName);
transactionManager.commit(status);
}
} catch(Exception e){
log.info("线程:{},发令枪异常,进入回滚", threadName);
transactionManager.rollback(status);
}
}
}
标签:uploadId,JAVA,String,Excel,resultList,线程,TFileInfoDo,new,多线程 From: https://www.cnblogs.com/andy1234/p/17838840.html