1. 数据库事务

1.1 数据库事务实现原理

Spring提供了灵活方便的事务管理功能,但这些功能都是基于底层数据库本身的事务处理机制工作的。要想深入了解Spring的事务管理和配置,有必要先学习数据库事务的基础知识。有需要可以阅读以下文章
Intro to 事务
Intro to InnoDB事务
InnoDB事务-原子性的实现, undo log
InnoDB事务-隔离性的实现, MVCC & 锁
InnoDB事务-持久性的实现, binglog & redo log&undo log

1.2 JDBC 事务

在介绍Spring 事务实现原理之前,先来看一下使用JDBC如何实现和数据库交互的事务功能。这可以帮助理解Spring事务的工作原理和优势。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

// JDBC事务代码
public class JDBCTransactionExample {
public static void main(String[] args) {
String url = "jdbc:mysql://localhost:3306/mydatabase";
String user = "username";
String password = "password";

Connection conn = null;
Statement stmt = null;
try {
// 1. 获取数据库连接
conn = DriverManager.getConnection(url, user, password);
// 2. 关闭自动提交模式
conn.setAutoCommit(false);
// 设置事务隔离级别
conn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
// 3. 创建Statement对象
stmt = conn.createStatement();
// 4. 执行SQL操作
stmt.executeUpdate("INSERT INTO accounts (name, balance) VALUES ('Alice', 1000)");
stmt.executeUpdate("INSERT INTO accounts (name, balance) VALUES ('Bob', 1000)");

// 5. 提交事务
conn.commit();
System.out.println("Transaction committed successfully.");

} catch (SQLException e) {
// 6. 发生异常时回滚事务
if (conn != null) {
try {
conn.rollback();
System.out.println("Transaction rolled back.");
} catch (SQLException ex) {
ex.printStackTrace();
}
}
e.printStackTrace();
} finally {
// 7. 关闭资源
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}

通过了解JDBC事务处理的基本步骤,可以更好地理解Spring提供的事务管理功能如何简化和增强这些操作。

Spring事务管理通过编程式事务 和 声明式事务管理,极大地简化了事务管理的复杂性,并提供了更强大的功能,例如事务传播行为、隔离级别设置等。

2. Spring 事务抽象层

2.1 TransactionDefinition

TransactionDefinition 是一个接口,定义了事务的相关属性,如隔离级别、传播行为、超时时间和只读标志等。这些属性可以通过 XML 配置或注解进行声明配置。

  • 隔离级别(Isolation Level):定义事务在多个事务同时访问数据库时的隔离程度。
  • 传播行为(Propagation Behavior):定义方法如何参与现有事务。
  • 超时时间(Timeout):定义事务必须在多长时间内完成。
  • 只读(Read-Only):标志事务是否只执行读操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    public interface TransactionDefinition {  
    // 事务传播行为
    int PROPAGATION_REQUIRED = 0;
    int PROPAGATION_SUPPORTS = 1;
    int PROPAGATION_MANDATORY = 2;
    int PROPAGATION_REQUIRES_NEW = 3;
    int PROPAGATION_NOT_SUPPORTED = 4;
    int PROPAGATION_NEVER = 5;
    int PROPAGATION_NESTED = 6;

    // 事务隔离级别
    int ISOLATION_DEFAULT = -1;
    int ISOLATION_READ_UNCOMMITTED = 1;
    int ISOLATION_READ_COMMITTED = 2;
    int ISOLATION_REPEATABLE_READ = 4;
    int ISOLATION_SERIALIZABLE = 8;

    int TIMEOUT_DEFAULT = -1;

    default int getPropagationBehavior() {
    return PROPAGATION_REQUIRED;
    }

    default int getIsolationLevel() {
    return ISOLATION_DEFAULT;
    }

    default int getTimeout() {
    return TIMEOUT_DEFAULT;
    }

    default boolean isReadOnly() {
    return false;
    }

    @Nullable
    default String getName() {
    return null;
    }

    static TransactionDefinition withDefaults() {
    return StaticTransactionDefinition.INSTANCE;
    }
    }

2.2 TransactionStatus

TransactionStatus 是一个接口,表示事务的当前状态。它包含了事务的一些控制信息,如是否新建事务、是否只读、是否已完成等。

  • isNewTransaction():检查是否是一个新的事务
  • setRollbackOnly():将当前事务标记为仅回滚
  • isRollbackOnly():检查当前事务是否被标记为仅回滚
  • isCompleted():检查事务是否已完成(提交或回滚)
1
2
3
4
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
boolean hasSavepoint();
void flush();
}

2.3 PlatformTransactionManager

PlatformTransactionManager 定义了一组用于事务管理的标准方法,使得不同的事务管理器实现可以通过这个统一的接口进行交互。

PlatformTransactionManager 定义了三种基本的事务操作:

  1. getTransaction:获取一个事务
  2. commit:提交事务
  3. rollback:回滚事务
1
2
3
4
5
6
7
8
9
10
11
12
public interface PlatformTransactionManager extends TransactionManager {  

TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;

void commit(TransactionStatus status) throws TransactionException;

void rollback(TransactionStatus status) throws TransactionException;
}

public interface TransactionManager {
}

Spring 提供了多种 PlatformTransactionManager 的实现,适用于不同的持久化技术。无论使用的是哪种持久化技术,开发人员都可以通过统一的事务管理接口来处理事务。

不同持久化技术对应的事务管理器实现类

2.3.1 DataSourceTransactionManager

DataSourceTransactionManager 是针对 MyBatis 的事务管理器实现。它通过 SqlSessionFactory 与 MyBatis 会话交互,实现事务管理。

2.4 Spring 编程式事务-TransactionTemplate

Spring为编程式事务管理提供了模板类org.springframework.transaction.support.TransactionTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class TransactionTemplate extends DefaultTransactionDefinition
implements TransactionOperations, InitializingBean {
@Nullable
private PlatformTransactionManager transactionManager;
public TransactionTemplate(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public TransactionTemplate(PlatformTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
super(transactionDefinition);
this.transactionManager = transactionManager;
}
public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}

@Nullable
public PlatformTransactionManager getTransactionManager() {
return this.transactionManager;
}

@Override
@Nullable
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");

if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
}
else {
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
result = action.doInTransaction(status);
}
catch (RuntimeException | Error ex) {
// Transactional code threw application exception -> rollback
rollbackOnException(status, ex);
throw ex;
}
catch (Throwable ex) {
// Transactional code threw unexpected exception -> rollback
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
this.transactionManager.commit(status);
return result;
}
}

private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException {
}
}

