33.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--单体转微服务--财务服务--记账

发布于:2025-08-01 ⋅ 阅读:(21) ⋅ 点赞:(0)

这篇文章我们一起把记账模块从单体应用迁移到微服务架构中。记账模块的功能想必大家都已经了解了,主要是记录用户的收入和支出,以及对这些记录的删除修改和查询等操作。具体的功能可以参考单体应用专栏,在这里就不多讲了。我们现在一起开始迁移记账模块的代码吧。

一、小修改

和前面的功能一样,我们把记账模块的代码从单体应用中抽离出来,放到微服务项目中。但是需要做一些小修改,这些修改包括前面几篇文章中提到的将接口修改为标准的restful接口、将Controller继承自ControllerBase而不是BaseController,以及调整路由地址等,在记账模块中,还要对新增记账、修改记账、删除记账这三个功能进行调整。我们先来看一下单体应用中这三个功能在Service层中的代码:

// more code ...

/// <summary>
/// 收支记录实现类
/// </summary>
public class IncomeExpenditureRecordImp : IIncomeExpenditureRecordServer
{
    // more code ...

    /// <summary>
    /// 新增收支记录
    /// </summary>
    /// <param name="incomeExpenditureRecord"></param>
    public void Add(IncomeExpenditureRecord incomeExpenditureRecord)
    {
        //开启事务
        using (var transaction = _sporeAccountingDbContext.Database.BeginTransaction())
        {
            try
            {
                // 查找记录范围内的预算
                var budget = _sporeAccountingDbContext.Budgets
                    .FirstOrDefault(x => x.UserId == incomeExpenditureRecord.UserId
                                         && x.StartTime <= incomeExpenditureRecord.RecordDate &&
                                         x.EndTime >= incomeExpenditureRecord.RecordDate
                                         && x.IncomeExpenditureClassificationId ==
                                         incomeExpenditureRecord.IncomeExpenditureClassificationId);

                if (budget != null)
                {
                    // 查询分类
                    var classification = _sporeAccountingDbContext.IncomeExpenditureClassifications
                        .FirstOrDefault(x => x.Id == incomeExpenditureRecord.IncomeExpenditureClassificationId);
                    if (classification.Type
                        == IncomeExpenditureTypeEnmu.Income)
                    {
                        budget.Remaining -= incomeExpenditureRecord.AfterAmount;
                        // 获取包含支出记录记录日期的报表记录
                        var reports = _sporeAccountingDbContext.Reports
                            .Where(x => x.UserId == incomeExpenditureRecord.UserId
                                        && x.Year <= incomeExpenditureRecord.RecordDate.Year &&
                                        x.Month >= incomeExpenditureRecord.RecordDate.Month &&
                                        x.ClassificationId ==
                                        incomeExpenditureRecord.IncomeExpenditureClassificationId);
                        // 如果没有就说明程序还未将其写入报表,那么就不做任何处理
                        for (int i = 0; i < reports.Count(); i++)
                        {
                            var report = reports.ElementAt(i);
                            report.Amount += incomeExpenditureRecord.AfterAmount;
                            _sporeAccountingDbContext.Reports.Update(report);
                        }
                    }

                    _sporeAccountingDbContext.Budgets.Update(budget);
                }

                _sporeAccountingDbContext.IncomeExpenditureRecords.Add(incomeExpenditureRecord);
                _sporeAccountingDbContext.SaveChanges();
                //提交事务
                transaction.Commit();
            }
            catch (Exception e)
            {
                //回滚事务
                transaction.Rollback();
                throw;
            }
        }
    }

