Spring事务管理机制深度解析:从JDBC基础到Spring高级实现

发布于:2025-08-29 ⋅ 阅读:(14) ⋅ 点赞:(0)

一、事务的本质?

什么是事务?

数据库事务(Database Transaction) ,是指作为单个逻辑工作单元执行的一系列操作,要么完全地执行,要么完全地不执行。

  事务处理可以确保除非事务性单元内的所有操作都成功完成,否则不会永久更新面向数据的资源。通过将一组相关操作组合为一个要么全部成功要么全部失败的单元,可以简化错误恢复并使应用程序更加可靠。

  一个逻辑工作单元要成为事务,必须满足所谓的 ACID(原子性、一致性、隔离性和持久性)属性。事务是数据库运行中的逻辑工作单位,由DBMS中的事务管理子系统负责事务的处理。

JDBC中是怎么处理事务的?

JDBC(Java Database Connectivity)提供了对数据库事务的支持,主要通过Connection对象来控制事务。

核心事务控制方法

JDBC通过Connection接口提供了以下关键事务控制方法:setAutoCommit(boolean autoCommit)- 设置自动提交模式

commit()- 提交事务rollback()- 回滚事务

        public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        try {
            // 注册 JDBC 驱动
            // Class.forName("com.mysql.cj.jdbc.Driver");
            // 打开连接
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC", "root", "123456");
            // 执行查询
            stmt = conn.createStatement();
            conn.setAutoCommit(false); // 关闭自动提交
            // 添加用户信息
            String sql = "INSERT INTO T_USER(id,user_name)values(1,'管理员')";
            stmt.executeUpdate(sql);
            // 添加日志问题
            sql = "INSET INTO t_log(id,log)values(1,'添加了用户:管理员')";
            stmt.executeUpdate(sql);
            conn.commit(); // 上面两个操作都没有问题就提交
        } catch (Exception e) {
            e.printStackTrace();
            // 出现问题就回滚
            try {
                conn.rollback();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
        } finally {
            try {
                if (stmt != null) stmt.close();
            } catch (SQLException se2) {
            }
            try {
                if (conn != null) conn.close();
            } catch (SQLException se) {
                se.printStackTrace();
            }
        }
    }

关键操作:关闭自动提交,都成功就commit,有一个失败就rollback

在Spring中的事务管理?

我们在Service中是可能调用多个Dao的方法来操作数据库中的数据的,我们要做的就是要保证UserService中的 addUser()方法中的相关操作满足事务的要求。在Spring中支持两种事务的使用方式

一种是基于XML的文件配置方式,一种是基于注解的方式。

在Spring的底层:PlatformTransactionManager

TransactionManager顶级接口,但是内容为空的

public interface TransactionManager {

}

PlatformTransactionManager:平台事务管理器

ReactiveTransactionManager:响应式编程的事务管理器关注的重点是PlatformTransactionManager

public interface PlatformTransactionManager extends TransactionManager {
    /**
     获取事务
    */
    TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
                    throws TransactionException;
    
    /**
      提交数据
     */
    void commit(TransactionStatus status) throws TransactionException;
    
    /**
        回滚数据
     */
    void rollback(TransactionStatus status) throws TransactionException;
    
}

PlatformTransactionManager也是个接口,在他下面的实现有两个比较重要实现

JtaTransactionManager:支持分布式事务【本身服务中的多数据源】

DataSourceTransactionManager:数据源事务管理器。在但数据源中的事务管理,这个是我们分析的重点。

在DataSourceTransactionManager中提供了与事务相关的操作方法。

事务的定义

在上面的 PlatformTransactoinManager中看到了 TransactionDefinition 这个对象,通过字面含义是 事务定义

TransactionDefinition中定义了事务的 传播属性隔离级别

DefaultTransactionDefinition:是事务定义的默认实现

DefaultTransactionAttribute:扩展了TransactionAttribute中的属性的实现

@Transactional:该组件就会被解析加载为对应的 TransactionDefinition对象。

开启事务

 然后在 PlatformTransactionManager中获取事务的时候返回的是 TransactionStatus对象。

核心方法

getTransaction

