【Spring】事务的执行原理(三)-Spring专区论坛-技术-SpringForAll社区

【Spring】事务的执行原理(三)

事务的回滚

  1. 如果获取事务属性不为空,并且抛出的异常是RuntimeException或者Error类型,调用事务管理器中的rollback方法进行回滚
  2. 如果事务属性为空或者抛出的异常不是RuntimeException,也不是Error,将继续提交事务
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {

    /**
     * 处理抛出异常下的事务
     */
    protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
        // 判空
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                        "] after exception: " + ex);
            }
            // 如果事务属性不为空并且异常是是RuntimeException或者Error
            if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                try {
                    // 获取事务管理器,调用rollback方法进行回滚
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                }
                catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                }
                catch (RuntimeException | Error ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    throw ex2;
                }
            }
            else {
                // 如果事务属性为空或者异常不是RuntimeException或者Error,继续提交事务
                try {
                    // 提交
                    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                }
                catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                }
                catch (RuntimeException | Error ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    throw ex2;
                }
            }
        }
    }
}

// DefaultTransactionAttribute中实现了rollbackOn方法
public class DefaultTransactionAttribute extends DefaultTransactionDefinition implements TransactionAttribute {
    /**
     * 判断是否是RuntimeException或者Error
     */
    @Override
    public boolean rollbackOn(Throwable ex) {
        return (ex instanceof RuntimeException || ex instanceof Error);
    }
}

rollback方法在AbstractPlatformTransactionManager中实现,主要分为以下三种情况:

  1. 判断事务是否设置了保存点,如果设置了将事务回滚到保存点
  2. 如果是一个独立的新事务,直接回滚即可
  3. 如果既没有设置保存点,也不是一个新事务,说明可能处于嵌套事务中,此时只设置回滚状态rollbackOnly为true,当它的外围事务进行提交时,如果发现回滚状态为true,则不提交

以上步骤执行完毕,调用cleanupAfterCompletion方法进行资源的清理已经挂起事务的恢复。

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    /* 
     * 回滚
     */
    @Override
    public final void rollback(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }
        // 转为DefaultTransactionStatus
        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        // 处理回滚
        processRollback(defStatus, false);
    }

    /**
     * 处理回滚
     */
    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
            boolean unexpectedRollback = unexpected;

            try {
                // 回滚之前的触发器
                triggerBeforeCompletion(status);
                // 是否有保存点
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                    // 回滚至保存点
                    status.rollbackToHeldSavepoint();
                }
                else if (status.isNewTransaction()) { // 如果是一个独立的新事务
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
                    // 直接回滚
                    doRollback(status);
                }
                else {
                    // 如果处于一个嵌套的事务汇总
                    if (status.hasTransaction()) {
                        // 如果本地的回滚状态置为true 或者 事务失败进行全局回滚
                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                            }
                            // 设置事务rollbackOnly状态为true
                            doSetRollbackOnly(stat);
                        }
                        else {
                            // 打印日志,意思是由事务的组织者决定是否回滚
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                            }
                        }
                    }
                    else {
                        // 打印DEBUG日志,应该回滚但是没有获取到事务
                        logger.debug("Should roll back transaction but cannot - no transaction available");
                    }
                    // Unexpected rollback only matters here if we're asked to fail early
                    if (!isFailEarlyOnGlobalRollbackOnly()) {
                        unexpectedRollback = false;
                    }
                }
            }
            catch (RuntimeException | Error ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }
            // 回滚之后的触发器
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

            // Raise UnexpectedRollbackException if we had a global rollback-only marker
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
        }
        finally {
            // 清除相关资源并恢复挂起的事务
            cleanupAfterCompletion(status);
        }
    }

    /**
     * 当一个事务失败以后,是否全局的标进行事务回滚
     */
    public final boolean isGlobalRollbackOnParticipationFailure() {
        return this.globalRollbackOnParticipationFailure;
    }
}

回滚处理

rollbackToHeldSavepoint回滚至保存点

rollbackToHeldSavepoint方法在AbstractTransactionStatus中实现,它调用了getSavepointManager方法获取保存点管理器,调用SavepointManager的rollbackToSavepoint方法进行回滚的:

public abstract class AbstractTransactionStatus implements TransactionStatus {

    public void rollbackToHeldSavepoint() throws TransactionException {
        Object savepoint = getSavepoint();
        if (savepoint == null) {
            throw new TransactionUsageException(
                    "Cannot roll back to savepoint - no savepoint associated with current transaction");
        }
        // getSavepointManager方法在DefaultTransactionStatus中实现
        getSavepointManager().rollbackToSavepoint(savepoint);
        getSavepointManager().releaseSavepoint(savepoint);
        setSavepoint(null);
    }
}

SavepointManager是一个接口,它的继承关系如下:

img

DefaultTransactionStatus中获取SavepointManager的方法:

  1. 获取transaction对象,前面的知识可知这里是一个DataSourceTransactionObject

  2. 由继承关系可知DataSourceTransactionObject也是SavepointManager子类,所以将DataSourceTransactionObject转为SavepointManager返回

    public class DefaultTransactionStatus extends AbstractTransactionStatus {
        @Override
        protected SavepointManager getSavepointManager() {
            // 前面的知识可知这里是一个DataSourceTransactionObject
            Object transaction = this.transaction;
            if (!(transaction instanceof SavepointManager)) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction object [" + this.transaction + "] does not support savepoints");
            }
            // 将DataSourceTransactionObject转为SavepointManager
            return (SavepointManager) transaction;
        }
    }
    

DataSourceTransactionObject是DataSourceTransactionManager的内部类,它继承了JdbcTransactionObjectSupport,rollbackToSavepoint方法在JdbcTransactionObjectSupport中实现:

  1. 获取ConnectionHolder,ConnectionHolder持有数据库连接
  2. 调用底层的rollback方法将事务回滚至保存点
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {
    // 内部类DataSourceTransactionObject
    private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

    }
}

// JdbcTransactionObjectSupport
public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject {
    /**
     * 回滚至保存点
     */
    @Override
    public void rollbackToSavepoint(Object savepoint) throws TransactionException {
        // 获取ConnectionHolder
        ConnectionHolder conHolder = getConnectionHolderForSavepoint();
        try {
            // 调用底层的rollback方法将事务回滚至保存点
            conHolder.getConnection().rollback((Savepoint) savepoint);
            conHolder.resetRollbackOnly();
        }
        catch (Throwable ex) {
            throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
        }
    }

    /**
     * 释放保存点
     */
    @Override
    public void releaseSavepoint(Object savepoint) throws TransactionException {
        ConnectionHolder conHolder = getConnectionHolderForSavepoint();
        try {
            // 调用底层的方法释放保存点
            conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
        }
        catch (Throwable ex) {
            logger.debug("Could not explicitly release JDBC savepoint", ex);
        }
    }
}

doRollback事务回滚

回滚事务时先获取数据库连接,然后调用底层的rollback进行回滚:

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {

    @Override
    protected void doRollback(DefaultTransactionStatus status) {
        // 获取数据源事务对象
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        // 获取数据库连接
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
        }
        try {
            // 调用底层的回滚方法
            con.rollback();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }
    }
}

doSetRollbackOnly设置回滚状态

doSetRollbackOnly方法在DataSourceTransactionManager中实现:

  1. 将事务转为DataSourceTransactionObject对象,前面讲过DataSourceTransactionObject持有了数据库连接对象ConnectionHolder
  2. 将ConnectionHolder的rollbackOnly属性置为true,先标记事务的回滚状态,交由外围事务进行判断统一进行回滚
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {

    @Override
    protected void doSetRollbackOnly(DefaultTransactionStatus status) {
        // 获取数据源事务对象
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        if (status.isDebug()) {
            logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
                    "] rollback-only");
        }
        // 设置回滚状态
        txObject.setRollbackOnly();
    }

    /**
     * 内部类DataSourceTransactionObject
     */
    private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

        // 省略其他方法

        // 设置回滚状态
        public void setRollbackOnly() {
            // 将ConnectionHolder的rollbackOnly属性置为true,在ConnectionHolder的父类ResourceHolderSupport中实现
            getConnectionHolder().setRollbackOnly();
        }
    }
}

// ConnectionHolder的父类ResourceHolderSupport
public abstract class ResourceHolderSupport implements ResourceHolder {

    private boolean rollbackOnly = false;

    /**
     * 标记事务回滚状态为true
     */
    public void setRollbackOnly() {
        this.rollbackOnly = true;
    }
}

资源清理

