.NET CORE 分布式事务(三) DTM实现Saga及高并发下的解决方案

发布于:2024-03-29 ⋅ 阅读:(12) ⋅ 点赞:(0)

目录(结尾附加项目代码资源地址)

引言:

1. SAGA事务模式

2. 拆分为子事务

3. 失败回滚

4. 如何做补偿

4.1 失败的分支是否需要补偿

5. 异常

6. 异常与子事务屏障

6.1 NPC的挑战

6.2 现有方案的问题

6.3 子事务屏障

6.4 原理

7. 更多高级场景

7.1 部分第三方操作无法回滚(go语言写了一点,不要在意这些细节,下面开始 .NET)

7.2 超时回滚

8.0 .NET CORE结合DTM实现Saga(C#启动)

8.1 准备工作(和前两期的注册环节、数据库差不多的操作,看过前两期的小伙伴可跳过8.1阶段)

8.1.1 Nuget引入Dtmcli

8.1.2 生成转账数据库(EF_CORE)

8.1.3 DbContext

8.1.4 数据库持久化

8.1.5 数据库最终生成

8.1.6 appsettings.json

8.1.7 Program.cs

8.2 主程序事务API控制器

8.3 用户1转账事务API控制器

8.4 用户2转账事务API控制器

9. 开始运行

9.1 先给A和B两位用户各1000块钱。

9.2 执行转账

10. 并发下执行Saga分布式事务

10.1 Program.cs代码修改

10.2 用户1转账事务API控制器代码修改

10.3 用户2转账事务API控制器代码修改

10.4 Redis启动

小结


引言:

紧接前两期   .NET CORE 分布式事务(一) DTM实现二阶段提交(.NET CORE 分布式事务(一) DTM实现二阶段提交-CSDN博客)  .NET CORE 分布式事务(二) DTM实现TCC(.NET CORE 分布式事务(二) DTM实现TCC-CSDN博客)  本期讲解Saga分布式事务,并探讨如何在高并发下使用Saga分布式事务。

1. SAGA事务模式

SAGA事务模式是DTM中最常用的模式,主要是因为SAGA模式简单易用,工作量少,并且能够解决绝大部分业务的需求。SAGA最初出现在1987年Hector Garcaa-Molrna & Kenneth Salem发表的论文SAGAS里。其核心思想是将长事务拆分为多个短事务,由Saga事务协调器协调,如果每个短事务都成功提交完成,那么全局事务就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

2. 拆分为子事务

例如我们要进行一个类似于银行跨行转账的业务,将A中的30元转给B,根据Saga事务的原理,我们将整个全局事务,切分为以下服务:

  • 转出(TransOut)服务,这里转出将会进行操作A-30
  • 转出补偿(TransOutCompensate)服务,回滚上面的转出操作,即A+30
  • 转入(TransIn)服务,转入将会进行B+30
  • 转入补偿(TransInCompensate)服务,回滚上面的转入操作,即B-30

整个SAGA事务的逻辑是:

执行转出成功=>执行转入成功=>全局事务完成

如果在中间发生错误,例如转入B发生错误,则会调用已执行分支的补偿操作,即:

执行转出成功=>执行转入失败=>执行转入补偿成功=>执行转出补偿成功=>全局事务回滚完成

下面我们看一个成功完成的SAGA事务典型的时序图:

在这个图中,我们的全局事务发起人,将整个全局事务的编排信息,包括每个步骤的正向操作和反向补偿操作定义好之后,提交给服务器,服务器就会按步骤执行前面SAGA的逻辑。

3. 失败回滚

如果有正向操作失败,例如账户余额不足或者账户被冻结,那么dtm会调用各分支的补偿操作,进行回滚,最后事务成功回滚。失败的时序图如下:

补偿执行顺序:

dtm的SAGA事务在1.10.0及之前,补偿操作是并发执行的,1.10.1之后,是根据用户指定的分支顺序,进行回滚的。

如果是普通SAGA,没有打开并发选项,那么SAGA事务的补偿分支是完全按照正向分支的反向顺序进行补偿的。

如果是并发SAGA,补偿分支也会并发执行,补偿分支的执行顺序与指定的正向分支顺序相反。假如并发SAGA指定A分支之后才能执行B,那么进行并发补偿时,DTM保证A的补偿操作在B的补偿操作之后执行

4. 如何做补偿

当SAGA对分支A进行失败补偿时,A的正向操作可能1. 已执行;2. 未执行;3. 甚至有可能处于执行中,最终执行成功或者失败是未知的。那么对A进行补偿时,要妥善处理好这三种情况,难度很大。

dtm提供了子事务屏障技术,自动处理上述三种情况,开发人员只需要编写好针对1的补偿操作情况即可,相关工作大幅简化,详细原理,参见下面的异常章节。

4.1 失败的分支是否需要补偿

dtm 常被问到的一个问题是,TransIn返回失败,那么这个时候是否还需要调用TransIn的补偿操作?DTM 的做法是,统一进行一次调用,这种的设计考虑点如下:

  • XA, TCC 等事务模式是必须要的,SAGA 为了保持简单和统一,设计为总是调用补偿
  • DTM 支持单服务多数据源,可能出现数据源1成功,数据源2失败,这种情况下,需要确保补偿被调用,数据源1的补偿被执行
  • DTM 提供的子事务屏障,自动处理了补偿操作中的各种情况,用户只需要执行与正向操作完全相反的补偿即可

5. 异常

在事务领域,异常是需要重点考虑的问题,例如宕机失败,进程crash都有可能导致不一致。当我们面对分布式事务,那么分布式中的异常出现更加频繁,对于异常的设计和处理,更是重中之重。

我们将异常分为以下几类:

  • 偶发失败: 在微服务领域,由于网络抖动、机器宕机、进程Crash会导致微小比例的请求失败。这类问题的解决方案是重试,第二次进行重试,就能够成功,因此微服务框架或者网关类的产品,都会支持重试,例如配置重试3次,每次间隔2s。DTM的设计对重试非常友好,应当支持幂等的各个接口都已支持幂等,不会发生因为重试导致事务bug的情况
  • 故障宕机: 大量公司内部都有复杂的多项业务,这些业务中偶尔有一两个非核心业务故障也是常态。DTM也考虑了这样的情况,在重试方面做了指数退避算法,如果遇见了故障宕机情况,那么指数退避可以避免大量请求不断发往故障应用,避免雪崩。
  • 网络乱序: 分布式系统中,网络延时是难以避免的,所以会发生一些乱序的情况,例如转账的例子中,可能发生服务器先收到撤销转账的请求,再收到转账请求。这类的问题是分布式事务中的一个重点难点问题。

业务上的失败与异常是需要做严格区分的,例如前面的余额不足,是业务上的失败,必须回滚,重试毫无意义。分布式事务中,有很多模式的某些阶段,要求最终成功。例如dtm的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功。

6. 异常与子事务屏障

分布式事务之所以难,主要是因为分布式系统中的各个节点都可能发生各种非预期的情况。本文先介绍分布式系统中的异常问题,然后介绍这些问题带给分布式事务的挑战,接下来指出现有各种常见用法的问题,最后给出正确的方案。

6.1 NPC的挑战

分布式系统最大的敌人可能就是NPC了,在这里它是Network Delay, Process Pause, Clock Drift的首字母缩写。我们先看看具体的NPC问题是什么:

  • Network Delay,网络延迟。虽然网络在多数情况下工作的还可以,虽然TCP保证传输顺序和不会丢失,但它无法消除网络延迟问题。
  • Process Pause,进程暂停。有很多种原因可以导致进程暂停:比如编程语言中的GC(垃圾回收机制)会暂停所有正在运行的线程;再比如,我们有时会暂停云服务器,从而可以在不重启的情况下将云服务器从一台主机迁移到另一台主机。我们无法确定性预测进程暂停的时长,你以为持续几百毫秒已经很长了,但实际上持续数分钟之久进程暂停并不罕见。
  • Clock Drift,时钟漂移。现实生活中我们通常认为时间是平稳流逝,单调递增的,但在计算机中不是。计算机使用时钟硬件计时,通常是石英钟,计时精度有限,同时受机器温度影响。为了在一定程度上同步网络上多个机器之间的时间,通常使用NTP协议将本地设备的时间与专门的时间服务器对齐,这样做的一个直接结果是设备的本地时间可能会突然向前或向后跳跃。

分布式事务既然是分布式的系统,自然也有NPC问题。因为没有涉及时间戳,带来的困扰主要是NP。

6.2 现有方案的问题

我们看到开源项目dtm之外,包括各云厂商,各开源项目,他们给出的业务实现建议大多类似如下(这也是大多数用户最容易想到的方案):

  • 空补偿: “针对该问题,在服务设计时,需要允许空补偿,即在没有找到要补偿的业务主键时,返回补偿成功,并将原业务主键记录下来,标记该业务流水已补偿成功。”
  • 防悬挂: “需要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝执行该笔服务,以免造成数据不一致。”

事实上,NPC里的P和C,以及P和C的组合,有很多种的场景,都可以导致上述竞态情况,就不一一赘述了。

虽然这种情况发生的概率不高,但是在金融领域,一旦涉及金钱账目,那么带来的影响可能是巨大的。

PS:幂等控制如果也采用“先查再改”,也是一样很容易出现类似的问题。解决这一类问题的关键点是要利用唯一索引,“以改代查”来避免竞态条件。

6.3 子事务屏障

 在dtm中,首创了子事务屏障技术,使用该技术,能够非常便捷的解决异常问题,极大的降低了分布式事务的使用门槛。

子事务屏障能够达到下面这个效果,看示意图:

所有这些请求,到了子事务屏障后:不正常的请求,会被过滤;正常请求,通过屏障。开发者使用子事务屏障之后,前面所说的各种异常全部被妥善处理,业务开发人员只需要关注实际的业务逻辑,负担大大降低。 子事务屏障提供了方法BranchBarrier.Call,业务开发人员,在busiCall里面编写自己的相关逻辑,调用 BranchBarrier.Call 。 BranchBarrier.Call保证,在空回滚、悬挂等场景下,busiCall不会被调用;在业务被重复调用时,有幂等控制,保证只被提交一次。子事务屏障会管理TCC、SAGA、事务消息等,也可以扩展到其他领域

6.4 原理

子事务屏障技术的原理是,在本地数据库,建立分支操作状态表dtm_barrier,唯一键为全局事务id-分支id-分支操作(try|confirm|cancel)

  1. 开启本地事务
  2. 对于当前操作op(try|confirm|cancel),insert ignore一条数据gid-branchid-op,如果插入不成功,提交事务返回成功(常见的幂等控制方法)
  3. 如果当前操作是cancel,那么在insert ignore一条数据gid-branchid-try,如果插入成功(注意是成功),则提交事务返回成功
  4. 调用屏障内的业务逻辑,如果业务返回成功,则提交事务返回成功;如果业务返回失败,则回滚事务返回失败

在此机制下,解决了乱序相关的问题

  • 空补偿控制--如果Try没有执行,直接执行了Cancel,那么3中Cancel插入gid-branchid-try会成功,不走屏障内的逻辑,保证了空补偿控制
  • 幂等控制--2中任何一个操作都无法重复插入唯一键,保证了不会重复执行
  • 防悬挂控制--Try在Cancel之后执行,那么Cancel会在3中插入gid-branchid-try,导致Try在2中不成功,就不执行屏障内的逻辑,保证了防悬挂控制

 对于SAGA、二阶段消息,也是类似的机制。

7. 更多高级场景

在实际应用中,还遇见过一些业务场景,需要一些额外的技巧进行处理

7.1 部分第三方操作无法回滚(go语言写了一点,不要在意这些细节,下面开始 .NET)

例如一个订单中的发货,一旦给出了发货指令,那么涉及线下相关操作,那么很难直接回滚。对于涉及这类情况的saga如何处理呢?

我们把一个事务中的操作分为可回滚的操作,以及不可回滚的操作。那么把可回滚的操作放到前面,把不可回滚的操作放在后面执行,那么就可以解决这类问题

	saga := dtmcli.NewSaga(DtmServer, shortuuid.New()).
			Add(Busi+"/CanRollback1", Busi+"/CanRollback1Revert", req).
			Add(Busi+"/CanRollback2", Busi+"/CanRollback2Revert", req).
			Add(Busi+"/UnRollback1", "", req).
			Add(Busi+"/UnRollback2", "", req).
			EnableConcurrent().
			AddBranchOrder(2, []int{0, 1}). // 指定step 2,需要在0,1完成后执行
			AddBranchOrder(3, []int{0, 1}) // 指定step 3,需要在0,1完成后执行

示例中的代码,指定Step 2,3 中的 UnRollback 操作,必须在Step 0,1 完成后执行。

对于不可回滚的操作,DTM的设计建议是,不可回滚的操作在业务上也不允许返回失败。可以这么思考,如果发货的操作返回了失败,那么这个失败的含义是不够清晰的,调用方不知道这个失败是修改了部分数据的失败,还是修改数据前的业务校验失败,因为这个操作不可回滚,所以调用方收到这个失败,是不知道如何正确处理这个错误的。

另外当你的一个全局事务中,如果出现了两个既不可回滚的又可能返回失败的操作,那么到了实际运行中,一个执行成功,一个执行失败,此时执行成功的那个事务无法回滚,那么这个事务的一致性就不可能保证了。

对于发货操作,如果可能在校验数据上可能发生失败,那么将发货操作拆分为发货校验、发货两个服务则会清晰很多,发货校验可回滚,发货不可回滚同时也不会失败。

7.2 超时回滚

saga属于长事务,因此持续的时间跨度很大,可能是100ms到1天,因此saga没有默认的超时时间。dtm支持saga事务单独指定超时时间,到了超时时间,全局事务就会回滚。

saga.TimeoutToFail = 1800

在saga事务中,设置超时时间一定要注意,这类事务里不能够包含无法回滚的事务分支,因为超时回滚时,已执行的无法回滚的分支,数据就是错的。

8.0 .NET CORE结合DTM实现Saga(C#启动)

8.1 准备工作(和前两期的注册环节、数据库差不多的操作,看过前两期的小伙伴可跳过8.1阶段)

8.1.1 Nuget引入Dtmcli

  <ItemGroup>
    <PackageReference Include="Dtmcli" Version="1.4.0" />
  </ItemGroup>

8.1.2 生成转账数据库(EF_CORE)

//模型
public partial class UserMoney
{
    public int id { get; set; }
    public int money { get; set; }
    public int trading_balance { get; set; }
    public int balance { get; set; }
    public int trymoney { get; set; }
    public string guid { get; set; }
}

8.1.3 DbContext

 public class DtmDbContext : DbContext
 {
     public DtmDbContext() { }
     public DtmDbContext(DbContextOptions<DtmDbContext> options) : base(options) { }
 
     public virtual DbSet<UserMoney> UserMoney { get; set; }
 
     protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
     {
         optionsBuilder
             .UseMySql("server=localhost;port=3307;user id=root;password=123;database=DTM_Test", ServerVersion.Parse("8.0.23-mysql"))
             .UseLoggerFactory(LoggerFactory.Create(option =>
             {
                 option.AddConsole();
             }));
     }
 
     protected override void OnModelCreating(ModelBuilder modelBuilder)
     {
         modelBuilder
             .UseCollation("utf8_general_ci")
             .HasCharSet("utf8");
 
         modelBuilder.Entity<UserMoney>(entity =>
         {
             entity.ToTable("UserMoney");
         });
     }
 }

8.1.4 数据库持久化

CREATE TABLE
IF
	NOT EXISTS DTM_Test.barrier (
	id BIGINT ( 22 ) PRIMARY KEY AUTO_INCREMENT,
	trans_type VARCHAR ( 45 ) DEFAULT '',
	gid VARCHAR ( 128 ) DEFAULT '',
	branch_id VARCHAR ( 128 ) DEFAULT '',
	op VARCHAR ( 45 ) DEFAULT '',
	barrier_id VARCHAR ( 45 ) DEFAULT '',
	reason VARCHAR ( 45 ) DEFAULT '' COMMENT 'the branch type who insert this record',
	create_time datetime DEFAULT now( ),
	update_time datetime DEFAULT now( ),
	KEY ( create_time ),
	KEY ( update_time ),
	UNIQUE KEY ( gid, branch_id, op, barrier_id ) 
	) ENGINE = INNODB DEFAULT CHARSET = utf8mb4;

8.1.5 数据库最终生成

8.1.6 appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "ConnectionString": "server=localhost;port=3307;user id=root;password=123;database=DTM_Test",
  "DtmSettings": {
    "TransactionUrl": "http://localhost:5271",
    "CompensateUrl": "http://localhost:5271"
  }
}

8.1.7 Program.cs

  // 注册DbContext
  builder.Services.AddDbContext<DtmDbContext>(options =>
  {
      options.UseMySql(builder.Configuration.GetValue<string>("ConnectionString"), ServerVersion.Parse("8.0.23-mysql"));
  });

  builder.Services.Configure<DtmSettings>(builder.Configuration.GetSection("DtmSettings"));

  builder.Services.AddDtmcli(dtm =>
  {
      dtm.DtmUrl = "http://localhost:36789";
      dtm.SqlDbType = "mysql";
      dtm.BarrierSqlTableName = "dtm_test.barrier";
  });

8.2 主程序事务API控制器

using DTM_EF;
using Dtmcli;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using System.Data.Common;
using MySqlConnector;
using DTM_EF.Model;
using Dtm_Saga;
using DtmCommon;
using Microsoft.CodeAnalysis.Operations;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using Newtonsoft.Json;

namespace Dtm_Saga.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class DtmSagaController : ControllerBase
    {
        private readonly ILogger<DtmSagaController> _logger;
        private readonly IDtmClient _dtmClient;
        private readonly IDtmTransFactory _transFactory;
        private readonly DtmSettings _settings;
        private readonly IBranchBarrierFactory _factory;
        private readonly DtmDbContext _dtmDbContext;

        public DtmSagaController(ILogger<DtmSagaController> logger,
            IDtmClient dtmClient,
            IDtmTransFactory transFactory,
            IOptions<DtmSettings> settings,
            IBranchBarrierFactory factory,
            DtmDbContext dtmDbContext)
        {
            _logger = logger;
            _dtmClient = dtmClient;
            _transFactory = transFactory;
            _settings = settings.Value;
            _factory = factory;
            _dtmDbContext = dtmDbContext;
        }

        [HttpPost("dtm-Saga")]
        public async Task<IActionResult> Get(int Money, CancellationToken cancellationToken)
        {
            var obj = TransResponse.BuildFailureResponse();
            try
            {
                //1. 创建gid。
                var gid = await _dtmClient.GenGid(cancellationToken);

                //2. 用户模型。
                UserMoney bodyA = new UserMoney() { id = 1, trymoney = -Money, guid = string.Empty };
                UserMoney bodyB = new UserMoney() { id = 2, trymoney = Money, guid = string.Empty };

                //3. 设置分支事务和补偿事务。
                var saga = _transFactory.NewSaga(gid)
                    .Add(_settings.TransactionUrl + "/Saga/UserATransactionUrl", _settings.CompensateUrl + "/Saga/UserACompensateUrl", bodyA)
                    .Add(_settings.TransactionUrl + "/Saga/UserBTransactionUrl", _settings.CompensateUrl + "/Saga/UserBCompensateUrl", bodyB)
                    .EnableWaitResult();//开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。

                //4. 执行submit
                await saga.Submit();

                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("result gid is {0}", gid);
                Console.ResetColor();

                obj = TransResponse.BuildSucceedResponse();
            }
            catch (DtmException ex)
            {
                obj = TransResponse.BuildFailureResponse();
            }
            return Ok(obj);
        }
    }
}