/**
 * This implementation handles propagation behavior. Delegates to
 * {@code doGetTransaction}, {@code isExistingTransaction}
 * and {@code doBegin}.
 * @see #doGetTransaction
 * @see #isExistingTransaction
 * @see #doBegin
 */
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
                throws TransactionException {

        // Use defaults if no transaction definition given.
        // 如果没有事务定义信息则使用默认的事务管理器定义信息
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

        // 获取事务
        Object transaction = doGetTransaction();
        boolean debugEnabled = logger.isDebugEnabled();

        // 判断当前线程是否存在事务,判断依据为当前线程记录的连接不为空且连接中的transactionActive属性不为空
        if (isExistingTransaction(transaction)) {
                // Existing transaction found -> check propagation behavior to find out how to behave.
                // 当前线程已经存在事务
                return handleExistingTransaction(def, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        // 事务超时设置验证
        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
                throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        // 如果当前线程不存在事务,但是PropagationBehavior却被声明为PROPAGATION_MANDATORY抛出异常
        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
                throw new IllegalTransactionStateException(
                                "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        // PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED都需要新建事务
        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                //没有当前事务的话,REQUIRED,REQUIRES_NEW,NESTED挂起的是空事务,然后创建一个新事务
                SuspendedResourcesHolder suspendedResources = suspend(null);
                if (debugEnabled) {
                        logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
                }
                try {
                        return startTransaction(def, transaction, debugEnabled, suspendedResources);
                }
                catch (RuntimeException | Error ex) {
                        // 恢复挂起的事务
                        resume(null, suspendedResources);
                        throw ex;
                }
        }
        else {
                // Create "empty" transaction: no actual transaction, but potentially synchronization.
                // 创建一个空的事务
                if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                        logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                                        "isolation level will effectively be ignored: " + def);
                }
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
}

doGetTransaction()

/**
 * 创建一个DataSourceTransactionObject当作事务,设置是否允许保存点,然后获取连接持有器ConnectionHolder
 * 里面会存放JDBC的连接,设置给DataSourceTransactionObject,当然第一次是空的
 *
 * @return
 */
@Override
protected Object doGetTransaction() {
        // 创建一个数据源事务对象
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        // 是否允许当前事务设置保持点
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        /**
         * TransactionSynchronizationManager 事务同步管理器对象(该类中都是局部线程变量)
         * 用来保存当前事务的信息,我们第一次从这里去线程变量中获取 事务连接持有器对象 通过数据源为key去获取
         * 由于第一次进来开始事务 我们的事务同步管理器中没有被存放.所以此时获取出来的conHolder为null
         */
        ConnectionHolder conHolder =
                        (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
        // 非新创建连接则写false
        txObject.setConnectionHolder(conHolder, false);
        // 返回事务对象
        return txObject;
}
/**
 * Create a TransactionStatus for an existing transaction.
 */
private TransactionStatus handleExistingTransaction(
                TransactionDefinition definition, Object transaction, boolean debugEnabled)
                throws TransactionException {

        /**
         * 判断当前的事务行为是不是PROPAGATION_NEVER的
         * 表示为不支持事务,但是当前又存在一个事务,所以抛出异常
         */
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
                throw new IllegalTransactionStateException(
                                "Existing transaction found for transaction marked with propagation 'never'");
        }

        /**
         * 判断当前的事务属性不支持事务,PROPAGATION_NOT_SUPPORTED,所以需要先挂起已经存在的事务
         */
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
                if (debugEnabled) {
                        logger.debug("Suspending current transaction");
                }
                // 挂起当前事务
                Object suspendedResources = suspend(transaction);
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                // 创建一个新的非事务状态(保存了上一个存在事务状态的属性)
                return prepareTransactionStatus(
                                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }

        /**
         * 当前的事务属性状态是PROPAGATION_REQUIRES_NEW表示需要新开启一个事务状态
         */
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
                if (debugEnabled) {
                        logger.debug("Suspending current transaction, creating new transaction with name [" +
                                        definition.getName() + "]");
                }
                // 挂起当前事务并返回挂起的资源持有器
                SuspendedResourcesHolder suspendedResources = suspend(transaction);
                try {
                        // 创建一个新的非事务状态(保存了上一个存在事务状态的属性)
                        return startTransaction(definition, transaction, debugEnabled, suspendedResources);
                }
                catch (RuntimeException | Error beginEx) {
                        resumeAfterBeginException(transaction, suspendedResources, beginEx);
                        throw beginEx;
                }
        }

        // 嵌套事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                // 不允许就报异常
                if (!isNestedTransactionAllowed()) {
                        throw new NestedTransactionNotSupportedException(
                                        "Transaction manager does not allow nested transactions by default - " +
                                        "specify 'nestedTransactionAllowed' property with value 'true'");
                }
                if (debugEnabled) {
                        logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
                }
                // 嵌套事务的处理
                if (useSavepointForNestedTransaction()) {
                        // Create savepoint within existing Spring-managed transaction,
                        // through the SavepointManager API implemented by TransactionStatus.
                        // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                        // 如果没有可以使用保存点的方式控制事务回滚,那么在嵌入式事务的建立初始简历保存点
                        DefaultTransactionStatus status =
                                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                        // 为事务设置一个回退点
                        status.createAndHoldSavepoint();
                        return status;
                }
                else {
                        // Nested transaction through nested begin and commit/rollback calls.
                        // Usually only for JTA: Spring synchronization might get activated here
                        // in case of a pre-existing JTA transaction.
                        // 有些情况是不能使用保存点操作
                        return startTransaction(definition, transaction, debugEnabled, null);
                }
        }

        // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
        if (debugEnabled) {
                logger.debug("Participating in existing transaction");
        }
        if (isValidateExistingTransaction()) {
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                        Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                        if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                                Constants isoConstants = DefaultTransactionDefinition.constants;
                                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                                definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                                (currentIsolationLevel != null ?
                                                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                                                "(unknown)"));
                        }
                }
                if (!definition.isReadOnly()) {
                        if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                                definition + "] is not marked as read-only but existing transaction is");
                        }
                }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

