传统查询
在传统的 MongoDB 查询中,我们通常使用find方法:
List<Document> results = mongoTemplate.find(query, Document.class, "collection");
这种方式会直接将查询结果全部加载到内存中,当数据量较大(如百万级文档)时,会导致严重的内存问题甚至 OOM。
所以会考虑利用skip
考虑分页。
skip分页查询
使用skip和limit,来分页处理数据:
int pageSize = 1000;
for (int page = 0; ; page++) {
Query query = new Query()
.skip(page * pageSize)
.limit(pageSize);
List<Document> results = mongoTemplate.find(query, Document.class, "large_collection");
if (results.isEmpty()) {
break;
}
// 处理当前页数据
processResults(results);
}
这种方法在数据量几万条左右可能工作良好,但当数据量达到百万级时,还是不行:
MongoDB 的skip操作是通过遍历并丢弃前面的文档来实现的。
例如:
skip(10000).limit(100) 需要先遍历 10000 条文档,然后只返回后面的 100 条
当数据量达到百万级时,skip(500000) 意味着要遍历 50 万条文档,即使使用索引也会非常缓慢。
实现基于 ID 的分页查询
id字段有索引,将其作为条件分批次查询,每次记录最后一个id:
public void processAllDocuments(String collectionName, int batchSize) {
String lastId = null;
int totalProcessed = 0;
while (true) {
// 创建查询条件:ID > lastId
Query query = new Query();
if (lastId != null) {
query.addCriteria(Criteria.where("_id").gt(lastId));
}
// 按ID升序排序,并限制批处理大小
query.with(Sort.by(Sort.Direction.ASC, "_id"));
query.limit(batchSize);
// 执行查询
List<Document> documents = mongoTemplate.find(query, Document.class, collectionName);
// 处理当前批次的数据
if (documents.isEmpty()) {
break;
} else {
processBatch(documents);
totalProcessed += documents.size();
// 更新lastId为当前批次的最后一个文档ID
Document lastDocument = documents.get(documents.size() - 1);
lastId = lastDocument.getObjectId("_id").toString();
}
}
}
这种方式已经很不错了,但是需要我们精确控制lastId。如果在操作时动态更改了数据,可能会造成数据遗漏。
mongoTemplate的stream方法
使用stream方法,可以按需逐批从数据库获取数据,每次只在内存中处理少量数据,适用于大数据量的读取和处理场景:
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.cursor.MongoCursor;
// 假设已注入MongoTemplate
@Autowired
private MongoTemplate mongoTemplate;
public void processLargeData() {
// 定义查询条件
Query query = new Query(Criteria.where("status").is("active"))
.sort(Sort.by(Sort.Direction.ASC, "createTime"));
MongoCursor<Document> cursor = null;
try {
// 执行查询获取游标
cursor = mongoTemplate.stream(query, Document.class, "collection");
int count = 0;
while (cursor.hasNext()) {
Document doc = cursor.next();
// 处理单个文档
processDocument(doc);
count++;
if (count % 100 == 0) {
System.out.println("已处理 " + count + " 条记录");
}
}
System.out.println("总共处理 " + count + " 条记录");
} catch (Exception e) {
// 处理可能的异常
log.error("数据处理失败", e);
} finally {
// 确保游标资源被关闭
if (cursor != null) {
cursor.close();
}
}
}
原理:MongoDB 中 Cursor 的批处理机制与性能优化
什么是 MongoDB Cursor 的批处理?
当你执行一个查询时,MongoDB 不会一次性返回所有匹配的文档,而是返回一个Cursor(游标)。Cursor 是一个指向查询结果集的指针,它采用 分批(Batch) 的方式从服务器获取数据。步骤如下:
- 客户端(应用程序)向 MongoDB 服务器发送查询请求
- 服务器返回一个 Cursor ID 和第一批数据(默认 101 条记录或 1MB,取较小值)
- 客户端通过Cursor 逐个处理这些数据(调用.next())
- 当客户端处理完这批数据后,再通过 Cursor 请求下一批数据
- 重复步骤 3-4,直到处理完所有数据或关闭 Cursor
这种机制的核心优势是避免一次性传输大量数据,从而减少内存占用和网络开销。
在 MongoDB 中,默认的批处理大小是由客户端驱动决定的。对于 Java 驱动(Spring Data MongoDB 基于此),默认批大小规则如下:
- 第一批数据:默认返回 101 条记录(或直到达到 1MB 大小限制,以较小者为准)
- 后续批次:默认返回 4MB 大小的数据(或该批次的所有数据,以较小者为准)
这个默认值(101)是一个经过权衡的选择:
- 足够小,避免一次性加载过多数据
- 足够大,减少客户端与服务器之间的往返次数
- 对于大多数应用场景,101 条记录是一个合理的初始批次大小
可以通过Query对象调整批大小:
Query query = new Query().batchSize(500); // 自定义批大小为500
try (MongoCursor<Document> cursor = mongoTemplate.stream(query, Document.class, "collection")) {
// 处理数据
}
为什么少量分批反而更快?
这个问题的核心在于理解数据库查询的性能瓶颈。对于大数据集,性能瓶颈通常不是单次查询的速度,而是:
- 网络传输开销:一次性传输大量数据会占用更多网络带宽,导致延迟增加
- 内存占用:一次性加载大量数据到内存会导致频繁 GC,甚至 OOM
- 服务器负载:数据库服务器需要一次性准备和传输大量数据,增加 CPU 和内存压力
调用.next () 方法时发生了什么?
当调用cursor.next()时,实际发生的流程如下:
- 检查当前批次:查看 Cursor 中是否还有未处理的文档
- 如果有:直接返回下一个文档,内存和网络无额外开销
- 如果没有:
- 自动向服务器发送请求,获取下一批数据
- 服务器返回下一批数据(默认 4MB 或剩余全部数据,取较小值)
- 将新批次数据加载到客户端内存
- 返回新批次的第一个文档
这个过程对开发者是透明的,我们只需要调用.next(),Cursor 会自动管理批处理和数据加载
总结
综上所述,当数据量很大时,可以考虑使用id分页或者stream流式处理。
特性 | 基于 ID 分页查询 | 流式处理(MongoTemplate.stream) |
---|---|---|
核心原理 | 按 ID 范围分批查询,每次查询ID > lastId | 使用数据库游标(Cursor)逐批获取数据 |
内存占用 | 每批数据加载到内存,处理后释放 | 每次仅加载当前批次数据,内存占用更低 |
性能表现 | 深层分页性能稳定,不受页码影响 | 全程性能稳定,略优于 ID 分页(减少查询次数) |
实现复杂度 | 需要手动管理 lastId 和分页逻辑 | 简单,自动管理游标生命周期 |
数据一致性 | 适合静态数据集,动态插入可能导致漏读 | 适合实时数据集,一次性遍历不中断 |
适用场景 | 分页展示、分批处理、断点续传任务 | 连续流式处理、大数据分析、实时数据处理 |
网络 IO 模式 | 主动请求下一页数据 | 自动请求下一批数据(管道化执行) |
批大小控制 | 通过limit()手动设置 | 通过batchSize()设置(默认 101/4MB) |