ABP VNext + MongoDB 数据存储:多模型支持与 NoSQL 扩展

发布于:2025-06-24 ⋅ 阅读:(19) ⋅ 点赞:(0)

🚀 ABP VNext + MongoDB 数据存储:多模型支持与 NoSQL 扩展(生产级实践)



🎯 引言

在高并发、快速迭代的业务环境中,传统 RDBMS 因结构僵硬、事务开销大而难以应对。MongoDB 以其灵活文档模型、高吞吐与分布式能力,成为 ABP 应用的理想补充。本文将示范如何在 ABP VNext 中生产级地集成 MongoDB——从配置、DI、仓储,到事务、多模型设计与监控全覆盖。

💬 业务痛点

  • 频繁迭代导致表结构变更成本高
  • 大规模写入时事务与锁竞争瓶颈明显
  • 多租户隔离需高扩展性

🧰 环境与依赖

  • 🖥️ .NET 8
  • 📦 ABP v6+
  • 🌐 MongoDB Server 6.x(Replica Set / Sharded Cluster)
  • 📦 NuGet 包
    • MongoDB.Driver
    • Volo.Abp.MongoDB

⚙️ appsettings.json

{
  "ConnectionStrings": {
    "MongoDb": "mongodb://localhost:27017/?maxPoolSize=200&minPoolSize=50"
  },
  "MongoDb": {
    "DatabaseName": "MyProjectDb"
  }
}

🏗️ 架构概述

📩 HTTP 请求
🔧 Application Service
📦 Domain Service
📚 MongoRepository
🗄️ MyMongoDbContext
🌐 MongoDB Server

🤖 集成与配置

📑 模块注册

public override void PreConfigureServices(ServiceConfigurationContext context)
{
    Configure<AbpMongoDbContextOptions>(options =>
    {
        options.ConnectionStringName = "MongoDb";
    });
}

public override void ConfigureServices(ServiceConfigurationContext context)
{
    context.Services.AddMongoDbContext<MyMongoDbContext>(builder =>
    {
        // includeAllEntities: false 仅为聚合根生成仓储
        builder.AddDefaultRepositories(includeAllEntities: false);
    });
}

💡 可根据项目需要,将 includeAllEntities 设置为 truefalse

📘 DbContext 定义

[ConnectionStringName("MongoDb")]
[MultiTenant]
public class MyMongoDbContext : AbpMongoDbContext
{
    public IMongoCollection<Order> Orders => Database.GetCollection<Order>("Orders");
    public IMongoCollection<Address> Addresses => Database.GetCollection<Address>("Addresses");

    public MyMongoDbContext(IAbpMongoDbContextOptions<MyMongoDbContext> options)
        : base(options) { }
}
  • 建议:在模块 PreConfigureServices 注入 ICurrentTenant 控制数据库路由。

📦 自定义仓储实现

public interface IMongoRepository<TEntity, TKey> : IRepository<TEntity, TKey>
    where TEntity : class, IEntity<TKey>
{
    Task BulkInsertAsync(IEnumerable<TEntity> entities, bool isOrdered = false);
    Task<IEnumerable<TResult>> AggregateLookupAsync<TForeign, TResult>(
        Expression<Func<TEntity, object>> localField,
        Expression<Func<TForeign, object>> foreignField,
        PipelineDefinition<TEntity, TResult> pipeline
    );
}

public class MongoRepository<TEntity, TKey>
    : MongoDbRepository<MyMongoDbContext, TEntity, TKey>, IMongoRepository<TEntity, TKey>
    where TEntity : class, IEntity<TKey>
{
    private readonly IMongoCollection<TEntity> _collection;

    public MongoRepository(IDbContextProvider<MyMongoDbContext> dbContextProvider)
        : base(dbContextProvider)
    {
        _collection = dbContextProvider.GetDbContext()
            .Database.GetCollection<TEntity>(typeof(TEntity).Name);
    }

    public async Task BulkInsertAsync(IEnumerable<TEntity> entities, bool isOrdered = false)
    {
        var models = entities.Select(e => new InsertOneModel<TEntity>(e));
        await _collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = isOrdered });
    }

    public async Task<IEnumerable<TResult>> AggregateLookupAsync<TForeign, TResult>(
        Expression<Func<TEntity, object>> localField,
        Expression<Func<TForeign, object>> foreignField,
        PipelineDefinition<TEntity, TResult> pipeline)
    {
        return await _collection.Aggregate(pipeline).ToListAsync();
    }
}

🔐 事务处理与一致性

🔄 UnitOfWork 流程

Controller AppService UnitOfWork Repository CreateAsync(input) BeginTransaction() InsertAsync(order) Acknowledged Commit() Completed Return OrderDto Controller AppService UnitOfWork Repository
public class OrderAppService : ApplicationService, IOrderAppService
{
    private readonly IMongoRepository<Order, Guid> _orderRepository;

    public OrderAppService(IMongoRepository<Order, Guid> orderRepository)
        => _orderRepository = orderRepository;

    [UnitOfWork]
    public async Task<OrderDto> CreateAsync(CreateOrderDto input)
    {
        var order = ObjectMapper.Map<Order>(input);
        await _orderRepository.InsertAsync(order);
        return ObjectMapper.Map<OrderDto>(order);
    }
}

🗺️ 分片与模型设计

🔑 Shard Key 评估

sh.shardCollection("MyProjectDb.Orders", { CustomerId: 1, CreatedAt: 1 });

