C++ 并发编程指南 并发设计模式:Actor vs. CSP (生活场景版)

发布于:2025-09-07 ⋅ 阅读:(15) ⋅ 点赞:(0)

并发设计模式:Actor vs. CSP (生活场景版)

想象一下,你正在规划一个大型活动的后勤工作。传统的“共享内存+锁”模式就像是一个共享的白板:所有工作人员都在上面读写任务,但必须轮流使用(加锁),经常造成拥堵(性能差),并且一个工作人员的失误可能擦掉别人的数据(状态混乱,耦合度高)。

Actor 和 CSP 提供了更优雅的解决方案。


1. Actor 模式:就像公司里的邮件系统

核心思想: “人(Actor)与人之间直接发邮件(消息)”

生活场景模拟:
假设你的活动有三个核心部门:

  • 前台 (Front Desk Actor):负责接收外界送来的物资,并通知相应部门来取。
  • 厨房 (Kitchen Actor):负责处理食材,制作餐食。
  • 舞台 (Stage Actor):负责管理音响、灯光设备。

工作流程 (Actor 模式):

  1. 隔离与邮箱: 每个部门(Actor)都在自己的办公室(独立线程/进程)里工作,互不干扰。每个部门门口都有一个专属邮箱(Mailbox)
  2. 异步通信: 厨房需要面粉了,他们不会直接跑去仓库翻找(共享内存)。而是给前台发一封邮件(Send("前台", NeedFlourMsg)):“请送10袋面粉来”,然后继续做自己的事(异步)。
  3. 顺序处理: 前台的员工一次只从自己的邮箱里取出一封邮件进行处理。比如他先处理舞台的“需要新麦克风”请求,处理完后,再处理厨房的“需要面粉”请求。他一次只做一件事,所以绝不会搞混(无需锁,内部状态安全)。
  4. 直接寻址: 厨房知道这封邮件是发给“前台”的,前台收到邮件后也知道它来自“厨房”。就像邮件有明确的发件人和收件人。

Actor 模式的特点:

  • 重点在“人”(Actor):系统由多个独立的 Actor 组成。
  • 直接通信:知道消息发给谁,也知道消息来自谁。
  • 优点:职责清晰,部门间完全隔离,一个部门崩溃不会直接影响另一个(容错性强)。

现实中的例子: 公司的邮件系统、客服工单系统(每个工单分配给特定客服处理)。


2. CSP 模式:就像快递站的储物格

核心思想: “人把东西放到公共格子(Channel),别人再从格子里取”

生活场景模拟:
同样是那个活动,现在我们换一种后勤模式。我们设置一个中央快递站,站里有一排带编号的储物格(Channel)。

  • 格子的类型:
    • 小件格 (无缓冲 Channel):只能放一件物品。如果格子里有东西,快递员必须等别人取走才能放新的(直接阻塞等待)。
    • 货架格 (有缓冲 Channel):比如 10 号货架可以放最多 10 箱水。放满后,送货员必须等待;取空后,取货员也必须等待。

工作流程 (CSP 模式):

  1. 关注通道: 工作人员不关心是谁把东西放进格子,也不关心最终是谁取走的。他们只认格子(Channel)
  2. 解耦通信: 农夫送来 10 箱矿泉水。他不需要知道活动方是谁,他只需根据指示,把水放到“10号货架”(10号货架 <- 10箱水)。
  3. 匿名处理: 活动现场的志愿者渴了,他也不知道水是谁送的,他只需走到“10号货架”取一箱水(水 <- 10号货架)。
  4. 同步协调: 如果志愿者来取水时货架是空的,他就会在那里等着,直到有农夫送来水(同步等待)。反之,如果货架满了,农夫也会等着。

CSP 模式的特点:

  • 重点在“格子”(Channel):Channel 是第一公民,是通信的核心。
  • 间接通信:发送者和接收者不知道对方的存在,完全通过 Channel 解耦。
  • 优点:灵活性极高,可以轻松组合出复杂的数据流管道(如:格A -> 工作人员1 -> 格B -> 工作人员2 -> 格C)。

现实中的例子: 工厂的流水线、餐厅的传菜窗口(厨师做好菜放到窗口,服务员从窗口端走,他们不需要直接交流)。


总结与对比:如何选择?