8.3 用户1转账事务API控制器

using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;
using ServiceStack.Redis;
using System.Threading;

namespace Dtm_Saga.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class SagaUserAController : ControllerBase
    {
        private readonly IBranchBarrierFactory _barrierFactory;
        private readonly ILogger<SagaUserAController> _Logger;
        private readonly DtmDbContext _dtmDbContext;
        private readonly IRedisClient _redisClient;
        private readonly RedisService _redisService;

        public SagaUserAController(IBranchBarrierFactory barrierFactory,
            ILogger<SagaUserAController> Logger,
            DtmDbContext dtmDbContext,
            IRedisClient redisClient,
            RedisService redisService)
        {
            _barrierFactory = barrierFactory;
            _Logger = Logger;
            _dtmDbContext = dtmDbContext;
            _redisClient = redisClient;
            _redisService = redisService;
        }

        [HttpPost]
        [Route("/Saga/UserATransactionUrl")]
        public async Task<IActionResult> UserATransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildSucceedResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                try
                {
                    await branchBarrier.Call(conn, async (tx) =>
                    {
                        //获取用户账户信息
                        var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();

                        if (UserMoney is null)
                        {
                            obj = TransResponse.BuildFailureResponse();
                            throw new Exception($"用户{body.id}--不存在");
                        }

                        if (UserMoney.money + body.trymoney < 0)
                        {
                            obj = TransResponse.BuildFailureResponse();
                            throw new Exception($"用户{body.id}--金额不足");
                        }

                        //前序判断都通过,修改信息准备提交   
                        UserMoney!.money += body.trymoney;
                        _dtmDbContext.SaveChanges();

                        await Task.CompletedTask;
                    });
                }
                catch (Exception ex)
                {
                    _Logger.LogError(ex.Message);
                }
            }
            return Ok(obj);
        }

        [HttpPost]
        [Route("/Saga/UserACompensateUrl")]
        public async Task<IActionResult> UserACompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,
                    [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            //var branchBarrier = _barrierFactory.CreateBranchBarrier(trans_type, gid, branch_id, op);
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildSucceedResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                try
                {
                    await branchBarrier.Call(conn, async (tx) =>
                    {
                        //获取用户账户信息
                        var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();
                        if (UserMoney is null)
                        {
                            obj = TransResponse.BuildFailureResponse();
                            throw new Exception($"用户{body.id}--不存在");
                        }
                        //前序判断都通过,修改信息准备提交 
                        UserMoney!.money -= body.trymoney;
                        _dtmDbContext.SaveChanges();

                        await Task.CompletedTask;
                    });
                }
                catch (Exception ex)
                {
                    _Logger.LogError(ex.Message);
                }
            }
            return Ok(obj);
        }
    }
}

