【Spring】事务的执行原理(二)-Spring专区论坛-技术-SpringForAll社区

【Spring】事务的执行原理(二)

前置知识

事务的执行步骤如下:

  1. 获取事务管理器
  2. 创建事务
  3. 执行目标方法
  4. 捕捉异常,如果出现异常进行回滚
  5. 提交事务
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    @Nullable
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                                             final InvocationCallback invocation) throws Throwable {
        // 获取TransactionAttributeSource
        TransactionAttributeSource tas = getTransactionAttributeSource();
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        // 1. 获取事务管理器
        final TransactionManager tm = determineTransactionManager(txAttr);
        // 省略部分代码
        // ...
        PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
            // 2. 创建事务
            TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

            Object retVal;
            try {
                // 3. 执行方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // 4. 回滚事务
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                // 清除事务
                cleanupTransactionInfo(txInfo);
            }
            // 省略代码
            // ...

            // 5. 提交事务
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }
        else {
            // 省略代码
            // ...
        }
    }
}

接下来,详细看一下事务的创建过程。

TransactionManager

事务管理器是Srping对事务进行管理的核心,PlatformTransactionManager里面定义了获取事务、提交事务、回滚事务的接口,不同的数据源可以有自己的实现,比如常见的JDBC数据源事务管理器DataSourceTransactionManager以及分布式事务管理器JtaTransactionManager:

img

PlatformTransactionManager

public interface PlatformTransactionManager extends TransactionManager {
    /**
     * 获取事务状态
     */
    TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException;

    /**
     * 提交事务
     */
    void commit(TransactionStatus status) throws TransactionException;

    /**
     * 回滚
     */
    void rollback(TransactionStatus status) throws TransactionException;
}

TransactionStatus

img

PlatformTransactionManager的getTransaction返回的是TransactionStatus,TransactionStatus是一个接口,主要的实现在DefaultTransactionStatus中:

/**
 * TransactionStatus可以获取事务的状态,也可以在发生异常时用于回滚
 */
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {

    /**
     * 返回事务是否设置了保存点
     */
    boolean hasSavepoint();

    /**
     * flush所有的session
     */
    @Override
    void flush();

}

/**
 * DefaultTransactionStatus
 */
public class DefaultTransactionStatus extends AbstractTransactionStatus {
    // 事务
    @Nullable
    private final Object transaction;

    // 是否新事务
    private final boolean newTransaction;

    private final boolean newSynchronization;

    // 是否只读
    private final boolean readOnly;

    private final boolean debug;

    // 挂起的事务
    @Nullable
    private final Object suspendedResources;

    /**
     * 构造函数
     */
    public DefaultTransactionStatus(
            @Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
            boolean readOnly, boolean debug, @Nullable Object suspendedResources) {
        this.transaction = transaction;
        this.newTransaction = newTransaction;
        this.newSynchronization = newSynchronization;
        this.readOnly = readOnly;
        this.debug = debug;
        this.suspendedResources = suspendedResources;
    }
}

TransactionInfo

事务的创建方法createTransactionIfNecessary返回的是TransactionInfo对象,后续回滚事务、提交事务等操作,传入的都是TransactionInfo这个对象,它是TransactionAspectSupport的内部类,对事务管理器transactionManager、事务属性transactionAttribute、事务的状态transactionStatus等事务的相关信息进行了封装:

/**
 * TransactionAspectSupport的内部类TransactionInfo
 */
protected static final class TransactionInfo {
    // 事务管理器
    @Nullable
    private final PlatformTransactionManager transactionManager;

    // 事务属性
    @Nullable
    private final TransactionAttribute transactionAttribute;

    private final String joinpointIdentification;

    // 事务状态
    @Nullable
    private TransactionStatus transactionStatus;

    // 旧的事务信息
    @Nullable
    private TransactionInfo oldTransactionInfo;

    private void bindToThread() {
        // 从Holder中获取当前线程绑定的事务信息
        this.oldTransactionInfo = transactionInfoHolder.get();
        // 更新当前线程对应的事务信息
        transactionInfoHolder.set(this);
    }
}

事务的创建

事务的创建分为两大部分:

  1. 调用事务管理器的getTransaction获取事务状态,返回的是TransactionStatus类型的对象
  2. 预处理事务,进行事务相关信息的封装以及事务和线程的绑定,返回的是TransactionInfo对象
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    /**
     * 创建事务
     */
    @SuppressWarnings("serial")
    protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                                                           @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

        // 如果事务属性不为空,但是名称为空
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }
        // 事务状态
        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                // 获取事务状态
                status = tm.getTransaction(txAttr);
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }
        // 预处理
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }
}

