package com.talkweb.modou;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
* @author : 苏文致
* @date Date : 2023年03月04日 13:46
* @Description:
*/
@Component
public class TransactionMultipartExecutor {
@Autowired
private DataSourceTransactionManager transactionManager;
private static final Logger log = LoggerFactory.getLogger(TransactionMultipartExecutor.class);
/**
* 多线程事务提交执行
*
* @param threadPool: 线程池
* @param datas: 数据
* @param batchSize :单批次执行的大小
* @param propagationBehavior: 见TransactionDefinition常量类
* @param consumer: 批量执行逻辑 <T> 单批次的数据
*/
public <T> boolean execute (ExecutorService threadPool,
List<T> datas,
int batchSize,
int propagationBehavior,
Consumer<List<T>> consumer){
int total = datas.size();
int batch = total % batchSize == 0 ? total / batchSize : total / batchSize + 1; //执行的批次
CountDownLatch startCountDown = new CountDownLatch(1); //开始执行线程任务
CountDownLatch subCountDown = new CountDownLatch(batch); //控制子线程业务执行(控制全局事务开关)
CountDownLatch mainCountDown = new CountDownLatch(batch); //子线程执行完成(控制主线程是否执行完成)
AtomicBoolean isError = new AtomicBoolean(false); //全局事务提交回滚开关
for (int i = 0; i < batch; i++) { //创建线程执行任务
List<T> signalBatchData = splitSignalData(datas, i, batchSize);
TransactionMultipartRunnable runnable = new TransactionMultipartRunnable(
transactionManager,
propagationBehavior,
startCountDown,
subCountDown,
mainCountDown,
isError) {
@Override
public void operation (){
consumer.accept(signalBatchData);
}
};
threadPool.submit(runnable);
}
startCountDown.countDown(); //开始执行
handleAwaitDownWithException(mainCountDown);//主线程执行
return isError.get();
}
/**
* 切分单次插入的数据
*/
public <T> List<T> splitSignalData (List<T> datas, int currentBatch, int batchSize){
int size = datas.size();
int left = currentBatch * batchSize;
int right = Math.min(left + batchSize, size);
return datas.subList(left, right);
}
public abstract static class TransactionMultipartRunnable implements Runnable {
private PlatformTransactionManager transactionManager;
private int propagationBehavior;
private CountDownLatch startCountDown;
private CountDownLatch subCountDown;
private CountDownLatch mainCountDown;
private AtomicBoolean isError;
protected TransactionMultipartRunnable (PlatformTransactionManager transactionManager, int propagationBehavior, CountDownLatch startCountDown, CountDownLatch subCountDown, CountDownLatch mainCountDown, AtomicBoolean isError){
this.transactionManager = transactionManager;
this.propagationBehavior = propagationBehavior;
this.startCountDown = startCountDown;
this.subCountDown = subCountDown;
this.mainCountDown = mainCountDown;
this.isError = isError;
}
public abstract void operation ();
@Override
public void run (){
handleAwaitDownWithException(startCountDown);
//传播属性
//TransactionDefinition.PROPAGATION_REQUIRES_NEW
DefaultTransactionDefinition def = new DefaultTransactionDefinition(propagationBehavior);
//事务的状态
TransactionStatus status = transactionManager.getTransaction(def);
try {
if (!isError.get()) {//如果已经存在异常的线程,直接条转
log.debug("执行业务逻辑...");
operation();
}
} catch (Exception e) {
isError.set(true);
}
subCountDown.countDown(); //子线程 --
handleAwaitDownWithException(subCountDown);
try {
if (isError.get()) {
transactionManager.rollback(status);
log.debug("回滚...");
} else {
transactionManager.commit(status);
log.debug("提交...");
}
} catch (TransactionException e) {
e.printStackTrace();
}
mainCountDown.countDown();
}
}
public static void handleAwaitDownWithException (CountDownLatch countDownLatch){
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试
TestService bean = run.getBean(TestService.class);
bean.test();
AppWidgetsFileService appWidgetsFileService = SpringUtils.getBean(AppWidgetsFileService.class);
//构造假数据
List<AppWidgetsFile> files = new ArrayList<>();
for (int i = 0; i < 3; i++) {
AppWidgetsFile appWidgetsFile = new AppWidgetsFile();
appWidgetsFile.setId(IdUtils.simpleUUID());
appWidgetsFile.setFileName("测试" + i);
files.add(appWidgetsFile);
}
AppWidgetsFile appWidgetsFile = new AppWidgetsFile();
appWidgetsFile.setId("38f3e3f73969575c6b838d2f059869e7"); //数据库主键 已经存在 (回滚)
appWidgetsFile.setFileName("测试");
files.add(appWidgetsFile);
//构造线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
9,
9,
30,
TimeUnit.MINUTES,
new LinkedBlockingDeque<>(50),
(r, e) -> {
log.debug("任务拒绝...");
throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
}
);
TransactionMultipartExecutor executor = SpringUtils.getBean(TransactionMultipartExecutor.class);
boolean execute = executor.execute(threadPool, files, 3, TransactionDefinition.PROPAGATION_REQUIRED,
(item) -> {
//TODO: 批量操作的执行
appWidgetsFileService.saveBatch(item);
});
if (!execute) log.debug("事务执行成功!");
else log.debug("事务执行失败!");
log.debug("主线程执行完成!");
标签:事务,int,spring,batchSize,new,提交,CountDownLatch,import,isError
From: https://www.cnblogs.com/swz123/p/17178238.html