特性 Actor 模式 (邮件系统) CSP 模式 (快递站格子)
通信对象 明确的参与者 (Actor) 抽象的通道 (Channel)
耦合关系 发送方和接收方互相知晓 发送方和接收方互相不知
关注点 (Who) 来处理消息 在哪里 (Where) 交换消息
同步方式 消息发送通常是异步 通过 Channel 进行同步(等待)
生活比喻 部门邮件往来 快递站、流水线、传菜窗口
典型语言 Erlang, Akka (Java/Scala) Go (原生支持), Clojure

选择建议:

  • 如果你的系统由许多独立的、有状态的、需要明确身份的实体组成(比如游戏中的每个玩家、电商系统中的每个订单),Actor 模式更直观。
  • 如果你需要构建一个数据流管道,处理步骤清晰,且希望最大程度解耦生产者和消费者(比如数据处理管道、网络爬虫的各个阶段),CSP 模式更灵活。

它们的共同伟大之处在于实现了那句名言:

“不要通过共享内存来通信;而要通过通信来共享内存。”
(Do not communicate by sharing memory; instead, share memory by communicating.)*

这意味着我们不再需要小心翼翼地守护一个“共享白板”,而是通过“邮件”或“快递”这种更自然、更安全的方式来协作,从而大大降低了并发编程的复杂性。

C++ 实现 Actor 与 CSP 模式的生活化示例

下面我将使用 C++ 实现餐厅点餐场景的 Actor 和 CSP 模式示例。

场景描述

在一个餐厅中,有顾客、服务员和厨师三种角色。顾客下单,服务员接收订单并传递给厨师,厨师烹饪完成后通知服务员上菜。

1. CSP 模式实现 (使用 Channel)

首先,我们需要实现一个简单的 Channel 类来模拟 Go 语言的 Channel:

#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <string>

// 简单的 Channel 实现
template <typename T>
class Channel {
private:
    std::queue<T> queue_;
    std::mutex mtx_;
    std::condition_variable cv_producer_;
    std::condition_variable cv_consumer_;
    size_t capacity_;
    bool closed_ = false;

public:
    Channel(size_t capacity = 0) : capacity_(capacity) {}

    bool send(T value) {
        std::unique_lock<std::mutex> lock(mtx_);
        cv_producer_.wait(lock, [this]() {
            return (capacity_ == 0 && queue_.empty()) || 
                   queue_.size() < capacity_ || 
                   closed_;
        });
        
        if (closed_) {
            return false;
        }
        
        queue_.push(value);
        cv_consumer_.notify_one();
        return true;
    }

    bool receive(T& value) {
        std::unique_lock<std::mutex> lock(mtx_);
        cv_consumer_.wait(lock, [this]() { 
            return !queue_.empty() || closed_; 
        });
        
        if (closed_ && queue_.empty()) {
            return false;
        }
        
        value = queue_.front();
        queue_.pop();
        cv_producer_.notify_one();
        return true;
    }

    void close() {
        std::unique_lock<std::mutex> lock(mtx_);
        closed_ = true;
        cv_producer_.notify_all();
        cv_consumer_.notify_all();
    }
};

// 定义消息类型
struct Order {
    int customerId;
    std::string dish;
};

struct CookedDish {
    int customerId;
    std::string dish;
};

// 厨师函数
void chef(Channel<Order>& orderChannel, Channel<CookedDish>& cookedChannel) {
    Order order;
    while (orderChannel.receive(order)) {
        std::cout << "厨师收到订单: 顾客 " << order.customerId << " 点了 " << order.dish << std::endl;
        
        // 模拟烹饪时间
        std::this_thread::sleep_for(std::chrono::seconds(2));
        
        CookedDish cookedDish{order.customerId, order.dish};
        std::cout << "厨师完成烹饪: " << cookedDish.dish << std::endl;
        cookedChannel.send(cookedDish);
    }
    std::cout << "厨师结束工作" << std::endl;
}

// 服务员函数
void waiter(Channel<Order>& customerToWaiter, Channel<Order>& waiterToChef, 
            Channel<CookedDish>& chefToWaiter, Channel<CookedDish>& waiterToCustomer) {
    while (true) {
        // 从顾客接收订单
        Order order;
        if (!customerToWaiter.receive(order)) {
            break;
        }
        std::cout << "服务员接收订单: 顾客 " << order.customerId << " 点了 " << order.dish << std::endl;
        
        // 将订单发送给厨师
        waiterToChef.send(order);
        
        // 从厨师接收烹饪完成的菜品
        CookedDish cookedDish;
        if (!chefToWaiter.receive(cookedDish)) {
            break;
        }
        std::cout << "服务员取菜: " << cookedDish.dish << std::endl;
        
        // 将菜品发送给顾客
        waiterToCustomer.send(cookedDish);
    }
    std::cout << "服务员结束工作" << std::endl;
}