TransactionTemplate 需要设置抽象层的PlatformTransactionManager TransactionDefinition
executeTransactionTemplate 中模版方法, 它是完成在事务中执行目标方法的核心逻辑

  1. 开启事务
  2. 在事务中执行目标方法
  3. 根据目标方法执行结果处理事务commit or rollback

execute 方法接受一个 TransactionCallback 作为参数,并在事务上下文中执行该回调。

TransactionCallback 是一个泛型函数式接口,用于定义在事务上下文中执行的目标方法。它有一个方法 doInTransaction,该方法包含具体的事务逻辑。

1
2
3
4
5
6
@FunctionalInterface  
public interface TransactionCallback<T> {

@Nullable
T doInTransaction(TransactionStatus status);
}

在实际使用时,可以使用匿名内部类或 Java 8 引入的 lambda 表达式进行传参。
示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class TransactionTemplateExample {

private final TransactionTemplate transactionTemplate;
private final MessageService messageService;

public TransactionTemplateExample(TransactionTemplate transactionTemplate, MessageService messageService) {
this.transactionTemplate = transactionTemplate;
this.messageService = messageService;
}
public void performTransaction() {
transactionTemplate.execute(new TransactionCallback<Void>() {
@Override
public Void doInTransaction(TransactionStatus status) {
try {
Message message = new Message();
message.setMessage("test message");
messageService.insertMessage(message);
messageService.insertMessage(message);
// 如果需要,可以在这里添加更多数据库操作

} catch (Exception e) {
status.setRollbackOnly();
throw e;
}
return null;
}
});
}
}

也可以使用lambda 表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
transactionTemplate.execute(status -> {
try {
Message message = new Message();
message.setMessage("test message");
messageService.insertMessage(message);
messageService.insertMessage(message);
// 如果需要,可以在这里添加更多数据库操作

} catch (Exception e) {
status.setRollbackOnly();
throw e;
}
return null;
});

2.5 声明式事务

可以看到,编程式事务实现方式中,事务管理代码对业务代码有明显入侵,
Spring支持通过声明式事务,使业务代码和事务管理代码完全解耦。

Spring 的声明式事务管理通过 Spring AOP 实现,使得业务代码和事务管理代码完全解耦。通过使用 @Transactional 注解和 AOP 代理机制,Spring 在方法调用前后自动织入事务管理逻辑,包括获取线程绑定资源、开始事务、提交/回滚事务、进行异常转换和处理等工作。这种方式极大地简化了事务管理,使代码更简洁、易读和易维护。

关于Spring AOP 的实现原理,可以阅读以下内容了解
Java 动态代理
Spring AOP 实践
Spring AOP XML配置方式原理详解
Spring AOP 注解方式原理详解

声明式事务有3种

  1. TransactionProxyFactoryBean XML 配置
  2. aop:config XML 配置
  3. @Transactional 注解配置

从循序渐进的学习角度来看,了解TransactionProxyFactoryBean有助于更直观地理解Spring声明式事务的内部工作原理。

总是来说,不论是哪种声明式 方式,都可以从以下3个角度考虑其实现原理

  1. 事务Advisor 是如何生成的(Advisor 包含 pointcut 和 advice 2个概念)
  2. 基于事务的动态代理对象是如何生成的
  3. 基于事务的动态代理对象是如何执行目标方法的

3. 声明式事务-TransactionProxyFactoryBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
  public class TransactionProxyFactoryBean extends AbstractSingletonProxyFactoryBean
implements BeanFactoryAware {
// 事务拦截器,用于拦截方法调用
private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor();

// Pointcut 定义了一个切入点,它决定了哪些方法调用应该被拦截,并应用事务管理。
@Nullable
private Pointcut pointcut;

// 设置事务管理器,
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionInterceptor.setTransactionManager(transactionManager);
}

// 设置事务属性。事务属性通过方法名和事务配置(如传播行为、隔离级别等)进行映射。
// 这个方法接受一个 Properties 对象,其中键是方法名,值是事务属性描述符。
public void setTransactionAttributes(Properties transactionAttributes) {
this.transactionInterceptor.setTransactionAttributes(transactionAttributes);
}


public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
this.transactionInterceptor.setTransactionAttributeSource(transactionAttributeSource);
}

public void setPointcut(Pointcut pointcut) {
this.pointcut = pointcut;
}

//设置 Bean 工厂。
// 如果在运行时没有显式设置事务管理器,这个方法会从 Bean 工厂中获取一个类型为 PlatformTransactionManager 的 bean 作为默认的事务管理器。
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.transactionInterceptor.setBeanFactory(beanFactory);
}


// 创建Advisor 将事务拦截器和切入点(如果有的话)组合起来。
// DefaultPointcutAdvisor 是一个将切入点和拦截器组合在一起的 Advisor。如果没有设置切入点,使用 TransactionAttributeSourceAdvisor,它会自动应用事务属性源。
@Override
protected Object createMainInterceptor() {
this.transactionInterceptor.afterPropertiesSet();
if (this.pointcut != null) {
return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor);
}
else {
// Rely on default pointcut.
return new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
}
}

@Override
protected void postProcessProxyFactory(ProxyFactory proxyFactory) {
proxyFactory.addInterface(TransactionalProxy.class);
}
}

下面将分析TransactionProxyFactoryBean 的继承实现图谱和重要变量

3.1 实现FactoryBean 接口

TransactionProxyFactoryBean是一个FactoryBean , 这意味着当调用它的getObject 方法时,可以获取到它创建的对象,在这里这个对象指的就是==基于事务的动态代理对象==

3.2 实现InitializingBean 接口

InitializingBean是 Spring 启动过程中的一个拓展点,用于bean实例化 的init 阶段,使得bean可以在属性注入完成后但在使用之前执行一些自定义的初始化工作。

1
2
3
public interface InitializingBean {  
void afterPropertiesSet() throws Exception;
}

关于Spring bean init 阶段的拓展点,更多内容可以点击这里阅读: Spring bean 实例化过程-initializebean

看下TransactionProxyFactoryBean 重写的InitializingBean.afterPropertiesSet 方法,简化了代码,只保留重点部分,可以看到主要分成3个步骤

  1. 准备代理工厂
  2. 创建Advisor
  3. 生成代理对象
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public abstract class AbstractSingletonProxyFactoryBean extends ProxyConfig  
    implements FactoryBean<Object>, BeanClassLoaderAware, InitializingBean {
    @Override
    public void afterPropertiesSet() {

    ProxyFactory proxyFactory = new ProxyFactory();
    proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(createMainInterceptor()));

    this.proxy = proxyFactory.getProxy(this.proxyClassLoader);
    }

    }
    也就是说,TransactionProxyFactoryBean 在自身实例化的init 阶段就把需要的生成的代理对象对象先生成了。