startTransaction()

/**
 * Start a new transaction.
 */
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
                boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

        // 是否需要新同步
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        // 创建新的事务
        DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        // 开启事务和连接
        doBegin(transaction, definition);
        // 新同步事务的设置,针对于当前线程的设置
        prepareSynchronization(status, definition);
        return status;
}

doBegin方法开启和连接事务

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
        // 强制转化事务对象
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
                // 判断事务对象没有数据库连接持有器
                if (!txObject.hasConnectionHolder() ||
                                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                        // 通过数据源获取一个数据库连接对象
                        Connection newCon = obtainDataSource().getConnection();
                        if (logger.isDebugEnabled()) {
                                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                        }
                        // 把我们的数据库连接包装成一个ConnectionHolder对象 然后设置到我们的txObject对象中去
                        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
                }

                // 标记当前的连接是一个同步事务
                txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
                con = txObject.getConnectionHolder().getConnection();

                // 为当前的事务设置隔离级别
                Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
                // 设置先前隔离级别
                txObject.setPreviousIsolationLevel(previousIsolationLevel);
                // 设置是否只读
                txObject.setReadOnly(definition.isReadOnly());

                // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
                // so we don't want to do it unnecessarily (for example if we've explicitly
                // configured the connection pool to set it already).
                // 关闭自动提交
                if (con.getAutoCommit()) {
                        //设置需要恢复自动提交
                        txObject.setMustRestoreAutoCommit(true);
                        if (logger.isDebugEnabled()) {
                                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                        }
                        // 关闭自动提交
                        con.setAutoCommit(false);
                }

                // 判断事务是否需要设置为只读事务
                prepareTransactionalConnection(con, definition);
                // 标记激活事务
                txObject.getConnectionHolder().setTransactionActive(true);

                // 设置事务超时时间
                int timeout = determineTimeout(definition);
                if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                        txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
                }

                // Bind the connection holder to the thread.
                // 绑定我们的数据源和连接到我们的同步管理器上,把数据源作为key,数据库连接作为value 设置到线程变量中
                if (txObject.isNewConnectionHolder()) {
                        // 将当前获取到的连接绑定到当前线程
                        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
                }
        }

        catch (Throwable ex) {
                if (txObject.isNewConnectionHolder()) {
                        // 释放数据库连接
                        DataSourceUtils.releaseConnection(con, obtainDataSource());
                        txObject.setConnectionHolder(null, false);
                }
                throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
}

doBegin方法中核心的关闭了自动提交

同时把连接绑定到本地线程中bindResource方法

Spring事务源码串联

编程式事务、AOP事务


@Autowired
private UserDao userDao;

@Autowired
private PlatformTransactionManager txManager;

@Autowired
private LogService logService;

@Transactional
public void insertUser(User u) {

        // 1、创建事务定义
        DefaultTransactionDefinition definition = new DefaultTransactionDefinition();

        // 2、根据定义开启事务
        TransactionStatus status = txManager.getTransaction(definition);

        try {
                this.userDao.insert(u);
                Log log = new Log(System.currentTimeMillis() + "", System.currentTimeMillis() + "-" + u.getUserName());
                // this.doAddUser(u);
                this.logService.insertLog(log);
                // 3、提交事务
                txManager.commit(status);
        } catch (Exception e) {
                // 4、异常了,回滚事务
                txManager.rollback(status);
                throw e;
        }
}

