文章目录
并发设计模式:Actor vs. CSP (生活场景版)
想象一下,你正在规划一个大型活动的后勤工作。传统的“共享内存+锁”模式就像是一个共享的白板:所有工作人员都在上面读写任务,但必须轮流使用(加锁),经常造成拥堵(性能差),并且一个工作人员的失误可能擦掉别人的数据(状态混乱,耦合度高)。
Actor 和 CSP 提供了更优雅的解决方案。
1. Actor 模式:就像公司里的邮件系统
核心思想: “人(Actor)与人之间直接发邮件(消息)”。
生活场景模拟:
假设你的活动有三个核心部门:
- 前台 (Front Desk Actor):负责接收外界送来的物资,并通知相应部门来取。
- 厨房 (Kitchen Actor):负责处理食材,制作餐食。
- 舞台 (Stage Actor):负责管理音响、灯光设备。
工作流程 (Actor 模式):
- 隔离与邮箱: 每个部门(Actor)都在自己的办公室(独立线程/进程)里工作,互不干扰。每个部门门口都有一个专属邮箱(Mailbox)。
- 异步通信: 厨房需要面粉了,他们不会直接跑去仓库翻找(共享内存)。而是给前台发一封邮件(
Send("前台", NeedFlourMsg)
):“请送10袋面粉来”,然后继续做自己的事(异步)。 - 顺序处理: 前台的员工一次只从自己的邮箱里取出一封邮件进行处理。比如他先处理舞台的“需要新麦克风”请求,处理完后,再处理厨房的“需要面粉”请求。他一次只做一件事,所以绝不会搞混(无需锁,内部状态安全)。
- 直接寻址: 厨房知道这封邮件是发给“前台”的,前台收到邮件后也知道它来自“厨房”。就像邮件有明确的发件人和收件人。
Actor 模式的特点:
- 重点在“人”(Actor):系统由多个独立的 Actor 组成。
- 直接通信:知道消息发给谁,也知道消息来自谁。
- 优点:职责清晰,部门间完全隔离,一个部门崩溃不会直接影响另一个(容错性强)。
现实中的例子: 公司的邮件系统、客服工单系统(每个工单分配给特定客服处理)。
2. CSP 模式:就像快递站的储物格
核心思想: “人把东西放到公共格子(Channel),别人再从格子里取”。
生活场景模拟:
同样是那个活动,现在我们换一种后勤模式。我们设置一个中央快递站,站里有一排带编号的储物格(Channel)。
- 格子的类型:
- 小件格 (无缓冲 Channel):只能放一件物品。如果格子里有东西,快递员必须等别人取走才能放新的(直接阻塞等待)。
- 货架格 (有缓冲 Channel):比如 10 号货架可以放最多 10 箱水。放满后,送货员必须等待;取空后,取货员也必须等待。
工作流程 (CSP 模式):
- 关注通道: 工作人员不关心是谁把东西放进格子,也不关心最终是谁取走的。他们只认格子(Channel)。
- 解耦通信: 农夫送来 10 箱矿泉水。他不需要知道活动方是谁,他只需根据指示,把水放到“10号货架”(
10号货架 <- 10箱水
)。 - 匿名处理: 活动现场的志愿者渴了,他也不知道水是谁送的,他只需走到“10号货架”取一箱水(
水 <- 10号货架
)。 - 同步协调: 如果志愿者来取水时货架是空的,他就会在那里等着,直到有农夫送来水(同步等待)。反之,如果货架满了,农夫也会等着。
CSP 模式的特点:
- 重点在“格子”(Channel):Channel 是第一公民,是通信的核心。
- 间接通信:发送者和接收者不知道对方的存在,完全通过 Channel 解耦。
- 优点:灵活性极高,可以轻松组合出复杂的数据流管道(如:
格A -> 工作人员1 -> 格B -> 工作人员2 -> 格C
)。
现实中的例子: 工厂的流水线、餐厅的传菜窗口(厨师做好菜放到窗口,服务员从窗口端走,他们不需要直接交流)。
总结与对比:如何选择?
特性 | Actor 模式 (邮件系统) | CSP 模式 (快递站格子) |
---|---|---|
通信对象 | 明确的参与者 (Actor) | 抽象的通道 (Channel) |
耦合关系 | 发送方和接收方互相知晓 | 发送方和接收方互相不知 |
关注点 | 谁 (Who) 来处理消息 | 在哪里 (Where) 交换消息 |
同步方式 | 消息发送通常是异步的 | 通过 Channel 进行同步(等待) |
生活比喻 | 部门邮件往来 | 快递站、流水线、传菜窗口 |
典型语言 | Erlang, Akka (Java/Scala) | Go (原生支持), Clojure |
选择建议:
- 如果你的系统由许多独立的、有状态的、需要明确身份的实体组成(比如游戏中的每个玩家、电商系统中的每个订单),Actor 模式更直观。
- 如果你需要构建一个数据流管道,处理步骤清晰,且希望最大程度解耦生产者和消费者(比如数据处理管道、网络爬虫的各个阶段),CSP 模式更灵活。
它们的共同伟大之处在于实现了那句名言:
“不要通过共享内存来通信;而要通过通信来共享内存。”
(Do not communicate by sharing memory; instead, share memory by communicating.)*
这意味着我们不再需要小心翼翼地守护一个“共享白板”,而是通过“邮件”或“快递”这种更自然、更安全的方式来协作,从而大大降低了并发编程的复杂性。
C++ 实现 Actor 与 CSP 模式的生活化示例
下面我将使用 C++ 实现餐厅点餐场景的 Actor 和 CSP 模式示例。
场景描述
在一个餐厅中,有顾客、服务员和厨师三种角色。顾客下单,服务员接收订单并传递给厨师,厨师烹饪完成后通知服务员上菜。
1. CSP 模式实现 (使用 Channel)
首先,我们需要实现一个简单的 Channel 类来模拟 Go 语言的 Channel:
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <string>
// 简单的 Channel 实现
template <typename T>
class Channel {
private:
std::queue<T> queue_;
std::mutex mtx_;
std::condition_variable cv_producer_;
std::condition_variable cv_consumer_;
size_t capacity_;
bool closed_ = false;
public:
Channel(size_t capacity = 0) : capacity_(capacity) {}
bool send(T value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_producer_.wait(lock, [this]() {
return (capacity_ == 0 && queue_.empty()) ||
queue_.size() < capacity_ ||
closed_;
});
if (closed_) {
return false;
}
queue_.push(value);
cv_consumer_.notify_one();
return true;
}
bool receive(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_consumer_.wait(lock, [this]() {
return !queue_.empty() || closed_;
});
if (closed_ && queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
cv_producer_.notify_one();
return true;
}
void close() {
std::unique_lock<std::mutex> lock(mtx_);
closed_ = true;
cv_producer_.notify_all();
cv_consumer_.notify_all();
}
};
// 定义消息类型
struct Order {
int customerId;
std::string dish;
};
struct CookedDish {
int customerId;
std::string dish;
};
// 厨师函数
void chef(Channel<Order>& orderChannel, Channel<CookedDish>& cookedChannel) {
Order order;
while (orderChannel.receive(order)) {
std::cout << "厨师收到订单: 顾客 " << order.customerId << " 点了 " << order.dish << std::endl;
// 模拟烹饪时间
std::this_thread::sleep_for(std::chrono::seconds(2));
CookedDish cookedDish{order.customerId, order.dish};
std::cout << "厨师完成烹饪: " << cookedDish.dish << std::endl;
cookedChannel.send(cookedDish);
}
std::cout << "厨师结束工作" << std::endl;
}
// 服务员函数
void waiter(Channel<Order>& customerToWaiter, Channel<Order>& waiterToChef,
Channel<CookedDish>& chefToWaiter, Channel<CookedDish>& waiterToCustomer) {
while (true) {
// 从顾客接收订单
Order order;
if (!customerToWaiter.receive(order)) {
break;
}
std::cout << "服务员接收订单: 顾客 " << order.customerId << " 点了 " << order.dish << std::endl;
// 将订单发送给厨师
waiterToChef.send(order);
// 从厨师接收烹饪完成的菜品
CookedDish cookedDish;
if (!chefToWaiter.receive(cookedDish)) {
break;
}
std::cout << "服务员取菜: " << cookedDish.dish << std::endl;
// 将菜品发送给顾客
waiterToCustomer.send(cookedDish);
}
std::cout << "服务员结束工作" << std::endl;
}
// 顾客函数
void customer(int id, Channel<Order>& waiterChannel, Channel<CookedDish>& customerChannel) {
std::string dishes[] = {"披萨", "意面", "沙拉", "牛排"};
std::string dish = dishes[id % 4];
// 下单
Order order{id, dish};
std::cout << "顾客 " << id << " 下单: " << dish << std::endl;
waiterChannel.send(order);
// 等待菜品
CookedDish cookedDish;
if (customerChannel.receive(cookedDish)) {
std::cout << "顾客 " << id << " 收到: " << cookedDish.dish << std::endl;
}
}
int main() {
// 创建各种Channel
Channel<Order> customerToWaiter(5); // 顾客到服务员的订单通道
Channel<Order> waiterToChef(5); // 服务员到厨师的订单通道
Channel<CookedDish> chefToWaiter(5); // 厨师到服务员的菜品通道
Channel<CookedDish> waiterToCustomer(5); // 服务员到顾客的菜品通道
// 启动厨师线程
std::thread chefThread(chef, std::ref(waiterToChef), std::ref(chefToWaiter));
// 启动服务员线程
std::thread waiterThread(waiter,
std::ref(customerToWaiter), std::ref(waiterToChef),
std::ref(chefToWaiter), std::ref(waiterToCustomer));
// 模拟多个顾客
std::thread customers[4];
for (int i = 0; i < 4; i++) {
customers[i] = std::thread(customer, i,
std::ref(customerToWaiter), std::ref(waiterToCustomer));
}
// 等待所有顾客完成
for (int i = 0; i < 4; i++) {
customers[i].join();
}
// 关闭通道并等待线程结束
customerToWaiter.close();
waiterToChef.close();
chefToWaiter.close();
waiterToCustomer.close();
waiterThread.join();
chefThread.join();
std::cout << "餐厅打烊" << std::endl;
return 0;
}
2. Actor 模式实现
接下来,我们使用 Actor 模式实现相同的餐厅场景:
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <string>
#include <functional>
#include <memory>
#include <unordered_map>
// 消息基类
struct Message {
virtual ~Message() = default;
};
// 订单消息
struct OrderMessage : Message {
int customerId;
std::string dish;
OrderMessage(int id, const std::string& d) : customerId(id), dish(d) {}
};
// 完成烹饪消息
struct CookedMessage : Message {
int customerId;
std::string dish;
CookedMessage(int id, const std::string& d) : customerId(id), dish(d) {}
};
// 简单的Actor基类
class Actor {
protected:
std::queue<std::unique_ptr<Message>> mailbox_;
std::mutex mailboxMutex_;
std::condition_variable mailboxCV_;
bool running_ = false;
std::thread thread_;
public:
virtual ~Actor() {
stop();
}
void start() {
running_ = true;
thread_ = std::thread(&Actor::run, this);
}
void stop() {
running_ = false;
mailboxCV_.notify_all();
if (thread_.joinable()) {
thread_.join();
}
}
void send(std::unique_ptr<Message> message) {
std::lock_guard<std::mutex> lock(mailboxMutex_);
mailbox_.push(std::move(message));
mailboxCV_.notify_one();
}
void run() {
while (running_) {
std::unique_ptr<Message> message;
{
std::unique_lock<std::mutex> lock(mailboxMutex_);
mailboxCV_.wait(lock, [this]() {
return !mailbox_.empty() || !running_;
});
if (!running_ && mailbox_.empty()) {
break;
}
if (!mailbox_.empty()) {
message = std::move(mailbox_.front());
mailbox_.pop();
}
}
if (message) {
processMessage(*message);
}
}
}
virtual void processMessage(Message& message) = 0;
};
// 顾客Actor
class CustomerActor : public Actor {
private:
int id_;
std::string dish_;
Actor* waiter_;
public:
CustomerActor(int id, const std::string& dish, Actor* waiter)
: id_(id), dish_(dish), waiter_(waiter) {}
void processMessage(Message& message) override {
if (auto cookedMsg = dynamic_cast<CookedMessage*>(&message)) {
std::cout << "顾客 " << id_ << " 收到: " << cookedMsg->dish << std::endl;
}
}
void placeOrder() {
std::cout << "顾客 " << id_ << " 下单: " << dish_ << std::endl;
waiter_->send(std::make_unique<OrderMessage>(id_, dish_));
}
};
// 服务员Actor
class WaiterActor : public Actor {
private:
Actor* chef_;
std::unordered_map<int, CustomerActor*> customers_;
public:
WaiterActor(Actor* chef) : chef_(chef) {}
void addCustomer(int customerId, CustomerActor* customer) {
customers_[customerId] = customer;
}
void processMessage(Message& message) override {
if (auto orderMsg = dynamic_cast<OrderMessage*>(&message)) {
std::cout << "服务员接收订单: 顾客 " << orderMsg->customerId
<< " 点了 " << orderMsg->dish << std::endl;
chef_->send(std::make_unique<OrderMessage>(*orderMsg));
}
else if (auto cookedMsg = dynamic_cast<CookedMessage*>(&message)) {
std::cout << "服务员取菜: " << cookedMsg->dish << std::endl;
auto it = customers_.find(cookedMsg->customerId);
if (it != customers_.end()) {
it->second->send(std::make_unique<CookedMessage>(*cookedMsg));
}
}
}
};
// 厨师Actor
class ChefActor : public Actor {
public:
void processMessage(Message& message) override {
if (auto orderMsg = dynamic_cast<OrderMessage*>(&message)) {
std::cout << "厨师收到订单: 顾客 " << orderMsg->customerId
<< " 点了 " << orderMsg->dish << std::endl;
// 模拟烹饪时间
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "厨师完成烹饪: " << orderMsg->dish << std::endl;
// 发送完成消息给服务员(通过消息中的sender字段)
// 这里简化处理,直接通过构造函数传递waiter引用
}
}
};
int main() {
// 创建厨师
ChefActor chef;
chef.start();
// 创建服务员
WaiterActor waiter(&chef);
waiter.start();
// 创建顾客
std::string dishes[] = {"披萨", "意面", "沙拉", "牛排"};
const int numCustomers = 4;
std::unique_ptr<CustomerActor> customers[numCustomers];
for (int i = 0; i < numCustomers; i++) {
customers[i] = std::make_unique<CustomerActor>(i, dishes[i % 4], &waiter);
waiter.addCustomer(i, customers[i].get());
customers[i]->start();
}
// 顾客下单
for (int i = 0; i < numCustomers; i++) {
customers[i]->placeOrder();
}
// 等待一段时间让订单处理完成
std::this_thread::sleep_for(std::chrono::seconds(10));
// 停止所有Actor
for (int i = 0; i < numCustomers; i++) {
customers[i]->stop();
}
waiter.stop();
chef.stop();
std::cout << "餐厅打烊" << std::endl;
return 0;
}
两种模式的对比分析
CSP 模式特点:
- 通信通过 Channel:各个角色之间通过 Channel 进行通信,不直接知道对方的存在
- 同步点:Channel 的发送和接收操作可以是同步的,形成天然的同步点
- 解耦:生产者和消费者完全解耦,只需知道 Channel 接口
- 数据流清晰:数据流动路径明确,易于理解和调试
Actor 模式特点:
- 实体为中心:每个 Actor 是一个独立的实体,有自己的状态和行为
- 异步消息传递:Actor 之间通过异步消息进行通信
- 封装状态:每个 Actor 封装自己的状态,避免了共享状态的问题
- 位置透明:Actor 可以分布在不同的进程中甚至不同的机器上
适用场景:
- CSP 更适合需要明确数据流和控制流的场景
- Actor 更适合需要模拟现实世界实体和分布式系统的场景
这两个示例展示了如何使用 C++ 实现并发编程中的两种重要模式,通过餐厅点餐的生活化场景使得这些概念更加容易理解。