Spring AOP XML配置方式原理详解 一文中具体讲过ProxyFactoryBean 如何在重写的FactoryBean.getObject方法中生成代理对象。

所以TransactionProxyFactoryBeanProxyFactoryBean 这两个都是用于生成代理对象的FactoryBean ,实际生成代理对象的时机是不一样的。

观察下ProxyFactoryBean 的继承实现图谱,可以看到ProxyFactoryBean 并没有实现InitializingBean , 所以无法像TransactionProxyFactoryBean 一样在自身实例化的init 阶段就把代理对象生产出来

3.2.1 createMainInterceptor- 创建事务Advisor

createMainInterceptor 用于创建事务管理中的环绕增强 TransactionInterceptor, 关于TransactionInterceptor见下面3.2 TransactionInterceptor - 环绕增强 部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public abstract class AbstractSingletonProxyFactoryBean extends ProxyConfig  
implements FactoryBean<Object>, BeanClassLoaderAware, InitializingBean {
protected abstract Object createMainInterceptor();
}


public class TransactionProxyFactoryBean extends AbstractSingletonProxyFactoryBean
implements BeanFactoryAware {
@Override
protected Object createMainInterceptor() {
this.transactionInterceptor.afterPropertiesSet();
if (this.pointcut != null) {
return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor);
}
else {
// Rely on default pointcut.
return new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
}
}
}

createMainInterceptorAbstractSingletonProxyFactoryBean中的一个抽象方法,TransactionProxyFactoryBean 实现了它。

在不指定切点表达式 pointcut 的时候, 会使用TransactionAttributeSourceAdvisor

TransactionAttributeSourceAdvisor

TransactionAttributeSourceAdvisor 管理TransactionInterceptor这个环绕增强。通过它可以获取到 pointcut 和Advice

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class TransactionAttributeSourceAdvisor extends AbstractPointcutAdvisor {

@Nullable
private TransactionInterceptor transactionInterceptor;

private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
@Override
@Nullable
protected TransactionAttributeSource getTransactionAttributeSource() {
return (transactionInterceptor != null ? transactionInterceptor.getTransactionAttributeSource() : null);
}
};

public TransactionAttributeSourceAdvisor() {
}

public TransactionAttributeSourceAdvisor(TransactionInterceptor interceptor) {
setTransactionInterceptor(interceptor);
}

public void setTransactionInterceptor(TransactionInterceptor interceptor) {
this.transactionInterceptor = interceptor;
}

public void setClassFilter(ClassFilter classFilter) {
this.pointcut.setClassFilter(classFilter);
}


@Override
public Advice getAdvice() {
Assert.state(this.transactionInterceptor != null, "No TransactionInterceptor set");
return this.transactionInterceptor;
}

@Override
public Pointcut getPointcut() {
return this.pointcut;
}
}

3.2.2 proxyFactory.getProxy -创建动态代理对象

1
2
3
public Object getProxy(@Nullable ClassLoader classLoader) {  
return createAopProxy().getProxy(classLoader);
}

又来到熟悉的动态代理创建的流程

  1. 判断代理类型
  2. 根据代理类型创建代理对象
    具体创建流程可以参考文章 Spring AOP XML配置方式原理详解

下面来介绍一下TransactionProxyFactoryBean中的变量

3.3 TransactionInterceptor - 环绕增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {

public TransactionInterceptor() {
}

public TransactionInterceptor(TransactionManager ptm, TransactionAttributeSource tas) {
setTransactionManager(ptm);
setTransactionAttributeSource(tas);
}

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {

Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}

TransactionInterceptor 继承了org.aopalliance.intercept.MethodInterceptor

org.aopalliance.intercept.MethodInterceptor是一个环绕增强,表示在目标方法执行前后实施增强。符合事务要在方法执行前开启 和执行后的关闭或者异常处理的使用场景。

1
2
3
4
5
org.aopalliance.intercept
@FunctionalInterface
public interface MethodInterceptor extends Interceptor {
Object invoke(MethodInvocation invocation) throws Throwable;
}

TransactionInterceptor 重写的invoke 方法中, 通过调用父类TransactionAspectSupport中方法invokeWithinTransaction 完成在事务中执行目标方法这一功能。

invokeWithinTransaction 方法,用于在事务范围内调用目标方法,它处理了事务的开始、提交和回滚,并确保目标方法在事务上下文中执行。

3.4 TransactionAspectSupport

TransactionAspectSupport是 Spring事务管理的基类,包含了事务管理汇总大多数真正的逻辑,具体的事务管理类,TransactionInterceptor继承它并实现具体的事务管理逻辑。

3.4.1 invokeWithinTransaction

invokeWithinTransaction 方法是 TransactionAspectSupport 类的一个关键方法, 用于在事务范围内调用目标方法,并处理事务的开始、提交和回滚,确保目标方法在适当的事务上下文中执行。也是TransactionInterceptor 重写invoke 实现环绕增强时的重点逻辑。

参数说明

  • method:要调用的方法。
  • targetClass:目标类,可以为空。
  • invocation:一个回调接口,用于实际执行目标方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 通过事务属性源获取当前方法的事务属性。
// 如果没有事务属性,则方法是非事务性的。
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final TransactionManager tm = determineTransactionManager(txAttr);
// 处理响应式事务管理, 先不管
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}
// 处理标准事务管理, 这是本次重点关心的
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 如果事务属性为null或者事务管理器不支持回调
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 则执行标准的事务边界控制(即通过`getTransaction`和`commit/rollback`调用)。
//开始一个新的事务或者加入现有的事务。(如果有必要)
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
// 执行目标方法,这是一个环绕通知,通常会导致目标对象的方法被调用。
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 如果方法抛出异常,调用`completeTransactionAfterThrowing`方法来处理事务回滚,并重新抛出异常。
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// - 在finally块中调用`cleanupTransactionInfo`来清理事务信息。
cleanupTransactionInfo(txInfo);
}

if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}

commitTransactionAfterReturning(txInfo);
return retVal;
}

else {
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();

// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}

// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}
}

既然是要在事务中执行方法的环绕增强,那么其逻辑显然易见包含3步

  1. 开启事务
  2. 在事务中执行目标方法
  3. 根据目标方法执行结果处理事务commit or rollback

该方法把事务的处理分成了3类

  1. 响应式事务
  2. 标准事务
  3. 回掉首选事务 CallbackPreferringPlatformTransactionManager

这里只分析标准事务的处理过程

3.4.2 invokeWithinTransaction-开启事务

3.4.2.1 createTransactionIfNecessary