8.4 用户2转账事务API控制器

using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using ServiceStack.Redis;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;

namespace Dtm_Saga.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class SagaUserBController : ControllerBase
    {
        private readonly IBranchBarrierFactory _barrierFactory;
        private readonly ILogger<SagaUserBController> _Logger;
        private readonly DtmDbContext _dtmDbContext;
        private readonly IRedisClient _redisClient;
        private readonly RedisService _redisService;

        public SagaUserBController(IBranchBarrierFactory barrierFactory,
            ILogger<SagaUserBController> Logger,
            DtmDbContext dtmDbContext,
            IRedisClient redisClient,
            RedisService redisService)
        {
            _barrierFactory = barrierFactory;
            _Logger = Logger;
            _dtmDbContext = dtmDbContext;
            _redisClient = redisClient;
            _redisService = redisService;
        }

        [HttpPost]
        [Route("/Saga/UserBTransactionUrl")]
        public async Task<IActionResult> UserBTransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildSucceedResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                try
                {
                    await branchBarrier.Call(conn, async (tx) =>
                    {
                        //获取用户账户信息
                        var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();

                        if (UserMoney is null)
                        {
                            obj = TransResponse.BuildFailureResponse();
                            throw new Exception($"用户{body.id}--不存在");
                        }

                        if (UserMoney.money + body.trymoney < 0)
                        {
                            obj = TransResponse.BuildFailureResponse();
                            throw new Exception($"用户{body.id}--金额不足");
                        }

                        //前序判断都通过,修改信息准备提交   
                        UserMoney!.money += body.trymoney;
                        _dtmDbContext.SaveChanges();

                        await Task.CompletedTask;
                    });
                }
                catch (Exception ex)
                {
                    _Logger.LogError(ex.Message);
                }
            }
            return Ok(obj);
        }

        [HttpPost]
        [Route("/Saga/UserBCompensateUrl")]
        public async Task<IActionResult> UserBCompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,
                    [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildSucceedResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                try
                {
                    await branchBarrier.Call(conn, async (tx) =>
                    {
                        //获取用户账户信息
                        var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();

                        if (UserMoney == null)
                        {
                            obj = TransResponse.BuildFailureResponse();
                            throw new Exception($"用户{body.id}--不存在");
                        }

                        //修改信息准备提交       
                        UserMoney!.money -= body.trymoney;
                        _dtmDbContext.SaveChanges();

                        await Task.CompletedTask;
                    });
                }
                catch (Exception ex)
                {
                    _Logger.LogError(ex.Message);
                }
            }
            return Ok(obj);
        }
    }
}