在Service中通过事务处理的代码实现了事务管理,同时结合AOP的内容,可以把事务的代码抽取出来,然后我们来看看Spring中这块是如何处理的。

通过Debug的方式看到处理的关键流程 TransactionInterceptor 就是事务处理的 advice


@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
        // Work out the target class: may be {@code null}.
        // The TransactionAttributeSource should be passed the target class
        // as well as the method, which may be from an interface.
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        // Adapt to TransactionAspectSupport's invokeWithinTransaction...
        return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

进入到invokeWithinTransaction方法中

@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                final InvocationCallback invocation) throws Throwable {

        // If the transaction attribute is null, the method is non-transactional.
        // 获取我们的事务属性源对象
        TransactionAttributeSource tas = getTransactionAttributeSource();
        // 通过事务属性源对象获取到当前方法的事务属性信息
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        // 获取我们配置的事务管理器对象
        final TransactionManager tm = determineTransactionManager(txAttr);

        if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
                ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
                        if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
                                throw new TransactionUsageException(
                                                "Unsupported annotated transaction on suspending function detected: " + method +
                                                ". Use TransactionalOperator.transactional extensions instead.");
                        }
                        ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
                        if (adapter == null) {
                                throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                                                method.getReturnType());
                        }
                        return new ReactiveTransactionSupport(adapter);
                });
                return txSupport.invokeWithinTransaction(
                                method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
        }

        PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
        // 获取连接点的唯一标识  类名+方法名
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        // 声明式事务处理
        if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
                // Standard transaction demarcation with getTransaction and commit/rollback calls.
                // 创建TransactionInfo
                TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

                Object retVal;
                try {
                        // This is an around advice: Invoke the next interceptor in the chain.
                        // This will normally result in a target object being invoked.
                        // 执行被增强方法,调用具体的处理逻辑
                        retVal = invocation.proceedWithInvocation();
                }
                catch (Throwable ex) {
                        // target invocation exception
                        // 异常回滚
                        completeTransactionAfterThrowing(txInfo, ex);
                        throw ex;
                }
                finally {
                        //清除事务信息,恢复线程私有的老的事务信息
                        cleanupTransactionInfo(txInfo);
                }

                if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                        // Set rollback-only in case of Vavr failure matching our rollback rules...
                        TransactionStatus status = txInfo.getTransactionStatus();

                        if (status != null && txAttr != null) {
                                retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                        }
                }

                //成功后提交,会进行资源储量,连接释放,恢复挂起事务等操作
                commitTransactionAfterReturning(txInfo);
                return retVal;
        }

        else {
                // 编程式事务处理
                Object result;
                final ThrowableHolder throwableHolder = new ThrowableHolder();

                // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
                try {
                        result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
                                TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
                                try {
                                        Object retVal = invocation.proceedWithInvocation();
                                        if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                                                // Set rollback-only in case of Vavr failure matching our rollback rules...
                                                retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                                        }
                                        return retVal;
                                }
                                catch (Throwable ex) {
                                        if (txAttr.rollbackOn(ex)) {
                                                // A RuntimeException: will lead to a rollback.
                                                if (ex instanceof RuntimeException) {
                                                        throw (RuntimeException) ex;
                                                }
                                                else {
                                                        throw new ThrowableHolderException(ex);
                                                }
                                        }
                                        else {
                                                // A normal return value: will lead to a commit.
                                                throwableHolder.throwable = ex;
                                                return null;
                                        }
                                }
                                finally {
                                        cleanupTransactionInfo(txInfo);
                                }
                        });
                }
                catch (ThrowableHolderException ex) {
                        throw ex.getCause();
                }
                catch (TransactionSystemException ex2) {
                        if (throwableHolder.throwable != null) {
                                logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                                ex2.initApplicationException(throwableHolder.throwable);
                        }
                        throw ex2;
                }
                catch (Throwable ex2) {
                        if (throwableHolder.throwable != null) {
                                logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                        }
                        throw ex2;
                }

                // Check result state: It might indicate a Throwable to rethrow.
                if (throwableHolder.throwable != null) {
                        throw throwableHolder.throwable;
                }
                return result;
        }
}

然后进入到createTransactionIfNecessary方法中

进入 getTransaction

核心的是doBegin方法。完成 自动提交的关闭和 本地线程 对象的存储


TransactionInterceptor

TransactionInterceptor是如何注入到容器中的?

首先来看看事务的开启@EnableTransactionManagement

可以看到拦截器关联到了Advisor中


网站公告

今日签到

点亮在社区的每一天
去签到