    /// <summary>
    /// 删除收支记录
    /// </summary>
    /// <param name="incomeExpenditureRecordId"></param>
    /// <returns></returns>
    public void Delete(string incomeExpenditureRecordId)
    {
        //开启事务
        using (var transaction = _sporeAccountingDbContext.Database.BeginTransaction())
        {
            try
            {
                var incomeExpenditureRecord = _sporeAccountingDbContext.IncomeExpenditureRecords
                    .FirstOrDefault(x => x.Id == incomeExpenditureRecordId);
                if (incomeExpenditureRecord != null)
                {
                    // 查找记录范围内的预算
                    var budget = _sporeAccountingDbContext.Budgets
                        .FirstOrDefault(x => x.UserId == incomeExpenditureRecord.UserId
                                             && x.StartTime <= incomeExpenditureRecord.RecordDate &&
                                             x.EndTime >= incomeExpenditureRecord.RecordDate
                                             && x.IncomeExpenditureClassificationId == incomeExpenditureRecord
                                                 .IncomeExpenditureClassificationId);
                    if (budget != null)
                    {
                        // 查询分类
                        var classification = _sporeAccountingDbContext.IncomeExpenditureClassifications
                            .FirstOrDefault(x => x.Id == incomeExpenditureRecord.IncomeExpenditureClassificationId);
                        if (classification.Type
                            == IncomeExpenditureTypeEnmu.Income)
                        {
                            budget.Remaining += incomeExpenditureRecord.AfterAmount;
                            // 获取包含支出记录记录日期的报表记录
                            var reports = _sporeAccountingDbContext.Reports
                                .Where(x => x.UserId == incomeExpenditureRecord.UserId
                                            && x.Year <= incomeExpenditureRecord.RecordDate.Year &&
                                            x.Month >= incomeExpenditureRecord.RecordDate.Month &&
                                            x.ClassificationId ==
                                            incomeExpenditureRecord.IncomeExpenditureClassificationId);
                            // 如果没有就说明程序还未将其写入报表,那么就不做任何处理
                            for (int i = 0; i < reports.Count(); i++)
                            {
                                var report = reports.ElementAt(i);
                                report.Amount -= incomeExpenditureRecord.AfterAmount;
                                _sporeAccountingDbContext.Reports.Update(report);
                            }
                        }

                        _sporeAccountingDbContext.Budgets.Update(budget);
                    }

                    _sporeAccountingDbContext.IncomeExpenditureRecords.Remove(incomeExpenditureRecord);
                    _sporeAccountingDbContext.SaveChanges();
                    //提交事务
                    transaction.Commit();
                }
            }
            catch (Exception e)
            {
                //回滚事务
                transaction.Rollback();
                throw;
            }
        }
    }

    /// <summary>
    /// 修改收支记录
    /// </summary>
    /// <param name="incomeExpenditureRecord"></param>
    /// <returns></returns>
    public void Update(IncomeExpenditureRecord incomeExpenditureRecord)
    {
        using (var transaction = _sporeAccountingDbContext.Database.BeginTransaction())
        {
            try
            {
                // 查询原记录
                var oldIncomeExpenditureRecord = _sporeAccountingDbContext.IncomeExpenditureRecords
                    .FirstOrDefault(x => x.Id == incomeExpenditureRecord.Id);
                // 查找记录范围内的预算
                var budget = _sporeAccountingDbContext.Budgets
                    .FirstOrDefault(x => x.UserId == incomeExpenditureRecord.UserId
                                         && x.StartTime <= incomeExpenditureRecord.RecordDate &&
                                         x.EndTime >= incomeExpenditureRecord.RecordDate
                                         && x.IncomeExpenditureClassificationId ==
                                         incomeExpenditureRecord.IncomeExpenditureClassificationId);
                if (budget != null)
                {
                    // 查询分类
                    var classification = _sporeAccountingDbContext.IncomeExpenditureClassifications
                        .FirstOrDefault(x => x.Id == incomeExpenditureRecord.IncomeExpenditureClassificationId);
                    if (classification.Type
                        == IncomeExpenditureTypeEnmu.Income)
                    {
                        //如果是支出,需要减去原来的金额
                        budget.Remaining = (budget.Amount - incomeExpenditureRecord.AfterAmount);
                        // 根据旧的支出记录判断是否修改了记录日期
                        // 如果是修改了记录日期,那么就将原记录日期所在的报表对应的分类金额减去,将新记录日期所在报表对应的分类金额加上
                        if (oldIncomeExpenditureRecord.RecordDate != incomeExpenditureRecord.RecordDate)
                        {
                            // 获取包含支出记录记录日期的报表记录
                            var oldReports = _sporeAccountingDbContext.Reports
                                .Where(x => x.UserId == incomeExpenditureRecord.UserId
                                            && x.Year <= oldIncomeExpenditureRecord.RecordDate.Year &&
                                            x.Month >= oldIncomeExpenditureRecord.RecordDate.Month &&
                                            x.ClassificationId == oldIncomeExpenditureRecord
                                                .IncomeExpenditureClassificationId);
                            // 如果没有就说明程序还未将其写入报表,那么就不做任何处理
                            for (int i = 0; i < oldReports.Count(); i++)
                            {
                                var oldReport = oldReports.ElementAt(i);
                                oldReport.Amount -= oldIncomeExpenditureRecord.AfterAmount;
                                _sporeAccountingDbContext.Reports.Update(oldReport);
                            }

                            // 获取包含支出记录记录日期的报表记录
                            var newReport = _sporeAccountingDbContext.Reports
                                .Where(x => x.UserId == incomeExpenditureRecord.UserId
                                            && x.Year <= incomeExpenditureRecord.RecordDate.Year &&
                                            x.Month >= incomeExpenditureRecord.RecordDate.Month &&
                                            x.ClassificationId ==
                                            incomeExpenditureRecord.IncomeExpenditureClassificationId);
                            // 如果没有就说明程序还未将其写入报表,那么就不做任何处理
                            for (int i = 0; i < newReport.Count(); i++)
                            {
                                var report = newReport.ElementAt(i);
                                report.Amount += incomeExpenditureRecord.AfterAmount;
                                _sporeAccountingDbContext.Reports.Update(report);
                            }
                        }
                    }
                    else
                    {
                        //如果是收入,需要加上原来的金额
                        budget.Remaining = (budget.Amount + incomeExpenditureRecord.AfterAmount);
                    }

                    _sporeAccountingDbContext.Budgets.Update(budget);
                }

                oldIncomeExpenditureRecord.AfterAmount = incomeExpenditureRecord.AfterAmount;
                oldIncomeExpenditureRecord.BeforAmount = incomeExpenditureRecord.BeforAmount;
                oldIncomeExpenditureRecord.RecordDate = incomeExpenditureRecord.RecordDate;
                oldIncomeExpenditureRecord.Remark = incomeExpenditureRecord.Remark;
                oldIncomeExpenditureRecord.CurrencyId = incomeExpenditureRecord.CurrencyId;
                oldIncomeExpenditureRecord.IncomeExpenditureClassificationId =
                    incomeExpenditureRecord.IncomeExpenditureClassificationId;
                _sporeAccountingDbContext.IncomeExpenditureRecords.Update(oldIncomeExpenditureRecord);
                _sporeAccountingDbContext.SaveChanges();
                //提交事务
                transaction.Commit();
            }
            catch (Exception e)
            {
                //回滚事务
                transaction.Rollback();
                throw;
            }
        }
    }