9. 开始运行

9.1 先给A和B两位用户各1000块钱。

9.2 执行转账

转100。

转-200。 

此时我们可以看到Saga分布式事务已经正常执行,并完成了转100和-200的操作。

10. 并发下执行Saga分布式事务

我们思考一个问题,每次只有一个请求的时候Saga分布式事务完美运行,但是在高并发下也能正常运行吗?我们测试一下。

打开apipost,输入请求地址,选择一键压测。

每次请求转账10元,10个并发。难道真的能转账成功吗?(执行之前恢复数据,A:1000元;B:1000元)。我们开始运行。

执行之后可以看到A:960元,B:1080元。按照常理来说每次转账10块钱,并发10次。10*10=100元,应该是A900元,B1100元。但是为什么会出现这个情况呢?

原因就是上一个并发还没执行完,当前并发也会访问数据库资源。数据库执行冲突,导致金额转账失败。如果你的程序写成这样基本就可以卷铺盖走人了。

那我们应该如何解决呢?有的小伙伴会说可以使用RabbitMQ消息队列(.NET CORE消息队列RabbitMQ-CSDN博客)。这确实是一个解决方案,把数据交给RabbitMQ。然后订阅一个一个数据的执行,一个一个用户进行扣款或转账。确实可以完美解决这个并发的问题。解决并发有很多解决方案。今天呢就不用RabbitMQ了,换一个。用Redis缓存数据库的分布式锁来解决这个并发的问题。(针对于Redis缓存数据库的分布式锁的非阻塞锁、阻塞锁、红锁以及锁的续命.NET CORE使用Redis分布式锁续命(续期)问题-CSDN博客,缓存数据类型,缓存api,Lua脚本,主从模式,读写分离,哨兵模式,集群模式等........之后陆续会推出文章,现在不做过多赘述,先解决这个并发问题。)

