【设计模式】如何用C++实现观察者模式【发布订阅机制】

发布于:2024-12-18 ⋅ 阅读:(145) ⋅ 点赞:(0)

【设计模式】如何用C++实现观察者模式【发布订阅机制】

一、问题背景

代码质量影响生活质量。最近工作中频繁接触各种设计模式,深刻体会到优秀的设计模式不仅能显著降低后续维护的压力,还能提升开发效率。观察者模式作为一种降低耦合度、提高扩展性的利器,其设计模式思想让人受益匪浅。

过去曾读过一本《练习的心态》,书中提出了这样的问题:当我们专注于过程时,往往会偏离目标,越专注离目标也就越远;当我们关注目标时,要么无法专注于过程,要么因目标与现实的差距而产生放弃的念头。

如何把握过程与目标的关系,也许需要我们用一种观察者的智慧和心态,即同时使用两个视角来思考:

  1. 第一种视角作为被观察者,允许自身专注,沉浸于过程,最好进入心流状态。
  2. 第二种视角作为观察者,观察者观察沉浸于过程的执行者,时常检查执行者是否偏离目标,然后提醒执行者调整策略。

当掌握这两种视角后,做到极致的情况下,自身始终存在一种理性的观察者视角:当自身愤怒时,观察者能提供一种理性的视角,以避免行为造成无法接受的后果。如果能做到这一点,仅仅是愤怒的话又有何不可呢。

观察者模式中这两种视角是低耦合的,而作为普通人,难免经常会将两种视角混淆在一起。

”破山中贼易,破心中贼难“,要让心中始终存在一种理性的智慧,也许还需要在事上学、心上练。把握人性的度量,从而实现“从心所欲而不逾矩”的理想人格。

二、什么是观察者模式?

观察者模式(Observer Pattern)是一种常见的设计模式,属于行为型模式。它定义了一种一对多的依赖关系,使得当一个对象的状态发生改变时,其所有依赖对象(观察者)都会自动收到通知并更新。这种模式常用于事件驱动的系统。

观察者模式的核心概念:

  • 被观察者(Subject):也称为发布者,它维护着一个观察者列表,当它的状态发生变化时,会通知所有观察者。
  • 观察者(Observer):也称为订阅者,它订阅了某个被观察者,当被观察者状态发生变化时,它会收到通知并执行相应的操作。

三、观察者模式工作原理

  1. 注册观察者: 观察者向被观察者注册,把自己添加到被观察者的观察者列表中。

  2. 状态改变: 被观察者的状态发生变化。

  3. 通知观察者: 被观察者遍历观察者列表,依次通知所有注册的观察者。

  4. 更新: 观察者收到通知后,根据自己的逻辑进行更新。

四、为什么使用观察者模式?

  1. 低耦合: 被观察者和观察者之间是低耦合的,它们之间不需要知道对方的具体实现细节。

  2. 可扩展性: 可以动态地增加或删除观察者,而不需要修改被观察者或其他观察者的代码。

  3. 复用性: 观察者模式可以被应用于各种场景,提高代码的复用性。

五、实现步骤

以发布订阅为例,需要实现以下三个组件:

  1. Publisher:用于发布订阅事件,承担了观察者模式中的Subject的状态改变功能。
  2. TopicServer:用于中心化管理Topic订阅事件和Subscriber观察者列表,承担了中的Subject的通知功能。
  3. Subscriber:用于订阅Topic,当发布者Publisher发布消息时,订阅服务器TopicServer会通知所有订阅该Topic的订阅者Subscriber。

1. 订阅者类Subscriber

./subscribe/Subscribe.h

#ifndef SUBSCRIBE_H
#define SUBSCRIBE_H
#include <string>
namespace Observer
{
    class Subscriber
    {
    public:
        virtual ~Subscriber() = default;
        virtual void Update(const std::string &topic, const std::string &message) = 0;
    };
}
#endif

./subscribe/SubscribeImpl.h

#ifndef SUBSCRIBEIMPL_H
#define SUBSCRIBEIMPL_H
#include <Subscriber.h>
namespace Observer
{
    class SubscriberImpl : public Subscriber
    {
    public:
        explicit SubscriberImpl(const std::string& subscriberName);
        ~SubscriberImpl() override;
        void Update(const std::string &topic, const std::string &message) override;
    private:
        std::string m_name;
    };
}
#endif

