MongoDB 事务管理:多文档操作如何保证 ACID?

发布于:2025-09-03 ⋅ 阅读:(20) ⋅ 点赞:(0)

引言:MongoDB 事务的重要性

在现代应用开发中,数据一致性是系统设计的核心挑战之一。MongoDB 作为领先的 NoSQL 数据库,从 4.0 版本开始支持多文档 ACID 事务,为开发者提供了处理复杂业务场景的强大工具。本文将深入探讨 MongoDB 事务的工作原理,并通过实际代码演示展示如何实现 ACID 特性。

第一章:MongoDB 事务基础

1.1 事务的基本概念

事务是数据库操作的逻辑单元,它必须满足 ACID 特性:

  • 原子性(Atomicity):事务中的所有操作要么全部成功,要么全部失败
  • 一致性(Consistency):事务将数据库从一个一致状态转变为另一个一致状态
  • 隔离性(Isolation):并发事务之间互不干扰
  • 持久性(Durability):事务一旦提交,其结果就是永久性的

1.2 MongoDB 事务的发展历程

  • MongoDB 4.0:支持副本集上的多文档事务
  • MongoDB 4.2:将事务支持扩展到分片集群
  • MongoDB 4.4+:优化事务性能,提高事务限制

第二章:MongoDB 事务 API 详解

2.1 基本事务操作

以下是使用 Node.js 驱动程序进行 MongoDB 事务的基本代码结构:

const { MongoClient } = require('mongodb');

async function runTransaction() {
    const uri = 'mongodb://localhost:27017';
    const client = new MongoClient(uri);
    
    try {
        await client.connect();
        
        // 1. 开始会话
        const session = client.startSession();
        
        try {
            // 2. 开始事务
            session.startTransaction({
                readConcern: { level: 'snapshot' },
                writeConcern: { w: 'majority' }
            });
            
            // 3. 事务内操作
            const db = client.db('bank');
            const accountsCollection = db.collection('accounts');
            
            // 转账操作:从账户A转100到账户B
            await accountsCollection.updateOne(
                { _id: 'A' },
                { $inc: { balance: -100 } },
                { session }
            );
            
            await accountsCollection.updateOne(
                { _id: 'B' },
                { $inc: { balance: 100 } },
                { session }
            );
            
            // 4. 提交事务
            await session.commitTransaction();
            console.log('Transaction committed successfully');
        } catch (error) {
            // 5. 发生错误时中止事务
            await session.abortTransaction();
            console.error('Transaction aborted:', error);
            throw error;
        } finally {
            // 6. 结束会话
            session.endSession();
        }
    } finally {
        await client.close();
    }
}

runTransaction().catch(console.error);

2.2 事务中的错误处理

正确处理事务中的错误对于保证数据一致性至关重要:

async function transferFunds(senderId, receiverId, amount) {
    const session = client.startSession();
    
    try {
        session.startTransaction({
            readConcern: { level: 'snapshot' },
            writeConcern: { w: 'majority' }
        });
        
        const db = client.db('bank');
        const accounts = db.collection('accounts');
        
        // 检查发送方余额是否充足
        const sender = await accounts.findOne(
            { _id: senderId },
            { session }
        );
        
        if (sender.balance < amount) {
            throw new Error('Insufficient funds');
        }
        
        // 执行转账
        await accounts.updateOne(
            { _id: senderId },
            { $inc: { balance: -amount } },
            { session }
        );
        
        await accounts.updateOne(
            { _id: receiverId },
            { $inc: { balance: amount } },
            { session }
        );
        
        await session.commitTransaction();
        return { success: true };
    } catch (error) {
        // 处理特定错误类型
        if (error.errorLabels && error.errorLabels.includes('TransientTransactionError')) {
            console.log('Transient error, retrying transaction...');
            return transferFunds(senderId, receiverId, amount);
        }
        
        await session.abortTransaction();
        return { success: false, error: error.message };
    } finally {
        session.endSession();
    }
}