10.1 Nuget引入StackExchange.Redis

  <ItemGroup>
    <PackageReference Include="StackExchange.Redis" Version="2.7.4" />
  </ItemGroup>

ServiceStack.Redis自3.9版本以后开始收费,我们坚持一贯的作风,能不花钱的就不花钱。但是ServiceStack.Redis提供的分布式锁api写的是真的很好,StackExchange.Redis里没有阻塞锁。我们需要自己手写。

using Microsoft.AspNetCore.DataProtection.KeyManagement;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using System.Diagnostics;
using System.Net.Sockets;
using System.Threading;

namespace Dtm_Saga
{
    public class RedisService
    {
        private readonly ConnectionMultiplexer _redis;
        private readonly IDatabase _database;

        /// <summary>
        /// 初始化 <see cref="RedisService"/> 类的新实例。
        /// </summary>
        /// <param name="connectionMultiplexer">连接多路复用器。</param>
        public RedisService(string connectionString)
        {
            _redis = ConnectionMultiplexer.Connect(connectionString);
            _database = _redis.GetDatabase();
        }

        #region 分布式锁...

        #region 阻塞锁

        public bool RedisLock(string key, int expireMilliSeconds, int timeout)
        {
            var script = @"local isNX = redis.call('SETNX', KEYS[1], ARGV[1])
                           if isNX == 1 then
                               redis.call('PEXPIRE', KEYS[1], ARGV[2])
                               return 1
                           end
                           return 0";
            RedisKey[] scriptkey = { key };
            RedisValue[] scriptvalues = { key, expireMilliSeconds * 1000 };
            var stopwatch = Stopwatch.StartNew();
            while (stopwatch.Elapsed.TotalSeconds < timeout)
            {
                if (_database.ScriptEvaluate(script, scriptkey, scriptvalues).ToString() == "1")
                {
                    stopwatch.Stop();
                    return true;
                }
            }
            Console.WriteLine($"[{DateTime.Now}]{key}--阻塞锁超时");
            stopwatch.Stop();
            return false;
        }