    // more code ...
}

在上面的代码中,我们看到这三个功能,都有两个共同的操作:一个是对预算的操作,另一个是对报表的操作。在这个单体应用代码中,我们都是直接调用预算和报表的DbSet进行操作,并且启用了事务来保证数据的一致性。这里看似合理,但是实际上并不符合微服务的设计原则,因为微服务应该是独立的,不能直接操作其他服务的数据,同时我们也要保证方法的单一原则,因此在这三个方法里操作其他数据是不合理的。并且,如果我们存储记账记录时,预算数据或者报表数据没有存储成功,就会触发回滚操作,这样就会导致记账记录没有存储成功,对于这种设计来不符合用户体验,因此我们需要对这三个方法进行修改。

我们要做的是将预算增删操作抽离出来,让新增记账、修改记账、删除记账这三个功能在每次存储数据成功后,发送MQ消息,然后预算订阅MQ消息,进行相应的增删改操作。这样就可以保证记账模块的独立性,同时也能保证数据的一致性,即使是预算服务出现问题,也不会影响到记账模块的正常运行。修改后的Service层代码如下:

// more code ...

/// <summary>
/// 记账服务实现类
/// </summary>
public class AccountingServerImpl : IAccountingServer
{
    /// <summary>
    /// RabbitMQ消息处理
    /// </summary>
    private readonly RabbitMqMessage _rabbitMqMessage;

    /// <summary>
    /// 新增记账
    /// </summary>
    /// <param name="accountBookId">账本ID</param>
    /// <param name="request">记账添加请求</param>
    /// <returns></returns>
    public long Add(long accountBookId, AccountingAddRequest request)
    {
        // more code ...

        //通过MQ发送记账数据到消息队列,从预算中扣除金额
        MqPublisher mqPublisher = new MqPublisher(accounting.AfterAmount.ToString("F2"),
            MqExchange.BudgetExchange,
            MqRoutingKey.BudgetRoutingKey,
            MqQueue.BudgetQueue, MessageType.BudgetDeduct, ExchangeType.Direct);
        _rabbitMqMessage.SendAsync(mqPublisher).Start();

        // 返回新增的记账ID
        return accounting.Id;
    }