第三章:实现 ACID 特性的代码实践

3.1 原子性(Atomicity)的实现

原子性确保事务中的所有操作要么全部成功,要么全部失败。以下代码演示了原子性的实现:

async function atomicOrderProcessing(userId, productId, quantity) {
    const session = client.startSession();
    
    try {
        session.startTransaction();
        
        const db = client.db('ecommerce');
        const users = db.collection('users');
        const products = db.collection('products');
        const orders = db.collection('orders');
        
        // 1. 检查库存
        const product = await products.findOne(
            { _id: productId },
            { session }
        );
        
        if (product.stock < quantity) {
            throw new Error('Insufficient stock');
        }
        
        // 2. 减少库存
        await products.updateOne(
            { _id: productId },
            { $inc: { stock: -quantity } },
            { session }
        );
        
        // 3. 创建订单
        const order = {
            userId,
            productId,
            quantity,
            date: new Date(),
            status: 'completed'
        };
        
        await orders.insertOne(order, { session });
        
        // 4. 更新用户订单历史
        await users.updateOne(
            { _id: userId },
            { $push: { orders: order._id } },
            { session }
        );
        
        await session.commitTransaction();
        return order;
    } catch (error) {
        await session.abortTransaction();
        console.error('Order processing failed:', error);
        throw error;
    } finally {
        session.endSession();
    }
}

3.2 一致性(Consistency)的实现

一致性确保事务将数据库从一个有效状态转变为另一个有效状态。以下代码展示了如何维护一致性:

async function consistentUserRegistration(userData) {
    const session = client.startSession();
    
    try {
        session.startTransaction({
            writeConcern: { w: 'majority' }
        });
        
        const db = client.db('app');
        const users = db.collection('users');
        const profiles = db.collection('profiles');
        const counters = db.collection('counters');
        
        // 验证邮箱唯一性
        const existingUser = await users.findOne(
            { email: userData.email },
            { session }
        );
        
        if (existingUser) {
            throw new Error('Email already exists');
        }
        
        // 使用计数器生成用户ID
        const counter = await counters.findOneAndUpdate(
            { _id: 'userId' },
            { $inc: { seq: 1 } },
            { 
                upsert: true,
                returnDocument: 'after',
                session 
            }
        );
        
        const userId = counter.value.seq;
        
        // 创建用户记录
        await users.insertOne({
            _id: userId,
            email: userData.email,
            password: userData.password,
            createdAt: new Date()
        }, { session });
        
        // 创建用户资料记录
        await profiles.insertOne({
            userId,
            name: userData.name,
            age: userData.age,
            preferences: {}
        }, { session });
        
        await session.commitTransaction();
        return userId;
    } catch (error) {
        await session.abortTransaction();
        console.error('User registration failed:', error);
        throw error;
    } finally {
        session.endSession();
    }
}

3.3 隔离性(Isolation)的实现

隔离性确保并发事务不会相互干扰。MongoDB 默认使用快照隔离级别:

async function isolatedInventoryCheck(productId, quantity) {
    const session = client.startSession();
    
    try {
        // 使用快照隔离级别
        session.startTransaction({
            readConcern: { level: 'snapshot' },
            writeConcern: { w: 'majority' }
        });
        
        const db = client.db('store');
        const products = db.collection('products');
        
        // 获取产品信息(基于事务开始时的快照)
        const product = await products.findOne(
            { _id: productId },
            { session }
        );
        
        if (product.stock < quantity) {
            throw new Error('Not enough stock');
        }
        
        // 预留库存
        await products.updateOne(
            { _id: productId },
            { $inc: { stock: -quantity, reserved: quantity } },
            { session }
        );
        
        await session.commitTransaction();
        return { success: true };
    } catch (error) {
        await session.abortTransaction();
        
        // 处理写冲突
        if (error.errorLabels && error.errorLabels.includes('TransientTransactionError')) {
            console.log('Write conflict detected, retrying...');
            return isolatedInventoryCheck(productId, quantity);
        }
        
        throw error;
    } finally {
        session.endSession();
    }
}

