分布式事务

Posted by Shane on November 8, 2019

数据库事务

  • Atomic(原子性):事务必须是原子的工作单元
  • Consistent(一致性):事务完成时,必须使所有的数据都保持一致状态
  • Isolation(隔离性):并发事务所做的修改必须和其他事务所做的修改是隔离的
  • Duration(持久性):事务完成之后,对系统的影响是永久性的

MySQL 的事务处理过程

  1. 记录 redo 和 undo log 文件,确保日志在磁盘上的持久化
  2. 更新数据记录
  3. 提交事务,redo 写入 commit 记录

分布式事务产生背景

  • 数据库分库分表
  • SOA 化

在分布式系统中,每一个机器节点虽然都能明确的知道自己执行的事务是否成功还是失败,但是却无法知道其他分布式节点的事务执行情况。因此,当一个事务要跨越多个分布式节点的时候(比如下单流程,下单系统和库存系统可能分别部署在不同的分布式节点中),为了保证该事务可以满足 ACID,就要引入一个协调者(Coordinator)。其他的节点被称为参与者(Participant)。协调者负责调度参与者的行为,最终决定这些参与者是否要把事务进行提交。

X/Open DTP

  • X/Open Distributed Transaction Processing Reference Model 分布式事务处理参考模型
  • X/Open 是一个组织机构,定义出的一套分布式事务标准,定义了规范的 API 接口
  • 2PC:two-phase-commit,用来保证分布式事务的完整性
  • JTA:Java Transaction API,J2EE 遵循了 X/Open DTP 规范,设计并实现了 Java 的分布式事务编程接口规范
  • XA :是 X/Open DTP 定义的中间件与数据库之间的接口规范。XA 接口函数由数据库厂商提供

X/Open DTP 角色

  • AP(Application):应用程序
  • RM(Resources Manager):资源管理器,一般表示数据库,必须实现 XA 定义的接口
  • TM(Transaction Manager):事务管理器,负责协调和事务管理

X/Open DTP 角色

2PC

阶段一:提交事务请求(投票)

  1. TM 向所有的 AP 发送事务内容,询问是否可以执行事务的提交操作,并等待各个 AP 的响应
  2. 执行事务。各个 AP 节点执行事务操作,将 undo 和 redo 信息记录到事务日志中,尽量把提交过程中所消耗时间的操作和准备都提前完成后,确保后续事务提交的成功率
  3. 各个 AP 向 TM 反馈事务询问的响应。各个 AP 成功执行了事务操作,那么反馈给 TM yes 的 response;如果 AP 没有成功执行事务,就反馈 TM no 的 response

阶段二:执行事务提交

执行事务提交

2PC-执行事务提交

假设一个事务的提交过程总共需要 30s,其中 prepare 操作需要 28s(事务日志落地磁盘及各种 IO 操作),而真正 commit 只需要 2s。那么 commit 阶段发生错误的概率和 prepare 相比,2/28(<10%),只要第一阶段成功,那么 commit 阶段出现失败的概率就非常小,大大增加了分布式事务的成功概率

中断事务提交

2PC-中断事务提交

2PC 存在的问题

  1. 数据一致性问题
    • 阶段二,TM 给 AP1、AP2 和 AP3 发送 commit 请求后,TM 和 AP1 挂掉,TM 恢复后,AP2 和 AP3 返回 yes,提交成功,AP1 无论恢复与否,都造成了数据不一致,AP1 启动后不知道上一个 TM 的请求内容
  2. 同步阻塞
    • 阶段二,TM 给 AP 发送 commit/abort 之前挂掉,AP 等待 TM 的请求,造成阻塞状态,影响性能
  3. 二阶段无法解决的问题
    • 协调者在发出 commit 消息之后宕机,而唯一接受到这条消息的参与者同时也宕机了。那么即使协调者通过选举协议产生了新的协调者,这条事务的状态也是不确定的,没人知道事务是否已经被提交
  4. 单点故障
    • 由于协调者的重要性,一旦协调者发生故障,参与者会一直阻塞下去

虽然通过 2PC 方式的分布式事务能够达到预期的效果,但是在现实中很少会用到 2PC 方式提交的 XA 事务,原因如下:

  1. 互联网电商应用的快速发展,对事务和数据的绝对一致性要求没有传统企业应用那么高
  2. XA 事务的介入增加了 TM 中间件,使得系统复杂化
  3. XA 事务的性能不高,因为 TM 要等待 RM 回应,所以为了确保事务尽量成功提交,等待超时的时间通常比较长,比如 30s 到几分钟,如果 RM 出现故障或者响应比较慢,则整个事务的性能严重下降

3PC

阶段一:canCommit

阶段二:preCommit

阶段三:doCommit