// 顾客函数
void customer(int id, Channel<Order>& waiterChannel, Channel<CookedDish>& customerChannel) {
    std::string dishes[] = {"披萨", "意面", "沙拉", "牛排"};
    std::string dish = dishes[id % 4];
    
    // 下单
    Order order{id, dish};
    std::cout << "顾客 " << id << " 下单: " << dish << std::endl;
    waiterChannel.send(order);
    
    // 等待菜品
    CookedDish cookedDish;
    if (customerChannel.receive(cookedDish)) {
        std::cout << "顾客 " << id << " 收到: " << cookedDish.dish << std::endl;
    }
}

int main() {
    // 创建各种Channel
    Channel<Order> customerToWaiter(5);    // 顾客到服务员的订单通道
    Channel<Order> waiterToChef(5);        // 服务员到厨师的订单通道
    Channel<CookedDish> chefToWaiter(5);   // 厨师到服务员的菜品通道
    Channel<CookedDish> waiterToCustomer(5); // 服务员到顾客的菜品通道
    
    // 启动厨师线程
    std::thread chefThread(chef, std::ref(waiterToChef), std::ref(chefToWaiter));
    
    // 启动服务员线程
    std::thread waiterThread(waiter, 
        std::ref(customerToWaiter), std::ref(waiterToChef),
        std::ref(chefToWaiter), std::ref(waiterToCustomer));
    
    // 模拟多个顾客
    std::thread customers[4];
    for (int i = 0; i < 4; i++) {
        customers[i] = std::thread(customer, i, 
            std::ref(customerToWaiter), std::ref(waiterToCustomer));
    }
    
    // 等待所有顾客完成
    for (int i = 0; i < 4; i++) {
        customers[i].join();
    }
    
    // 关闭通道并等待线程结束
    customerToWaiter.close();
    waiterToChef.close();
    chefToWaiter.close();
    waiterToCustomer.close();
    
    waiterThread.join();
    chefThread.join();
    
    std::cout << "餐厅打烊" << std::endl;
    return 0;
}

2. Actor 模式实现

接下来,我们使用 Actor 模式实现相同的餐厅场景:

#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <string>
#include <functional>
#include <memory>
#include <unordered_map>

// 消息基类
struct Message {
    virtual ~Message() = default;
};

// 订单消息
struct OrderMessage : Message {
    int customerId;
    std::string dish;
    
    OrderMessage(int id, const std::string& d) : customerId(id), dish(d) {}
};

// 完成烹饪消息
struct CookedMessage : Message {
    int customerId;
    std::string dish;
    
    CookedMessage(int id, const std::string& d) : customerId(id), dish(d) {}
};

// 简单的Actor基类
class Actor {
protected:
    std::queue<std::unique_ptr<Message>> mailbox_;
    std::mutex mailboxMutex_;
    std::condition_variable mailboxCV_;
    bool running_ = false;
    std::thread thread_;
    
public:
    virtual ~Actor() {
        stop();
    }
    
    void start() {
        running_ = true;
        thread_ = std::thread(&Actor::run, this);
    }
    
    void stop() {
        running_ = false;
        mailboxCV_.notify_all();
        if (thread_.joinable()) {
            thread_.join();
        }
    }
    
    void send(std::unique_ptr<Message> message) {
        std::lock_guard<std::mutex> lock(mailboxMutex_);
        mailbox_.push(std::move(message));
        mailboxCV_.notify_one();
    }
    
    void run() {
        while (running_) {
            std::unique_ptr<Message> message;
            {
                std::unique_lock<std::mutex> lock(mailboxMutex_);
                mailboxCV_.wait(lock, [this]() { 
                    return !mailbox_.empty() || !running_; 
                });
                
                if (!running_ && mailbox_.empty()) {
                    break;
                }
                
                if (!mailbox_.empty()) {
                    message = std::move(mailbox_.front());
                    mailbox_.pop();
                }
            }
            
            if (message) {
                processMessage(*message);
            }
        }
    }
    
    virtual void processMessage(Message& message) = 0;
};

// 顾客Actor
class CustomerActor : public Actor {
private:
    int id_;
    std::string dish_;
    Actor* waiter_;
    
public:
    CustomerActor(int id, const std::string& dish, Actor* waiter) 
        : id_(id), dish_(dish), waiter_(waiter) {}
    
