重学Java-分布式事务实现方案
1. XA 规范
XA (也常称为 X/Open AX, 是 eXtended Architecture 的缩写) 是 X/Open (后来合并为 The Open Group )于 1991 年发布的分布式事务处理规范。
- XA 使用两阶段提交(2PC) 来确保事务的所有更改都生效(提交)或不(回滚),即通过 2PC 保证原子性,从而保证数据的完整性。可以把 XA 理解为一个强一致性的中心化原子提交协议。
- XA 描述了全局事务管理器™和(局部)资源管理器(RM,即特定应用程序)之间的接口:
- 想要使用 XA 的应用程序使用库或单独的服务使用 XA 事务管理器。
- 事务管理器跟踪事务中的参与者(即应用程序写入的各种数据存储),并与他们一起执行两阶段提交。
- 换句话说,XA 事务管理器与应用程序与服务器的交互是分开的。XA 维护其提交或回滚决策的日志,可用于在系统中断时进行恢复。
- 从本地事务角度来看,XA 方案的本质是在从“执行 SQL”到“Commit/Rollback”这两步之间始终握着 DB 事务不提交,直到所有分支全部 ready 之后再 Commit。所以,XA 是通过“长事务”来保证一致性的。
- 在 CAP 定理中,“长事务”是典型的偏向 CP 侧的强一致性方案
- XA 规范基于 1980 年代开发的Tuxedo系统中使用的接口,但此后被多个系统采用。
- 大多数银行系统和上了年纪的老系统最喜欢使用的分布式事务解决方案,这套方案依赖于底层数据库的支持,DB 这层首先得要实现 XA 协议,目前主流的数据库基本都支持XA事务,包括mysql InnoDB、oracle、sqlserver、postgre等
缺点:
- 由于 2PC 是一个阻塞协议,如果事务执行期间等待时间过长,会被卡住并持有相关数据库锁,最终可能会对使用相同数据库的其它应用程序造成影响。
- 另外如果事务管理器崩溃且无法恢复其中的决策记录,可能需要手动干预等。
XA 是通过“长事务”来保证一致性的,针对其的优化方案就是将长变短,将一个“长事务”分解成多个短事务,将偏向 CP 侧的强一致性方案,变成最终一致性方案,这也是目前 Seata 等事务框架实现的主要方向。
更多介绍可见wiki-X/Open XA
2. AT
AT 是阿里开源的 Seata 主推的一套分布式事务解决方案,也是大多数 Seata 使用者选用的方案。AT 方案备受推崇,一个最主要的原因就在于省心。因为是一种“无入侵”式的编程方案,不需要改变任何业务代码,只需要一个注解@GlobalTransactional
和少量配置信息就可以实现分布式事务。根据官方说明,AT模式支持的数据库有:MySQL、Oracle、PostgreSQL、 TiDB、MariaDB。
假如目前有两个服务 Customer 和 Template,我们需要删除用户的优惠券,此时需要 Customer 调用 Template 做一些优惠券删除等操作,在这个过程增加使用分布式事务的场景:
- Customer 服务负责用户领取优惠券、删除优惠券等功能。
- Template 服务负责增删改查优惠券以及模板等功能。
Seata 框架的三个重要角色,TC、TM 和 RM。
- TC 全称是 Transaction Coordinator,TC 扮演了一个中心化的事务协调者的角色,负责协调全局事务的提交和回滚,并维护全局和分支事务的状态。在 Seata 中指的就是 Seata Server。
- TM 全称是 Transaction Manager,它是事务管理器,主要作用是发起一个全局事务,对全局事务的提交和回滚做出决议。在 AT 方案中,TM 通常是由发起全局事务的那个微服务所扮演的,比如在“删除券模板”这个场景里,TM 的扮演者就是 Customer 服务。
- RM 全称是 Resource Manager,它是资源管理器,向 TC 注册分支事务并上报事务状态,同时负责对当前分支事务进行提交和回滚。每一个分支事务都是全局事务的参与者,这些分支事务的所属应用扮演了 RM 的角色。
Seata AT 的业务流程分为两个阶段来执行。
- 一阶段:执行核心业务逻辑(即代码中的 CRUD 操作)。Seata 会根据 DB 操作自动生成相应的回滚日志,并将回滚日志添加到 RM 对应的 undo_log 表中。执行业务代码和添加回滚日志这两步都是在同一个本地事务中提交的。
- 二阶段:如果全局事务的最终决议是 Commit,则更新分支事务状态并清回滚日志;如果最终决议是 Rollback,则根据 undo_log 中的回滚日志进行 rollback 操作。二阶段是以异步化的方式来执行的。
Seata AT 方案的核心在于这个 undo_log。正是有了这个记录回滚日志的 undo_log 表,我们才能将一阶段和二阶段剥离成两个独立的本地事务来执行。而 Seata AT 之所以执行效率高,主要原因有两个。一是核心业务逻辑可以在一阶段得到快速提交,DB 资源被快速释放;二是全局事务的 Commit 和 Rollback 是异步执行。Seata 实现 2PC 与传统 2PC 的差别:
- 架构层次方面:传统 2PC 方案的 RM 实际上是在数据库层,RM 本质上就是数据库自身,通过 XA 协议实现,而 Seata 的 RM 是以 jar 包的形式作为中间件层部署在应用程序这一侧的。
- 两阶段提交方面:传统 2PC无论第二阶段的决议是 commit 还是 rollback ,事务性资源的锁都要保持 到 Phase2 完成才释放。而 Seata 的做法是在 Phase1 就将本地事务提交,这样就可以省去 Phase2 持锁的时间,整体提高效率。
以上面删除优惠券的场景为例:
- Customer 服务作为分布式事务的起点,扮演了一个 TM 的角色,它会向 TC 注册并发起一个全局事务。全局事务会生成一个 XID,它是全局唯一的 ID 标识,所有分支事务都会和这个 XID 进行绑定。
- XID 在服务内部(非跨服务调用)的传播机制是基于 ThreadLocal 构建的,即 XID 在当前线程的上下文中进行透传。
- 对于跨服务调用来说,则依赖 seata-all 组件内置的各个适配器(如 Interceptor 和 Filter)将 XID 传递给对象服务。
- Customer 服务调用了 Template 服务进行模板注销流程,Template 服务的 RM 开启了一个分支事务,并注册到 TC。
- 在执行分支事务的过程中,RM 还会生成回滚日志并提交到 undo_log 表中。
- RM 还需要获取到两个特殊的 Lock。其中一个是 Local Lock(本地锁),另一个是 Global Lock(全局锁)。
- Template 服务调用成功,Customer 服务开始执行自己的本地事务,流程都大同小异就不说了。TM 端根据业务的执行情况,最终做出二阶段决议,Commit 或 Rollback。
- 最后,TC 向各个分支下达了二阶段决议。
- 如果最终决议是 Commit,那么各个 RM 会执行一段异步操作,删除 undo_log;
- 如果最终决议是 Rollback ,那么 RM 端会根据 undo_log 中记录的回滚日志做反向补偿。
下面是目前理解的简易的 AT 服务简易执行图:
上面提到的锁( Lock )信息存放在 seata 服务 的 lock_table
这张表里,它会记录待修改的资源 ID 以及它的全局事务和分支事务 ID 等信息。
- 无论一阶段提交还是二阶段回滚,RM 在执行修改记录前都需要先获取本地锁,才会执行 CRUD 操作。
- RM 在一阶段提交前还会尝试获取 Global Lock(全局锁),防止多个分布式事务对同一条记录进行修改。如果事务tx1和事务tx2,tx1先获取到了全局锁,则在tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待全局锁 。
- 对于两个事务同时修改的一个记录A,只有同时获取全局锁和本地锁的事务才能正常提交一阶段事务。
- 本地锁会随一阶段事务的提交 / 回滚而释放,而全局锁只有等到全局事务提交 / 回滚之后才会被释放。
- 在一阶段中,如果某一个事务在一定的尝试次数后仍然无法获取全局锁,它会知难而退,执行本地事务回滚操作。
- 而如果在二阶段回滚的时候,RM 无法获取本地锁,它会原地打转不停重试,直到成功获取本地锁并完成重试。
针对读隔离,在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
- 目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理申请到全局锁才能实现全局的读已提交。这种方式在整个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。
- 默认的全局读未提交可能会导致脏回滚的情况产生,一旦产生会导致数据不一致,必须要人工介入处理,如:
- 全局事务tx1对数据行A1进行修改 v1 -> v2
- 此时另一个服务将对数据行A1进行修改 v2 -> v3
- 全局事务tx1回滚,发现数据行A1的当前数据为v3,不符合之前的v2,回滚失败
seata 大概实现过程:
- 添加 seata 依赖项
- 增加 yaml 配置
- 声明数据源代理,如首先声明一个 druidDataSource 数据源 ,之后使用 seata 提供的
DataSourceProxy
数据源代理类包含 druidDataSource 数据源,使用Seata 特有的数据源,作为当前项目的 DataSrouce 代理@Configuration public class SeataConfiguration { // 初始数据源 @Bean @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource druidDataSource() { return new DruidDataSource(); } // 使用 Seata 特有的数据源,作为当前项目的 DataSrouce 代理 @Bean("dataSource") @Primary public DataSource dataSourceDelegation(DruidDataSource druidDataSource) { return new DataSourceProxy(druidDataSource); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 - 增加指定注解 : 在服务请求 api 接口方法上增加
@GlobalTransactional
对服务开启全局事务,之后在服务调用类上增加@Transactional
即可。@GlobalTransactional(name = "coupon-customer-serv", rollbackFor = Exception.class
通过@GlobalTransactional
注解的rollbackFor
方法指定了该全局事务碰到任何 Exception 异常,都会触发全局事务回滚操作。
3. TCC
相比 AT 来说, TCC 的实现就比较复杂了,它是一个基于“补偿模式”的解决方案,需要通过编写业务逻辑代码实现事务控制。 TCC 是由 Try、Confirm 和 Cancel 三个单词首字母缩写组成,该概念最早由 Pat Helland 于2007年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出,当时还是以 Tentative-Confirmation-Cancellation命名,之后Atomikos公司以 Try-Confirm-Cancel 正式作为名称。
Try、Confirm 和 Cancel 分别对应了 TCC 模式的三个执行阶段,每一个阶段都是独立的本地事务。
- Try 阶段完成的工作是预定操作资源(Prepare)。
- 尝试执行, 完成所有业务检查(一致性),预留必要的业务资源(准隔离性)。
- 说白了就是“占座”的意思,在正式开始执行业务逻辑之前,先把要操作的资源占上座。
- Confirm 阶段完成的工作是执行主要业务逻辑(Commit),它类似于事务的 Commit 操作。在这个阶段中,你可以对 Try 阶段锁定的资源进行各种 CRUD 操作。如果 Confirm 阶段被成功执行,就宣告当前分支事务提交成功。
- 如果所有分支的 Try 都执行成功,则走进 Confirm 阶段。该阶段执行真正的业务操作,不做任何业务检测,仅使用 Try 阶段预留的资源。
- 一旦在此阶段失败,会一直自动重试直到成功。
- Cancel 阶段的工作是事务回滚(Rollback),它类似于事务的 Rollback 操作。在这个阶段中,你可没有 AT 方案的 undo_log 帮你做自动回滚,你需要通过业务代码,对 Confirm 阶段执行的操作进行人工回滚。
- 如果分支中有一个在 try 阶段 执行失败,则进入 Cancel 阶段,在该阶段需要执行手动编写的业务回滚操作,同时释放 try 阶段申请的资源。
- 一旦在此阶段失败,会一直自动重试直到成功。
Try 阶段成功后,不立即进入 Confirm/Cancel 阶段,而是认为全局事务已经结束了,启动定时任务来异步执行 Confirm/Cancel,扣减或释放资源,这样会有很大的性能提升。
下面是目前理解的简易的 TCC 服务简易执行图:
在 seata 中如果使用 TCC 方案,需要在服务接口中 @LocalTCC
和 @TwoPhaseBusinessAction
两个注解,同时为执法服务方法声明增加 Commit 和 Cancel 阶段要调用的方法声明:
@LocalTCC
注解被用来修饰实现了二阶段提交的本地 TCC 接口@TwoPhaseBusinessAction
注解标识当前方法使用 TCC 模式管理事务提交。@TwoPhaseBusinessAction
注解修饰的是 Try 阶段要执行的方法。- 在
@TwoPhaseBusinessAction
注解内:- 通过
name
属性给当前事务注册了一个全局唯一的 TCC bean name。 commitMethod
属性指定了它在 Confirm 阶段要执行的方法。rollbackMethod
属性指定了它在 Cancel 阶段要执行的方法。
- 通过
- 在
commitMethod
和rollbackMethod
指定的方法中可以使用一个特殊的入参BusinessActionContext
来传递查询参数。在 TCC 模式下,查询参数将作为BusinessActionContext
的一部分,在事务上下文中进行传递。
声明阶段参考代码如下:
@LocalTCC
public interface CouponTemplateServiceTCC extends CouponTemplateService {
@TwoPhaseBusinessAction(
name = "deleteTemplateTCC",
commitMethod = "deleteTemplateCommit",
rollbackMethod = "deleteTemplateCancel"
)
void deleteTemplateTCC(@BusinessActionContextParameter(paramName = "id") Long id);
void deleteTemplateCommit(BusinessActionContext context);
void deleteTemplateCancel(BusinessActionContext context);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
具体实现中:
- 在 try 阶段申请资源,如表中某字段作为锁标记,查询可用的且且未加锁的某条记录并对其加锁。
- 如果 try 阶段执行成功,进入 Confirm 阶段的方法中执行业务逻辑,如更新该记录为被删除状态。如果该阶段执行期间报错, seata server 作为 TC 角色会隔一段时间重试一次,直到成功。
- 如果 try 阶段执行失败,进入 Cancel 阶段的方法中执行释放在 try 中申请的资源,如将该记录的加锁状态移除。如果该阶段执行期间报错, seata server 作为 TC 角色会隔一段时间重试一次,直到执行成功。
@Transactional
void deleteTemplateTCC(@BusinessActionContextParameter(paramName = "id") Long id){
// 查询 为id值的某条可用数据
// 如果存在则更新其锁状态为加锁中
// 如果查找失败则报错
}}
@Transactional
void deleteTemplateCommit(BusinessActionContext context) {
// try 阶段执行成功时执行
// 从上下文 context 中获取参数
Long id = Long.parseLong(context.getActionContext("id").toString())
// 根据id执行相关记录更新操作
// 如果更新操作失败,seata server 一直重试,直到执行成功。
}
@Transactional
void deleteTemplateCancel(BusinessActionContext context) {
// try 阶段执行失败时执行
// 查询id对应的数据是否存在,存在则更新其锁字段为未加锁,从而释放 try 阶段执行的锁资源。
// 如果该阶段执行失败,seata server 一直重试,直到执行成功。
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
这种方案也存在的问题
- TCC 空回滚 :是在没有执行 Try 方法的情况下,TC 下发了回滚指令并执行了 Cancel 逻辑。比如某个分支事务的一阶段 Try 方法因为网络不可用发生了 Timeout 异常,或者 Try 阶段执行失败,这时候 TM 端会判定全局事务回滚,TC 端向各个分支事务发送 Cancel 指令,这就产生了一次空回滚。
- 简单解决方案是在 Cancel 阶段,你应当先判断一阶段 Try 有没有执行成功。如果成功再执行资源释放操作。
- 更为完善的一种做法是,引入独立的事务控制表,在 Try 阶段中将 XID 和分支事务 ID 落表保存,如果 Cancel 阶段查不到事务控制记录,那么就说明 Try 阶段未被执行。
- TCC 倒悬:倒悬又被叫做“悬挂”,它是指 TCC 三个阶段没有按照先后顺序执行。如果因为网络卡顿导致执行了一次空回滚,原先超时的 Try 方法经过网关层的重试,又被后台服务接收到了,这就产生了一次倒悬场景,即一阶段 Try 在二阶段回滚之后被触发。此时会导致当前资源被长期锁定,从而造成一种类似死锁的情况。
- 可以考虑引入事务控制表,Cancel 阶段执行成功后,也可以在事务控制表中记录回滚状态,并在 Try 阶段中检查该状态,如果二阶段回滚完毕,那么就直接跳过一阶段 Try。
- TCC 相比 AT 开发量至少是双倍的,以开发量为代价换取事务高可控性。涉及资源的锁定流程、各资源之间的关联性等问题,需要开发团队对业务流程的每一个步骤了如指掌,才能设计出高效的 TCC 流程。
- 关于幂等性,通过本地事务控制表来确保幂等性是一种简单有效的低成本方案。
- Seata 框架在 1.5.1 版本便引入了
tcc_fence_log
表作为事务控制表,具体可见《阿里 Seata 新版本终于解决了 TCC 模式的幂等、悬挂和空回滚问题》
4. Saga
SAGA最初出现在1987年 Hector Garcaa-Molrna 和 Kenneth Salem发表的论文 《SAGAS》里面。其核心思想是将长事务拆分成多个短事务(亦即子事务),由Saga事务协调器协调,子事务本身可以与其它事务交错执行,且依然需要保证所有的事务要么全部执行成功,要么全部执行失败。每个子事务 Tx
都有一个对应的事务补偿 Cx
,补偿事务在需要回滚的时候执行,如事务T1执行完成,T2时执行失败,会从 T2 的事务补偿 C2一直反向执行到最初的实物补偿 C1 。
seata 中的saga方案是基于状态机引擎来实现的。机制是:
- 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件
- 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点
- 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚
- 注意: 异常发生时是否进行补偿也可由用户自定义决定
- 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能
这种方案实现比较复杂且在 Seata 中不是主流,这里不再多说,详情可见《SEATA Saga 模式》。
在 DTM 框架中有另一种相对比较简单的实现,且成了该框架的主流分布式事务实现方案。如一个银行转账的简单实例中,A 转账 30元给 B,根据 saga 事务原理,将整个全局事务 拆分成如下的服务:
- 转出(TransOut)服务,这里转出将会进行操作 A-30
- 转出补偿(TransOutCompensate)服务,回滚上面的转出操作,即 A+30
- 转入(TransIn)服务,转入将会进行 B+30
- 转入补偿(TransInCompensate)服务,回滚上面的转入操作,即 B-30
整个SAGA事务的逻辑是: - 【转出(TransOut)服务】执行成功 => 【转入(TransIn)服务】执行成功 => 全局事务执行成功。
- 如果中间事务如【转入(TransIn)服务】执行失败,则会对已执行的分支执行事务补偿:
- 【转出(TransOut)服务】执行成功 => 【转入(TransIn)服务】执行失败 => 执行【转入(TransIn)补偿服务】成功 => 执行【转出(TransOut)补偿服务】成功 =>全局事务回滚完成
- dtm 的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功。
DTM 主要用于 Golang 中,目前看其官方也有了 java 版的,这里以 Golang 版的简单说一下其实现:
req := &busi.ReqHTTP{Amount: 30} // 微服务的请求Body
// dtmutil.DefaultHTTPServer 为DTM服务的地址
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, shortuuid.New()).
// 添加一个 TransOut 的子事务,正向操作为url: busi.Busi+"/TransOut", 逆向操作为url: busi.Busi+"/TransOutRevert"
Add(busi.Busi+"/TransOut", busi.Busi+"/TransOutRevert", req).
// 添加一个 TransIn 的子事务,正向操作为url: busi.Busi+"/TransIn", 逆向操作为url: busi.Busi+"/TransInRevert"
Add(busi.Busi+"/TransIn", busi.Busi+"/TransInRevert", req)
logger.Debugf("saga busi trans submit")
// 提交saga事务,dtm会完成所有的子事务/回滚所有的子事务
err := saga.Submit()
logger.Debugf("result gid is: %s", saga.Gid)
logger.FatalIfError(err)
2
3
4
5
6
7
8
9
10
11
12
DTM 框架 和 Seata 框架在角色方面差不多,也是三个角色, TM-事务管理器、AP-应用程序、RM-资源管理器。
- RM-资源管理器:RM是一个应用服务,负责管理全局事务中的本地事务,他通常会连接到一个数据库,负责相关数据的修改、提交、回滚、补偿等操作。如上面被调用的 转出(TransOut)服务和 转入(TransIn)服务都各是一个 RM。这个和 Seata 的 RM 完全一样。
- AP-应用程序:AP是一个应用服务,负责全局事务的编排,他会注册全局事务,注册子事务,调用RM接口。如上面程序代码本身就是一个单独的AP,它负责编排了一个包含TransOut、TransIn的全局事务,然后提交给TM。这个同 Seata 的 TM 一样。
- TM-事务管理器:DTM服务,负责全局事务的管理,每个全局事务都注册到TM,每个事务分支也注册到TM。TM会协调所有的RM,将同一个全局事务的不同分支,全部提交或全部回滚。 这个同 Seata 的 TC一样。
Saga的优点是既能满足业务需求,又能保持系统松耦合架构。而问题是无法保证隔离性。
5. 消息服务
使用消息服务实现的分布式事务管理是一种 Best Effort,即尽最大努力交付,主要用于在不同的服务平台之间的事务性保证的这样一种场景。比较常见的方案是使用MQ的事务消息模式 + 事务日志表记录执行状态 + batch job扫表做反向补偿。所有努力送达的模型,必须是先预扣(预占资源)的做法。
事务消息简单理解就是一个可以让两个或多个事务最终实现的一个消息凭证。如在A给B转账30元过程中。A先通过银行转出30元,得到一个凭证,之后排队在某个时间点银行给B的账户上增加30元。只要这个凭证(消息)能可靠保存,我们能依靠这个凭证(消息)完成最终一致性。
- 在服务A中执行对A账户扣款的操作,并将转账消息服务保存到某数据库表或者消息队列
- 之后服务B中消费这条转账消息,并执行为账户B增加30元的操作。
要解决消息可靠存储,我们实际上需要解决的问题是,本地的 数据库记录的存储和 message 存储的一致性问题。保证可靠的保存消息凭证的常用方式有:
- Transactional outbox 事务收件箱
- Polling publisher
- Transaction log tailing
- 2PC Message Queue
事务消息一旦被可靠的持久化,我们整个分布式事务,变为了最终一致性,消息被消费才能保障最终业务数据的完整性,所以我们要尽最大努力把消息送达到下游的业务消费方,称为:Best Effort。只有消息被消费,整个交易才能算是完整完结。
- 即如果收到失败就会隔一段时间重试一次,直到成功为止。
- 原则上不应该出现失败回滚的情况,所以才要尽最大努力实现。所以扣了钱不会因为失败退回,会一直尝试直到购买等后续操作返回成功。
- 假设回调成功,但返回响应时失败,再次调用时要考虑幂等性,比如通过接口付款成功回调给用户发道具,多次刷新这个接口需要防止发送多次道具。
Transactional outbox 在完成扣款时,同时记录消息数据,这个消息数据与业务数据保存在同一个数据库中,此时这个消息记录表(如表名 msg)就是一个事务收件箱。
- 在同一个事务中完成扣款与消息记录(如用户A通过支付宝消费30元),保证了只要扣了钱就一定能把消息保存下来。
- 在事务提交成功后,想办法通知下一步操作,如服务B向用户A发送xxx游戏道具,服务B处理成功后发送回复消息。
- 服务A收到回复消息后删除在1中存储的该条消息数据。
Polling publisher 主动拖取消息,定时的轮训 msg 表,将待消费的(如 state=1)的消息按id自增等排序后拿出来,保证顺序消费。可以 publish 给下游服务的消息队列让其自己消费消息,或者直接通过直接 rpc 发送给下游服务。 Pull 的模型,从延迟来说不够好:
- Pull 太猛对 Database 有一定压力
- Pull 频次低了,延迟比较高
上面这些保存消息的方式使得消息数据和业务数据紧耦合在一起,从架构上看不够优雅,而且容易诱发其他问题。
Transaction log tailing 使用 canal 订阅msg消息表的binlog,之后 msg的任何变更通过 canal 监听发送到消息队列,之后消费者服务去消息队列中直接消费。
- 使用 canal 订阅以后,是实时流式消费数据
- 在消费者必须努力送达到
幂等是为了解决消息重复投递,如果相同的消息被重复投递两次或者消费两次,会导致数据加倍。解决方案:
- 全局唯一 ID + 去重表
- 在消费端加消息应用状态表 msg_apply,通俗来说就是个账本,用于记录消息的消费情况,每次来一个消息,在真正执行之前,先去消息应用状态表中查询一遍,如果找到说明是重复消息,丢弃即可,如果没找到才执行,同时插入到消息应用状态表(同一事务)。
- 再一种方式是在订单增加状态字段,基于状态判定。
- 版本号
- 全局自增ID+版本号等。
2PC Message Queue,比如使用 RocketMQ 集群。
- 发送 Perpared 准备消息给MQ消息集群,此时仅发送暂存了,还未提交。
- 执行本地事务如支付宝先扣钱。
- 如果执行成功,进入 Commit 阶段,提交事务。此阶段也可能会失败,提交失败或者回滚失败。
- 一旦 Commit 成功,RocketMQ 会认为这个消息被交付了, 便继续可通知下游操作或者下游者来消费它,可能是push的或者publisher,最终被消费者消费到。只有 Commot 成功,RocketMQ。
[图片来源《极客时间-Go进阶训练营》]
关于报错:
- 如果是第一阶段报错,直接结束,告诉用户报错了,交易失败。
- 假设第一阶段执行成功,第二阶段执行本地事务失败。
- RocketMQ 在以前有 LocalTransactionState 状态的回查机制(3.1.5版本后做了阉割,需要自己做一些事务补偿)。当有一条 Perpared 消息长时间在队列中无人响应,将该消息捞出来反向询问事务是否需要做以及成功, 确认 MQ 集群是否收到了 LocalTransactionState 状态消息。如果询问发现失败了,连记录都没有,直接失败丢弃。
- 假如执行本地事务是成功的,但第三步 Commit 失败,不管哪阶段的指令都无法直接提交到 MQ 实例中,所以回查是返回失败就直接失败,成功就直接成功。从而保证生产者这边的事务。
- 消费方收到消息执行本地事务,幂等性需要消费方自己搞定,而不是在 RocketMQ 中搞定的。
- 如果本地事务执行成功,第三步发送确认消息(即 ACK),该ACK告知 MQ 这条消息被正确消费了,让其从RocketMQ消息集群中删除。
- 针对消费者集群执行本地事务失败的问题,如果数据回滚会涉及数据等大量操作,RocketMQ 目前需要人工解决。
6. 参考资料
这里仅放出文章中未提到过得资料,提到过的均已同时备注了查阅地址。
- 《深入理解分布式系统》
- Spring Cloud 微服务项目实战
- 《极客时间-Go进阶训练营》
- dtm官方文档
- seata官方文档
除特别注明外,本站所有文章均为 windcoder 原创,转载请注明出处来自: zhongxuejava-fenbushishiwushixianfangan

暂无数据