改进点

  1. 增加了超时机制
  2. 第二阶段,如果协调者超时没有接收到参与者的反馈,则自动认为失败,发送 abort 命令
  3. 第三阶段,如果参与者超时没有接收到协调者的反馈,则自动认为成功,开始提交事务(基于概率)

3PC 的问题

相对于 2PC,3PC 主要解决单点故障问题,并减少阻塞。因为一旦参与者无法及时收到来自协调者的信息,则会默认执行 commit,而不会一直持有事务资源并处于阻塞状态。但是这种机制也会导致数据一致性问题,由于网络问题,协调者发送端的 abort 响应没有及时被参与者接收到,那么参与者在等待超时之后执行了 commit 操作,这样就和其他接收到 abort 命令并执行回滚的参与者之间存在数据不一致的情况。

XA/JTA

JTA 中涉及的角色

  • 开发人员:只需通过使用 UserTransaction 接口来进行操作分布式事务

  • TransactionManager 的实现提供者:实现 UserTransaction、TransactionManager、Transaction 接口,通过与 XAResource 接口交互来实现分布式事务,TransactionManager 的实现提供者如 JOTM(Java Open Transaction Manager)、Atomikos,他们的上述接口分别如下
    1. JOTM 的 UserTransaction 实现是 org.objectweb.jotm.Current
    2. JOTM 的 TransactionManager 实现仍然是 org.objectweb.jotm.Current
    3. JOTM 的 Transaction 实现是 org.objectweb.jotm.TransactionImpl
    4. Atomikos 的 UserTransaction 实现是 com.atomikos.icatch.jta.UserTransactionImp
    5. Atomikos 的 TransactionManager 实现是 com.atomikos.icatch.jta.UserTransactionManager
    6. Atomikos 的 Transaction 实现是 com.atomikos.icatch.jta.TransactionImp
  • XAResource 接口的实现者:需要由资源管理器者来实现,若资源管理器是数据库,则数据库需要提供XAResource 接口的实现。如对于 MySQL 来说,对应的实现为 com.mysql.jdbc.jdbc2.optional.MysqlXAConnection。这种形式下的数据库驱动就是支持分布式事务的数据库驱动

  • 知名的分布式事务管理器主要有 atomikos、bitronix、narayana。其中,仅 atomikos 支持 XA 和 TCC 两种机制;bitronix、narayana 则只支持 XA 机制。这三者都不提供对 dubbo 的开箱即用的支持,需要自行集成。
  • 目前对 dubbo 提供开箱即用支持的分布式事务管理器有:ByteJTA(基于 XA 机制)、ByteTCC(基于 TCC 机制)。

JTA 分布式事务的实现

Spring 集成 atomikos

  1. pom.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
    <dependency>
        <groupId>com.atomikos</groupId>
        <artifactId>transactions-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>javax.transaction</groupId>
        <artifactId>jta</artifactId>
    </dependency>
    <dependency>
        <groupId>javax</groupId>
        <artifactId>javaee-api</artifactId>
    </dependency>
    
  2. META-INF/spring/service-common.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/tx
            http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-4.3.xsd" default-autowire="byName">
       
        <context:property-placeholder location="classpath:application.properties"/>
        <!--XADataSource-->
        <bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init"
              destroy-method="close" abstract="true">
            <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
            <property name="poolSize" value="10"/>
            <property name="minPoolSize" value="10"/>
            <property name="maxPoolSize" value="30"/>
            <property name="borrowConnectionTimeout" value="60"/>  <!--获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回-->
            <property name="reapTimeout"
                      value="20"/> <!--最大获取数据时间,如果不设置这个值,Atomikos 使用默认的 5分钟,那么在处理大批量数据读取的时候,一旦超过 5分钟,就会抛出类似 Resultset is close 的错误.-->
            <property name="maxIdleTime" value="60"/>    <!--最大闲置时间,超过最小连接池连接的连接将将关闭-->
            <property name="maintenanceInterval" value="60"/>  <!--连接回收时间-->
            <property name="loginTimeout" value="60"/>     <!--Java 数据库连接池,最大可等待获取 datasouce 的时间-->
            <property name="logWriter" value="60"/>
            <property name="testQuery">
                <value>select 1</value>
            </property>
        </bean>
        <!--DataSource-->
        <bean id="orderDataSource" parent="abstractXADataSource">
            <property name="uniqueResourceName" value="orderDataSource"/>
            <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
            <property name="xaProperties">
                <props>
                    <prop key="user">root</prop>
                    <prop key="password">root</prop>
                    <prop key="URL">jdbc:mysql://192.168.11.129:3306/order</prop>
                </props>
            </property>
            <property name="poolSize" value="20"/>
        </bean>
       
        <!--atomiko transaction manager-->
        <bean id="atomiokosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init"
              destroy-method="close">
            <property name="forceShutdown" value="true"/>
        </bean>
        <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
            <property name="transactionTimeout" value="300"/>
        </bean>
       
        <bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
            <property name="transactionManager" ref="atomiokosTransactionManager"/>
            <property name="userTransaction" ref="atomikosUserTransaction"/>
        </bean>
       
       
        <bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate">
            <property name="transactionManager" ref="springTransactionManager"/>
        </bean>
       
        <bean id="orderJdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
            <constructor-arg ref="orderDataSource"/>
        </bean>
       
        <context:component-scan base-package="com.test.dubbo"/>
       
        <context:annotation-config/>
        <tx:annotation-driven transaction-manager="springTransactionManager"/>
    </beans>
    
  3. OrderServiceImpl.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.jta.JtaTransactionManager;
       
    import javax.transaction.SystemException;
    import javax.transaction.UserTransaction;
       
    /**
     * JTA 事务
     */
    @Service(value = "orderService")
    public class OrderServiceImpl implements IOrderServices {
       
        @Autowired
        JtaTransactionManager springTransactionManager;
       
        @Autowired
        OrderDao orderDao;
       
        @Autowired
        IUserService userService;
       
        public DoOrderResponse doOrder(DoOrderRequest request) {
            DoOrderResponse response = new DoOrderResponse();
            // 获取 UserTransaction
            UserTransaction userTransaction = springTransactionManager.getUserTransaction();
            // order 下单后调用 user 更新余额
            try {
                userTransaction.begin();
                orderDao.insertOrder();
                // RPC 调用其他服务
                DebitResponse response1 = userService.debit(new DebitRequest());
                userTransaction.commit();
            } catch (Exception e) {
                try {
                    userTransaction.rollback();
                } catch (SystemException e1) {
                    e1.printStackTrace();
                }
            }
            response.setCode("000000");
            return response;
        }
    }
    