在事务回滚之后,需要清理相关的资源以及恢复被挂起的事务:

  1. 如果事务的newSynchronization状态为true,

    清除当前线程绑定的事务相关信息

    • 在TransactionSynchronizationManager的clear方法中实现,清理了当前线程绑定的事务名称、事务隔离级别等信息
  2. 如果是一个新事务,清除当前线程与数据库连接的绑定关系,在DataSourceTransactionManager的doCleanupAfterCompletion方法中实现

  3. 如果挂起的事务不为空,恢复挂起的事务

    • 获取数据源,恢复数据源与挂起事务的绑定关系

    • 恢复挂起事务与当前线程的同步信息

      public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
      
          /**
           * 回滚之后的清除操作
           * @see #doCleanupAfterCompletion
           */
          private void cleanupAfterCompletion(DefaultTransactionStatus status) {
              status.setCompleted();
              if (status.isNewSynchronization()) {
                  // 清除当前线程绑定的信息
                  TransactionSynchronizationManager.clear();
              }
              // 如果是一个新事务
              if (status.isNewTransaction()) {
                  // 清除当前线程与数据库连接的绑定关系
                  doCleanupAfterCompletion(status.getTransaction());
              }
              // 如果挂起的事务不为空
              if (status.getSuspendedResources() != null) {
                  if (status.isDebug()) {
                      logger.debug("Resuming suspended transaction after completion of inner transaction");
                  }
                  Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
                  // 恢复挂起的事务
                  resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
              }
          }
      
          /**
           * 恢复挂起的事务
           * @see #doResume
           * @see #suspend
           */
          protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
                  throws TransactionException {
      
              if (resourcesHolder != null) {
                  // 获取挂起的事务
                  Object suspendedResources = resourcesHolder.suspendedResources;
                  if (suspendedResources != null) {
                      // 获取数据源,并与挂起的事务进行绑定
                      doResume(transaction, suspendedResources);
                  }
                  // 挂起事务的同步信息
                  List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
                  if (suspendedSynchronizations != null) {
                      // 恢复事务与线程的同步信息
                      TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
                      TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
                      TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
                      TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
                      doResumeSynchronization(suspendedSynchronizations);
                  }
              }
          }
      }
      
      // DataSourceTransactionManager
      public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
              implements ResourceTransactionManager, InitializingBean {
          // 主要是清除当前线程与数据库连接的绑定关系
          protected void doCleanupAfterCompletion(Object transaction) {
              DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
      
              // Remove the connection holder from the thread, if exposed.
              if (txObject.isNewConnectionHolder()) {
                  TransactionSynchronizationManager.unbindResource(obtainDataSource());
              }
      
              // 获取数据库连接
              Connection con = txObject.getConnectionHolder().getConnection();
              try {
                  if (txObject.isMustRestoreAutoCommit()) {
                      // 自动提交置为true
                      con.setAutoCommit(true);
                  }
                  // 重置数据库连接的相关设置
                  DataSourceUtils.resetConnectionAfterTransaction(
                          con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
              }
              catch (Throwable ex) {
                  logger.debug("Could not reset JDBC Connection after transaction", ex);
              }
      
              if (txObject.isNewConnectionHolder()) {
                  if (logger.isDebugEnabled()) {
                      logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
                  }
                  // 释放连接
                  DataSourceUtils.releaseConnection(con, this.dataSource);
              }
              // 清除当前线程与数据库连接的绑定关系
              txObject.getConnectionHolder().clear();
          }
      
          @Override
          protected void doResume(@Nullable Object transaction, Object suspendedResources) {
              // 获取数据源,并与挂起的事务进行绑定
              TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
          }
      }
      
      // TransactionSynchronizationManager
      public abstract class TransactionSynchronizationManager {
      
          // 保存了线程绑定的数据库资源信息,Map中Key为数据源构建的KEY,value为对应的ConnectionHolder
          private static final ThreadLocal<Map<Object, Object>> resources =
                  new NamedThreadLocal<>("Transactional resources");
          // 保存了线程绑定的事务同步信息TransactionSynchronization
          private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
                  new NamedThreadLocal<>("Transaction synchronizations");
      
          // 保存了线程绑定的事务名称
          private static final ThreadLocal<String> currentTransactionName =
                  new NamedThreadLocal<>("Current transaction name");
          // 保存了线程绑定的事务只读状态
          private static final ThreadLocal<Boolean> currentTransactionReadOnly =
                  new NamedThreadLocal<>("Current transaction read-only status");
      
          // 保存了线程绑定的事务隔离级别
          private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
                  new NamedThreadLocal<>("Current transaction isolation level");
          // 保存了线程绑定的事务活跃状态
          private static final ThreadLocal<Boolean> actualTransactionActive =
                  new NamedThreadLocal<>("Actual transaction active");
      
          /**
           * 清理事务与当前线程的各种同步状态
           */
          public static void clear() {
              // 清除当前线程绑定的事务同步信息TransactionSynchronization
              synchronizations.remove();
              // 清除当前线程绑定的事务名称
              currentTransactionName.remove();
              // 清除线程绑定的事务只读状态
              currentTransactionReadOnly.remove();
              // 清除线程绑定的事务隔离级别
              currentTransactionIsolationLevel.remove();
              // 清除线程绑定的事务活跃状态
              actualTransactionActive.remove();
          }
      }
      

总结

img

原文地址:https://shan-ml.github.io/2022/04/02/%E3%80%90Srping%E3%80%91%E4%BA%8B%E5%8A%A1%E7%9A%84%E6%89%A7%E8%A1%8C%E5%8E%9F%E7%90%86%EF%BC%88%E4%B8%80%EF%BC%89/

请登录后发表评论

    没有回复内容