createTransactionIfNecessary 负责在需要时创建一个新的事务或加入现有事务,并返回相应的事务状态(TransactionStatus

  • tm:事务管理器 (PlatformTransactionManager),用于实际管理事务。
  • txAttr:事务属性 (TransactionAttribute),定义事务的传播行为、隔离级别、超时时间等。
  • joinpointIdentification:连接点标识,用于标识当前方法调用。 在Spring AOP 实践 讲解过连接点 joinpoint
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // 事务管理器 (`PlatformTransactionManager`),用于实际管理事务。
    protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
    @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    // 如果事务属性存在但没有指定名称,则创建一个新的 `DelegatingTransactionAttribute`,并覆盖其 `getName` 方法,使其返回 `joinpointIdentification` 作为事务名称。这确保了每个事务都有一个唯一的标识符,便于调试和日志记录。
    if (txAttr != null && txAttr.getName() == null) {
    txAttr = new DelegatingTransactionAttribute(txAttr) {
    @Override
    public String getName() {
    return joinpointIdentification;
    }
    };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
    if (tm != null) {
    //创建一个新事务or加入到现有事务中,取决于事务属性的配置
    status = tm.getTransaction(txAttr);
    }
    else {
    if (logger.isDebugEnabled()) {
    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
    "] because no transaction manager has been configured");
    }
    }
    }
    // 保存与当前事务相关的所有信息,并返回 `TransactionInfo` 对象。
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }

  1. 设置事务名称
    • 检查事务属性 txAttr 是否为空,并且事务属性中是否指定了事务名称。
    • 如果事务属性存在但没有指定名称,则创建一个新的 DelegatingTransactionAttribute,并覆盖其 getName 方法,使其返回 joinpointIdentification 作为事务名称。这确保了每个事务都有一个唯一的标识符,便于调试和日志记录。
  2. 获取事务状态TransactionStatus
    如果事务属性 txAttr 不为空:
    • 检查是否提供了事务管理器 tm
    • 如果事务管理器存在,调用 tm.getTransaction(txAttr) 来获取事务状态(TransactionStatus)。这一步可能会创建一个新事务,或者加入到现有事务中,具体取决于事务属性的配置。
    • 如果没有配置事务管理器,并且日志级别为调试,则记录一条调试信息,表示跳过事务处理。
  3. 准备事务信息 TransactionInfo
    • 调用 prepareTransactionInfo 方法来准备并返回 TransactionInfo 对象。
    • TransactionInfo 是一个内部类,用于保存与当前事务相关的所有信息,包括tm-事务管理器、txAttr-事务属性、joinpointIdentification-连接点标识、status-事务状态
getTransaction-获取当前事务

getTransaction(TransactionAttribute txAttr) 方法是由PlatformTransactionManager接口定义的一个关键方法,用于根据定义好的事务传播行为来判断获取或创建一个事务。

AbstractPlatformTransactionManager 实现了PlatformTransactionManager 并重写了getTransaction方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public interface PlatformTransactionManager extends TransactionManager {
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
}


public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {

// Use defaults if no transaction definition given.
// 如果传入的事务定义为 `null`,则使用默认的事务定义
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取当前事务对象。
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 检查给定的事务对象是否表示一个已经存在的事务
// 在事务管理中,确定是否存在现有事务对于决定如何处理新事务至关重要。例如,根据传播行为,新事务可能需要挂起现有事务或者参与现有事务。
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// 如果存在当前事务,则根据传播行为处理现有事务,调用 `handleExistingTransaction(def, transaction, debugEnabled)` 方法,并返回相应的事务状态。
return handleExistingTransaction(def, transaction, debugEnabled);
}

// Check definition settings for new transaction.
// 检查事务定义中的超时时间
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}

// No existing transaction found -> check propagation behavior to find out how to proceed.
// 根据事务定义中的传播行为(`PropagationBehavior`)决定如何处理新事务。
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
}

这个方法的主要功能是根据事务定义(TransactionDefinition)来获取或创建一个事务,并返回相应的事务状态(TransactionStatus

  • definition:事务定义(TransactionDefinition),描述事务的传播行为、隔离级别、超时时间等属性。可以为空,如果为空则使用默认的事务定义。
  • 返回一个 TransactionStatus 对象,表示当前事务的状态。

doGetTransaction 方法在 Spring 事务管理中用于获取当前的事务对象。
它返回一个包含当前事务状态信息的对象,后续方法(如 getTransaction)会根据这个对象决定是否需要创建新的事务或加入现有事务。通过这种设计,Spring 提供了灵活且可扩展的事务管理机制,确保事务的正确性和一致性。

1
2
3
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
protected abstract Object doGetTransaction() throws TransactionException;
}
1
2
3
4
5
6
7
8
9
10
11
12
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager  
implements ResourceTransactionManager, InitializingBean {
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
}

TransactionSynchronizationManager 是一个用来管理与当前线程关联的资源(如连接、事务状态等)的工具类。它使用线程局部变量(ThreadLocal)来存储这些资源,从而确保每个线程都有自己的独立资源副本。

事务传播行为处理

Spring 的事务传播行为定义在 TransactionDefinition.Propagation 枚举中,包括以下几种常见的类型:

  • REQUIRED:如果当前没有事务,被嵌套调用的方法会创建一个新的事务;如果已经存在一个事务,嵌套调用的方法将加入到现有事务中执行。
  • REQUIRES_NEW:被嵌套调用的方法每次都创建一个新的事务;如果当前已经存在一个事务,则暂停当前事务,待新事务完成后再恢复。
  • NESTED:如果当前没有事务,则被嵌套调用的方法创建一个新的事务;如果已经存在一个事务,则在现有事务中创建一个嵌套事务。
  • MANDATORY:被嵌套调用的方法必须在现有事务中运行,如果当前没有事务,则抛出异常。
  • SUPPORTS:如果当前有事务,被嵌套调用的方法则在当前事务中运行;如果当前没有事务,被嵌套调用的方法则以非事务方式运行。
  • NOT_SUPPORTED:被嵌套调用的方法以非事务方式运行,如果当前有事务,则暂停当前事务。
  • NEVER:被嵌套调用的方法 以非事务方式运行,如果当前有事务,则抛出异常。