钢性事务与柔性事务

一般而言,满足 ACID 的事务的为钢性事务,满足 BASE 理论的为柔性事务。其中,柔性事务大致可以分为以下四种:

  • 两阶段型
  • 补偿型
  • 异步确保型
  • 最大努力通知型

柔性事务:相当于 ACID 的强一制事务(刚性事务)而来,最终一致性、TCC 事务、补偿机制

互联网分布式事务的解决方案

目前互联网领域里有几种流行的分布式事务解决方案,但都没有像之前所说的 XA 事务一样形成 X/Open DTP 这样的工业规范,而是仅仅在具体的行业里获得较多的认可

业务接口整合,避免分布式事务

此方案是把一个业务流程中需要在一个事务里执行的多个相关业务接口,整合为一个事务中。比如将 A、B、C 三个业务整合为一个服务 D 来实现单一事务的业务流程服务

最终一致性方案之 eBay 模式

eBay 在 2008 年公布了一个关于 BASE 准则,提到了一个分布式解决方案。eBay 的方案是一个最终一致性方案,它主要采用消息队列来辅助实现事务控制流程,方案的核心是将需要分布式处理的任务通过消息队列的方式异步执行。如果事务失败,则可以发起人工重试的纠正流程。人工重试被更多的应用于支付场景,通过对账系统对事后问题进行处理。

比如一个很常见的场景:某个用户产生了一笔交易,那么需要在交易表中增加记录,同时需要修改用户表的金额(余额),由于这两个表属于不同的远程服务,所以会涉及到分布式事务与数据一致性问题

-- 用户表:user(id, name, amt_sold, amt_bought) 
-- 交易表:transaction(xid, seller_id, buyer_id, amount)
begin;
  INSERT INTO transaction VALUES(xid, $seller_id, $buyer_id, $amount);
  UPDATE user SET amt_sold = amt_sold + $amount WHERE id = $seller_id;
  UPDATE user SET amt_bought = amt_bought + $amount WHERE id = $buyer_id;
commit;

可以使用消息队列(MQ)实现。先启动一个事务,更新交易表(transaction)后,并不直接更新 user 表,而是对 user 表的更新操作插入到消息队列中。目标系统收到消息后,启动本地事务去对用户表的余额进行操作

1
2
3
4
5
// 伪代码
boolean result = dao.updateTransaction();
if (result) {
  mq.sendUser();
}

根据上面的伪代码的实现方案,可能出现几种情况:

  1. 数据库操作成功,向 MQ 投递消息也成功
  2. 数据库操作失败,不会向 MQ 投递消息
  3. 数据库操作成功,但是向 MQ 中投递消息失败,向外抛出异常,数据库操作回滚。