获取事务状态

事务状态获取是调用PlatformTransactionManager事务管理器的getTransaction获取的,具体实现在AbstractPlatformTransactionManager中:

  1. 调用doGetTransaction方法获取事务,它是一个抽象方法,需要子类实现,数据源的不同具体的实现类也不同,接下来以常见的DataSourceTransactionManager为例,查看获取事务的具体实现逻辑

  2. 当前线程经存在事务,

    判断方式是通过当前线程是否持有数据库连接并且事务处于活跃状态

    • 如果存在事务,需要根据事务传播行为进行不同的处理
  3. 当前线程不存在事务

    • 如果事务的传播行为是PROPAGATION_MANDATORY,当前线程没有事务会抛出异常
    • 如果传播行为是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW或者PROPAGATION_NESTED,先挂起一个空事务,然后新建事务
    • 其他情况,调用prepareTransactionStatus创建TransactionStatus并返回
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    /**
     * 获取事务
     */
    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {
        // 获取事务定义
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
        // 获取事务
        Object transaction = doGetTransaction();
        boolean debugEnabled = logger.isDebugEnabled();
        // 当前的线程是否已经存在事务
        if (isExistingTransaction(transaction)) {
            // 如果已经存在事务,根据事务传播行为的设置进行不同的处理
            return handleExistingTransaction(def, transaction, debugEnabled);
        }
        // 当前线程不存在事务
        // 检查事务是否超时
        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
        }

        // 如果事务的传播行为是PROPAGATION_MANDATORY,表示当前线程没有事务会抛出异常,而走到这里说明当前没有事务,所以抛出异常
        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }// 如果传播行为是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW或者PROPAGATION_NESTED
        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            // 因为当前线程不存在事务,所以先挂起一个空事务
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
            }
            try {
                // 开启新事务
                return startTransaction(def, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + def);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            // 其他情况,创建TransactionStatus
            return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
    }
}
doGetTransaction获取事务

DataSourceTransactionManager实现了doGetTransaction获取事务的方法:

  1. 创建DataSourceTransactionObject对象,它是DataSourceTransactionManager内部类,持有一个ConnectionHolder对象,里面记录了数据库的连接
  2. 调用obtainDataSource获取数据源
  3. 从ThreadLocal中获取当前线程对应的Map资源数据集合,根据第2步中获取到的数据源从Map中获取对应的ConnectionHolder,也就是说当前线程绑定了某个数据源的连接,从ThreadLocal获取到数据库连接之后,将数据库连接设置到DataSourceTransactionObject中
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {

    /**
     * 获取事务,返回的是DataSourceTransactionObject
     */
    @Override
    protected Object doGetTransaction() {
        // 创建DataSourceTransactionObject
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        // 设置是否允许保存点
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        // 根据数据源获取当前线程绑定的ConnectionHolder对象,ConnectionHolder中存有数据库连接
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
        // 设置ConnectionHolder对象
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

    /**
     * 获取数据源
     */
    protected DataSource obtainDataSource() {
        // 获取数据源
        DataSource dataSource = getDataSource();
        Assert.state(dataSource != null, "No DataSource set");
        return dataSource;
    }

    /**
     * 内部类,DataSourceTransactionObject,记录了数据源信息
     */
    private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

        private boolean newConnectionHolder;

        private boolean mustRestoreAutoCommit;

        // 设置连接信息,connectionHolder在父类JdbcTransactionObjectSupport中
        public void setConnectionHolder(@Nullable ConnectionHolder connectionHolder, boolean newConnectionHolder) {
            super.setConnectionHolder(connectionHolder);
            this.newConnectionHolder = newConnectionHolder;
        }
    }
}

// JdbcTransactionObjectSupport
public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject {
    // ConnectionHolder,记录了数据库连接
    @Nullable
    private ConnectionHolder connectionHolder;
}

// ConnectionHolder
public class ConnectionHolder extends ResourceHolderSupport {
    // 数据库连接
    @Nullable
    private Connection currentConnection;
}

TransactionSynchronizationManager

TransactionSynchronizationManager中保存了线程绑定的各种信息:

public abstract class TransactionSynchronizationManager {
    // 保存了线程绑定的数据库资源信息,Map中Key为数据源构建的KEY,value为对应的ConnectionHolder
    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");
    // 保存了线程绑定的事务同步信息TransactionSynchronization
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<>("Transaction synchronizations");

    // 保存了线程绑定的事务名称
    private static final ThreadLocal<String> currentTransactionName =
            new NamedThreadLocal<>("Current transaction name");
    // 保存了线程绑定的事务只读状态
    private static final ThreadLocal<Boolean> currentTransactionReadOnly =
            new NamedThreadLocal<>("Current transaction read-only status");