        public bool RedisUnLock(string key)
        {
            var script = @"local getLock = redis.call('GET', KEYS[1])
                            if getLock == ARGV[1] then
                              redis.call('DEL', KEYS[1])
                              return 1
                            end
                            return 0"
            ;
            RedisKey[] scriptkey = { key };
            RedisValue[] scriptvalues = { key };
            return _database.ScriptEvaluate(script, scriptkey, scriptvalues).ToString() == "1";
        }

        #endregion

        #endregion
    }
}

 用Redis的Lua脚本来实现阻塞锁的机制,Lua脚本在Redis中被认为是原子性的。在执行Lua脚本期间,Redis不会并行处理其他客户端的命令,而是将它们排队等待。因此,Lua脚本中的所有指令都会连续无中断地执行,不会与其他任何命令交错。

10.2 Program.cs代码修改

Program中注册Redis,这里连接超时加了500000秒,是要高并发时程序一直在请求Redis,实际生产环境需要多次测试,选择一个最佳的超时时间。

            builder.Services.AddSingleton<RedisService>(provider =>
            {
                //你的Redis连接字符串
                string redisConnectionString = "127.0.0.1:6379,abortConnect=false,syncTimeout=500000";
                return new RedisService(redisConnectionString);
            });

10.3 用户1转账事务API控制器代码修改