    /// <summary>
    /// 删除记账
    /// </summary>
    /// <param name="accountBookId">账本ID</param>
    /// <param name="id">记账ID</param>
    public void Delete(long accountBookId, long id)
    {
        // more code ...

        //通过MQ发送删除记账数据到消息队列,把预算中的金额恢复
        MqPublisher mqPublisher = new MqPublisher(accounting.AfterAmount.ToString("F2"),
            MqExchange.BudgetExchange,
            MqRoutingKey.BudgetRoutingKey,
            MqQueue.BudgetQueue, MessageType.BudgetAdd, ExchangeType.Direct);
        _rabbitMqMessage.SendAsync(mqPublisher).Start();
    }

    /// <summary>
    /// 修改记账
    /// </summary>
    /// <param name="accountBookId">账本ID</param>
    /// <param name="request">修改请求</param>
    public void Edit(long accountBookId, AccountingEditRequest request)
    {
        // more code ...

        // 通过MQ发送修改记账数据到消息队列,更新预算中的金额
        MqPublisher mqPublisher = new MqPublisher(amountDifference.ToString("F2"),
            MqExchange.BudgetExchange,
            MqRoutingKey.BudgetRoutingKey,
            MqQueue.BudgetQueue, MessageType.BudgetUpdate, ExchangeType.Direct);
        _rabbitMqMessage.SendAsync(mqPublisher).Start();
    }

    // more code ...
}

在上面的代码中,我们对新增记账、删除记账、修改记账这三个方法进行了修改。新增记账时,我们将记账数据发送到MQ消息队列中,预算服务会订阅这个消息,并进行相应的扣款操作。删除记账时,我们同样发送消息到MQ消息队列中,预算服务会将金额恢复到预算中。修改记账时,我们计算出原金额与新金额的差额,并发送到MQ消息队列中,预算服务会根据差额进行相应的更新操作。下面是新增的预算订阅MQ消息处理类的代码:

using SP.Common.Message.Model;
using SP.Common.Message.Mq;
using SP.Common.Message.Mq.Model;
using SP.FinanceService.Models.Entity;
using SP.FinanceService.Models.Enumeration;
using SP.FinanceService.Service;

namespace SP.FinanceService.Mq;

/// <summary>
/// Budget 消息消费者服务
/// </summary>
public class BudgetConsumerService : BackgroundService
{
    /// <summary>
    /// RabbitMq 消息
    /// </summary>
    private readonly RabbitMqMessage _rabbitMqMessage;

    /// <summary>
    /// 日志记录器
    /// </summary>
    private readonly ILogger<BudgetConsumerService> _logger;

    /// <summary>
    /// 预算服务
    /// </summary>
    private readonly IBudgetServer _budgetService;

    /// <summary>
    ///  Budget 消息消费者服务
    /// </summary>
    /// <param name="rabbitMqMessage"></param>
    /// <param name="logger"></param>
    /// <param name="budgetService"></param>
    public BudgetConsumerService(RabbitMqMessage rabbitMqMessage, ILogger<BudgetConsumerService> logger,
        IBudgetServer budgetService)
    {
        _logger = logger;
        _rabbitMqMessage = rabbitMqMessage;
        _budgetService = budgetService;
    }