getTransaction 中,事务传播行为 要分成2种情况处理

  1. 没有当前事务,根据事物传播行为如何处理当前方法的调用
  2. 存在当前事务,根据事物传播行为如何处理当前方法的调用, handleExistingTransaction
    没有当前事务
    省略getTransaction中其他逻辑,只看和事务传播行为相关的代码。
    在没有当前事务时, 当前方法的调用按照以下逻辑处理
    MANDATORY:被嵌套调用的方法必须在现有事务中运行,当前没有事务,则抛出异常。
    REQUIREDREQUIRES_NEWNESTED:当前没有事务,创建一个新事务
    SUPPORTSNOT_SUPPORTEDNEVER:当前没有事务,以非事务方式运行当前方法。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
     public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    throws TransactionException {
    Object transaction = doGetTransaction();

    // 检查给定的事务对象是否表示一个已经存在的事务
    // 在事务管理中,确定是否存在现有事务对于决定如何处理新事务至关重要。例如,根据传播行为,新事务可能需要挂起现有事务或者参与现有事务。
    if (isExistingTransaction(transaction)) {
    //存在当前事务,则根据传播行为处理现有事务,调用 `handleExistingTransaction(def, transaction, debugEnabled)` 方法,并返回相应的事务状态。
    return handleExistingTransaction(def, transaction, debugEnabled);
    }
    //不存在当前事务,根据事务定义中的传播行为(`PropagationBehavior`)决定如何处理新事务。
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    throw new IllegalTransactionStateException(
    "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    SuspendedResourcesHolder suspendedResources = suspend(null);
    if (debugEnabled) {
    logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
    }
    try {
    return startTransaction(def, transaction, debugEnabled, suspendedResources);
    }
    catch (RuntimeException | Error ex) {
    resume(null, suspendedResources);
    throw ex;
    }
    }
    else {
    // Create "empty" transaction: no actual transaction, but potentially synchronization.
    if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    "isolation level will effectively be ignored: " + def);
    }
    boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
    }
    }
    存在当前事务
    handleExistingTransaction整体逻辑比较容易理解,也是按照各种传播行为定义进行相应处理
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    private TransactionStatus handleExistingTransaction(
    TransactionDefinition definition, Object transaction, boolean debugEnabled)
    throws TransactionException {
    // 该方法要以非事务方式运行,如果当前有事务,则抛出异常。
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    throw new IllegalTransactionStateException(
    "Existing transaction found for transaction marked with propagation 'never'");
    }
    // 该方法要以非事务方式运行,如果当前有事务,则暂停当前事务。
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    if (debugEnabled) {
    logger.debug("Suspending current transaction");
    }
    Object suspendedResources = suspend(transaction);
    boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    return prepareTransactionStatus(
    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    // 该方法调用需要创建一个新的事务;如果当前已经存在一个事务,则暂停当前事务,待新事务完成后再恢复。
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    if (debugEnabled) {
    logger.debug("Suspending current transaction, creating new transaction with name [" +
    definition.getName() + "]");
    }
    SuspendedResourcesHolder suspendedResources = suspend(transaction);
    try {
    return startTransaction(definition, transaction, debugEnabled, suspendedResources);
    }
    catch (RuntimeException | Error beginEx) {
    resumeAfterBeginException(transaction, suspendedResources, beginEx);
    throw beginEx;
    }
    }
    // 如果当前没有事务,则该方法创建一个新的事务;
    // 如果已经存在一个事务,则在现有事务中创建一个嵌套事务。
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    if (!isNestedTransactionAllowed()) {
    throw new NestedTransactionNotSupportedException(
    "Transaction manager does not allow nested transactions by default - " +
    "specify 'nestedTransactionAllowed' property with value 'true'");
    }
    if (debugEnabled) {
    logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
    }
    if (useSavepointForNestedTransaction()) {
    // Create savepoint within existing Spring-managed transaction,
    // through the SavepointManager API implemented by TransactionStatus.
    // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
    DefaultTransactionStatus status =
    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
    status.createAndHoldSavepoint();
    return status;
    }
    else {
    // Nested transaction through nested begin and commit/rollback calls.
    // Usually only for JTA: Spring synchronization might get activated here
    // in case of a pre-existing JTA transaction.
    return startTransaction(definition, transaction, debugEnabled, null);
    }
    }

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    // 在已经有事务的前提下PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED 都需要新建一个事务,
    if (debugEnabled) {
    logger.debug("Participating in existing transaction");
    }
    if (isValidateExistingTransaction()) {
    // 判断将要新建的事务和已存在的事务其隔离等级是否匹配
    if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
    Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
    //如果当前事务的隔离级别为 `null` 或者与新的事务定义中的隔离级别不一致,则抛出 `IllegalTransactionStateException` 异常。这表示新的事务定义中的隔离级别与现有事务不兼容。
    if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
    Constants isoConstants = DefaultTransactionDefinition.constants;
    throw new IllegalTransactionStateException("Participating transaction with definition [" +
    definition + "] specifies isolation level which is incompatible with existing transaction: " +
    (currentIsolationLevel != null ?
    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
    "(unknown)"));
    }
    }
    // 如果当前事务是只读的,而新的事务定义不是只读的,则2个事务不兼容,需要抛出异常
    if (!definition.isReadOnly()) {
    if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
    throw new IllegalTransactionStateException("Participating transaction with definition [" +
    definition + "] is not marked as read-only but existing transaction is");
    }
    }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }
    }

这段代码的主要作用是验证参与的事务定义(TransactionDefinition)是否与现有事务的属性兼容。它具体执行以下验证:

  1. 隔离级别验证:确保新的事务定义中的隔离级别与现有事务的隔离级别一致。如果不一致,则抛出异常。
  2. 只读属性验证:确保新的事务定义中的只读属性与现有事务的只读属性一致。如果不一致,则抛出异常。

通过这种验证机制,Spring 能够确保在事务传播过程中,新的事务定义不会破坏现有事务的设置,从而保证事务管理的正确性和一致性。

3.4.2.1 开始事务: startTransaction->doBegin

不论是否存在当前事务, 都要有一个事务启动的步骤

1
2
3
4
5
6
7
8
9
10
11
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}

重点逻辑在doBegin 中,还是看我们更常用到的DataSourceTransactionManager 如何实现的该方法,可以看到其整体流程和JDBC 事务中的流程保持一致,只是DataSourceTransactionManager 中多了更多的包装层。

  1. 获取数据库连接
  2. 设置自动提交、隔离级别、只读属性
  3. 执行目标方法
  4. 结束事务:commit or rollback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager  
implements ResourceTransactionManager, InitializingBean {
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 1. 获取数据库连接
con = txObject.getConnectionHolder().getConnection();

Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 设置事务隔离级别
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 设置只读属性
txObject.setReadOnly(definition.isReadOnly());
// 设置自动提交属性
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}

prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);

int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
// 将与当前线程的事务相关信息保存到TransactionSynchronizationManager工具类中,方便后续使用
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}

catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
}

3.4.2 invokeWithinTransaction 执行目标方法-InvocationCallback

invokeWithinTransaction在对当前的方法有了合适的事务后,就可以通过InvocationCallback.proceedWithInvocation执行对目标方法的调用了。

InvocationCallbackinvokeWithinTransaction的第三个参数。

1
2
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation)

InvocationCallback 是一个位于TransactionAspectSupport内部的函数式接口,可以通过匿名内部类或者lambda表达式实现该接口。