3.4 持久性(Durability)的实现

持久性确保一旦事务提交,其结果就是永久性的:

async function durablePaymentProcessing(paymentData) {
    const session = client.startSession();
    
    try {
        // 使用 majority write concern 确保数据持久性
        session.startTransaction({
            writeConcern: { w: 'majority', j: true }
        });
        
        const db = client.db('payments');
        const payments = db.collection('payments');
        const accounts = db.collection('accounts');
        const auditLog = db.collection('audit_log');
        
        // 记录支付
        const payment = {
            amount: paymentData.amount,
            from: paymentData.fromAccount,
            to: paymentData.toAccount,
            timestamp: new Date(),
            status: 'processing'
        };
        
        const result = await payments.insertOne(payment, { session });
        
        // 更新账户余额
        await accounts.updateOne(
            { _id: paymentData.fromAccount },
            { $inc: { balance: -paymentData.amount } },
            { session }
        );
        
        await accounts.updateOne(
            { _id: paymentData.toAccount },
            { $inc: { balance: paymentData.amount } },
            { session }
        );
        
        // 更新支付状态
        await payments.updateOne(
            { _id: result.insertedId },
            { $set: { status: 'completed' } },
            { session }
        );
        
        // 记录审计日志
        await auditLog.insertOne({
            event: 'payment_processed',
            paymentId: result.insertedId,
            timestamp: new Date()
        }, { session });
        
        await session.commitTransaction();
        return { success: true, paymentId: result.insertedId };
    } catch (error) {
        await session.abortTransaction();
        console.error('Payment processing failed:', error);
        throw error;
    } finally {
        session.endSession();
    }
}

第四章:高级事务模式

4.1 重试机制

由于网络问题或临时错误可能导致事务失败,实现自动重试机制很重要:

async function runWithRetry(txnFn, maxRetries = 3, delayMs = 100) {
    let attempt = 0;
    let lastError;
    
    while (attempt < maxRetries) {
        try {
            return await txnFn();
        } catch (error) {
            lastError = error;
            attempt++;
            
            if (error.errorLabels && 
                error.errorLabels.includes('TransientTransactionError')) {
                console.log(`Attempt ${attempt}: Transient error, retrying...`);
                await new Promise(resolve => setTimeout(resolve, delayMs * attempt));
            } else {
                break;
            }
        }
    }
    
    throw lastError;
}

// 使用示例
async function reliableTransfer(senderId, receiverId, amount) {
    return runWithRetry(async () => {
        return transferFunds(senderId, receiverId, amount);
    });
}

4.2 跨集合事务

MongoDB 事务可以跨多个集合操作:

async function crossCollectionOperation() {
    const session = client.startSession();
    
    try {
        session.startTransaction();
        
        const db = client.db('blog');
        const users = db.collection('users');
        const posts = db.collection('posts');
        const comments = db.collection('comments');
        
        // 创建新用户
        const user = {
            _id: new ObjectId(),
            name: 'John Doe',
            email: 'john@example.com',
            createdAt: new Date()
        };
        
        await users.insertOne(user, { session });
        
        // 创建第一篇博客文章
        const post = {
            _id: new ObjectId(),
            title: 'My First Post',
            content: 'Hello world!',
            authorId: user._id,
            createdAt: new Date()
        };
        
        await posts.insertOne(post, { session });
        
        // 添加评论
        const comment = {
            _id: new ObjectId(),
            postId: post._id,
            userId: user._id,
            text: 'Great first post!',
            createdAt: new Date()
        };
        
        await comments.insertOne(comment, { session });
        
        // 更新用户文档
        await users.updateOne(
            { _id: user._id },
            { 
                $push: { 
                    posts: post._id,
                    comments: comment._id 
                } 
            },
            { session }
        );
        
        await session.commitTransaction();
        return { user, post, comment };
    } catch (error) {
        await session.abortTransaction();
        console.error('Cross-collection operation failed:', error);
        throw error;
    } finally {
        session.endSession();
    }
}