    /// <summary>
    /// RabbitMq 消息
    /// </summary>
    /// <param name="stoppingToken"></param>
    /// <returns></returns>
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        MqSubscriber subscriber = new MqSubscriber(MqExchange.BudgetExchange,
            MqRoutingKey.BudgetRoutingKey, MqQueue.BudgetQueue);
        await _rabbitMqMessage.ReceiveAsync(subscriber, async message =>
        {
            // 验证消息
            MqMessage mqMessage = ValidateMessage(message);
            if (mqMessage == null) return;

            // 获取当前预算
            List<Budget> budgets = GetCurrentBudgets();
            if (budgets == null || budgets.Count == 0) return;

            decimal amount = decimal.Parse(message.Body);

            // 根据消息类型处理预算
            switch (mqMessage.Type)
            {
                case MessageType.BudgetAdd:
                    _logger.LogInformation("接收到预算增加处理消息, {Message}", mqMessage);
                    UpdateBudgetsByAmount(budgets, amount, true);
                    break;
                case MessageType.BudgetUpdate:
                    _logger.LogInformation("接收到预算更新消息, {Message}", mqMessage);
                    UpdateBudgetsByAmount(budgets, amount, true);
                    break;
                case MessageType.BudgetDeduct:
                    _logger.LogInformation("接收到预算扣除消息, {Message}", mqMessage);
                    UpdateBudgetsByAmount(budgets, amount, false);
                    break;
                default:
                    _logger.LogWarning("未知的消息类型: {Type}", mqMessage.Type);
                    break;
            }

            await Task.CompletedTask;
        });
    }

    /// <summary>
    /// 验证消息
    /// </summary>
    /// <param name="message">原始消息</param>
    /// <returns>验证后的 MqMessage,如果验证失败返回 null</returns>
    private MqMessage ValidateMessage(object message)
    {
        MqMessage mqMessage = message as MqMessage;
        if (mqMessage == null)
        {
            _logger.LogError("消息转换失败");
            return null;
        }
        return mqMessage;
    }

    /// <summary>
    /// 获取当前预算
    /// </summary>
    /// <returns>当前预算列表,如果没有可用预算返回空列表</returns>
    private List<Budget> GetCurrentBudgets()
    {
        List<Budget> budgets = _budgetService.QueryCurrentBudgets();
        if (budgets == null || budgets.Count == 0)
        {
            _logger.LogInformation("当前没有可用的预算,不执行处理");
            return new List<Budget>();
        }
        return budgets;
    }

    /// <summary>
    /// 根据金额更新预算
    /// </summary>
    /// <param name="budgets">预算列表</param>
    /// <param name="amount">金额</param>
    /// <param name="isAdd">是否为增加操作,true为增加,false为扣除</param>
    private void UpdateBudgetsByAmount(List<Budget> budgets, decimal amount, bool isAdd)
    {
        string operation = isAdd ? "增加" : "扣除";
        decimal operationAmount = isAdd ? amount : -amount;

        // 更新月度预算
        UpdateBudgetByPeriod(budgets, PeriodEnum.Month, operationAmount, operation);
        
        // 更新年度预算
        UpdateBudgetByPeriod(budgets, PeriodEnum.Year, operationAmount, operation);
        
        // 更新季度预算
        UpdateBudgetByPeriod(budgets, PeriodEnum.Quarter, operationAmount, operation);

        // 保存更改到数据库
        _budgetService.UpdateBudgets(budgets);
    }

    /// <summary>
    /// 根据周期更新预算
    /// </summary>
    /// <param name="budgets">预算列表</param>
    /// <param name="period">预算周期</param>
    /// <param name="amount">金额变化</param>
    /// <param name="operation">操作类型描述</param>
    private void UpdateBudgetByPeriod(List<Budget> budgets, PeriodEnum period, decimal amount, string operation)
    {
        Budget budget = budgets.FirstOrDefault(b => b.Period == period);
        if (budget != null)
        {
            budget.Amount += amount;
            string periodName = GetPeriodName(period);
            _logger.LogInformation("{PeriodName}预算{Operation}成功,{Operation}金额: {Amount}", 
                periodName, operation, operation, budget.Amount);
        }
    }

    /// <summary>
    /// 获取周期名称
    /// </summary>
    /// <param name="period">预算周期</param>
    /// <returns>周期名称</returns>
    private string GetPeriodName(PeriodEnum period)
    {
        return period switch
        {
            PeriodEnum.Month => "月度",
            PeriodEnum.Year => "年度",
            PeriodEnum.Quarter => "季度",
            _ => "未知"
        };
    }
}

在上面的代码中,我们创建了一个BudgetConsumerService类,它继承自BackgroundService,用于处理预算相关的MQ消息。我们在ExecuteAsync方法中订阅了MQ消息,并根据消息类型进行相应的处理。我们还添加了一些日志记录,以便于调试和监控。

对于调用报表的部分,我们暂时先将这部分代码删除掉,因为这个设计其实是不符合业务逻辑的,我们的报表都是定时生成的,而不是实时生成的,因此对报表的修改其实是没必要的,因此我们在这里不对报表进行任何操作。等到后面我们实现了报表模块后,再来处理报表相关的逻辑。

二、总结

在这篇文章中,我们将记账模块从单体应用迁移到微服务架构中,并对新增记账、删除记账、修改记账这三个功能进行了调整。我们将预算的增删改操作抽离出来,让记账模块通过MQ消息与预算服务进行交互,从而实现了模块的独立性和数据的一致性。我们还创建了一个BudgetConsumerService类,用于处理预算相关的MQ消息。

通过这些修改,我们使得记账模块能够更好地适应微服务架构的设计原则,同时也提高了系统的可维护性和可扩展性。接下来,我们将继续实现记账模块的其他功能,并将其与其他模块进行集成。


网站公告

今日签到

点亮在社区的每一天
去签到