    // 保存了线程绑定的事务隔离级别
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
            new NamedThreadLocal<>("Current transaction isolation level");
    // 保存了线程绑定的事务活跃状态
    private static final ThreadLocal<Boolean> actualTransactionActive =
            new NamedThreadLocal<>("Actual transaction active");

    /**
     * 根据key(数据源dataSource)获取当前线程绑定的资源,也就是ConnectionHoler
     */
    @Nullable
    public static Object getResource(Object key) {
        // 构建KEY
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        // 获取资源
        Object value = doGetResource(actualKey);
        if (value != null && logger.isTraceEnabled()) {
            logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
                    Thread.currentThread().getName() + "]");
        }
        return value;
    }

    /**
     * 根据KEY获取资源
     */
    @Nullable
    private static Object doGetResource(Object actualKey) {
        // 获取当前线程对应的数据,其中KEY为根据数据源构建出的KEY,value为ConnectionHoler
        Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
        // 根据KEY获取数据源
        Object value = map.get(actualKey);
        // Transparently remove ResourceHolder that was marked as void...
        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
            map.remove(actualKey);
            // Remove entire ThreadLocal if empty...
            if (map.isEmpty()) {
                resources.remove();
            }
            value = null;
        }
        return value;
    }
}
isExistingTransaction是否存在事务

isExistingTransaction同样在DataSourceTransactionManager中实现,具体是是通过DataSourceTransactionObject是否持有数据库连接并且事务处于活跃状态来判断是否存在事务的:

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {
    /**
     * 是否存在事务
     */
    @Override
    protected boolean isExistingTransaction(Object transaction) {
        // 转换为DataSourceTransactionObject
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        // 通过判断DataSourceTransactionObject是否持有ConnectionHolder并且事务处于活跃状态
        return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
    }
}
handleExistingTransaction当程已存在事务情况下的处理

在当前线程已存在事务的情况下:

  1. 如果事务传播行为设是PROPAGATION_NEVER,表示以非事务的方式执行,如果当前存在事务,将抛出异常
  2. 如果事务的传播行为是PROPAGATION_NOT_SUPPORTED,表示以非事务的方式执行,如果当前存在事务,则挂起当前的事务,不使用事务
  3. 如果事务的传播行为是PROPAGATION_REQUIRES_NEW,需要挂起当前事务,创建一个自己的事务
  4. 如果事务的传播行为是PROPAGATION_NESTED,判断是否使用保存点
    • 如果是使用嵌套事务
    • 如果不是,开启一个新事务
  5. 非以上四种情况,使用当前的事务,调用prepareTransactionStatus方法创建TransactionStatus
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    /**
     * 获取事务
     */
    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {
        /**
         * 处理存在的事务,主要是对事务传播行为的判断和处理
         */
        private TransactionStatus handleExistingTransaction (
                TransactionDefinition definition, Object transaction,boolean debugEnabled)
            throws TransactionException {
            // 如果事务传播行为设置的是PROPAGATION_NEVER,表示以非事务的方式执行,所以当前存在事务将抛出异常
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
                throw new IllegalTransactionStateException(
                        "Existing transaction found for transaction marked with propagation 'never'");
            }
            // 如果事务的传播行为是PROPAGATION_NOT_SUPPORTED,表示以非事务的方式执行,如果当前存在事务,则挂起当前事务,不使用事务
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
                if (debugEnabled) {
                    logger.debug("Suspending current transaction");
                }
                // 挂起当前事务
                Object suspendedResources = suspend(transaction);
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                // 创建TransactionStatus,第二个参数传入的是事务,可以看到这里设置的为NULL,不使用事务
                return prepareTransactionStatus(
                        definition, null, false, newSynchronization, debugEnabled, suspendedResources);
            }
            // 如果事务的传播行为是PROPAGATION_REQUIRES_NEW,需要挂起当前事务创建新事务
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
                if (debugEnabled) {
                    logger.debug("Suspending current transaction, creating new transaction with name [" +
                            definition.getName() + "]");
                }
                // 挂起当前事务
                SuspendedResourcesHolder suspendedResources = suspend(transaction);
                try {
                    // 新建事务
                    return startTransaction(definition, transaction, debugEnabled, suspendedResources);
                } catch (RuntimeException | Error beginEx) {
                    resumeAfterBeginException(transaction, suspendedResources, beginEx);
                    throw beginEx;
                }
            }
            // 如果是PROPAGATION_NESTED
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                // 是否允许嵌套事务
                if (!isNestedTransactionAllowed()) {
                    throw new NestedTransactionNotSupportedException(
                            "Transaction manager does not allow nested transactions by default - " +
                                    "specify 'nestedTransactionAllowed' property with value 'true'");
                }
                if (debugEnabled) {
                    logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
                }
                // 是否使用保存点
                if (useSavepointForNestedTransaction()) {
                    // 创建DefaultTransactionStatus,开启嵌套事务
                    DefaultTransactionStatus status =
                            prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                    // 创建保存点
                    status.createAndHoldSavepoint();
                    // 返回事务状态
                    return status;
                } else {
                    // 新建事务
                    return startTransaction(definition, transaction, debugEnabled, null);
                }
            }

            // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
            if (debugEnabled) {
                logger.debug("Participating in existing transaction");
            }
            // 校验存在事务的合法性
            if (isValidateExistingTransaction()) {
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                    Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                    if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                        Constants isoConstants = DefaultTransactionDefinition.constants;
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                (currentIsolationLevel != null ?
                                        isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                        "(unknown)"));
                    }
                }
                if (!definition.isReadOnly()) {
                    if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                definition + "] is not marked as read-only but existing transaction is");
                    }
                }
            }
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 创建TransactionStatus
            return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
        }
    }
}
prepareTransactionStatus创建TransactionStatus