1
2
3
4
5
6
7
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
@FunctionalInterface
protected interface InvocationCallback {
@Nullable
Object proceedWithInvocation() throws Throwable;
}
}

在调用TransactionInterceptor.invoke方法中调用invokeWithinTransaction 时,InvocationCallback 传入了一个lambda 表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 执行目标方法
retVal = invocation.proceedWithInvocation();
}
}

说明在invokeWithinTransaction中执行目标方式时, 实际上调用的是MethodInvocation.proceed方法。

ReflectiveMethodInvocation 实现了MethodInvocation, 所以这里在调用ReflectiveMethodInvocation.proceed

ReflectiveMethodInvocation.proceed 及其子类CglibMethodInvocation.proceed 可以通过递归调用完成Advice 增强逻辑的执行和目标方法的执行。

3.4.3 invokeWithinTransaction-事务结束处理

  1. 如果目标方法正常完成执行,那就要执行commit 操作, commitTransactionAfterReturning
  2. 如果目标方法执行出现异常,那就要执行回滚操作

3.5 再看事务传播行为

在方法嵌套调用的情况下,Spring 的事务传播行为(Transaction Propagation Behaviors)决定了嵌套方法是新建事务、加入现有事务、开始非事务执行,还是创建嵌套事务。在方法正常执行结束时,不同传播行为的区别不大,但在业务操作失败/部分失败并需要回滚时,这些传播行为的差异变得至关重要。

  1. PROPAGATION_REQUIRED
  • 描述:如果当前没有事务,则创建一个新事务;如果已经存在一个事务,则加入该事务。
  • 正常执行:方法正常执行结束,事务提交。
  • 失败回滚:无论是外层方法还是内层方法抛出异常,整个事务都会回滚,因为它们属于同一个事务边界。
  1. PROPAGATION_REQUIRES_NEW
  • 描述:每次都创建一个新的事务。如果已经存在一个事务,则暂停当前事务,创建一个新事务。
  • 正常执行:每个方法都有自己独立的事务,方法执行结束,各自的事务提交。
  • 失败回滚:如果内层方法抛出异常,只有内层方法的事务回滚,外层事务不会受到影响。如果外层方法抛出异常,外层事务回滚,内层事务不受影响(已提交)。
  1. PROPAGATION_NESTED
  • 描述:如果当前没有事务,则创建一个新的事务;如果已经存在一个事务,则在当前事务中创建一个嵌套事务。
  • 正常执行:嵌套方法成功执行,其嵌套事务提交,主事务继续。
  • 失败回滚:如果内层方法抛出异常,内层事务回滚到保存点,外层事务可以决定是否继续执行或回滚到整个事务。如果外层方法抛出异常,整个事务(包括嵌套事务)都会回滚。
  1. PROPAGATION_SUPPORTS
  • 描述:如果当前有事务,则在事务中运行;如果当前没有事务,则以非事务方式运行。
  • 正常执行:在事务中执行则提交事务;非事务执行则正常完成。
  • 失败回滚:在事务中执行时,如果抛出异常,事务回滚;非事务执行时,抛出异常不会回滚。
  1. PROPAGATION_NOT_SUPPORTED
  • 描述:总是以非事务方式执行,如果当前有事务,则暂停当前事务。
  • 正常执行:方法总是非事务方式执行,正常完成。
  • 失败回滚:由于没有事务,即使抛出异常,也不会触发回滚。
  1. PROPAGATION_NEVER
  • 描述:以非事务方式执行,如果当前有事务,则抛出异常。
  • 正常执行:在没有事务的情况下正常执行。
  • 失败回滚:如果在事务中调用,则直接抛出异常,操作不会执行。
  1. PROPAGATION_MANDATORY
  • 描述:必须在事务中执行,如果当前没有事务,则抛出异常。
  • 正常执行:必须在事务中执行,正常完成。
  • 失败回滚:如果在非事务情况下调用,则抛出异常,操作不会执行;在事务中调用时,抛出异常回滚事务。

3.6 示例代码

有了以上内容了解,下面我们将基于以下代码,并跟随debug 信息,来具体看下基于事务的动态代理对象的生成和目标方法执行过程
applicationContext.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<!-- 定义业务类的 Bean -->
<bean id="messageService" class="com.example.codingInAction.service.MessageService"/>


<!-- 配置数据源 -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/trust_message"/>
<property name="username" value="root"/>
<property name="password" value="XXXXXX"/>
</bean>


<!-- 配置事务管理器 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>

<!-- 配置事务代理 -->
<bean id="messageServiceProxy" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
<property name="target" ref="messageService"/>
<property name="transactionAttributes">
<props>
<prop key="insertMessage">PROPAGATION_REQUIRED</prop>
<prop key="findByMessageKey">PROPAGATION_REQUIRED</prop>
</props>
</property>
</bean>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MessageService {
public int insertMessage(Message message){
// 模拟数据库操作
System.out.println("insertMessage 执行");
return 1;
}

public Message findByMessageKey(Map<String, Object> params){
// 模拟数据库操作
System.out.println("findByMessageKey 执行");
return new Message();
}
}

public class CodingInActionApplication {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
MessageService messageService = (MessageService) context.getBean("messageServiceProxy");
Message message = new Message();
message.setMessage("test message");
messageService.insertMessage(message);
}
}

3.7 基于事务的动态代理对象

Spring AOP XML配置方式原理详解 一文讲解过使用ProxyFactoryBean 配置代理对象的过程框架基本一致, 以下会省略大部分内容,只重点强调不一致的细节

getBean(“&messageServiceProxy”)

1
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");

getBean(“&messageServiceProxy”) 意味着是在以上代码的Spring 容器启动过程中实例化了messageServiceProxy 这个TransactionProxyFactoryBean本身。

3.7.1 TransactionProxyFactoryBean init 阶段 -InitializingBean

messageServiceProxy本身实例化的过程中,会在init 阶段就生成代理对象

3.7.2 获取代理对象-getBean(“messageServiceProxy”)

1
MessageService messageService = (MessageService) context.getBean("messageServiceProxy");

直接跳过各种细节来到FactoryBean.getObject,可以看到由于在afterPropertiesSet中已经生成了代理对象,所以在getObject逻辑中是直接返回已经生成的代理对象的, 不像ProxyFactoryBean 是在getObject逻辑中才生成代理对象

1
2
3
4
5
6
7
8
9
10
public abstract class AbstractSingletonProxyFactoryBean extends ProxyConfig  
implements FactoryBean<Object>, BeanClassLoaderAware, InitializingBean {
@Override
public Object getObject() {
if (this.proxy == null) {
throw new FactoryBeanNotInitializedException();
}
return this.proxy;
}
}