4.3 分片集群事务

在分片集群上使用事务需要特别注意:

async function shardedClusterTransaction() {
    const session = client.startSession();
    
    try {
        // 分片集群事务需要特别的配置
        session.startTransaction({
            readConcern: { level: 'snapshot' },
            writeConcern: { w: 'majority' },
            readPreference: 'primary'
        });
        
        const db = client.db('sharded_db');
        const users = db.collection('users');  // 假设按 userId 分片
        const orders = db.collection('orders'); // 假设按 orderId 分片
        
        // 创建用户
        const userId = new ObjectId();
        await users.insertOne({
            _id: userId,
            name: 'Alice',
            email: 'alice@example.com'
        }, { session });
        
        // 创建订单
        const orderId = new ObjectId();
        await orders.insertOne({
            _id: orderId,
            userId,
            items: ['item1', 'item2'],
            total: 100,
            status: 'pending'
        }, { session });
        
        // 更新用户订单历史
        await users.updateOne(
            { _id: userId },
            { $push: { orders: orderId } },
            { session }
        );
        
        await session.commitTransaction();
        return { userId, orderId };
    } catch (error) {
        await session.abortTransaction();
        console.error('Sharded transaction failed:', error);
        
        // 处理分片特定错误
        if (error.code === 263 && error.codeName === 'ShardKeyNotFound') {
            console.error('Shard key not provided in document');
        }
        
        throw error;
    } finally {
        session.endSession();
    }
}

第五章:事务性能优化

5.1 批量操作

在事务中使用批量操作可以提高性能:

async function bulkOperationsInTransaction() {
    const session = client.startSession();
    
    try {
        session.startTransaction();
        
        const db = client.db('inventory');
        const products = db.collection('products');
        
        // 批量更新多个产品
        const bulkUpdates = [
            {
                updateOne: {
                    filter: { _id: 'prod1' },
                    update: { $inc: { stock: -5 } }
                }
            },
            {
                updateOne: {
                    filter: { _id: 'prod2' },
                    update: { $inc: { stock: -3 } }
                }
            },
            {
                updateOne: {
                    filter: { _id: 'prod3' },
                    update: { $inc: { stock: -2 } }
                }
            }
        ];
        
        await products.bulkWrite(bulkUpdates, { session });
        
        await session.commitTransaction();
        return { success: true };
    } catch (error) {
        await session.abortTransaction();
        console.error('Bulk operations failed:', error);
        throw error;
    } finally {
        session.endSession();
    }
}

5.2 事务超时控制

长时间运行的事务会影响性能,应该设置超时:

async function timedTransaction() {
    const session = client.startSession();
    
    try {
        // 设置事务超时时间为5秒
        const transactionOptions = {
            maxCommitTimeMS: 5000,
            readConcern: { level: 'snapshot' },
            writeConcern: { w: 'majority' }
        };
        
        session.startTransaction(transactionOptions);
        
        const db = client.db('timed_db');
        const collection = db.collection('data');
        
        // 执行一些操作
        await collection.insertOne({ value: 1 }, { session });
        await collection.insertOne({ value: 2 }, { session });
        
        // 模拟长时间运行的操作
        await new Promise(resolve => setTimeout(resolve, 6000));
        
        await collection.insertOne({ value: 3 }, { session });
        
        await session.commitTransaction();
        return { success: true };
    } catch (error) {
        await session.abortTransaction();
        
        if (error.code === 50 && error.codeName === 'MaxTimeMSExpired') {
            console.error('Transaction timed out');
        } else {
            console.error('Transaction failed:', error);
        }
        
        throw error;
    } finally {
        session.endSession();
    }
}

5.3 只读事务

对于需要一致读取但不修改数据的场景,可以使用只读事务:

async function readOnlyTransaction() {
    const session = client.startSession();
    
    try {
        // 只读事务配置
        session.startTransaction({
            readConcern: { level: 'snapshot' },
            readPreference: 'primary'
        });
        
        const db = client.db('reporting');
        const orders = db.collection('orders');
        const customers = db.collection('customers');
        
        // 获取订单数据
        const recentOrders = await orders.find(
            { date: { $gte: new Date('2023-01-01') } },
            { session }
        ).toArray();
        
        // 获取相关客户信息
        const customerIds = recentOrders.map(o => o.customerId);
        const orderCustomers = await customers.find(
            { _id: { $in: customerIds } },
            { session }
        ).toArray();
        
        // 生成报告数据
        const reportData = recentOrders.map(order => {
            const customer = orderCustomers.find(c => c._id.equals(order.customerId));
            return {
                orderId: order._id,
                amount: order.amount,
                customerName: customer ? customer.name : 'Unknown'
            };
        });
        
        // 只读事务不需要提交
        await session.abortTransaction();
        
        return reportData;
    } catch (error) {
        await session.abortTransaction();
        console.error('Read-only transaction failed:', error);
        throw error;
    } finally {
        session.endSession();
    }
}

第六章:事务监控与调试

6.1 监控活动事务

async function monitorActiveTransactions() {
    const adminDb = client.db('admin');
    
    // 获取当前活动事务
    const result = await adminDb.command({
        currentOp: true,
        $or: [
            { op: 'command', 'command.startTransaction': { $exists: true } },
            { op: 'command', 'command.commitTransaction': { $exists: true } },
            { op: 'command', 'command.abortTransaction': { $exists: true } }
        ]
    });
    
    return result.inprog;
}

6.2 事务性能分析

async function analyzeTransactionPerformance() {
    const session = client.startSession();
    
    try {
        // 启用分析
        await client.db('admin').command({
            profile: 2,
            slowms: 100
        });
        
        session.startTransaction();
        
        const db = client.db('perf_test');
        const collection = db.collection('test_data');
        
        // 执行一些操作
        await collection.insertOne({ value: 'test1' }, { session });
        await collection.insertOne({ value: 'test2' }, { session });
        await collection.updateOne(
            { value: 'test1' },
            { $set: { updated: true } },
            { session }
        );
        
        await session.commitTransaction();
        
        // 获取分析数据
        const profileData = await db.collection('system.profile').find().toArray();
        return profileData;
    } catch (error) {
        await session.abortTransaction();
        throw error;
    } finally {
        session.endSession();
        
        // 关闭分析
        await client.db('admin').command({
            profile: 0
        });
    }
}

第七章:事务最佳实践

7.1 设计原则

  1. 尽量缩短事务持续时间
  2. 减少事务中的操作数量
  3. 避免在事务中执行耗时操作
  4. 合理设置事务超时
  5. 实现健壮的重试机制

7.2 常见陷阱与解决方案

陷阱1:事务过大

// 不好的做法:事务中包含太多操作
async function badLargeTransaction() {
    const session = client.startSession();
    session.startTransaction();
    
    try {
        for (let i = 0; i < 1000; i++) {
            await collection.insertOne({ value: i }, { session });
        }
        
        await session.commitTransaction();
    } catch (error) {
        await session.abortTransaction();
        throw error;
    } finally {
        session.endSession();
    }
}

// 好的做法:分批处理
async function goodBatchProcessing() {
    const batchSize = 100;
    const total = 1000;
    
    for (let i = 0; i < total; i += batchSize) {
        const session = client.startSession();
        
        try {
            session.startTransaction();
            
            for (let j = i; j < i + batchSize && j < total; j++) {
                await collection.insertOne({ value: j }, { session });
            }
            
            await session.commitTransaction();
        } catch (error) {
            await session.abortTransaction();
            throw error;
        } finally {
            session.endSession();
        }
    }
}

陷阱2:忽略错误处理