using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;
using ServiceStack.Redis;
using System.Threading;

namespace Dtm_Saga.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class SagaUserAController : ControllerBase
    {
        private readonly IBranchBarrierFactory _barrierFactory;
        private readonly ILogger<SagaUserAController> _Logger;
        private readonly DtmDbContext _dtmDbContext;
        private readonly IRedisClient _redisClient;
        private readonly RedisService _redisService;

        public SagaUserAController(IBranchBarrierFactory barrierFactory,
            ILogger<SagaUserAController> Logger,
            DtmDbContext dtmDbContext,
            IRedisClient redisClient,
            RedisService redisService)
        {
            _barrierFactory = barrierFactory;
            _Logger = Logger;
            _dtmDbContext = dtmDbContext;
            _redisClient = redisClient;
            _redisService = redisService;
        }

        [HttpPost]
        [Route("/Saga/UserATransactionUrl")]
        public async Task<IActionResult> UserATransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildSucceedResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                try
                {
                    await branchBarrier.Call(conn, async (tx) =>
                    {
                        //Redis分布式锁,锁定
                        if (_redisService.RedisLock("DataLock:UserATransactionUrl", 2000, 2000))
                        {
                            //获取用户账户信息
                            var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();

                            if (UserMoney is null)
                            {
                                obj = TransResponse.BuildFailureResponse();
                                throw new Exception($"用户{body.id}--不存在");
                            }

                            if (UserMoney.money + body.trymoney < 0)
                            {
                                obj = TransResponse.BuildFailureResponse();
                                throw new Exception($"用户{body.id}--金额不足");
                            }

                            //前序判断都通过,修改信息准备提交   
                            UserMoney!.money += body.trymoney;
                            _dtmDbContext.SaveChanges();
                        }

                        await Task.CompletedTask;
                    });
                }
                catch (Exception ex)
                {
                    _Logger.LogError(ex.Message);
                }
                finally
                {
                    //Redis分布式锁,释放锁
                    _redisService.RedisUnLock("DataLock:UserATransactionUrl");
                }
            }
            return Ok(obj);
        }

        [HttpPost]
        [Route("/Saga/UserACompensateUrl")]
        public async Task<IActionResult> UserACompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,
                    [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            //var branchBarrier = _barrierFactory.CreateBranchBarrier(trans_type, gid, branch_id, op);
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildSucceedResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                try
                {
                    await branchBarrier.Call(conn, async (tx) =>
                    {
                        //Redis分布式锁,锁定
                        if (_redisService.RedisLock("DataLock:UserACompensateUrl", 2000, 2000))
                        {
                            //获取用户账户信息
                            var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();
                            if (UserMoney is null)
                            {
                                obj = TransResponse.BuildFailureResponse();
                                throw new Exception($"用户{body.id}--不存在");
                            }
                            //前序判断都通过,修改信息准备提交 
                            UserMoney!.money -= body.trymoney;
                            _dtmDbContext.SaveChanges();
                        }
                        await Task.CompletedTask;
                    });
                }
                catch (Exception ex) { _Logger.LogError(ex.Message); }
                finally
                {
                    //Redis分布式锁,释放锁
                    _redisService.RedisUnLock("DataLock:UserACompensateUrl");
                }
            }
            return Ok(obj);
        }
    }
}

10.4 用户2转账事务API控制器代码修改

using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using ServiceStack.Redis;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;

