Spring Data JPA系列
2、Spring Data JPA Criteria查询、部分字段查询
3、Spring Data JPA数据批量插入、批量更新真的用对了吗
4、Spring Data JPA的一对一、LazyInitializationException异常、一对多、多对多操作
5、Spring Data JPA自定义Id生成策略、复合主键配置、Auditing使用
6、【源码】Spring Data JPA原理解析之Repository的自动注入(一)
7、【源码】Spring Data JPA原理解析之Repository的自动注入(二)
8、【源码】Spring Data JPA原理解析之Repository执行过程及SimpleJpaRepository源码
9、【源码】Spring Data JPA原理解析之Repository自定义方法命名规则执行原理(一)
10、【源码】Spring Data JPA原理解析之Repository自定义方法命名规则执行原理(二)
11、【源码】Spring Data JPA原理解析之Repository自定义方法添加@Query注解的执行原理
13、【源码】Spring Data JPA原理解析之事务注册原理
14、【源码】Spring Data JPA原理解析之事务执行原理
前言
前两篇博文【源码】SpringBoot事务注册原理-CSDN博客和【源码】Spring Data JPA原理解析之事务注册原理-CSDN博客从源码的角度分别分析了Spring容器中的bean以及JPA中的Repository类的事务注册的原理。它们都是在方法中添加@Transactional注解,最后生成代理类,在代理类中添加TransactionInterceptor拦截器,从而实现了事务的管理。
限于篇幅,上面两篇博文只讲解了添加了TransactionInterceptor拦截器,本篇继续从源码的角度,分析一下TransactionInterceptor拦截器的执行过程。
ReflectiveMethodInvocation回顾
Spring中的动态代理对象方法调用的时候,会先执行ReflectiveMethodInvocation.proceed()方法,循环遍历所有的拦截器。执行完所有拦截器之后,再执行动态代理对象的target类的对应方法,即原方法。详见:
【源码】Spring Data JPA原理解析之Repository执行过程及SimpleJpaRepository源码-CSDN博客
博客中的动态代理方法拦截部分。
对于方法中添加@Transactional注解的代理类,在执行方法之前,会先执行拦截器的invoke()方法,也就包含了TransactionInterceptor拦截器。
TransactionInterceptor
TransactionInterceptor的invoke()方法代码如下:
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取代理类的代理目标对象
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 在事务内调用
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}
在invoke()方法中,先获取了代理类的目标对象,然后调用父类的invokeWithinTransaction()方法,执行事务处理。
TransactionAspectSupport
TransactionAspectSupport的核心代码如下:
package org.springframework.transaction.interceptor;
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
private static final Object DEFAULT_TRANSACTION_MANAGER_KEY = new Object();
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
new NamedThreadLocal<>("Current aspect-driven transaction");
@Nullable
protected static TransactionInfo currentTransactionInfo() throws NoTransactionException {
return transactionInfoHolder.get();
}
/**
* 静态方法,在代码中,可以通过该方法,修改事务状态,实现手动回滚等
*/
public static TransactionStatus currentTransactionStatus() throws NoTransactionException {
TransactionInfo info = currentTransactionInfo();
if (info == null || info.transactionStatus == null) {
throw new NoTransactionException("No transaction aspect-managed TransactionStatus in scope");
}
return info.transactionStatus;
}
protected final Log logger = LogFactory.getLog(getClass());
@Nullable
private String transactionManagerBeanName;
@Nullable
private PlatformTransactionManager transactionManager;
// AnnotationTransactionAttributeSource对象
@Nullable
private TransactionAttributeSource transactionAttributeSource;
@Nullable
private BeanFactory beanFactory;
// 省略其他
/**
* 执行事务
*/
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
// 如果事务属性资源为null,则方法没有事务
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取@Transactional注解的事务属性信息
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器,如果是Jpa,则返回JpaTransactionManager;如果是普通jdbc,则返回JdbcTransactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 获取方法限定名。格式:类名.方法名
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行原方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 抛异常后完成事务动作,只有满足回滚规则,才会回滚,否则即使抛异常,依然提交
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除本地线程中保存的当前事务的TransactionInfo信息
cleanupTransactionInfo(txInfo);
}
// 执行事务提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}
protected void clearTransactionManagerCache() {
this.transactionManagerCache.clear();
this.beanFactory = null;
}
/**
* 确定用于给定事务的特定事务管理器
*/
@Nullable
protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// Do not attempt to lookup tx manager if no tx attributes are set
if (txAttr == null || this.beanFactory == null) {
return getTransactionManager();
}
// 返回与此事务属性关联的限定符值
String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) {
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
}
else if (StringUtils.hasText(this.transactionManagerBeanName)) {
// transactionManagerBeanName默认为transactionManager
// 如果是Jpa,则返回JpaTransactionManager;如果是普通jdbc,则返回JdbcTransactionManager
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
}
else {
PlatformTransactionManager defaultTransactionManager = getTransactionManager();
if (defaultTransactionManager == null) {
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
if (defaultTransactionManager == null) {
defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
this.transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}
/**
* 从Spring IOC容器中查找qualifier或PlatformTransactionManager类型的bean。加入缓存
*/
private PlatformTransactionManager determineQualifiedTransactionManager(BeanFactory beanFactory, String qualifier) {
PlatformTransactionManager txManager = this.transactionManagerCache.get(qualifier);
if (txManager == null) {
// 从Spring IOC容器中查找qualifier或PlatformTransactionManager类型的bean
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
beanFactory, PlatformTransactionManager.class, qualifier);
this.transactionManagerCache.putIfAbsent(qualifier, txManager);
}
return txManager;
}
/**
* 获取方法限定名。格式:类名.方法名
*/
private String methodIdentification(Method method, @Nullable Class<?> targetClass,
@Nullable TransactionAttribute txAttr) {
String methodIdentification = methodIdentification(method, targetClass);
if (methodIdentification == null) {
if (txAttr instanceof DefaultTransactionAttribute) {
methodIdentification = ((DefaultTransactionAttribute) txAttr).getDescriptor();
}
if (methodIdentification == null) {
methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
}
}
return methodIdentification;
}
@Nullable
protected String methodIdentification(Method method, @Nullable Class<?> targetClass) {
return null;
}
/**
* 如有必要,根据给定的TransactionAttribute创建事务
*/
@SuppressWarnings("serial")
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
// 如果未指定名称,则将方法标识应用为事务名称
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 准备TransactionInfo对象,并返回
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
/**
* 准备TransactionInfo对象
*/
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
// 创建TransactionInfo
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.newTransactionStatus(status);
}
else {
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
// 将当前的TransactionAspectSupport绑定到线程本地变量
txInfo.bindToThread();
return txInfo;
}
/**
* 执行事务提交
*/
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
/**
* 执行回滚
*/
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 如果满足回顾规则
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 进行事务回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
// 如果TransactionStatus.isRollbackOnly()为true,则仍将回滚
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
if (txInfo != null) {
txInfo.restoreThreadLocalStatus();
}
}
/**
* 保存事务相关的信息类
*/
protected final class TransactionInfo {
@Nullable
private final PlatformTransactionManager transactionManager;
@Nullable
private final TransactionAttribute transactionAttribute;
private final String joinpointIdentification;
@Nullable
private TransactionStatus transactionStatus;
@Nullable
private TransactionInfo oldTransactionInfo;
public TransactionInfo(@Nullable PlatformTransactionManager transactionManager,
@Nullable TransactionAttribute transactionAttribute, String joinpointIdentification) {
this.transactionManager = transactionManager;
this.transactionAttribute = transactionAttribute;
this.joinpointIdentification = joinpointIdentification;
}
public PlatformTransactionManager getTransactionManager() {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
return this.transactionManager;
}
@Nullable
public TransactionAttribute getTransactionAttribute() {
return this.transactionAttribute;
}
public String getJoinpointIdentification() {
return this.joinpointIdentification;
}
public void newTransactionStatus(@Nullable TransactionStatus status) {
this.transactionStatus = status;
}
@Nullable
public TransactionStatus getTransactionStatus() {
return this.transactionStatus;
}
public boolean hasTransaction() {
return (this.transactionStatus != null);
}
/**
* 将当前的事务的TransactionInfo信息保存在本地线程变量中
*/
private void bindToThread() {
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}
/**
* 清除本地线程中保存的当前事务的TransactionInfo信息
*/
private void restoreThreadLocalStatus() {
transactionInfoHolder.set(this.oldTransactionInfo);
}
@Override
public String toString() {
return (this.transactionAttribute != null ? this.transactionAttribute.toString() : "No transaction");
}
}
@FunctionalInterface
protected interface InvocationCallback {
@Nullable
Object proceedWithInvocation() throws Throwable;
}
}
4.1 invokeWithinTransaction()
在invokeWithinTransaction()方法,主要执行如下:
4.1.1)执行getTransactionAttributeSource(),获取TransactionAttributeSource对象;
如果是Repository类,则TransactionAttributeSource为AnnotationTransactionAttributeSource的子类RepositoryAnnotationTransactionAttributeSource。Controller或Service等类中,为AnnotationTransactionAttributeSource对象。
4.1.2)执行TransactionAttributeSource的getTransactionAttribute()方法,获取TransactionAttribute对象;
在TransactionAttributeSource的getTransactionAttribute()方法中,会通过事务注解解析器,解析方法添加的org.springframework.transaction.annotation包或javax.transaction包下的@Transactional注解的信息,返回TransactionAttribute对象。如果没有添加@Transactional注解,则返回null。详见:【源码】SpringBoot事务注册原理-CSDN博客
4.1.3)执行determineTransactionManager(),从Spring IOC容器中查找名称为qualifier或PlatformTransactionManager类型的bean对象。对于JPA,此处默认返回JpaTransactionManager;
qualifier为@Transactional注解中的value属性。当一个项目中有多个数据源的时候,需要手动指定一个数据源。
4.1.4)执行methodIdentification(),获取方法限定名。默认格式:类名.方法名;
4.1.5)代码会执行第一个if分支,执行createTransactionIfNecessary()方法,根据给定的TransactionAttribute创建事务信息;
4.1.6)调用invocation.proceedWithInvocation(),执行ReflectiveMethodInvocation.proceed()方法,直动执行原方法,获取返回值;
4.1.7)如果4.1.6)抛异常,则执行completeTransactionAfterThrowing(),抛异常后完成事务动作,只有满足回滚规则,才会回滚,否则即使抛异常,依然提交。抛出异常,结束方法。
4.1.8)执行cleanupTransactionInfo(),清除本地线程中保存的当前事务的TransactionInfo信息。即使抛4.1.6)抛异常了,也会执行;
4.1.9)如果4.1.6)正常执行,执行commitTransactionAfterReturning(),提交事务;
4.2 createTransactionIfNecessary()
createTransactionIfNecessary()方法执行如下:
4.2.1)如果未指定名称,则将方法标识应用为事务名称;
4.2.2)执行PlatformTransactionManager.getTransaction()方法,根据事务的传播行为,返回一个TransactionStatus对象;
4.2.3)执行prepareTransactionInfo(),创建一个TransactionInfo对象,并将当前的TransactionAspectSupport绑定到线程本地变量;
4.3 completeTransactionAfterThrowing()
completeTransactionAfterThrowing()方法执行如下:
4.3.1)调用TransactionInfo.transactionAttribute.rollbackOn(ex),判断异常是否满足回滚规则。只有满足规则的,才会执行rollback()进行回滚;
默认的回滚规则实现在DefaultTransactionAttribute类中,异常必须是RuntimeException或Error才能回滚。
也可以通过@Transactional注解的rollbackFor、rollbackForClassName、noRollbackFor和noRollbackForClassName来设置回滚规则。
4.3.2)如果不满足回滚规则,则仍然执行commit(),进行事务提交;
AbstractPlatformTransactionManager
AbstractPlatformTransactionManager的核心代码如下:
package org.springframework.transaction.support;
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
/**
* 获取一个TransactionStatus对象,此实现处理传播行为。
*/
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 抽象方法。如JpaTransactionManager.doGetTransaction(),创建一个JpaTransactionObject对象
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
// 如果事务存在,则检测传播行为并返回
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// 找到现有事务->检查传播行为以了解行为方式
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
// 检查事务属性中的超时属性,默认为-1
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
// 如果事务的传播型为为PROPAGATION_MANDATORY,则抛异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 挂起事务,此处返回的suspendedResources为null
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
// 初始getTransactionSynchronization()为0,需要激活事务同步
// SYNCHRONIZATION_NEVER为2,表示不激活事务同步
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建一个DefaultTransactionStatus对象
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开始事务
doBegin(transaction, definition);
// 准备同步,将事务相关信息保存到本地线程变量
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// 传播行为为PROPAGATION_SUPPORTS PROPAGATION_NOT_SUPPORTED PROPAGATION_NEVER
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
protected final DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
prepareSynchronization(status, definition);
return status;
}
/**
* 创建一个DefaultTransactionStatus对象
*/
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// 初始actualNewSynchronization为true
boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}
/**
* 将事务相关信息保存到本地线程变量
*/
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
// 事务名称,默认为类名.方法名
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果本地代码设置了回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
// 全局事务被标记为仅回滚,但事务代码请求提交
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
// 提交处理
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 为提交做准备,在beforeCommit同步回调发生之前执行。空方法,可以扩展。在该方法中,如果抛异常,依然会导致回滚
prepareForCommit(status);
// 执行beforeCommit()和beforeCompletion()触发器
triggerBeforeCommit(status);
// 主要用于资源的回收,如本地线程变量回收等
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 有保存点,释放保存点信息
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
// 提交事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
// 事务以静默方式回滚,因为它已标记为仅回滚
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 回滚处理
processRollback(defStatus, false);
}
/**
* 处理实际回滚
*/
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
// 执行beforeCompletion()触发器,回收资源等
triggerBeforeCompletion(status);
// 如果有保存点,则回滚到保存点。通过调用Connection.rollback(savepoint),回滚到保存点
// 【JdbcTransactionObjectSupport.rollbackToSavepoint()】
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 执行回滚
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}
private void doRollbackOnCommitException(DefaultTransactionStatus status, Throwable ex) throws TransactionException {
try {
if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback after commit exception", ex);
}
doRollback(status);
}
else if (status.hasTransaction() && isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Marking existing transaction as rollback-only after commit exception", ex);
}
doSetRollbackOnly(status);
}
}
catch (RuntimeException | Error rbex) {
logger.error("Commit exception overridden by rollback exception", ex);
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw rbex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
protected abstract void doBegin(Object transaction, TransactionDefinition definition)
throws TransactionException;
// 省略其他,其中的触发器大部分是空方法,用于扩展
}
该类的核心方法为getTransaction(),根据事务的传播行为,获取TransactionStatus对象。传播行为可以通过@Transactional的propagation属性进行设置。其中doBegin()在JPA中的实现为JpaTransactionManager。
JpaTransactionManager
JpaTransactionManager的核心源码如下:
package org.springframework.orm.jpa;
public class JpaTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, BeanFactoryAware, InitializingBean {
@Nullable
private EntityManagerFactory entityManagerFactory;
@Nullable
private String persistenceUnitName;
private final Map<String, Object> jpaPropertyMap = new HashMap<>();
@Nullable
private DataSource dataSource;
private JpaDialect jpaDialect = new DefaultJpaDialect();
// 省略其他
/**
* 获取datasource,如果使用第三方的数据源,则返回对应的数据源,如DruidDataSource
* @return
*/
@Nullable
public DataSource getDataSource() {
return this.dataSource;
}
@Override
public void afterPropertiesSet() {
if (getEntityManagerFactory() == null) {
throw new IllegalArgumentException("'entityManagerFactory' or 'persistenceUnitName' is required");
}
if (getEntityManagerFactory() instanceof EntityManagerFactoryInfo) {
EntityManagerFactoryInfo emfInfo = (EntityManagerFactoryInfo) getEntityManagerFactory();
DataSource dataSource = emfInfo.getDataSource();
if (dataSource != null) {
setDataSource(dataSource);
}
JpaDialect jpaDialect = emfInfo.getJpaDialect();
if (jpaDialect != null) {
setJpaDialect(jpaDialect);
}
}
}
@Override
public Object getResourceFactory() {
return obtainEntityManagerFactory();
}
/**
* 创建一个JpaTransactionObject对象,并对象的EntityManagerHolder和ConnectionHolder
*/
@Override
protected Object doGetTransaction() {
// JPA事务对象,表示EntityManagerHolder。被JpaTransactionManager用作事务对象
JpaTransactionObject txObject = new JpaTransactionObject();
// 设置是否允许保存点
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 从线程本地变量获取EntityManagerHolder。EntityManagerFactory和EntityManagerHolder绑定
EntityManagerHolder emHolder = (EntityManagerHolder)
TransactionSynchronizationManager.getResource(obtainEntityManagerFactory());
if (emHolder != null) {
if (logger.isDebugEnabled()) {
logger.debug("Found thread-bound EntityManager [" + emHolder.getEntityManager() +
"] for JPA transaction");
}
txObject.setEntityManagerHolder(emHolder, false);
}
// 获取数据库资源
if (getDataSource() != null) {
// 从线程本地变量获取EntityManagerHolder
ConnectionHolder conHolder = (ConnectionHolder)
TransactionSynchronizationManager.getResource(getDataSource());
txObject.setConnectionHolder(conHolder);
}
return txObject;
}
@Override
protected boolean isExistingTransaction(Object transaction) {
return ((JpaTransactionObject) transaction).hasTransaction();
}
/**
*
* @param transaction 通过doGetTransaction()创建的JpaTransactionObject对象
* @param definition TransactionAspectSupport对象
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
JpaTransactionObject txObject = (JpaTransactionObject) transaction;
if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
throw new IllegalTransactionStateException(
"Pre-bound JDBC Connection found! JpaTransactionManager does not support " +
"running within DataSourceTransactionManager if told to manage the DataSource itself. " +
"It is recommended to use a single JpaTransactionManager for all transactions " +
"on a single DataSource, no matter whether JPA or JDBC access.");
}
try {
if (!txObject.hasEntityManagerHolder() ||
txObject.getEntityManagerHolder().isSynchronizedWithTransaction()) {
// 新创建一个EntityManager,返回SessionImpl对象
EntityManager newEm = createEntityManagerForTransaction();
if (logger.isDebugEnabled()) {
logger.debug("Opened new EntityManager [" + newEm + "] for JPA transaction");
}
txObject.setEntityManagerHolder(new EntityManagerHolder(newEm), true);
}
// 获取SessionImpl对象
EntityManager em = txObject.getEntityManagerHolder().getEntityManager();
// Delegate to JpaDialect for actual transaction begin.
final int timeoutToUse = determineTimeout(definition);
// 在HibernateJpaDialect中,返回一个SessionTransactionData对象
Object transactionData = getJpaDialect().beginTransaction(em,
new DelegatingTransactionDefinition(definition) {
@Override
public int getTimeout() {
return timeoutToUse;
}
});
txObject.setTransactionData(transactionData);
// Register transaction timeout.
if (timeoutToUse != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getEntityManagerHolder().setTimeoutInSeconds(timeoutToUse);
}
// Register the JPA EntityManager's JDBC Connection for the DataSource, if set.
if (getDataSource() != null) {
// 在HibernateJpaDialect中,返回HibernateConnectionHandle对象
ConnectionHandle conHandle = getJpaDialect().getJdbcConnection(em, definition.isReadOnly());
if (conHandle != null) {
// 创建一个连接持有者
ConnectionHolder conHolder = new ConnectionHolder(conHandle);
if (timeoutToUse != TransactionDefinition.TIMEOUT_DEFAULT) {
conHolder.setTimeoutInSeconds(timeoutToUse);
}
if (logger.isDebugEnabled()) {
logger.debug("Exposing JPA transaction as JDBC [" + conHandle + "]");
}
// 资源添加到本地线程遍变量
TransactionSynchronizationManager.bindResource(getDataSource(), conHolder);
txObject.setConnectionHolder(conHolder);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Not exposing JPA transaction [" + em + "] as JDBC transaction because " +
"JpaDialect [" + getJpaDialect() + "] does not support JDBC Connection retrieval");
}
}
}
// Bind the entity manager holder to the thread.
// 将EntityManagerHolder保存到线程本地变量
if (txObject.isNewEntityManagerHolder()) {
TransactionSynchronizationManager.bindResource(
obtainEntityManagerFactory(), txObject.getEntityManagerHolder());
}
// 设置事务同步标记
txObject.getEntityManagerHolder().setSynchronizedWithTransaction(true);
}
catch (TransactionException ex) {
closeEntityManagerAfterFailedBegin(txObject);
throw ex;
}
catch (Throwable ex) {
closeEntityManagerAfterFailedBegin(txObject);
throw new CannotCreateTransactionException("Could not open JPA EntityManager for transaction", ex);
}
}
/**
* 返回SessionImpl对象
*/
protected EntityManager createEntityManagerForTransaction() {
EntityManagerFactory emf = obtainEntityManagerFactory();
if (emf instanceof EntityManagerFactoryInfo) {
// emf为代理对象,执行如下方法时,会先执行
// AbstractEntityManagerFactoryBean.ManagedEntityManagerFactoryInvocationHandler.invoke()方法,
// 然后执行AbstractEntityManagerFactoryBean.getNativeEntityManagerFactory()方法
emf = ((EntityManagerFactoryInfo) emf).getNativeEntityManagerFactory();
}
Map<String, Object> properties = getJpaPropertyMap();
// EntityManagerFactory.createEntityManager() -> SessionFactoryImpl.createEntityManager()。返回一个SessionImpl
return (!CollectionUtils.isEmpty(properties) ?
emf.createEntityManager(properties) : emf.createEntityManager());
}
@Override
protected void doCommit(DefaultTransactionStatus status) {
JpaTransactionObject txObject = (JpaTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Committing JPA transaction on EntityManager [" +
txObject.getEntityManagerHolder().getEntityManager() + "]");
}
try {
// 获取当前事务的EntityTransaction对象
EntityTransaction tx = txObject.getEntityManagerHolder().getEntityManager().getTransaction();
// 执行提交
tx.commit();
}
catch (RollbackException ex) {
if (ex.getCause() instanceof RuntimeException) {
DataAccessException dae = getJpaDialect().translateExceptionIfPossible((RuntimeException) ex.getCause());
if (dae != null) {
throw dae;
}
}
throw new TransactionSystemException("Could not commit JPA transaction", ex);
}
catch (RuntimeException ex) {
// Assumably failed to flush changes to database.
throw DataAccessUtils.translateIfNecessary(ex, getJpaDialect());
}
}
/**
* 执行回滚
*/
@Override
protected void doRollback(DefaultTransactionStatus status) {
JpaTransactionObject txObject = (JpaTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Rolling back JPA transaction on EntityManager [" +
txObject.getEntityManagerHolder().getEntityManager() + "]");
}
try {
// 获取事务对象
EntityTransaction tx = txObject.getEntityManagerHolder().getEntityManager().getTransaction();
// 如果当前事务是激活的
if (tx.isActive()) {
// 回滚当前事务,放弃此事务中发生的任何更改。如事务中打开的查询将在提交或回滚时关闭(如果尚未关闭)。
tx.rollback();
}
}
catch (PersistenceException ex) {
throw new TransactionSystemException("Could not roll back JPA transaction", ex);
}
finally {
if (!txObject.isNewEntityManagerHolder()) {
// 清除EntityManager中所有挂起的操作
txObject.getEntityManagerHolder().getEntityManager().clear();
}
}
}
/**
* JPA事务对象,表示EntityManagerHolder。被JpaTransactionManager用作事务对象。
*/
private class JpaTransactionObject extends JdbcTransactionObjectSupport {
@Nullable
private EntityManagerHolder entityManagerHolder;
private boolean newEntityManagerHolder;
@Nullable
private Object transactionData;
public void setEntityManagerHolder(
@Nullable EntityManagerHolder entityManagerHolder, boolean newEntityManagerHolder) {
this.entityManagerHolder = entityManagerHolder;
this.newEntityManagerHolder = newEntityManagerHolder;
}
public EntityManagerHolder getEntityManagerHolder() {
Assert.state(this.entityManagerHolder != null, "No EntityManagerHolder available");
return this.entityManagerHolder;
}
public boolean hasEntityManagerHolder() {
return (this.entityManagerHolder != null);
}
public boolean isNewEntityManagerHolder() {
return this.newEntityManagerHolder;
}
public boolean hasTransaction() {
return (this.entityManagerHolder != null && this.entityManagerHolder.isTransactionActive());
}
public void setTransactionData(@Nullable Object transactionData) {
this.transactionData = transactionData;
getEntityManagerHolder().setTransactionActive(true);
if (transactionData instanceof SavepointManager) {
getEntityManagerHolder().setSavepointManager((SavepointManager) transactionData);
}
}
@Nullable
public Object getTransactionData() {
return this.transactionData;
}
public void setRollbackOnly() {
EntityTransaction tx = getEntityManagerHolder().getEntityManager().getTransaction();
if (tx.isActive()) {
tx.setRollbackOnly();
}
if (hasConnectionHolder()) {
getConnectionHolder().setRollbackOnly();
}
}
@Override
public boolean isRollbackOnly() {
EntityTransaction tx = getEntityManagerHolder().getEntityManager().getTransaction();
return tx.getRollbackOnly();
}
@Override
public void flush() {
try {
getEntityManagerHolder().getEntityManager().flush();
}
catch (RuntimeException ex) {
throw DataAccessUtils.translateIfNecessary(ex, getJpaDialect());
}
}
@Override
public Object createSavepoint() throws TransactionException {
if (getEntityManagerHolder().isRollbackOnly()) {
throw new CannotCreateTransactionException(
"Cannot create savepoint for transaction which is already marked as rollback-only");
}
return getSavepointManager().createSavepoint();
}
@Override
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
getSavepointManager().rollbackToSavepoint(savepoint);
getEntityManagerHolder().resetRollbackOnly();
}
@Override
public void releaseSavepoint(Object savepoint) throws TransactionException {
getSavepointManager().releaseSavepoint(savepoint);
}
private SavepointManager getSavepointManager() {
if (!isSavepointAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions");
}
SavepointManager savepointManager = getEntityManagerHolder().getSavepointManager();
if (savepointManager == null) {
throw new NestedTransactionNotSupportedException(
"JpaDialect does not support savepoints - check your JPA provider's capabilities");
}
return savepointManager;
}
}
}
在JpaTransactionManager类中,通过Connection执行真正的数据库相关操作,实现事务的开启、事务提交以及事务回滚。
小结
限于篇幅,本篇先分享到这里。以下做一个小结:
1)bean中的方法添加@Transactional注解,会生成一个代理类,且添加TransactionInterceptor拦截器;
2)当方法调用时,执行TransactionInterceptor.invoke()方法,该方法调用父类TransactionAspectSupport.invokeWithinTransaction()方法;
2.1)解析原方法的@Transactional注解信息,封装为TransactionAttribute对象;
2.2)从Spring容器中获取JpaTransactionManager对象;
2.3)开启事务;
2.4)在try中执行ReflectiveMethodInvocation.proceed()方法,直动执行原方法,获取返回值;
2.5)在catch中捕获异常,如果出现异常,执行completeTransactionAfterThrowing(),对满足回滚规则的,执行回滚;如果不满足回滚规则,依然提交事务,并抛出异常,结束方法;
2.6)如果没有异常,提交事务;
关于本篇内容你有什么自己的想法或独到见解,欢迎在评论区一起交流探讨下吧。
标签:status,transaction,return,Nullable,JPA,源码,ex,原理,null From: https://blog.csdn.net/JingAi_jia917/article/details/139526934