ABP VNext + RediSearch:微服务级全文检索 🚀
📚 目录
✨ TL;DR
- 🚀 利用 Redis Stack(内置 RediSearch)+ Redis.OM,在 ABP VNext 微服务中实现毫秒级全文检索
- 🐳 Docker Compose & 🎯 Kubernetes Manifest:持久化、ACL 认证、RedisInsight 可视化
- 🏷️ 全局
Prefix
+ 动态 IndexName,完美隔离多租户索引与数据 - 🔄 完整功能:索引创建/删除/写入/批量、实时/删除同步、批量重建、Polly 重试
- 🔍 支持全文、Tag、数值、地理、Facet 聚合;📈 性能对比 PostgreSQL LIKE/FTS vs. RediSearch
- 🔒 生产建议:AOF/RDB、ACL、Pre-commit/SAST、监控 & 慢查询、Testcontainers 集成测试
📚 一、背景与动机 🚀
传统关系型数据库全文检索(LIKE '%关键词%'
或 FTS)在微服务、高并发场景下常遇:
- 性能瓶颈:百万级文档延时 100+ ms;
- 功能受限:地理半径、Facet 聚合需自研;
- 扩展复杂:分片与高可用运维成本高。
RediSearch 基于内存倒排索引,支持次毫秒级响应、实时更新、地理 & 聚合,完美契合高吞吐、低延迟检索需求。
🛠️ 二、环境与依赖 🐳
2.1 Docker Compose 启动 Redis Stack
version: "3.8"
services:
redis:
image: redis/redis-stack:latest
container_name: redis-stack
ports:
- "6379:6379"
- "8001:8001"
volumes:
- redis-data:/data
command:
- redis-server
- --requirepass YourStrong!Pass
- --appendonly yes
volumes:
redis-data:
- 🔐 安全:
--requirepass
强制认证 - 💾 持久化:
--appendonly yes
开启 AOF - 🔍 GUI:访问 http://localhost:8001 使用 RedisInsight
提示:Docker Compose v3 下资源限制字段无效,如需限内存请用 Swarm 或 CLI 参数
--memory
。
docker-compose up -d
2.2 Kubernetes 部署(示例 Manifest)
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis-stack
spec:
replicas: 1
selector: { matchLabels: { app: redis-stack } }
template:
metadata: { labels: { app: redis-stack } }
spec:
containers:
- name: redis
image: redis/redis-stack:latest
args: ["redis-server", "--requirepass", "YourStrong!Pass", "--appendonly", "yes"]
ports:
- containerPort: 6379
- containerPort: 8001
volumeMounts:
- mountPath: /data
name: redis-data
volumes:
- name: redis-data
persistentVolumeClaim:
claimName: redis-pvc
---
apiVersion: v1
kind: Service
metadata:
name: redis-stack
spec:
type: ClusterIP
ports:
- port: 6379
- port: 8001
selector:
app: redis-stack
2.3 ABP VNext & NuGet 包
dotnet add package Redis.OM
dotnet add package StackExchange.Redis
dotnet add package Volo.Abp.Caching.StackExchangeRedis
dotnet add package Polly
appsettings.json:
{
"Abp": {
"DistributedCache": {
"Redis": {
"Configuration": "localhost:6379,password=YourStrong!Pass,allowAdmin=true",
"InstanceName": "MyApp:"
}
}
}
}
🏗️ 三、架构与流程图 🏗️
🔧 四、索引模型与依赖注入 🔧
4.1 模型定义
using Redis.OM.Modeling;
[Document(IndexName = "product-idx")] // 基础索引名
public class ProductIndex
{
[RedisIdField] // 主键
public string Id { get; set; }
[Searchable] // 全文
public string Name { get; set; }
[Indexed(IsTag = true)] // Tag
public string Category { get; set; }
[Indexed(IsSortable = true)] // 数值/排序
public decimal Price { get; set; }
[Indexed(IsGeo = true)] // 地理
public GeoLoc Location { get; set; }
}
4.2 服务注册
public override void ConfigureServices(ServiceConfigurationContext context)
{
// 1. ABP Redis 缓存
context.Services.AddStackExchangeRedisCache(options => { … });
// 2. ConnectionMultiplexer
context.Services.AddSingleton(sp =>
ConnectionMultiplexer.Connect(
sp.GetRequiredService<IConfiguration>()
.GetSection("Abp:DistributedCache:Redis:Configuration")
.Value
)
);
// 3. Redis.OM Provider
context.Services.AddSingleton(sp =>
{
var mux = sp.GetRequiredService<ConnectionMultiplexer>();
var tenantId = sp.GetService<ICurrentTenant>()?.GetId()?.ToString() ?? "global";
return new RedisConnectionProvider(new RedisConnectionProviderOptions
{
RedisConnection = mux,
Prefix = $"tenant:{tenantId}:"
});
});
// 4. 注入索引/搜索服务
context.Services.AddTransient<IIndexService, RedisOmIndexService>();
context.Services.AddTransient<ISearchService, RedisOmSearchService>();
}
注意:
Prefix
仅对文档 HashKey 生效,不会自动修改FT.CREATE
的索引名。若需隔离多租户索引,需在CreateIndexAsync
/DropIndexAsync
中手动拼接:var indexName = $"{prefix}product-idx";
🛠️ 五、IndexService & SearchService 实现 🛠️
接口
public interface IIndexService
{
Task CreateIndexAsync<T>() where T : class;
Task DropIndexAsync<T>() where T : class;
Task UpsertAsync<T>(T doc) where T : class;
Task DeleteAsync<T>(string id) where T : class;
Task BulkInsertAsync<T>(IEnumerable<T> docs) where T : class;
}
public interface ISearchService
{
Task<SearchResult<T>> SearchAsync<T>(
string query, int skip = 0, int take = 20) where T : class;
Task<SearchResult<T>> SearchAsync<T>(
SearchDefinition def) where T : class;
}
RedisOmIndexService
public class RedisOmIndexService : IIndexService
{
private readonly RedisConnectionProvider _prov;
private readonly IDatabase _db;
private readonly string _prefix;
public RedisOmIndexService(
RedisConnectionProvider prov,
ConnectionMultiplexer mux)
{
_prov = prov;
_db = mux.GetDatabase();
_prefix = prov.Prefix; // 如 "tenant:1:"
}
public Task CreateIndexAsync<T>() where T : class
{
var baseIdx = _prov.RedisCollection<T>().IndexName;
var idxName = $"{_prefix}{baseIdx}";
// 使用 Redis.OM 默认 schema
return _db.ExecuteAsync("FT.CREATE",
idxName, "ON", "HASH",
"PREFIX", "1", $"{_prefix}{typeof(T).Name.ToLowerInvariant()}:",
"SCHEMA", /* ... schema args ... */);
}
public async Task DropIndexAsync<T>() where T : class
{
var idxName = $"{_prefix}{_prov.RedisCollection<T>().IndexName}";
var rl = (RedisResult[])await _db.ExecuteAsync("FT._LIST");
var list = rl.Select(r => (string)r).ToArray();
if (list.Contains(idxName))
await _db.ExecuteAsync("FT.DROPINDEX", idxName, "DD");
}
public Task UpsertAsync<T>(T doc) where T : class
=> _prov.RedisCollection<T>().InsertAsync(doc);
public Task DeleteAsync<T>(string id) where T : class
=> _prov.RedisCollection<T>().DeleteAsync(id);
public async Task BulkInsertAsync<T>(IEnumerable<T> docs) where T : class
{
// 限制并发,防止瞬时打垮 Redis
using var sem = new SemaphoreSlim(50);
var tasks = docs.Select(async d =>
{
await sem.WaitAsync();
try { await _prov.RedisCollection<T>().InsertAsync(d); }
finally { sem.Release(); }
});
await Task.WhenAll(tasks);
}
}
RedisOmSearchService
public class RedisOmSearchService : ISearchService
{
private readonly RedisConnectionProvider _prov;
public RedisOmSearchService(RedisConnectionProvider prov) => _prov = prov;
public async Task<SearchResult<T>> SearchAsync<T>(
string query, int skip = 0, int take = 20) where T : class
{
var col = _prov.RedisCollection<T>();
var res = await col.SearchAsync(
new SearchDefinition(query).Limit(skip, take)
);
return new SearchResult<T>
{
Items = res.Documents.Select(d => d.Object).ToList(),
Total = res.TotalResults
};
}
public async Task<SearchResult<T>> SearchAsync<T>(
SearchDefinition def) where T : class
{
var col = _prov.RedisCollection<T>();
var res = await col.SearchAsync(def);
return new SearchResult<T>
{
Items = res.Documents.Select(d => d.Object).ToList(),
Total = res.TotalResults
};
}
}
⚙️ 六、数据同步策略 🔄
6.1 EF Core 批量扩展
public static class IQueryableExtensions
{
public static async IAsyncEnumerable<List<T>> BatchAsync<T>(
this IQueryable<T> source, int size)
{
var total = await source.CountAsync();
for (int i = 0; i < total; i += size)
yield return await source.Skip(i).Take(size).ToListAsync();
}
}
6.2 实时新增/更新/删除
// 新增/更新
public class ProductChangedHandler
: ILocalEventHandler<EntityChangedEventData<Product>>
{
private readonly IIndexService _idx;
private readonly AsyncPolicy _retry = Policy
.Handle<Exception>()
.WaitAndRetryAsync(new[]
{
TimeSpan.FromMilliseconds(50),
TimeSpan.FromMilliseconds(100)
});
public ProductChangedHandler(IIndexService idx) => _idx = idx;
public async Task HandleEventAsync(EntityChangedEventData<Product> e)
{
var doc = new ProductIndex {
Id = e.Entity.Id.ToString(),
Name = e.Entity.Name,
Category = e.Entity.Category,
Price = e.Entity.Price,
Location = new GeoLoc(e.Entity.Lat, e.Entity.Lng)
};
await _retry.ExecuteAsync(() => _idx.UpsertAsync(doc));
}
}
// 删除
public class ProductDeletedHandler
: ILocalEventHandler<EntityDeletedEventData<Product>>
{
private readonly IIndexService _idx;
public ProductDeletedHandler(IIndexService idx) => _idx = idx;
public Task HandleEventAsync(EntityDeletedEventData<Product> e)
=> _idx.DeleteAsync<ProductIndex>(e.EntityId.ToString());
}
6.3 批量重建:RebuildIndexJob
public class RebuildIndexJob : IBackgroundJob
{
private readonly IRepository<Product, Guid> _repo;
private readonly IIndexService _idx;
public RebuildIndexJob(IRepository<Product, Guid> repo, IIndexService idx)
{
_repo = repo; _idx = idx;
}
public async Task ExecuteAsync()
{
await _idx.DropIndexAsync<ProductIndex>();
await _idx.CreateIndexAsync<ProductIndex>();
var q = _repo.WithDetails()
.Select(p => new ProductIndex {
Id = p.Id.ToString(),
Name = p.Name,
Category = p.Category,
Price = p.Price,
Location = new GeoLoc(p.Lat, p.Lng)
});
await foreach (var batch in q.BatchAsync(500))
await _idx.BulkInsertAsync(batch);
}
}
📄 七、复杂查询示例 🔍
// 1. 简单全文
var r1 = await _search.SearchAsync<ProductIndex>(
"\"wireless headphones\"", 0, 20);
// 2. Tag + Range + Geo + 排序
var def = new SearchDefinition()
.FilterByTag(nameof(ProductIndex.Category), "Audio")
.FilterByRange(nameof(ProductIndex.Price), 50, 200)
.FilterByGeo(nameof(ProductIndex.Location), lat, lng, 10)
.OrderByDescending(nameof(ProductIndex.Price))
.Limit(0, 20);
var r2 = await _search.SearchAsync<ProductIndex>(def);
// 3. Facet 聚合
var fdef = new SearchDefinition("headphones")
.AddFacet(nameof(ProductIndex.Category));
var agg = await _search.SearchAsync<ProductIndex>(fdef);
📊 八、性能对比测试示例脚本 📈
public async Task TestPerformanceAsync()
{
var db = new MyAppDbContext();
var sw = new Stopwatch();
var idx = _search;
sw.Start();
await db.Products
.Where(p => EF.Functions.Like(p.Name, "%headphones%"))
.ToListAsync();
Console.WriteLine($"SQL LIKE: {sw.ElapsedMilliseconds} ms");
sw.Restart();
await db.Products
.Where(p => EF.Functions.ToTsVector("english", p.Name)
.Matches(EF.Functions.PlainToTsQuery("english", "headphones")))
.ToListAsync();
Console.WriteLine($"PostgreSQL FTS: {sw.ElapsedMilliseconds} ms");
sw.Restart();
await idx.SearchAsync<ProductIndex>("headphones");
Console.WriteLine($"RediSearch: {sw.ElapsedMilliseconds} ms");
}
🚦 九、生产最佳实践 & 陷阱提示 ⚠️
持久化 & 安全
--appendonly yes
+ 挂载/data
;- ACL/
requirepass
+ 客户端配置密码;
多租户索引隔离
Prefix
仅对文档 Key 生效;手动拼接索引名:
var idxName = $"{prefix}product-idx";
异常 & 重试
- Polly 重试 +
CancellationToken
超时;
- Polly 重试 +
监控 & 告警
FT.SLOWLOG
、Redis slowlog;- RedisInsight/Prometheus Exporter;
安全扫描 & 质量
- Pre-commit:
dotnet-format
、StyleCop; - 依赖扫描:OWASP Dependency-Check;
- SAST:GitHub CodeQL/SonarQube;
- Pre-commit:
集成测试
- Testcontainers 启动 Redis Stack,覆盖 CRUD/Search;