namespace Dtm_Saga.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class SagaUserBController : ControllerBase
    {
        private readonly IBranchBarrierFactory _barrierFactory;
        private readonly ILogger<SagaUserBController> _Logger;
        private readonly DtmDbContext _dtmDbContext;
        private readonly IRedisClient _redisClient;
        private readonly RedisService _redisService;

        public SagaUserBController(IBranchBarrierFactory barrierFactory,
            ILogger<SagaUserBController> Logger,
            DtmDbContext dtmDbContext,
            IRedisClient redisClient,
            RedisService redisService)
        {
            _barrierFactory = barrierFactory;
            _Logger = Logger;
            _dtmDbContext = dtmDbContext;
            _redisClient = redisClient;
            _redisService = redisService;
        }

        [HttpPost]
        [Route("/Saga/UserBTransactionUrl")]
        public async Task<IActionResult> UserBTransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildSucceedResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                try
                {
                    await branchBarrier.Call(conn, async (tx) =>
                    {
                        //Redis分布式锁,锁定
                        if (_redisService.RedisLock("DataLock:UserBTransactionUrl", 2000, 2000))
                        {
                            //获取用户账户信息
                            var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();

                            if (UserMoney is null)
                            {
                                obj = TransResponse.BuildFailureResponse();
                                throw new Exception($"用户{body.id}--不存在");
                            }

                            if (UserMoney.money + body.trymoney < 0)
                            {
                                obj = TransResponse.BuildFailureResponse();
                                throw new Exception($"用户{body.id}--金额不足");
                            }

                            //前序判断都通过,修改信息准备提交   
                            UserMoney!.money += body.trymoney;
                            _dtmDbContext.SaveChanges();
                        }
                        await Task.CompletedTask;
                    });
                }
                catch (Exception ex)
                {
                    _Logger.LogError(ex.Message);
                }
                finally
                {
                    //Redis分布式锁,释放锁
                    _redisService.RedisUnLock("DataLock:UserBTransactionUrl");
                }
            }
            return Ok(obj);
        }

        [HttpPost]
        [Route("/Saga/UserBCompensateUrl")]
        public async Task<IActionResult> UserBCompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,
                    [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body)
        {
            var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query);
            var obj = TransResponse.BuildSucceedResponse();
            using (MySqlConnection conn = new MySqlConnection("server=localhost;port=3307;user id=root;password=123;database=DTM_Test"))
            {
                try
                {
                    await branchBarrier.Call(conn, async (tx) =>
                    {
                        //Redis分布式锁,锁定
                        if (_redisService.RedisLock("DataLock:UserBCompensateUrl", 2000, 2000))
                        {
                            //获取用户账户信息
                            var UserMoney = _dtmDbContext.Set<UserMoney>().Where(c => c.id == body.id).FirstOrDefault();

                            if (UserMoney == null)
                            {
                                obj = TransResponse.BuildFailureResponse();
                                throw new Exception($"用户{body.id}--不存在");
                            }

                            //修改信息准备提交       
                            UserMoney!.money -= body.trymoney;
                            _dtmDbContext.SaveChanges();
                        }
                        await Task.CompletedTask;
                    });
                }
                catch (Exception ex) { _Logger.LogError(ex.Message); }
                finally
                {
                    //Redis分布式锁,释放锁
                    _redisService.RedisUnLock("DataLock:UserACompensateUrl");
                }
            }
            return Ok(obj);
        }
    }
}

!!!谨记!!!-----释放锁的时候,每个锁的释放代码只能在finally出现一次。不能在try里也写上释放锁。虽然当前并发在try里释放和finally里释放运行并没有问题。但是下一个并发在执行的时候上一个执行到try释放锁之后,立即抢锁,抢锁成功。结果在执行的时候上一个并发在执行finally的时候给释放了,这样是不对的。

10.5 Redis启动

修改好代码之后继续运行。(先启动一个win版本的Redis,linux docker启动等操作之后推出文章。数据进行恢复,A:1000元,B:1000元)。每次转账1元,直接上100,200,500并发。

转账1元,100并发:

转账1元,200并发:(让程序跑一会,喝杯水)

转账1元,500并发:(可以离开工位出去透透气)

为什么A:1627元,B:373元 呢?单次并发有点多导致HTTP请求超时。这时候当前的服务就要部署多个实例,可参考之前文章微服务架构Nacos(.NET CORE微服务之Nacos_nacos .net core-CSDN博客.NET CORE微服务之Ocelot(连接Nacos)_net ocelot + noces-CSDN博客.NET CORE微服务之Polly_polly .net core-CSDN博客),或用Nginx反向代理实现负载均衡。

虽然出现问题,也无法避免人工介入。但是我们最起码保证了用户资产并未出现超减或超加现象,这也是电商中的秒杀,防止商品超卖解决方案。当然应对高并发还有非常多的解决方案。

小结

本文给出了一个完整的 SAGA 事务方案,是一个可以实际运行的 SAGA,并解决高并发的使用场景,您只需要在这个示例的基础上进行简单修改,就能够用于解决您的真实问题

本文项目代码资源地址:【免费】.NETCORE分布式事务(三)DTM实现Saga及高并发下的解决方案资源-CSDN文库

本文含有隐藏内容,请 开通VIP 后查看