./subscribe/SubscribeImpl.cpp

#include <SubscriberImpl.h>
#include <iostream>
using namespace Observer;

SubscriberImpl::SubscriberImpl(const std::string& subscriberName) : m_name(subscriberName) {}

SubscriberImpl::~SubscriberImpl() {}

void SubscriberImpl::Update(const std::string &topic, const std::string &message)
{
    std::cout << "Subscriber [" << m_name << "] received message on topic [" << topic << "]: " << message << std::endl;
}

2. 发布订阅中心类TopicServer

./topicServer/TopicServer.h

#ifndef TOPICSERVER_H
#define TOPICSERVER_H
#include <string>
#include <memory>
#include <Subscriber.h>
namespace Observer
{
    class TopicServer
    {
    public:
        virtual ~TopicServer() = default;
        virtual void Subscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber) = 0;
        virtual void Unsubscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber) = 0;
        virtual void Notify(const std::string& topic, const std::string& message) = 0;
    };
}
#endif

./topicServer/TopicServerImpl.h

#ifndef TOPICSERVERIMPL_H
#define TOPICSERVERIMPL_H
#include <TopicServer.h>
#include <unordered_map>
#include <vector>
namespace Observer
{
    class TopicServerImpl : public TopicServer
    {
    public:
        explicit TopicServerImpl();
        ~TopicServerImpl() override;
        void Subscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber) override;
        void Unsubscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber) override;
        void Notify(const std::string& topic, const std::string& message) override;
    private:
        std::unordered_map<std::string, std::vector<std::shared_ptr<Subscriber>>> m_topicSubscriber;
    };
}
#endif

./topicServer/TopicServerImpl.cpp

#include <TopicServerImpl.h>
#include <algorithm>
using namespace Observer;

TopicServerImpl::TopicServerImpl() {}

TopicServerImpl::~TopicServerImpl() {}

void TopicServerImpl::Subscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber)
{
    m_topicSubscriber[topic].push_back(subscriber);
}

void TopicServerImpl::Unsubscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber)
{
    auto& subscribersList = m_topicSubscriber[topic];
    auto itBegin = std::remove(subscribersList.begin(), subscribersList.end(), subscriber);
    subscribersList.erase(itBegin, subscribersList.end());
    if (subscribersList.size() == 0) {
        m_topicSubscriber.erase(topic);
    }
}

void TopicServerImpl::Notify(const std::string& topic, const std::string& message)
{
    if (m_topicSubscriber.contains(topic)) {
        for (auto subscriber : m_topicSubscriber[topic]) {
            subscriber->Update(topic, message);
        }
    }
}

3. 发布者类Publisher

./publisher/Publisher.h

#ifndef PUBLISHER_H
#define PUBLISHER_H
#include <string>
namespace Observer
{
    class Publisher
    {
    public:
        virtual ~Publisher() = default;
        virtual void PublishMessage(const std::string& topic, const std::string& message) = 0;
    };
}
#endif

/publisher/PublisherImpl.h

#ifndef PUBLISHERIMPL_H
#define PUBLISHERIMPL_H
#include <Publisher.h>
#include <TopicServer.h>
#include <memory>
namespace Observer
{
    class PublisherImpl : public Publisher
    {
    public:
        explicit PublisherImpl(std::shared_ptr<TopicServer> topicServer);
        ~PublisherImpl() override;
        void PublishMessage(const std::string& topic, const std::string& message) override;
    private:
        std::shared_ptr<TopicServer> m_server;
    };
}
#endif

/publisher/PublisherImpl.cpp

#include <PublisherImpl.h>
using namespace Observer;

PublisherImpl::PublisherImpl(std::shared_ptr<TopicServer> topicServer) : m_server(topicServer) {}

PublisherImpl::~PublisherImpl() {}

void PublisherImpl::PublishMessage(const std::string& topic, const std::string& message) {
    m_server->Notify(topic, message);
}

4. main函数调用

./main.cpp

