事务扩展机制 TransactionSynchronization
在进行数据库操作的时候,如果需要多个操作要么一起成功,要么一起失败那么就需要使用事务操作了。使用 Spring 框架只需要在方法上添加 @Transactional
注解这个方法就具有事务特性了。而且 Spring 也事务操作给开发者提供了很方便的扩展。
1. TransactionSynchronizationManager
操作多个方法 Spring 是如何来进行事务处理的呢?Spring 对于事务的管理都是基于 TransactionSynchronizationManager
,下面我们就来简单的分析一下这个类。
TransactionSynchronizationManager.java
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<String>("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<Boolean>("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<Integer>("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<Boolean>("Actual transaction active");
这个对象里面通过 ThreadLocal 保存了线程需要状态以及资源对象
-
resources
:保存连接资源,因为一个方法里面可能包含两个事务(比如事务传播特性为:TransactionDefinition#PROPAGATION_REQUIRES_NEW
),所以就用 Map 来保存资源. -
synchronizations
在进行数据库操作的时候,如果需要多个操作要么一起成功,要么一起失败那么就需要使用事务操作了。使用 Spring 框架只需要在方法上添加@Transactional
注解这个方法就具有事务特性了。而且 Spring 也事务操作给开发者提供了很方便的扩展。:线程同步器,这个就是对 Spring 事务的扩展,通过TransactionSynchronizationManager#registerSynchronization
来注册,我们稍后来分析这个对象在进行数据库操作的时候,如果需要多个操作要么一起成功,要么一起失败那么就需要使用事务操作了。使用 Spring 框架只需要在方法上添加@Transactional
注解这个方法就具有事务特性了。而且 Spring 也事务操作给开发者提供了很方便的扩展 -
currentTransactionReadOnly
:用于保存当前事务是否只读 -
在进行数据库操作的时候,如果需要多个操作要么一起成功,要么一起失败那么就需要使用事务操作了。使用 Spring 框架只需要在方法上添加
@Transactional
注解这个方法就具有事务特性了。而且 Spring 也事务操作给开发者提供了很方便的扩展 -
currentTransactionName
:用于保存当前事务名称,默认为空 -
currentTransactionIsolationLevel
:用来保存当前事务的隔离级别 -
actualTransactionActive
:用于保存当前事务是否还是 Active 状态
下面我们来分析一下事务操作对于连接资源的处理,也就是事务处理中 TransactionSynchronizationManager
对于资源(resources 属性)的管理
关于 Spring 对于事务的处理– Spring Annotation Transaction:
2. Spring 事务处理
TransactionSynchronizationManager#bindResource
绑定连接资源到 TransactionSynchronizationManager
中的 resources 属性当中。下面是绑定时序图:
TransactionInterceptor
是 Spring 对于事务方法处理的代理入口,里面对 JDBC 事务的抽象:
// 获取连接
Connection conn = DataSource.getConnection();
// 设置自动提交 false
conn..setAutoCommit(false);
try {
// 业务操作
doSomething();
} catch (Exception e) {
// 回滚事务
conn.rollback();
}
// 提交事务
conn.commit();
TransactionAspectSupport#invokeWithinTransaction
TransactionAspectSupport#invokeWithinTransaction
是 Spring 对处理的处理。下面我们来大概分析一下它的处理过程:
上面的代码逻辑如下:
-
TransactionAttributeSource#getTransactionAttribute
获取事务相关的信息(TransactionAttribute
),以注解型事务为例,看方法获取类上有没有标注@Transactional
注解。 -
获取到 Spring 容器中配置的事务管理器 (
PlatformTransactionManager
),后面就是真正的事务处理 -
创建事务信息(
TransactionInfo
),里面包含事务管理器(PlatformTransactionManager
) 以及事务相关信息(TransactionAttribute
) -
后面就是 Spring 对于事务的抽象操作,包含
设置自动提交 false
、业务操作
、异常回滚事务
和正常就提交事务
我们回到正题, Spring 通过创建事务信息(TransactionInfo
),把数据库连接通过 TransactionSynchronizationManager#bindResource
绑定到 ThreadLocal
变量当中。然后标注到一个事务当中的其它数据库操作就可以通过TransactionSynchronizationManager#getResource
获取到这个连接。
数据库的事务是基于连接的,Spring 对于多个数据库操作的事务实现是基于 ThreadLocal。所以在事务操作当中不能使用多线程
3. Spring 事务的扩展 – TransactionSynchronization
在上面的 TransactionSynchronizationManager
类中我们知道,事务操作的时候它的当前线程还保存了 TransactionSynchronization
对象。而这个对象伴随着 Spring 对 事务处理的各个生命周期都会有相应的扩展。
TransactionSynchronization.java.
public interface TransactionSynchronization extends Flushable {
/** 事务提交状态 */
int STATUS_COMMITTED = 0;
/** 事务回滚状态 */
int STATUS_ROLLED_BACK = 1;
/**系统异常状态 */
int STATUS_UNKNOWN = 2;
void suspend();
void resume();
void flush();
// 事务提交之前
void beforeCommit(boolean readOnly);
// 事务成功或者事务回滚之前
void beforeCompletion();
// 事务成功提交之后
void afterCommit();
// 操作完成之后(包含事务成功或者事务回滚)
void afterCompletion(int status);
}
事务的事务扩展项目中的应用场景是当订单成功之后,发送一条消息到 MQ 当中去。由于事务是和数据库连接相绑定的,如果把发送消息和数据库操作放在一个事务里面。当发送消息时间过长时会占用数据库连接,所以就要把数据库操作与发送消息到 MQ 解耦开来。可以利用 TransactionSynchronization#afterCommit
的这个方法,当数据成功保存到数据库并且事务提交了就把消息发送到 MQ 里面。
@Transactional
public void finishOrder(Order order){
// 修改订单成功
updateOrderSuccess(order);
// 发送消息到 MQ
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter(){
@Override
public void afterCommit() {
mqService.send(order);
}
});
}
当事务成功提交之后,就会把消息发送给 MQ,并且不会占用数据库连接资源。
4. Spring 事务扩展 – @TransactionalEventListener
在 Spring framework 4.2 之后还可以使用@TransactionalEventListener
处理数据库事务提交成功后再执行操作。这种方式比 TransactionSynchronization
更加优雅。它的使用方式如下:
@Transactional
public void finishOrder(Order order){
// 修改订单成功
updateOrderSuccess(order);
// 发布 Spring Event 事件
applicationEventPublisher.publishEvent(new MyAfterTransactionEvent(order));
}
@Slf4j
@Component
private static class MyTransactionListener {
@Autowired
private MqService mqService;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
private void onHelloEvent(MyAfterTransactionEvent event) {
Order order = event.getOrder();
mqService.send(order);
}
}
// 定一个事件,继承自ApplicationEvent
private static class MyAfterTransactionEvent extends ApplicationEvent {
private Order order;
public MyAfterTransactionEvent(Object source, Order order) {
super(source);
this.order = order;
}
public Order getOrder() {
return order;
}
}
它的实现原理是当 Spring Bean 的方法标注了通过 TransactionalEventListenerFactory#createApplicationListener
创建 ApplicationListenerMethodTransactionalAdapter
然后在事件回调当中创建 TransactionSynchronization
的实现类TransactionSynchronizationEventAdapter
。并且通过 TransactionSynchronizationManager.registerSynchronization
把 TransactionSynchronizationEventAdapter
注册到当前线程当中。
TransactionSynchronizationEventAdapter
private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {
private final ApplicationListenerMethodAdapter listener;
private final ApplicationEvent event;
private final TransactionPhase phase;
public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
ApplicationEvent event, TransactionPhase phase) {
this.listener = listener;
this.event = event;
this.phase = phase;
}
@Override
public int getOrder() {
return this.listener.getOrder();
}
@Override
public void beforeCommit(boolean readOnly) {
if (this.phase == TransactionPhase.BEFORE_COMMIT) {
processEvent();
}
}
@Override
public void afterCompletion(int status) {
if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEvent();
}
else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEvent();
}
else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
processEvent();
}
}
protected void processEvent() {
this.listener.processEvent(this.event);
}
}
上面就是使用@TransactionalEventListener
处理数据库事务提交成功后再执行操作的原理。