    void processMessage(Message& message) override {
        if (auto cookedMsg = dynamic_cast<CookedMessage*>(&message)) {
            std::cout << "顾客 " << id_ << " 收到: " << cookedMsg->dish << std::endl;
        }
    }
    
    void placeOrder() {
        std::cout << "顾客 " << id_ << " 下单: " << dish_ << std::endl;
        waiter_->send(std::make_unique<OrderMessage>(id_, dish_));
    }
};

// 服务员Actor
class WaiterActor : public Actor {
private:
    Actor* chef_;
    std::unordered_map<int, CustomerActor*> customers_;
    
public:
    WaiterActor(Actor* chef) : chef_(chef) {}
    
    void addCustomer(int customerId, CustomerActor* customer) {
        customers_[customerId] = customer;
    }
    
    void processMessage(Message& message) override {
        if (auto orderMsg = dynamic_cast<OrderMessage*>(&message)) {
            std::cout << "服务员接收订单: 顾客 " << orderMsg->customerId 
                      << " 点了 " << orderMsg->dish << std::endl;
            chef_->send(std::make_unique<OrderMessage>(*orderMsg));
        } 
        else if (auto cookedMsg = dynamic_cast<CookedMessage*>(&message)) {
            std::cout << "服务员取菜: " << cookedMsg->dish << std::endl;
            auto it = customers_.find(cookedMsg->customerId);
            if (it != customers_.end()) {
                it->second->send(std::make_unique<CookedMessage>(*cookedMsg));
            }
        }
    }
};

// 厨师Actor
class ChefActor : public Actor {
public:
    void processMessage(Message& message) override {
        if (auto orderMsg = dynamic_cast<OrderMessage*>(&message)) {
            std::cout << "厨师收到订单: 顾客 " << orderMsg->customerId 
                      << " 点了 " << orderMsg->dish << std::endl;
            
            // 模拟烹饪时间
            std::this_thread::sleep_for(std::chrono::seconds(2));
            
            std::cout << "厨师完成烹饪: " << orderMsg->dish << std::endl;
            
            // 发送完成消息给服务员(通过消息中的sender字段)
            // 这里简化处理,直接通过构造函数传递waiter引用
        }
    }
};

int main() {
    // 创建厨师
    ChefActor chef;
    chef.start();
    
    // 创建服务员
    WaiterActor waiter(&chef);
    waiter.start();
    
    // 创建顾客
    std::string dishes[] = {"披萨", "意面", "沙拉", "牛排"};
    const int numCustomers = 4;
    std::unique_ptr<CustomerActor> customers[numCustomers];
    
    for (int i = 0; i < numCustomers; i++) {
        customers[i] = std::make_unique<CustomerActor>(i, dishes[i % 4], &waiter);
        waiter.addCustomer(i, customers[i].get());
        customers[i]->start();
    }
    
    // 顾客下单
    for (int i = 0; i < numCustomers; i++) {
        customers[i]->placeOrder();
    }
    
    // 等待一段时间让订单处理完成
    std::this_thread::sleep_for(std::chrono::seconds(10));
    
    // 停止所有Actor
    for (int i = 0; i < numCustomers; i++) {
        customers[i]->stop();
    }
    waiter.stop();
    chef.stop();
    
    std::cout << "餐厅打烊" << std::endl;
    return 0;
}

两种模式的对比分析

CSP 模式特点:

  1. 通信通过 Channel:各个角色之间通过 Channel 进行通信,不直接知道对方的存在
  2. 同步点:Channel 的发送和接收操作可以是同步的,形成天然的同步点
  3. 解耦:生产者和消费者完全解耦,只需知道 Channel 接口
  4. 数据流清晰:数据流动路径明确,易于理解和调试

Actor 模式特点:

  1. 实体为中心:每个 Actor 是一个独立的实体,有自己的状态和行为
  2. 异步消息传递:Actor 之间通过异步消息进行通信
  3. 封装状态:每个 Actor 封装自己的状态,避免了共享状态的问题
  4. 位置透明:Actor 可以分布在不同的进程中甚至不同的机器上

适用场景:

  • CSP 更适合需要明确数据流和控制流的场景
  • Actor 更适合需要模拟现实世界实体和分布式系统的场景

这两个示例展示了如何使用 C++ 实现并发编程中的两种重要模式,通过餐厅点餐的生活化场景使得这些概念更加容易理解。


网站公告

今日签到

点亮在社区的每一天
去签到