#include <PublisherImpl.h>
#include <TopicServerImpl.h>
#include <SubscriberImpl.h>
namespace {
    constexpr std::string TOPICA {"TopicA"};
    constexpr std::string TOPICB {"TopicB"};
}
using namespace Observer;
int main()
{
    std::shared_ptr<TopicServer> server = std::make_shared<TopicServerImpl>();

    // 创建观察者
    std::shared_ptr<Subscriber> subscriber1 = std::make_shared<SubscriberImpl>("subscriber1");
    std::shared_ptr<Subscriber> subscriber2 = std::make_shared<SubscriberImpl>("subscriber2");
    std::shared_ptr<Subscriber> subscriber3 = std::make_shared<SubscriberImpl>("subscriber3");
    std::shared_ptr<Subscriber> subscriber4 = std::make_shared<SubscriberImpl>("subscriber4");

    // 订阅主题
    server->Subscribe(TOPICA, subscriber1);
    server->Subscribe(TOPICA, subscriber2);
    server->Subscribe(TOPICA, subscriber4);
    server->Subscribe(TOPICB, subscriber1);
    server->Subscribe(TOPICB, subscriber3);
    server->Subscribe(TOPICB, subscriber4);

    std::shared_ptr<Publisher> publisher = std::make_shared<PublisherImpl>(server);

    // 发布消息
    publisher->PublishMessage(TOPICA, "Hello from TopicA!");
    publisher->PublishMessage(TOPICB, "Greetings from TopicB!");

    // 移除订阅者
    server->Unsubscribe(TOPICA, subscriber4);
    server->Unsubscribe(TOPICB, subscriber4);

    // 发布消息
    publisher->PublishMessage(TOPICA, "Update from TopicA after unsubscribe!");
    publisher->PublishMessage(TOPICB, "Update from TopicB after unsubscribe!");

    return 0;
}

5. 编写CMakeLists.txt

cmake_minimum_required(VERSION 3.10)
set(ProjectName Observer)
project(${ProjectName})

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)

include_directories(
    ./publisher
    ./subscriber
    ./topicServer
)

file(GLOB LIB_FILE
    publisher/*
    subscriber/*
    topicServer/*)

message(${LIB_FILE})
add_executable(${ProjectName}
    main.cpp
    ${LIB_FILE})

此时文件树结构如下:

.
├── CMakeLists.txt
├── main.cpp
├── publisher
│   ├── Publisher.h
│   ├── PublisherImpl.cpp
│   └── PublisherImpl.h
├── subscriber
│   ├── Subscriber.h
│   ├── SubscriberImpl.cpp
│   └── SubscriberImpl.h
└── topicServer
    ├── TopicServer.h
    ├── TopicServerImpl.cpp
    └── TopicServerImpl.h

6. 编译运行

mkdir build
cd build
cmake ..
make -j12
./Observer

运行结果如下

Subscriber [subscriber1] received message on topic [TopicA]: Hello from TopicA!
Subscriber [subscriber2] received message on topic [TopicA]: Hello from TopicA!
Subscriber [subscriber4] received message on topic [TopicA]: Hello from TopicA!
Subscriber [subscriber1] received message on topic [TopicB]: Greetings from TopicB!
Subscriber [subscriber3] received message on topic [TopicB]: Greetings from TopicB!
Subscriber [subscriber4] received message on topic [TopicB]: Greetings from TopicB!
Subscriber [subscriber1] received message on topic [TopicA]: Update from TopicA after unsubscribe!
Subscriber [subscriber2] received message on topic [TopicA]: Update from TopicA after unsubscribe!
Subscriber [subscriber1] received message on topic [TopicB]: Update from TopicB after unsubscribe!
Subscriber [subscriber3] received message on topic [TopicB]: Update from TopicB after unsubscribe!

运行结果说明:

订阅了TOPICAsubscriber1、subscriber2、subscriber4均收到了Publisher发布的Hello from TopicA!

订阅了TOPICBsubscriber1、subscriber3、subscriber4均收到了Publisher发布的Greetings from TopicB!

subscriber4TOPICATOPICB去订阅后,再次发布消息则只有subscriber1、subscriber2、subscriber3能收到订阅信息

本文基于观察者模式,侧重于于阐述设计模式的核心思想,实现了一个简化的发布订阅系统。这种设计模式在实际生产环境中,往往需要更复杂的实现,比如涉及到不同进程之间的通信、负载均衡等,以满足高并发、高可用性的要求。


网站公告

今日签到

点亮在社区的每一天
去签到