3.8 createMainInterceptor -事务Advisor

还是在afterPropertiesSet 方法中,会创建事务Advisor。在不指定切点表达式 pointcut 的时候, 会使用TransactionAttributeSourceAdvisor

TransactionInterceptor 本身也是在Spring 容器启动过程中,并且按照XML 文件中的信息进行实例化,相当于把编程式事务实现中通过代码设置的属性,全部在XML 文件中通过配置实现了。

从以上代码可以看出, 从TransactionAttributeSourceAdvisor可以获取到需要的切点 Pointcut (这里pointcut 就是前面填充的TransactionAttributeSource 属性)和Advice 增强 , 切点可以过滤出需要执行增强的类及方法, 筛选出目标方法后应用增强完成事务自动管理的功能。

以上步骤执行完毕后,可以看下实例化完成的TransactionProxyFactoryBean 包含的内容

  1. proxy: 在init 阶段提前生成好的动态代理对象, 这是一个CGLIB 动态对象
  2. transactionInterceptor, 包含了在XML文件中指定的transactionManager、transactionAttributes信息

3.9 基于事务的动态代理对象执行目标方法

1
messageService.insertMessage(message)

最终获取到的是一个CGLIB 动态代理对象, 执行方法时, 直接来到DynamicAdvisedInterceptor.intercept方法

3.9.1 DynamicAdvisedInterceptor.intercept

3.9.2 getInterceptorsAndDynamicInterceptionAdvice

获取档案方法可用的interceptor.

advisor 就是前面创建的TransactionAttributeSourceAdvisor实例, 经过ClassFilter.matchesMethodMatcher.matches处理后后, 最终返回了可以用在当前bean 上interceptor

3.9.3 ReflectiveMethodInvocation.proceed

ReflectiveMethodInvocation.proceed 是CGLIB 动态代理处理目标方法调用 核心逻辑,它根据currentInterceptorIndex来判断运行以下逻辑

  1. 执行目标方法
  2. 执行目标方法上的interceptor
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    public class ReflectiveMethodInvocation implements ProxyMethodInvocation, Cloneable {
    private int currentInterceptorIndex = -1;
    @Override
    @Nullable
    public Object proceed() throws Throwable {
    // We start with an index of -1 and increment early.
    if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
    return invokeJoinpoint();
    }

    Object interceptorOrInterceptionAdvice =
    this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
    if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
    InterceptorAndDynamicMethodMatcher dm =
    (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
    Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
    if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
    return dm.interceptor.invoke(this);
    }
    else {
    return proceed();
    }
    }
    else {
    return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
    }
    }
    }

CglibAopProxy中有一个内部类CglibMethodInvocation继承了`ReflectiveMethodInvocation 并重写了proceed方法

1
2
3
4
5
6
7
8
class CglibAopProxy implements AopProxy, Serializable {
private static class CglibMethodInvocation extends ReflectiveMethodInvocation {
@Override
@Nullable
public Object proceed() throws Throwable {
return super.proceed();
}
}

所以当DynamicAdvisedInterceptor.intercept中执行到new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed(),会首先来到父类 ReflectiveMethodInvocation.proceed来到递归处理Advisor 和目标方法的核心逻辑

前面已经说了,TransactionInterceptor 继承了org.aopalliance.intercept.MethodInterceptor, 所以逻辑会走到这里调用invoke 方法。

来到TransactionInterceptor 重写的invoke 方法, 进入前面重点讲过的invokeWithTransaction

这里invokeWithTransaction 的第4个参数是InvocationCallback, 这里传入了 传入了一个labada 表达式, 说明在调用InvocationCallback.proceedWithInvocation 的实现逻辑是调用MethodInvocation.proceed 方法

所以在invokeWithTransaction 方法中获取到事务信息后,执行InvocationCallback.proceedWithInvocation 实际还是在继续执行ReflectiveMethodInvocation.proceed

再次进入ReflectiveMethodInvocation.proceed在此代码示例中, Advisor 只有一个,且已经访问过,所以这里开始执行目标方法的调用

执行完目标方法后,一层层出栈,又回到invokeWithTransaction方法,此时开始执行事务结束的逻辑, 此次调用方法正常执行完毕,所以这里走正常commit 关闭逻辑即可。

4. 声明式事务 aop:config

TransactionProxyFactoryBean 需要手动配置和获取代理对象,尤其是在需要为多个 bean 配置事务时,每个 bean 都需要手动配置,这在实际应用中会变得繁琐且易出错。

aop:config@Transactional 注解通过 AbstractAutoProxyCreator这个BeanPostProcessor 可以在实例化的init 阶段自动生成代理对象。

4.1 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 <!-- 定义业务类的 Bean -->
<bean id="messageService" class="com.example.codingInAction.service.MessageService"/>


<!-- 配置数据源 -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/trust_message"/>
<property name="username" value="root"/>
<property name="password" value="24048@Ms"/>
</bean>


<!-- 配置事务管理器 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>

<!-- 配置事务拦截器 -->
<bean id="transactionInterceptor" class="org.springframework.transaction.interceptor.TransactionInterceptor">
<property name="transactionManager" ref="transactionManager"/>
<property name="transactionAttributes">
<props>
<prop key="insertMessage">PROPAGATION_REQUIRED</prop>
<prop key="findByMessageKey">PROPAGATION_REQUIRED</prop>
</props>
</property>
</bean>

<!-- 配置事务增强 -->
<aop:config>
<aop:pointcut id="serviceMethods" expression="execution(* com.example.codingInAction.service.*.*(..))"/>
<aop:advisor advice-ref="transactionInterceptor" pointcut-ref="serviceMethods"/>
</aop:config>
1
2
3
4
5
6
7
8
9
public class CodingInActionApplication {   
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
MessageService messageService = (MessageService) context.getBean("messageService");
Message message = new Message();
message.setMessage("test message");
messageService.insertMessage(message);
}
}

4.2 生成基于事务的动态代理对象AspectJAwareAdvisorAutoProxyCreator

Spring AOP 注解方式原理详解-## AbstractAutoProxyCreator 中讲过,AbstractAutoProxyCreator 是一个抽象类,它有多个具体实现。

在使用aop:config 方式,实现事务功能时,使用的是AspectJAwareAdvisorAutoProxyCreator

AspectJAwareAdvisorAutoProxyCreatorAnnotationAwareAspectJAutoProxyCreator 类似,但主要用于处理通过 XML 配置文件或其他非注解方式配置的 AspectJ 切面。

来看Spring 容器启动过程, 当进行业务bean 的初始化之前,已经有一些bean 提前初始化完成存放在singletonObjects 这个一级缓存中了

从beanName 可以看出,messageService、dataSource、transactionManager、transactionInterceptor、serviceMethods 均是在XML 出现的bean 定义。

org.springframework.aop.support.DefaultBeanFactoryPointcutAdvisororg.springframework.aop.config.internalAutoProxyCreator 比较特殊

4.2.1 internalAutoProxyCreator

org.springframework.aop.config.internalAutoProxyCreator 不是一个实际的类,而是一个特殊的 bean 名称。Spring 使用这个名称在内部标识和注册用于自动代理创建的组件。

这个 bean 名称指向的是一个具体的 AbstractAutoProxyCreator实现类,在使用aop:config 配置基于事务的代理对象时,指的是AspectJAwareAdvisorAutoProxyCreator, 通过上面的debug 信息也可以确认这一点。

4.2.2 BeanPostProcessor.postProcessAfterInitialization

AspectJAwareAdvisorAutoProxyCreator直接在Spring 容器启动过程中,通过BeanPostProcessor.postProcessAfterInitialization在init阶段介入为MessageService 生成基于事务的动态代理对象

4.3 事务Advisor-DefaultBeanFactoryPointcutAdvisor

从Spring 启动完成后的singletonObjects 中可以看出aop:config 这个标签配置对应DefaultBeanFactoryPointcutAdvisor

DefaultBeanFactoryPointcutAdvisor 的主要作用是将一个切点(Pointcut)和一个切面(Advice)结合起来,形成一个完整的 AOP 配置。根据Spring 容器启动完成的数据看DefaultBeanFactoryPointcutAdvisor 就对应aop:config 配置,里面包含了定义的pointcut 和Advice.

Advisor 实例化完成后,在生成代理对象的wrapIfNecessary中就可以获取到能应用到当前类的Advisor 了。

通过getAdvicesAndAdvisorsForBean 获取在当前类上可以的Advisor , 从debug 信息上可以看到specificInterceptors

4.4 基于事务的动态代理对象执行目标方法

前面生成的代理对象是CGLIB 动态代理对象, 执行目标方法时,直接来到DynamicAdvisedInterceptor.intercept方法 。通过ReflectiveMethodInvocation.proceed 方法进入递归执行Advisor和目标方法 的逻辑。
和3.9.3 中内容完成一致,此处略过

5. 声明式事务-@Transactional注解

@Transactional 注解

  • 类级别和方法级别:可以在类上或方法上使用 @Transactional 注解。如果在类上标注,则该类的所有方法都将受事务管理。
  • 事务传播行为:通过 propagation 属性定义事务的传播行为,如 REQUIREDREQUIRES_NEWMANDATORY 等。
  • 隔离级别:通过 isolation 属性定义事务的隔离级别,如 READ_COMMITTEDREPEATABLE_READSERIALIZABLE 等。
  • 超时和只读属性:可以通过 timeoutreadOnly 属性设置事务的超时时间和只读特性。
  • 回滚规则:通过 rollbackFornoRollbackFor 属性指定哪些异常会导致事务回滚。

5.1 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Transactional
@Service
public class MessageService {
public int insertMessage(Message message){
System.out.println("insertMessage 执行");
return 1;
}
public Message findByMessageKey(Map<String, Object> params){
System.out.println("findByMessageKey 执行");
return new Message();
}
}


@SpringBootApplication
public class CodingInActionApplication {
public static void main(String[] args) {
SpringApplication.run(CodingInActionApplication.class, args);
}
}

5.2 生成基于事务的动态代理对象AnnotationAwareAspectJAutoProxyCreator

使用@Transactional注解时,基于事务的代理对象时通过AnnotationAwareAspectJAutoProxyCreator这个BeanPostProcessor在init 阶段生成的

5.3 BeanFactoryTransactionAttributeSourceAdvisor`