prepareTransactionStatus用于创建TransactionStatus,具体创建的是DefaultTransactionStatus类型的:

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    /**
     * 创建TransactionStatus
     *
     * @param definition         事务定义
     * @param transaction        事务
     * @param newTransaction     是否新事务
     * @param newSynchronization 是否需要同步线程绑定的信息
     * @param debug
     * @param suspendedResources 挂起的事务
     * @return
     */
    protected final DefaultTransactionStatus prepareTransactionStatus(
            TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
            boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
        // 创建DefaultTransactionStatus
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
        // 初始化事务的同步信息
        prepareSynchronization(status, definition);
        return status;
    }

    /**
     * 初始化事务的同步信息
     */
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        if (status.isNewSynchronization()) {
            // 设置当前线程绑定的线程活跃状态
            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
            // 设置绑定的事务隔离级别
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                            definition.getIsolationLevel() : null);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
            // 设置绑定的事务名称
            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
            TransactionSynchronizationManager.initSynchronization();
        }
    }
}
suspend挂起事务

挂起事务其实是将当前事务的相关设置清除,并解绑当前线程对应的数据库连接:

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    /**
     * 挂起事务
     */
    @Nullable
    protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
        // 判断活跃状态
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
            try {
                Object suspendedResources = null;
                if (transaction != null) {
                    // 挂起事务
                    suspendedResources = doSuspend(transaction);
                }
                String name = TransactionSynchronizationManager.getCurrentTransactionName();
                // 清除当前事务的各种信息
                TransactionSynchronizationManager.setCurrentTransactionName(null);
                boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
                boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                TransactionSynchronizationManager.setActualTransactionActive(false);
                return new SuspendedResourcesHolder(
                        suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
            }
            catch (RuntimeException | Error ex) {
                // doSuspend failed - original transaction is still active...
                doResumeSynchronization(suspendedSynchronizations);
                throw ex;
            }
        }
        else if (transaction != null) {
            // Transaction active but no synchronization active.
            Object suspendedResources = doSuspend(transaction);
            return new SuspendedResourcesHolder(suspendedResources);
        }
        else {
            // Neither transaction nor synchronization active.
            return null;
        }
    }
}

// DataSourceTransactionManager挂起事务
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {
    @Override
    protected Object doSuspend(Object transaction) {
        // 转为DataSourceTransactionObject
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        // 将绑定的数据库连接置为null
        txObject.setConnectionHolder(null);
        // 解绑当前线程的对应的数据源连接
        return TransactionSynchronizationManager.unbindResource(obtainDataSource());
    }
}

// 解绑资源
public abstract class TransactionSynchronizationManager {

    // 保存了线程绑定的数据库资源信息,Map中Key为数据源构建的KEY,value为对应的ConnectionHolder
    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");

    /**
     * 解绑当前线程对应的资源
     */
    public static Object unbindResource(Object key) throws IllegalStateException {
        // 根据对象获取KEY
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        // 解绑当前线程绑定的资源
        Object value = doUnbindResource(actualKey);
        if (value == null) {
            throw new IllegalStateException(
                    "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
        }
        return value;
    }

    /**
     * 解绑资源
     */
    @Nullable
    private static Object doUnbindResource(Object actualKey) {
        // 获取当前线程对应的数据
        Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
        // 从Map中移除资源
        Object value = map.remove(actualKey);
        if (map.isEmpty()) {
            resources.remove();
        }
        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
            value = null;
        }
        if (value != null && logger.isTraceEnabled()) {
            logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
                    Thread.currentThread().getName() + "]");
        }
        return value;
    }
}
startTransaction新建事务