// 不好的做法:忽略错误处理
async function badErrorHandling() {
    const session = client.startSession();
    session.startTransaction();
    
    await collection.insertOne({ value: 1 }, { session });
    await collection.insertOne({ value: 2 }, { session });
    
    await session.commitTransaction();
    session.endSession();
}

// 好的做法:完整的错误处理
async function goodErrorHandling() {
    const session = client.startSession();
    
    try {
        session.startTransaction();
        
        await collection.insertOne({ value: 1 }, { session });
        await collection.insertOne({ value: 2 }, { session });
        
        await session.commitTransaction();
    } catch (error) {
        console.error('Transaction failed:', error);
        
        try {
            await session.abortTransaction();
        } catch (abortError) {
            console.error('Failed to abort transaction:', abortError);
        }
        
        throw error;
    } finally {
        try {
            session.endSession();
        } catch (endError) {
            console.error('Failed to end session:', endError);
        }
    }
}

第八章:实际应用案例

8.1 电商订单处理系统

async function processOrder(orderData) {
    const session = client.startSession();
    
    try {
        session.startTransaction({
            readConcern: { level: 'snapshot' },
            writeConcern: { w: 'majority' }
        });
        
        const db = client.db('ecommerce');
        const orders = db.collection('orders');
        const products = db.collection('products');
        const users = db.collection('users');
        const inventory = db.collection('inventory');
        
        // 1. 验证库存
        for (const item of orderData.items) {
            const product = await products.findOne(
                { _id: item.productId },
                { session }
            );
            
            if (!product) {
                throw new Error(`Product ${item.productId} not found`);
            }
            
            const stock = await inventory.findOne(
                { productId: item.productId },
                { session }
            );
            
            if (!stock || stock.quantity < item.quantity) {
                throw new Error(`Insufficient stock for product ${item.productId}`);
            }
        }
        
        // 2. 扣减库存
        for (const item of orderData.items) {
            await inventory.updateOne(
                { productId: item.productId },
                { $inc: { quantity: -item.quantity } },
                { session }
            );
        }
        
        // 3. 创建订单
        const order = {
            _id: new ObjectId(),
            userId: orderData.userId,
            items: orderData.items,
            total: orderData.total,
            status: 'pending',
            createdAt: new Date()
        };
        
        await orders.insertOne(order, { session });
        
        // 4. 更新用户订单历史
        await users.updateOne(
            { _id: orderData.userId },
            { $push: { orders: order._id } },
            { session }
        );
        
        // 5. 记录审计日志
        await db.collection('audit_log').insertOne({
            type: 'order_created',
            orderId: order._id,
            timestamp: new Date()
        }, { session });
        
        await session.commitTransaction();
        return order;
    } catch (error) {
        await session.abortTransaction();
        
        if (error.errorLabels && error.errorLabels.includes('TransientTransactionError')) {
            console.log('Transient error, retrying...');
            return processOrder(orderData);
        }
        
        throw error;
    } finally {
        session.endSession();
    }
}

8.2 银行转账系统