对于上面的几种情况,问题都不大。分析消费端的问题:

  1. 消息出队以后,消费者对应的业务操作执行成功。如果执行失败,消息不能失效或丢失。需要保证消息和业务操作一致
    • 现在用的比较普遍的 MQ 都具有持久化消息的功能,如果消费者宕机或消费失败,都可以执行重试机制
  2. 尽量避免消息重复消费,如果重复消费,也不能影响业务的执行结果
    1. 保证消费者的幂等性,也就是说如果队列中的消息因为网络异常导致发送多次的情况下,仍然需要保证消息被应用多次与应用一次产生的效果是一样的
    2. 通过消费日志表来记录消费状态。增加一个 message_applied(msg_id)表,用来记录已经执行成功的消息。在目标系统执行更新操作之前,先检测该消息是否已经被消费过,消费完成后通过本地事务来更新此消费表状态,用来避免消息重复消费问题

上面这种方式是非常经典的实现,基本避免了分布式事务,实现了最终一致性。各大知名的电商平台和互联网公司,几乎都是采用类似的设计思路来实现最终一致性。这种方式适合的业务场景广泛,而且比较可靠。不过这种方式技术实现上的难度比较大。

保证最终一致性模式

  1. 查询模式
    • 任何一个服务操作都提供一个查询接口,用来向外部输出操作执行的状态。服务操作的使用方可以通过接口得知服务操作执行的状态,然后根据不同状态做不同的处理操作。为了能够实现查询,每个服务操作都需要有唯一的流水号衰减查询
  2. 补偿模式
    • 有了查询模式,就能够得知操作所处的具体状态。如果整个操作处于不正常状态,我们需要修正操作中出现问题的子操作。也许是要重新执行,或者取消已完成的操作。通过修复使得整个分布式系统达到最终一致。这个过程就是补偿模式。
    • 根据发起形式可分为以下几种:
      • 自动恢复:自动重试,通过对发生失败操作的接口自动重试,或者回滚已经完成的操作
      • 通知运营:人工补偿,如果程序无法自动完成恢复,则通知运营人员手动进行补偿
      • 通知技术:监控、预警。通过监控或告警通知到技术人员,通过技术手段进行修复

TCC 事务

TCC 属于补偿柔性事务,本质也是一个两阶段型事务,这与 JTA 极其相似。但是与 JTA 的不同是,JTA 属于资源层事务,而 TCC 是服务层事务。

TCC事务模型

TCC 分为三个阶段 TRYING-CONFIRMING-CANCELING。每个阶段做不同的处理。TRYING、CONFIRMING、CANCELIING 大致可以理解为 SQL 事务中的 LOCK、COMMIT、ROLLBACK

  1. TRYING 阶段主要是对业务系统做检测及资源预留
  2. CONFIRMING 阶段主要是对业务系统做确认提交,TRYING 阶段执行成功并开始执行 CONFIRMING 阶段时,默认 CONFIRMING 阶段是不会出错的。即:只要 TRYING 成功,CONFIRMING 一定成功。
  3. CANCELING 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

以上所有的操作需要满足幂等性,幂等性的实现方式可以是:

  1. 通过唯一键值做处理,即每次调用的时候传入唯一键值,通过唯一键值判断业务是否被操作,如果已被操作,则不再重复操作
  2. 通过状态机处理,给业务数据设置状态,通过业务状态判断是否需要重复执行

如何更通俗的理解 TCC 事务模型

支付系统接收到会员的支付请求后,需要扣减会员账户余额、增加会员积分(暂时假设需要同步实现)、增加商户余额

会员系统、商户系统、积分系统是独立的三个子系统,无法通过传统的事务方式进行处理。

try 阶段:会员资金账户的资金预留,即:冻结会员账户的金额(订单金额)

confirm 阶段:会员积分账户增加积分余额,商户账户增加账户余额

cancel 阶段:解冻/释放扣减的会员余额

开源的 TCC 框架:tcc-transaction、bytetcc

海信 HICS 技术团队压测评估

支付宝的 X/Open DTP 模型的 DTS 架构

  • DTS(Distributed Transaction Service)框架是由支付宝在 X/Open DTP 模型的基础上改进的一个设计,定义了类似 2PC 的标准两阶段接口,业务系统只需要实现对应的接口就可以使用 DTS 的事务功能。
  • DTS 最大的特点是放宽了数据库的强一致约束,保证了数据的最终一致性。

最大努力通知型

支付包回调页面和接口中解密参数,然后调用系统中更新交易状态相关服务,将订单更新为付款成功。同时,只有当回调页面中输出 success 字样或者标识业务处理成功响应状态码时,支付宝才会停止回调请求。否则,支付宝会每隔一段时间后,再向客户方发起回调请求,知道输出成功标识为止。

这就是一个很典型的补偿例子,与一些 MQ 重试补偿机制很类似。