5.3.1 internalTransactionAdvisor

org.springframework.transaction.config.internalTransactionAdvisor 并不是一个实际存在的类,而是一个特殊的 bean 名称。

在Spring 项目中, 当使用 @EnableTransactionManagement<tx:annotation-driven> 配置时,Spring 会自动注册并配置internalTransactionAdvisor 这个特殊Bean, 其对应的实例是BeanFactoryTransactionAttributeSourceAdvisor

在 Spring Boot 项目中,即使不显式使用 @EnableAutoConfiguration,Spring Boot 仍会自动注册和配置许多默认的组件和功能,包括事务管理。这是因为 Spring Boot 的自动配置机制默认包含在 @SpringBootApplication 注解中。

@SpringBootApplication 是一个组合注解,它包含了多个注解,其中一个关键注解就是 @EnableAutoConfiguration。因此,使用 @SpringBootApplication 时,Spring Boot 的自动配置机制会被自动启用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {

@Nullable
private TransactionAttributeSource transactionAttributeSource;

private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
@Override
@Nullable
protected TransactionAttributeSource getTransactionAttributeSource() {
return transactionAttributeSource;
}
};
public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
this.transactionAttributeSource = transactionAttributeSource;
}
public void setClassFilter(ClassFilter classFilter) {
this.pointcut.setClassFilter(classFilter);
}

@Override
public Pointcut getPointcut() {
return this.pointcut;
}
}

public abstract class AbstractBeanFactoryPointcutAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
@Nullable
private String adviceBeanName;

@Nullable
private BeanFactory beanFactory;
}

BeanFactoryTransactionAttributeSourceAdvisor和前面讲过的TransactionProxyFactoryBean有相似之处,

  • TransactionAttributeSourceAdvisorBeanFactoryTransactionAttributeSourceAdvisor 都是用于应用事务管理的 AOP Advisor,但在实现细节和使用场景上有所不同。
  • TransactionAttributeSourceAdvisor 适用于XML 配置配置和使用场景,而 BeanFactoryTransactionAttributeSourceAdvisor 增加了BeanFactory 属性, 适合用于Spring 容器启动过程中使用
  • 两者都通过定义切点和应用事务增强,实现了 Spring 事务管理的核心功能。

通过getAdvicesAndAdvisorsForBean,可以看到在获取应用到当前类的Advisor 时, 通过getBean方法从Spring 容器获取Advisor 实例时, beanName使用的是org.springframework.transaction.config.internalTransactionAdvisor , 最终返回的实例是 BeanFactoryTransactionAttributeSourceAdvisor

5.4 基于事务的动态代理对象如何执行方法

和3.9.3 中内容完成一致,此处略过

6.在Spring中实现跨资源事务

前面已经说过,Spring 的事务机制只保证数据库操作的原子性,所以当需要数据库操作和其他中间件操作如kafka操作具有原子性的时候,就要用其他的方案来保证。
关于这种情况,可以点击阅读TrustMessage-基于2PC+MySQL+泛化调用实现的可靠消息中心