async function transferMoney(fromAccount, toAccount, amount) {
    const session = client.startSession();
    
    try {
        session.startTransaction({
            readConcern: { level: 'snapshot' },
            writeConcern: { w: 'majority' }
        });
        
        const db = client.db('banking');
        const accounts = db.collection('accounts');
        const transactions = db.collection('transactions');
        
        // 1. 检查账户是否存在
        const from = await accounts.findOne(
            { _id: fromAccount },
            { session }
        );
        
        const to = await accounts.findOne(
            { _id: toAccount },
            { session }
        );
        
        if (!from || !to) {
            throw new Error('One or both accounts not found');
        }
        
        // 2. 检查余额是否充足
        if (from.balance < amount) {
            throw new Error('Insufficient funds');
        }
        
        // 3. 执行转账
        await accounts.updateOne(
            { _id: fromAccount },
            { $inc: { balance: -amount } },
            { session }
        );
        
        await accounts.updateOne(
            { _id: toAccount },
            { $inc: { balance: amount } },
            { session }
        );
        
        // 4. 记录交易
        const transaction = {
            _id: new ObjectId(),
            fromAccount,
            toAccount,
            amount,
            timestamp: new Date(),
            status: 'completed'
        };
        
        await transactions.insertOne(transaction, { session });
        
        // 5. 更新账户交易历史
        await accounts.updateOne(
            { _id: fromAccount },
            { $push: { transactions: transaction._id } },
            { session }
        );
        
        await accounts.updateOne(
            { _id: toAccount },
            { $push: { transactions: transaction._id } },
            { session }
        );
        
        await session.commitTransaction();
        return transaction;
    } catch (error) {
        await session.abortTransaction();
        
        if (error.errorLabels && error.errorLabels.includes('TransientTransactionError')) {
            console.log('Transient error, retrying...');
            return transferMoney(fromAccount, toAccount, amount);
        }
        
        throw error;
    } finally {
        session.endSession();
    }
}

第九章:MongoDB 事务的限制与替代方案

9.1 事务限制

  1. 操作限制:
    • 不能创建或删除集合
    • 不能创建或删除索引
    • 不能操作 system.* 集合
  2. 性能限制:
    • 事务持续时间不应过长(建议 < 1秒)
    • 事务中操作数量不应过多
    • 分片集群事务性能开销较大
  3. 大小限制:
    • 单个事务的修改不能超过 16MB
    • 事务中操作的总执行时间不能超过 transactionLifetimeLimitSeconds(默认60秒)

9.2 替代方案

当不适合使用事务时,可以考虑以下模式:

  1. 嵌入式文档:

// 将相关数据嵌入单个文档
{
_id: “order123”,
user: {
id: “user1”,
name: “John Doe”
},
items: [
{ productId: “prod1”, quantity: 2 },
{ productId: “prod2”, quantity: 1 }
],
total: 100
}

2. 两阶段提交模式:
    ```javascript
async function twoPhaseCommit() {
    // 阶段1:准备
    await collection.updateOne(
        { _id: "task1" },
        { $set: { state: "pending", prepared: true } }
    );

    await collection.updateOne(
        { _id: "task2" },
        { $set: { state: "pending", prepared: true } }
    );

    // 阶段2:提交
    await collection.updateOne(
        { _id: "task1" },
        { $set: { state: "completed", prepared: false } }
    );

    await collection.updateOne(
        { _id: "task2" },
        { $set: { state: "completed", prepared: false } }
    );
}
  1. 补偿事务模式:

async function compensatingTransaction() {
try {
// 执行主操作
await collection.insertOne({ value: 1 });
await collection.insertOne({ value: 2 });
} catch (error) {
// 执行补偿操作
await collection.deleteOne({ value: 1 });
await collection.deleteOne({ value: 2 });
throw error;
}
}

## 第十章:未来发展与总结
### 10.1 MongoDB 事务的未来
1. 性能优化:持续减少事务开销
2. 功能扩展:支持更多操作类型
3. 分布式增强:改进跨分片事务性能
4. 与聚合框架集成:支持事务中的复杂聚合操作
### 10.2 总结
MongoDB 的多文档事务为开发者提供了强大的数据一致性保障。通过本文的详细代码示例,我们展示了如何:
- 正确使用 MongoDB 事务 API
- 实现 ACID 特性
- 处理错误和重试
- 优化事务性能
- 避免常见陷阱
记住以下关键点:
1. 总是正确处理事务错误和会话生命周期
2. 为生产环境配置适当的 readConcern 和 writeConcern
3. 实现健壮的重试机制处理临时错误
4. 优先考虑单文档原子性设计,只在必要时使用多文档事务
通过合理应用这些技术,您可以在 MongoDB 中构建既灵活又可靠的应用系统。

网站公告

今日签到

点亮在社区的每一天
去签到