⚠️ 复合键示例,可有效避免单一热点。

🏗️ 多模型建模

引用模型
AddressId
Order
Address 集合
嵌入式
Address 嵌入
Order
// 示例 $lookup 聚合
var results = await _context.Orders.Aggregate()
    .Lookup<Address, LookupResult>(
        _context.Addresses,
        o => o.AddressId,
        a => a.Id,
        result => result.Addresses
    )
    .ToListAsync();

🚀 性能优化指南

📈 索引创建

var orderCollection = context.Database.GetCollection<Order>("Orders");
await orderCollection.Indexes.CreateManyAsync(new[]
{
    new CreateIndexModel<Order>(
        Builders<Order>.IndexKeys.Ascending(o => o.CustomerId)
    ),
    new CreateIndexModel<Order>(
        Builders<Order>.IndexKeys.Descending(o => o.CreatedAt)
    )
});

⚡ 批量写入

await repository.BulkInsertAsync(largeOrderList, isOrdered: false);

🛠️ 捕获 BulkWriteException 并重试或补偿处理。


📊 监控与可观测性

🐢 慢查询检测(CommandSucceededEvent)

Configure<AbpMongoOptions>(options =>
{
    options.ClusterConfigurator = cb =>
    {
        cb.Subscribe<CommandSucceededEvent>(e =>
        {
            if (e.CommandName == "find" && e.Duration > TimeSpan.FromMilliseconds(100))
            {
                Logger.LogWarning("🐢 Slow MongoDB query: {Command}", e.Command);
            }
        });
    };
});

🌐 Application Insights

services.AddApplicationInsightsTelemetry();
var telemetryClient = serviceProvider.GetRequiredService<TelemetryClient>();
// 示例上报连接池使用率
var poolUsage = /* 读取连接池状态 */;
telemetryClient.TrackMetric("mongo.connectionPoolUsage", poolUsage);

🛠️ Controller 全 CRUD 示例

[ApiController]
[Route("api/orders")]
public class OrdersController : AbpController
{
    private readonly IOrderAppService _service;
    public OrdersController(IOrderAppService service) => _service = service;

    [HttpPost]
    [ProducesResponseType(typeof(OrderDto), 201)]
    public async Task<OrderDto> Create(CreateOrderDto input)
    {
        return await _service.CreateAsync(input);
    }

    [HttpGet("{id}")]
    [ProducesResponseType(typeof(OrderDto), 200)]
    [ProducesResponseType(404)]
    public Task<OrderDto> Get(Guid id) => _service.GetAsync(id);

    [HttpPut("{id}")]
    [ProducesResponseType(typeof(OrderDto), 200)]
    public Task<OrderDto> Update(Guid id, UpdateOrderDto input)
    {
        input.Id = id;
        return _service.UpdateAsync(input);
    }

    [HttpDelete("{id}")]
    [ProducesResponseType(204)]
    public Task Delete(Guid id) => _service.DeleteAsync(id);
}

🧪 单元测试示例(xUnit + Mongo2Go + DI)

public class OrderRepositoryTests : IClassFixture<ServiceFixture>
{
    private readonly IMongoRepository<Order, Guid> _repository;

    public OrderRepositoryTests(ServiceFixture fixture)
    {
        _repository = fixture.ServiceProvider.GetRequiredService<IMongoRepository<Order, Guid>>();
    }

    [Fact]
    public async Task BulkInsert_Should_Insert_All_Orders()
    {
        var orders = Enumerable.Range(1, 10)
            .Select(i => new Order { Id = Guid.NewGuid(), CustomerId = $"C{i}" })
            .ToList();
        await _repository.BulkInsertAsync(orders);
        var count = await _repository.GetCountAsync();
        Assert.Equal(10, count);
    }

    [Fact]
    public async Task Update_Should_Modify_Order()
    {
        var order = await _repository.InsertAsync(new Order { Id = Guid.NewGuid(), CustomerId = "C0" });
        order.CustomerId = "C0-Updated";
        await _repository.UpdateAsync(order);
        var fetched = await _repository.GetAsync(order.Id);
        Assert.Equal("C0-Updated", fetched.CustomerId);
    }

    [Fact]
    public async Task Delete_Should_Remove_Order()
    {
        var order = await _repository.InsertAsync(new Order { Id = Guid.NewGuid(), CustomerId = "C1" });
        await _repository.DeleteAsync(order.Id);
        await Assert.ThrowsAsync<EntityNotFoundException>(() => _repository.GetAsync(order.Id));
    }
}

public class ServiceFixture : IDisposable
{
    public ServiceProvider ServiceProvider { get; }

    public ServiceFixture()
    {
        var runner = MongoDbRunner.Start();
        var services = new ServiceCollection();
        services.Configure<AbpMongoDbContextOptions<MyMongoDbContext>>(options =>
        {
            options.ConnectionStringName = "MongoDb";
        });
        services.AddMongoDbContext<MyMongoDbContext>(builder =>
        {
            builder.AddDefaultRepositories(includeAllEntities: false);
        });
        services.AddTransient(typeof(IMongoRepository<,>), typeof(MongoRepository<,>));
        services.AddSingleton(runner);
        ServiceProvider = services.BuildServiceProvider();
    }

    public void Dispose()
    {
        var runner = ServiceProvider.GetRequiredService<MongoDbRunner>();
        runner.Dispose();
    }
}

📚 附录


网站公告

今日签到

点亮在社区的每一天
去签到