目录
工作总结之线程池和原子类篇
前言
起因是,项目里面的同步代码的执行时间过久(20小时左右),于是,往多线程方面去思考,最终采用线程池和原子类来解决。
解决思路
定义一个页码的公共变量(原子类),根据总页数来开启相应的线程数量(1页1个线程,1到5页5个线程,5到10页10个线程)来同步,一个线程负责一页(2000条),执行完后作为原子类的页码+1,如果还小于总页数,则继续执行下一页,直到同步完所有数据。
具体实现代码
线程池工具类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadPoolUtil {
private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolUtil.class);
private static ThreadPoolExecutor pool = null;
static {
pool = new ThreadPoolExecutor(10, 10, 300, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler());
}
public static void destroy() {
pool.shutdown();
}
public static void execute(Runnable r) {
pool.execute(r);
}
public static void awaitTermination() throws InterruptedException {
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
}
private static class CustomThreadFactory implements ThreadFactory {
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = "线程-" + count.incrementAndGet();
t.setName(threadName);
return t;
}
}
private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.error("任务执行失败 {}, 线程池已满 {}", r.toString(), executor.toString());
}
}
}
业务逻辑的实现
/**
* 手动同步市前置库好差评办件评价数据
* @return
*/
@RequestMapping(value = "/manualSync")
@NeedSpy(name = "手动同步市前置库好差评办件评价数据",description = "手动同步市前置库好差评办件评价数据")
public GenericResult manualSync(String beginTime,String endTime,String curAreaCode,String areaCode1,Boolean isVillage){
long startTime = System.currentTimeMillis();
try{
log.info("-------------好差评手动同步省网数据过程开始--------------");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("好差评同步省网数据时间:{}至{}",beginTime,endTime);
// 如果包含村社区
String deptNo="";
if (isVillage != null && isVillage) {
List<QueuingArea> areaList = new ArrayList<>();
String sql = "";
if (StringUtils.isEmpty(curAreaCode)){
return FailureResult.create("选择村社区后区域选择不可为空!");
}
String[] split = curAreaCode.split(",");
for (String projectAreaId : split) {
QueuingArea queuingArea = queuingManageService.findQueuingAreaByDeptCode(projectAreaId);
if(queuingArea != null){
String areaBureauCode = queuingArea.getBureauCode();
Map<String, Object> areaParams = Maps.newHashMap();
areaParams.put("bureauCode",areaBureauCode);
List<QueuingArea> queuingAreaList = queuingManageService.listQueuingAreas(areaParams);
if(queuingAreaList.size()>0){
areaList.addAll(queuingAreaList);
}
areaParams.clear();
areaParams.put("parentBurCode",areaBureauCode);
List<QueuingArea> queuingAreas = queuingManageService.listQueuingAreas(areaParams);
if(queuingAreas.size()>0){
areaList.addAll(queuingAreas);
}
}
}
for (QueuingArea area : areaList) {
sql += "'" + area.getDeptCode() + "'" + ",";
}
deptNo +=sql;
if (deptNo.length()>2){
deptNo=deptNo.substring(1,deptNo.length()-2);
}
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("beginTime",beginTime);
jsonObject.put("endTime",endTime);
if (StringUtils.isNotEmpty(deptNo)){
jsonObject.put("curAreaCode",deptNo);
}else {
jsonObject.put("curAreaCode",curAreaCode.replace(",","','"));
}
jsonObject.put("areaCode",areaCode1);
AtomicInteger pageNumAi = new AtomicInteger(1);
AtomicInteger totalNewCountAi = new AtomicInteger(0);
AtomicInteger totalUpdateCountAi = new AtomicInteger(0);
AtomicInteger totalFailCountAi = new AtomicInteger(0);
// Integer pageNum = 1;
Integer pageSize = 2000;
//1、2月份,7w多条
Integer totalNum = goodBadCommentService.count(jsonObject);
// Integer totalNewCount = 0;
// Integer totalUpdateCount = 0;
if (totalNum > 0){
//1、2月份,36页
Integer pages = totalNum / pageSize + 1;
Integer executeThreadNum = 1;
if(pages < 5){
executeThreadNum = 1;
}else if(pages >= 5 && pages < 10){
executeThreadNum = 5;
}else {
executeThreadNum = 10;
}
for (int i = 1; i <= executeThreadNum; i++) {
int p = pageNumAi.get();
CustomThreadPoolUtil.execute(new Runnable() {
@Override
public void run() {
Integer pageNumLocal = p;
//jsonObject应该拷一份副本
String jsonStr = JSONObject.toJSONString(jsonObject);
JSONObject jsonObjectLocal = JSONObject.parseObject(jsonStr);
for(; pageNumLocal <= pages; pageNumLocal = pageNumAi.incrementAndGet()){
jsonObjectLocal.put("pageNum",pageNumLocal);
jsonObjectLocal.put("pageSize",pageSize);
List<GoodBadCommentEntityVo> infos = goodBadCommentService.getInfo(jsonObjectLocal);
log.info("前置机第{}页",pageNumLocal);
ArrayList<String> fileList = new ArrayList<String>();
for (GoodBadCommentEntityVo info : infos) {
fileList.add(info.toString());
}
SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
String fileName = null;
try {
fileName = "evaluate2" + format.format(format.parse(beginTime)) + "_" + pageNumLocal + "页";
} catch (ParseException e) {
e.printStackTrace();
}
FilesToWrite.createScreenTxt(fileList,fileName);
int newCount = 0;
int updateCount = 0;
for(GoodBadCommentEntityVo vo : infos){
Map<String,Object> params = new HashMap<>();
String proProjectNo = vo.getProProjectNo();
// 截取符合一体化平台业务系统的办件编号
proProjectNo = proProjectNo.substring(proProjectNo.length() - 12);
params.put("like(proProjectNo)","%"+proProjectNo);
params.put("assessTime",sdf.format(vo.getAssessTime()));
List<NewEvaluateInfo> newEvaluateInfoList = newEvaluateInfoService.queryForList(params);
NewEvaluateInfo newEvaluateInfo = null;
if(newEvaluateInfoList.size()>0){
newEvaluateInfo = newEvaluateInfoList.get(0);
updateCount++;
}else{
newEvaluateInfo = new NewEvaluateInfo();
newCount++;
}
BeanUtils.copyProperties(vo,newEvaluateInfo);
/*=========判断功能区=========*/
String areaCode = vo.getAreaCode();
String deptCode = vo.getDeptCode();
if(null != areaCode && null != deptCode){
if(!Objects.equals(areaCode, "320507000000")){
String queuingAreaCode = CacheOrg.queuingArea.get(areaCode);
if(null != queuingAreaCode){
newEvaluateInfo.setAreaCode(queuingAreaCode);
}else {
newEvaluateInfo.setAreaCode("320507SPJ");
}
}else{
String areaCodeInfo = getByDeptCode(deptCode);
newEvaluateInfo.setAreaCode(areaCodeInfo);
}
}else {
newEvaluateInfo.setAreaCode("320507SPJ");
}
newEvaluateInfo.setProProjectNo(proProjectNo);
try {
newEvaluateInfo.setAcceptDate(sdf.format(vo.getAcceptDate()));
newEvaluateInfo.setAssessTime(sdf.format(vo.getAssessTime()));//用户评价时间
}catch (Exception e){
totalFailCountAi.incrementAndGet();
log.info("{}第{}页失败条目(情况1):{}/2000",Thread.currentThread().getName(),pageNumLocal,newCount + updateCount);
log.info("失败条目的proProjectNo:{}",vo.getProProjectNo());
continue;
}
newEvaluateInfo.setAssessNumber(vo.getAssessNumber());
newEvaluateInfo.setPromisetime(vo.getPromiseTime());
newEvaluateInfo.setEvaluateType("1");//办件
// DeclareInfo declareInfo = registerBusinessService.findRegisterByDeclareSN(newEvaluateInfo.getProProjectNo());
// if(declareInfo!=null){
// newEvaluateInfo.setApsUserName(declareInfo.getApplicantName());//用户姓名
// newEvaluateInfo.setApsUserPhone(declareInfo.getApplicantMobile());//用户手机号
// }
newEvaluateInfo.setApsUserName(vo.getUserName());//用户姓名
newEvaluateInfo.setApsUserPhone(vo.getMobile());//用户手机号
// 整改反馈
newEvaluateInfo.setRectificationTime(null == vo.getRectificationTime()?"":sdf.format(vo.getRectificationTime()));
try {
newEvaluateInfoService.updateNewEvaluateInfo(newEvaluateInfo);
}catch (Exception e){
totalFailCountAi.incrementAndGet();
log.info("{}第{}页失败条目(情况2):{}/2000",Thread.currentThread().getName(),pageNumLocal,newCount + updateCount);
log.info("失败条目的proProjectNo:{}",vo.getProProjectNo());
continue;
}
log.info("{}第{}页同步中:{}/2000",Thread.currentThread().getName(),pageNumLocal,newCount + updateCount);
}
totalNewCountAi.addAndGet(newCount);
totalUpdateCountAi.addAndGet(updateCount);
log.info("{}同步进度{}/{}页,其中包括<新增>{}条、<更新>{}条,totalNewCountAi的值{},totalUpdateCountAi的值{}",
Thread.currentThread().getName(),pageNumLocal,pages,newCount,updateCount,totalNewCountAi.get(),totalUpdateCountAi.get());
}
}
});
if(p < executeThreadNum){
pageNumAi.incrementAndGet();
}
}
CustomThreadPoolUtil.destroy();
try {//等待直到所有任务完成
CustomThreadPoolUtil.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.info("同步省网好差评数据量:{}条,其中包括<新增>{}条、<更新>{}条、<失败>{}条", totalNum,totalNewCountAi.get(),totalUpdateCountAi.get(),totalFailCountAi.get());
log.info("-------------好差评手动同步省网数据过程结束--------------");
double duration = (double) (System.currentTimeMillis() - startTime)/1000;
log.info("同步共耗时{}",duration);
return SuccessResult.create().setData("同步省网好差评数据量:"+totalNum+"条,其中包括<新增>"+totalNewCountAi.get()+"条、<更新>"+totalUpdateCountAi.get()+
"条、<失败>"+totalFailCountAi.get()+"条");
}catch (Exception ex){
ex.printStackTrace();
log.error("-------------好差评手动同步省网数据过程异常--------------");
double duration = (double)(System.currentTimeMillis() - startTime)/1000;
log.info("同步共耗时{}",duration);
return FailureResult.create(ex.getMessage());
}
}
期间遇到的问题
- 第11页被漏掉
注意这行代码:
if(p < executeThreadNum){ pageNumAi.incrementAndGet(); }
作为公共原子类的页码在启动最后一个线程(假设一共启动10个线程)时不能再做+1操作,否则下一页(第11页)会漏掉。 sdf.format
(将日期格式化为字符串)和newEvaluateInfoService.updateNewEvaluateInfo(newEvaluateInfo);
(如果存在执行更新操作,不存在则执行新增操作)在执行过程中报错,导致执行的线程挂掉没有后续
只需要将它俩try catch
掉就行了,最终调通代码7w条数据,从原来的20小时优化为3小时跑完。
有待优化的地方
- 线程池工具类
static {
pool = new ThreadPoolExecutor(10, 10, 300, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler());
}
因为在业务代码里面写的是:
CustomThreadPoolUtil.destroy();
后续想再使用这个工具类的线程池就失败了,我的思路是将这个静态代码块改成一个generateThreadPool的静态方法,每次调用为pool赋上新的线程池,就可重复使用这个工具类了。
2. 后续有同事告诉我sdf.format
方法,即SimpleDateFormat类在多线程下容易异常的问题
有现象可以证明,我调试的时候,第一次同步完,在这里报错的有9条,但是第二次同步完,这里报错为0条,当时百思不得其解,被他这么一说就明白了,虽然我现在是try catch
掉的,显然不是最好的方法
3. 线程池的队列问题new ArrayBlockingQueue<Runnable>(10)
这个队列在我调试的过程中,当时我的线程参数是这样的new ThreadPoolExecutor(1, 10, 300, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler());
,核心线程只有一个,当我想要开启10个线程运行的时候,结果只有线程-1运行,原因是使用该队列的策略是:超过核心线程数量的任务会被放到队列中,然后应该是等着交给核心线程(我这里只有1个)去执行(我不太确定,回头得去确认一下,根据运行结果的事实,推测是这样的,应该跟线程池的一般运行顺序有关系),所以我后来给把核心线程数调成了10.