startTransaction用于开启一个新事务,在AbstractPlatformTransactionManager中实现:

  1. 创建TransactionStatus,可以看到在构造函数中传入了事务定义、当前事务、是否新建事务(置为了true)、是否同步、是否debug、挂起的事务这些参数
  2. 调用doBegin方法设置事务的一些信息
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    /**
     * 开启一个新事务
     */
    private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
                                               boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        // 创建新事务DefaultTransactionStatus,第三个参数表示是否新建事务,这里置为了true, suspendedResources记录了挂起的事务
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        // 设置事务的相关信息
        doBegin(transaction, definition);
        prepareSynchronization(status, definition);
        return status;
    }
}

doBegin方法在DataSourceTransactionManager中实现:

  1. 判断当前事务是否持有数据库连接获取事务与数据库的同步状态为true,从数据源中新建一个数据库连接,并与当前事务绑定
  2. 如果开启了自动提交,将自动提交置为false
  3. 如果是新创建的连接,将数据库连接ConnectionHolder绑定到当前线程
// DataSourceTransactionManager实现了doBegin
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {
    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        // 转为DataSourceTransactionObject
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            // 如果未持有连接信息或者事务与数据库的同步状态为true
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                // 从数据源获取一个新的连接
                Connection newCon = obtainDataSource().getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                // 将连接信息设置到当前的事务中
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
            // 将事务与数据库连接的同步状态置为true
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            // 获取数据库连接
            con = txObject.getConnectionHolder().getConnection();
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            txObject.setReadOnly(definition.isReadOnly());
            // 是否是自动提交
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                // 自动提交置为false
                con.setAutoCommit(false);
            }
            // 处理连接
            prepareTransactionalConnection(con, definition);
            // 事务置为活跃状态
            txObject.getConnectionHolder().setTransactionActive(true);
            // 超时时间获取
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
            // 是否是新建的连接
            if (txObject.isNewConnectionHolder()) {
                // 将数据库连接ConnectionHolder绑定到当前线程
                TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
            }
        }
        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, obtainDataSource());
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }
}

事务预处理

  1. 创建TransactionInfo事务信息,它是TransactionAspectSupport的内部类,将事务管理器transactionManager、事务属性transactionAttribute、事务的状态transactionStatus等事务相关的信息进行了封装,并实现了将事务绑定到当前线程的方法
  2. 调用TransactionInfo的bindToThread方法将事务与当前线程绑定,是通过ThreadLocal实现的
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    /**
     * 准备事务
     */
    protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
                                                     @Nullable TransactionAttribute txAttr, String joinpointIdentification,
                                                     @Nullable TransactionStatus status) {
        // 创建事务信息
        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
            }
            // 设置事务状态
            txInfo.newTransactionStatus(status);
        }
        else {
            if (logger.isTraceEnabled()) {
                logger.trace("No need to create transaction for [" + joinpointIdentification +
                        "]: This method is not transactional.");
            }
        }

        // 将事务绑定到当前线程
        txInfo.bindToThread();
        return txInfo;
    }

    /**
     * 使用ThreadLocal记录线程绑定的事务
     */
    private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
            new NamedThreadLocal<>("Current aspect-driven transaction");

    /**
     * 内部类TransactionInfo
     */
    protected static final class TransactionInfo {

        @Nullable
        private final PlatformTransactionManager transactionManager;

        @Nullable
        private final TransactionAttribute transactionAttribute;

        private final String joinpointIdentification;

        @Nullable
        private TransactionStatus transactionStatus;

        @Nullable
        private TransactionInfo oldTransactionInfo;

        private void bindToThread() {
            // 从Holder中获取当前线程绑定的事务信息
            this.oldTransactionInfo = transactionInfoHolder.get();
            // 更新当前线程对应的事务信息
            transactionInfoHolder.set(this);
        }
    }
}

总结

img

参考

【猫吻鱼】Spring源码分析:全集整理

原文地址:https://shan-ml.github.io/2022/04/02/%E3%80%90Srping%E3%80%91%E4%BA%8B%E5%8A%A1%E7%9A%84%E6%89%A7%E8%A1%8C%E5%8E%9F%E7%90%86%EF%BC%88%E4%B8%80%EF%BC%89/

请登录后发表评论

    没有回复内容