import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Resource;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
/**
* mybatis批量操作工具类
*/
@Slf4j
@Component
public class MybatisBatchUtil {
private static final int BATCH_SIZE = 2000;
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Resource
PlatformTransactionManager transactionManager;
@Autowired
private Executor taskExecutor;
public <T, U, R> void batch(List<T> dataList, Class<U> mapperClass, BiFunction<T, U, R> function) throws ApiException {
if (dataList.size() <= BATCH_SIZE){
batchSync(dataList, mapperClass, function);
}else {
batchParallel(dataList, mapperClass, function);
}
}
public <T, U, R> void batchSync(List<T> dataList, Class<U> mapperClass, BiFunction<T, U, R> function) throws ApiException {
SqlSession batchSqlSession = sqlSessionFactory.openSession();
batchSqlSession.getConfiguration().setDefaultExecutorType(ExecutorType.BATCH);
U mapper = batchSqlSession.getMapper(mapperClass);
DefaultTransactionDefinition threadDef = new DefaultTransactionDefinition();
threadDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus threadTransaction = transactionManager.getTransaction(threadDef);
for (T t : dataList) {
try {
function.apply(t, mapper);
} catch (Exception e) {
transactionManager.rollback(threadTransaction);
throw new ApiException("系统异常请联系管理员");
}
}
transactionManager.commit(threadTransaction);
}
public <T, U, R> void batchParallel(List<T> dataList, Class<U> mapperClass, BiFunction<T, U, R> function) throws ApiException {
SqlSession batchSqlSession = sqlSessionFactory.openSession();
batchSqlSession.getConfiguration().setDefaultExecutorType(ExecutorType.BATCH);
U mapper = batchSqlSession.getMapper(mapperClass);
List<List<T>> phoneNumPartition = ListUtils.partition(dataList, BATCH_SIZE);
Vector<Thread> threadVector = new Vector<>();
CountDownLatch count = new CountDownLatch(phoneNumPartition.size());
AtomicBoolean rollbackFlag = new AtomicBoolean(false);
for (List<T> data : phoneNumPartition) {
taskExecutor.execute(() -> {
TransactionStatus threadTransaction = null;
try {
Thread thread = Thread.currentThread();
threadVector.add(thread);
if (CollectionUtils.isEmpty(data)) {
return;
}
DefaultTransactionDefinition threadDef = new DefaultTransactionDefinition();
threadDef.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
threadTransaction = transactionManager.getTransaction(threadDef);
for (T datum : data) {
function.apply(datum, mapper);
}
count.countDown();
LockSupport.park();
if (rollbackFlag.get()) {
transactionManager.rollback(threadTransaction);
} else {
transactionManager.commit(threadTransaction);
}
} catch (TransactionException e) {
log.error("异步批量插入异常", e);
count.countDown();
rollbackFlag.set(true);
if(threadTransaction != null) transactionManager.rollback(threadTransaction);
}
});
}
try {
count.await(30000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
rollbackFlag.set(true);
log.error("导入时候线程阻塞异常", e);
}
for (Thread thread : threadVector) {
LockSupport.unpark(thread);
}
if (rollbackFlag.get()) {
throw new ApiException("系统异常请联系管理员");
}
}
标签:回滚,java,import,util,数据表,threadTransaction,org,多线程,transactionManager
From: https://www.cnblogs.com/MC-Bonnie/p/18189368