CppCon 2016 学习:Embedded MQTT Client in C++14

发布于:2025-06-22 ⋅ 阅读:(18) ⋅ 点赞:(0)

ISO Standard Protocol

  • 表示该协议是由 国际标准化组织(ISO) 制定或认可的标准协议。
  • 通常意味着它是经过国际认可的,具有一致性、兼容性和稳定性的通信方法。

Publisher/Subscriber Model

  • 是一种 发布/订阅架构
    • Publisher(发布者) 发送消息;
    • Subscriber(订阅者) 订阅并接收感兴趣的消息;
    • 它们之间通常不直接通信,而是通过一个中间件(如消息代理)。
  • 好处是解耦:发布者不需要知道谁会接收消息,系统更灵活可扩展。

Considered Lightweight

  • 说明该协议占用资源较少(如 CPU、内存、带宽),适合:
    • 嵌入式系统
    • 物联网(IoT)设备
    • 移动应用
  • 一般传输效率高、负载小。

Requires a Reliable Connection

  • 表明该协议需要稳定、可靠的网络连接;
  • 例如:使用 TCP 协议而不是 UDP;
  • 丢包或不稳定的连接可能导致通信失败或数据丢失。

综合理解

这个协议可能是像 MQTT 这样的协议:

协议 是否为 ISO 标准 Pub/Sub 是否轻量 是否需要可靠连接
MQTT 是(ISO/IEC 20922) (基于 TCP)

MQTT(Message Queuing Telemetry Transport) 的应用场景。下面我来逐步解释你所提供的关键词和它们之间的关系,帮助你全面理解。

背景:你描述的是一个基于 MQTT 协议的场景

MQTT 是一个轻量级的发布/订阅(Pub/Sub)通信协议,广泛用于物联网(IoT)系统中。

关键词逐条解释

Devices & Clients

  • toaster / cl342r / switch1 / light1 / nuke-plant-power
    → 这些是设备或客户端的标识符。它们通过 MQTT 协议连接到一个Broker

MQTT Broker

  • Broker 是 MQTT 的核心组件,负责接收所有消息、筛选消息、决定谁订阅了哪些主题并将消息转发给他们。
  • 设备不会直接通信,而是通过 Broker 进行消息传递。

Topics & Subscriptions

Topics 是消息的通道,结构像路径:
town/plant/toaster/power_control
  • 这里是一个典型的 MQTT Topic,类似路径一样的结构。
通配符说明:
  • +:单层通配符,例如:
    town/plant/+/power_control
    
    匹配:
    • town/plant/toaster/power_control
    • town/plant/light1/power_control 等等。
  • #:多层通配符,例如:
    town/plant/toaster/#
    
    匹配:
    • town/plant/toaster/status
    • town/plant/toaster/power_control
    • town/plant/toaster/logs/error 等等。
示例订阅:
  • toaster 这个客户端:
    subscribe to town/plant/toaster/#
    
    → 订阅关于 toaster 的所有消息。
  • switch1 这个客户端:
    publish to town/plant/toaster/power_control
    
    → 控制 toaster 的电源。
  • 也可以:
    subscribe to town/plant/+/power_control
    
    → 接收任何设备的电源控制消息。

Payload(消息内容)

  • on
    → 表示打开设备,例如 toaster 或 light。

举个完整的操作流程:

  1. Broker 启动,所有客户端连接上来。
  2. toaster(cl342r) 订阅:
    town/plant/toaster/#
    
    → 它会接收所有发往 toaster 的消息。
  3. switch1 发送一条消息到:
    town/plant/toaster/power_control
    
    内容是:
    "on"
    
  4. Broker 接收到这条消息,并转发给 toaster。
  5. toaster 收到 "on" 指令,开启加热。

总结理解

概念 含义
Publisher 发送消息的客户端(如 switch1)
Subscriber 接收消息的客户端(如 toaster)
Topic 消息通道,例如 town/plant/toaster/power_control
Broker 中介服务器,管理所有通信
Payload 传送的实际消息内容,如 "on"
通配符 +/# 用于订阅多个主题的快捷方式

MQTT 协议中的基本消息类型,这对于理解设备如何在 MQTT 网络中进行通信非常重要。下面是详细解释:

MQTT 基本消息类型及其作用

MQTT 协议中,通信过程是通过一系列“控制报文(Control Packets)”完成的,下面是最基本的几种消息类型:

1. CONNECT

  • 用途:客户端(如 IoT 设备)连接到 Broker 时发送。
  • 包含内容
    • 客户端 ID
    • 用户名/密码(如果有认证)
    • 保持连接间隔(keep-alive)
  • 例子:toaster 启动后发送 CONNECT 报文到 Broker。

2. CONNACK

  • 用途:Broker 对 CONNECT 的响应。
  • 说明:如果连接成功,返回成功;否则返回错误代码(如认证失败、协议不支持等)。

3. DISCONNECT

  • 用途:客户端主动断开连接时发送。
  • 说明:让 Broker 知道设备是正常下线,而不是掉线崩溃。

4. SUBSCRIBE

  • 用途:客户端告诉 Broker 想要接收哪些 Topic 的消息。
  • 说明:可以使用通配符 +#
  • 例子:toaster 订阅 town/plant/toaster/power_control

5. UNSUBSCRIBE

  • 用途:客户端取消对某些 Topic 的订阅。
  • 说明:例如设备不再关心某类消息时使用。

6. PUBLISH

  • 用途:客户端向 Broker 发送消息,Broker 再转发给相应的订阅者。
  • 包含内容
    • Topic(消息发往的路径)
    • Payload(消息内容,如 "on"
    • QoS(消息质量级别)

7. PINGREQ / PINGRESP

  • PINGREQ(ping 请求):客户端定时发送,告诉 Broker “我还活着”。
  • PINGRESP(ping 响应):Broker 返回,表示连接还在。
  • 用途:维持连接活性,防止连接被误判为断开。

一个简化的通信流程图(文字版)

[Client] --- CONNECT ---> [Broker]
          <-- CONNACK ---
[Client] --- SUBSCRIBE ---> topic: "plant/toaster"
[Client] <--- PUBLISH --- (from other client)
[Client] --- PINGREQ ---> [Broker]
          <-- PINGRESP ---
[Client] --- DISCONNECT ---> [Broker]

小结表格

消息类型 用途
CONNECT 客户端请求连接 Broker
CONNACK Broker 确认连接结果
DISCONNECT 客户端主动断开连接
SUBSCRIBE 订阅某个或多个 Topic
UNSUBSCRIBE 取消订阅
PUBLISH 发布消息到某个 Topic
PINGREQ/RESP 心跳保活,维持连接

MQTT 的三种服务质量等级(QoS, Quality of Service)。这是 MQTT 的核心机制之一,决定了消息在网络上传输的可靠性。

MQTT 的三种 QoS 等级详解

QoS 等级 名称 交付次数 是否重复发送 可靠性 使用场景
0 At most once 最多一次 不重试 传感器数据、低优先级消息
1 At least once 至少一次 可能重复 控制命令,允许去重处理
2 Exactly once 恰好一次 严格控制 金融交易、重要命令

QoS 0 – At most once delivery(最多一次传送)

  • 行为:消息最多发送一次,不保证送达。
  • 协议机制
    • 不确认、不重发;
    • 使用 TCP 的基本传输;
  • 适合场景
    • 非关键数据,如温度传感器的定期上报;
    • 丢一两条没关系。

QoS 1 – At least once delivery(至少一次传送)

  • 行为:Broker 确保消息至少送达一次,但可能会重复送达
  • 协议机制
    • 发送方等待 ACK 确认;
    • 若未收到,自动重试;
  • 接收方需要处理重复消息
  • 适合场景
    • 关键但可以去重的操作,如“打开灯”命令。

QoS 2 – Exactly once delivery(恰好一次传送)

  • 行为:消息只会被传递一次,不多、不少。
  • 协议机制
    • 最复杂的握手流程:
      1. PUBLISH(含 Message ID)
      2. PUBREC(收到)
      3. PUBREL(发布者确认)
      4. PUBCOMP(最终确认)
  • 适合场景
    • 不能容忍重复的任务,如银行转账、订单处理。

可视化流程对比

QoS 0:
Client --> Broker    [PUBLISH]
(no ACK)
QoS 1:
Client --> Broker    [PUBLISH (dup可能为0或1)]
Broker --> Client    [PUBACK]
QoS 2:
Client --> Broker    [PUBLISH]
Broker --> Client    [PUBREC]
Client --> Broker    [PUBREL]
Broker --> Client    [PUBCOMP]

小结建议

  • QoS 0:速度最快,最省资源,但不可靠。
  • QoS 1:实用性最高,是大多数 MQTT 项目的默认选择。
  • QoS 2:最可靠但最耗资源,使用时要小心性能。

MQTT 的实际应用场景。下面是你列出的内容的整理和解释,帮助你全面理解 MQTT 在现实中的用途。

MQTT 被用在哪里?

MQTT 因为它 轻量、基于发布/订阅、支持低带宽与高延迟环境,被广泛用于大大小小的系统中:

1. 大型平台系统

  • Amazon IoT CoreMicrosoft Azure IoT HubGoogle Cloud IoT
    • 这些云平台使用 MQTT 作为主要协议之一,支持百万级设备通信。
    • 用于设备遥测、远程控制、云端命令分发等。

2. 小型/嵌入式设备

  • MQTT 可运行在资源受限的设备上,如:
    • 低功耗 MCU(如 ESP8266, STM32)
    • 电池供电设备
    • 无需操作系统的设备

3. RabbitMQ 等消息中间件系统

  • 虽然 RabbitMQ 原生使用 AMQP,但它也提供 MQTT 插件
  • 可以用作 MQTT Broker,把 IoT 设备接入企业消息流。

4. 智能家居 / IoT 产品

  • 灯泡、烤面包机、咖啡机、空调、插座等
  • 通过 MQTT 控制开关、状态监测、自动化规则触发
  • 如:开门 → 开灯;咖啡煮好 → 发通知

5. 医疗设备

  • 实时心率监测、血压仪、远程诊断系统等
  • MQTT 用于低延迟地将数据发送到医院系统或医生手机

6. 传感器系统

  • 环境传感器(温湿度、烟雾、空气质量)
  • 工业传感器(震动、电流、液位)
  • 数据可实时推送到云端进行分析

7. 移动应用

  • 用于聊天、实时数据推送、设备状态显示
  • 例如:手机 App 控制灯光、查看温度、推送警报

小结:MQTT 的使用广泛性

场景 设备例子 MQTT 的优势
云平台 Amazon IoT Core, Azure IoT 高可扩展性、可靠性
嵌入式设备 ESP32, Arduino, STM32 轻量、低功耗
智能家居 灯泡、插座、音响、咖啡机 实时控制、自动化
医疗设备 心率监测、远程诊断 稳定传输、低延迟
工业监控 温度、压力、液位、报警器 支持大量数据流、高可靠性
移动 App 状态推送、命令控制 实时通信、节省带宽

MQTT 订阅流程的时序图(sequence diagram)

时序图解读(MQTT 订阅 & 消息接收流程)

User MQTT Client Broker 用户发起订阅 subscribe subscribe (Topic) suback (ACK) 订阅成功 gos_level (Data) gos_level User MQTT Client Broker

各步骤说明

步骤 描述
1. User → MQTT_Client
subscribe(订阅请求)
用户(或上层程序)指示客户端订阅某个 Topic,如 plant/toaster/power_control
2. MQTT_Client → Broker
subscribe(转发订阅)
客户端将订阅请求发送到 MQTT Broker。
3. Broker → MQTT_Client
suback(订阅确认)
Broker 返回 SUBACK 报文,表示成功(或失败)订阅。
4. Broker → MQTT_Client
gos_level(推送数据)
当某个 Publisher 发布了该 Topic 的消息,Broker 将该消息推送到客户端。这里的 gos_level 是一个示例消息内容。
5. MQTT_Client → User
转发 gos_level
客户端把接收到的消息内容传给用户或上层程序处理。

场景举例

  • 用户设备(如手机 App)调用客户端去订阅一个主题;
  • 例如订阅消息 town/plant/toaster/power_control
  • 以后只要 Broker 收到该主题的消息,就会转发给客户端,然后客户端再交给用户处理(如展示、触发操作等)。

小结

你这个时序图非常准确地描述了:

  • MQTT 的订阅生命周期;
  • 消息如何从发布者通过 Broker 到订阅者;
  • MQTT 是事件驱动的异步通信模型,客户端只需订阅,Broker 自动推送数据。

时序图解读(订阅确认流程)

User MQTT Client Broker subscribe subscribe suback suback User MQTT Client Broker

每一步的意义

步骤 描述
1. User → MQTT Client
subscribe
用户或上层程序发出订阅指令,例如:订阅 sensor/temp
2. MQTT Client → Broker
subscribe
MQTT 客户端将订阅请求发送给 Broker
3. Broker → MQTT Client
suback
Broker 返回 SUBACK 报文,表示确认已经成功订阅
4. MQTT Client → User
suback
MQTT 客户端将订阅确认结果反馈给用户或上层应用程序

小结

  • 这个图主要描述了 订阅建立的握手过程
  • 数据还没有真正流动,真正的数据是在订阅完成后通过 PUBLISH 推送的;
  • suback 是订阅成功的“回执”,代表订阅通道建立好了,后续就可以接收该 Topic 的消息了。

使用另一个库(或框架)的常见原因,特别是在处理 MQTT网络通信异步任务 时。下面我逐一解释这些概念,帮你理解它们为什么是选择其他库的理由:

为什么使用另一个库?(例如替代默认的 MQTT 客户端)

你写到的几个关键词对应的理由如下:

1. Blocking(阻塞)

  • 问题:默认的 MQTT 客户端(例如 Paho)可能在某些模式下会 阻塞主线程,特别是在连接、订阅或等待消息时。
  • 原因使用其他库
    • 你可能需要 非阻塞异步(async/await) 支持的库,例如:asyncio-mqttRxMQTT 等。

2. Dependency Injection(依赖注入)

  • 问题:默认客户端不一定支持现代应用程序架构(如 Spring Boot、NestJS、Dagger)。
  • 为什么需要库支持 DI
    • 某些框架/库允许你将 MQTT 客户端作为依赖自动注入,便于测试、模块化、热替换等。
    • 例如在 Java 的 Spring 中,你可能想使用 @Autowired 注入一个封装好的 MQTTService。

3. Event Notification(事件通知机制)

  • 问题:传统客户端通过轮询或回调处理消息,可能不够灵活。
  • 高级做法
    • 使用事件驱动机制,例如基于 Observer、事件总线、消息分发器等;
    • 允许你订阅某个“事件”,自动触发处理函数。
    • 例如:mqtt.on("message", handler)、或使用 EventEmitter 风格。

4. Executor(线程池执行器)

  • 问题:需要更强的并发控制。
  • 高级用法
    • 用线程池、任务队列等方式调度 MQTT 相关任务;
    • 某些库允许自定义执行器(Executor)或调度策略;
    • 在 Java 中,这样可以避免每个消息都启动一个线程,提升性能和控制。

举例:为何选择另一个 MQTT 库

需求 默认库不足 替代库/理由
需要非阻塞 默认阻塞 I/O 使用 asyncio-mqtt(Python)或 Eclipse Vert.x(Java)
想要依赖注入 无 DI 支持 使用 Spring 的 MQTT 模块(Java),NestJS(Node)
需要事件驱动 回调难维护 EventEmitter 模式或 Rx(响应式)MQTT
并发可控 无线程池管理 自定义 Executor,适用于高吞吐设备集群

总结

你写的四个点不是具体库,而是选择其他库的关键技术需求。你可以这样理解:

“为什么要用另一个 MQTT 库?因为我需要它能支持非阻塞、依赖注入、事件通知和线程池管理。”

C++ 中实现依赖注入(Dependency Injection) 的简单讲解与示例。虽然 C++ 没有内建的 DI 框架(不像 Java 的 Spring),但依赖注入思想是一样可以实现的,通常通过构造函数注入。

假设场景:一个控制器依赖于一个服务

没有依赖注入(强耦合)

#include <iostream>
class Service {
public:
    void doSomething() {
        std::cout << "Service is working\n";
    }
};
class Controller {
private:
    Service service;  // 强耦合:内部创建了 Service
public:
    void run() {
        service.doSomething();
    }
};
int main() {
    Controller controller;
    controller.run();
}

问题Controller 自己创建了 Service,无法轻松替换、测试或模拟。

使用依赖注入(构造函数注入)

#include <iostream>
#include <memory>
// 接口(抽象类)定义
class IService {
public:
    virtual void doSomething() = 0;
    virtual ~IService() {}
};
// 真实实现
class RealService : public IService {
public:
    void doSomething() override {
        std::cout << "RealService is working\n";
    }
};
// 控制器使用接口
class Controller {
private:
    std::shared_ptr<IService> service;  // 依赖注入
public:
    Controller(std::shared_ptr<IService> s) : service(s) {}
    void run() {
        service->doSomething();
    }
};
int main() {
    std::shared_ptr<IService> service = std::make_shared<RealService>();
    Controller controller(service);  // 注入依赖
    controller.run();
}

说明

技术点 描述
IService 接口(抽象类),用于解耦实现
RealService 具体实现类
Controller 依赖于接口,而不是实现(面向接口编程)
构造函数注入 通过构造函数传入依赖
std::shared_ptr 用于共享和管理生命周期

依赖注入的好处

  • 你可以轻松地创建 MockService 来测试 Controller,而不用真实运行逻辑;
  • 更加模块化,便于扩展(换不同实现不用改 Controller);
  • 更适合大型系统架构(比如插件式、策略模式等)。

想更高级?

在大型 C++ 项目中,可以引入 DI 框架,比如:

  • Boost.DI — Boost 社区的依赖注入库
  • Fruit — Google 提供的依赖注入框架(基于模板)

你列出的这些关键词都和“依赖注入”和“库设计”相关,尤其是在构建可扩展、灵活的软件系统时。下面帮你拆解每个词的含义以及它们之间的关系,方便你理解:

1. Injection(注入)

  • 指把“依赖的组件”从外部传入到代码里,而不是自己内部创建。
  • 这是**依赖注入(Dependency Injection)**的核心动作。
  • 作用是减少耦合,让系统更灵活易维护。

2. Library Interface(库接口)

  • 你开发的“库”对外提供的接口(函数、类、方法等)。
  • 接口定义了外部如何调用库的功能
  • 它屏蔽了内部实现细节,只暴露必要的操作。

3. Connection(连接)

  • 一般指网络连接、数据库连接、消息队列连接等。
  • 在 MQTT 这样的通信库中,连接是客户端与 Broker 之间的通信通道。
  • 连接管理(建立、重连、关闭)是库逻辑重要部分。

4. Library Logic(库逻辑)

  • 库内部实现的业务逻辑、功能处理代码。
  • 包括处理请求、解析数据、维护状态、调度任务等。
  • 对外提供服务的核心部分。

5. Executor(执行器)

  • 用于管理任务执行的组件,比如线程池、事件循环。
  • 控制任务什么时候执行,如何并发执行,保证性能和资源利用。
  • 在异步库或多线程库中非常重要。

这些概念怎么关联?

概念 角色/作用 关系
Injection 把外部依赖传入,增强灵活性 让接口调用者能控制连接、执行器等
Library Interface 提供给外部的调用入口 外部通过接口使用库逻辑
Connection 建立客户端和服务端的通信管道 库逻辑管理连接状态
Library Logic 实现具体功能,比如消息处理、协议解析 由接口触发,控制连接和执行任务
Executor 任务调度与执行,比如线程池或事件循环 由库逻辑调用,保证异步和并发性能

简单类比

  • Injection 是“请别人帮忙给我工具”;
  • Library Interface 是“工具箱的外壳和说明书”;
  • Connection 是“工具箱的连接线”;
  • Library Logic 是“工具箱里真正工作的零件”;
  • Executor 是“调度员,安排谁什么时候用哪个零件”。
User MQTT Client Broker subscribe subscribe suback suback publish publish puback puback publish publish Time out (no response) (timeout error) User MQTT Client Broker

MQTT 客户端建立连接和订阅的流程,而“Chain completion handlers”通常指的是用链式回调或异步处理的方式来一步步完成这些操作。下面我帮你梳理理解:

1. Chain completion handlers(链式完成处理器)

  • 多个异步操作串联起来,前一个完成后触发后一个
  • 适用于网络编程中多步操作,保证操作顺序和正确处理结果。
  • 例如:连接成功后再进行协商,协商成功后订阅主题。

2. Establish connection(建立连接)

  • 客户端首先向 MQTT Broker 发起网络连接请求。
  • 建立 TCP 连接或 WebSocket 连接。
  • 这一步是后续通信的基础。

3. Negotiate with broker(与 Broker 协商)

  • 连接成功后,客户端发送 CONNECT 报文,包含客户端 ID、用户名密码、保持心跳等参数。
  • Broker 返回 CONNACK 确认连接是否成功。
  • 这一步是协议级别的“握手”。

4. Subscribe(订阅)

  • 连接建立并协商成功后,客户端发送 SUBSCRIBE 报文请求订阅某些主题。
  • Broker 返回 SUBACK 确认。
  • 之后,Broker 会将对应主题的消息推送给客户端。

举个例子

connect()
  .then(() => negotiate())
  .then(() => subscribe())
  .catch(error => handleError(error))
  • 这里每个操作完成后,触发下一个操作;
  • 任何一步失败都会跳到错误处理。

总结

  • 链式完成处理器帮助处理异步、多步骤过程,避免回调地狱;
  • MQTT 连接流程依次是建立网络连接 → 协议握手(协商) → 订阅主题;
  • 理解这几个步骤,是理解 MQTT 客户端运行原理的关键。

一个层次化有限状态机(Hierarchical Finite State Machine, HFSM),用于描述 MQTT 客户端与 Broker 的连接状态管理流程。以下是对其理解的分解:

左侧状态机的总体流程

  • NotConnected(未连接状态)
    • 客户端处于未连接状态,准备尝试连接到 MQTT Broker。
  • ConnectBroker(尝试连接状态)
    • 进入连接 Broker 的子状态机(右侧部分)。
  • Connected(已连接状态)
    • 成功建立连接后,客户端进入此状态,可以正常通信。
  • ShuttingDown(关闭状态)
    • 客户端进入关闭流程,可能是主动断开连接或系统关闭。

右侧子状态机(ConnectBroker 子流程)

  • NegotiateBroker(协商 Broker)
    • 客户端与 Broker 协商连接参数,例如协议版本、心跳间隔等。
  • WaitConnectionAck(等待连接确认)
    • 客户端等待来自 Broker 的连接确认消息(CONNACK)。
  • TestRetryCount(测试重试次数)
    • 如果连接失败,检查是否超过最大重试次数。
    • 若未超过,则回到协商 Broker 的状态;否则,触发错误并终止连接尝试。
  • Error(错误处理)
    • 当连接失败或重试次数超出限制时,进入错误状态。

状态机的特点

  1. 层次化结构
    • 主状态机(左侧)包含一个子状态机(右侧),用于处理更复杂的连接逻辑。
  2. 重试机制
    • 在连接失败的情况下,通过 TestRetryCount 检查是否需要重试。
  3. 状态转换条件
    • 状态转换基于事件,例如连接成功、失败、主动断开等。
  4. 灵活性
    • 这种设计使得 MQTT 客户端能够优雅处理连接管理,支持错误恢复和重试。

总结

该图清晰地描述了 MQTT 客户端在不同状态下的行为逻辑,尤其是连接管理过程的细节。通过这种分层状态机设计,客户端可以高效处理连接生命周期中的各种情况,包括初始连接、重试、断开和关闭等。

理解了!你这段代码是用来往 OLED 设备写初始化指令:

原始写法(C 风格)

static int oled_write(uint8_t *bytes, int count) {
    int tx = i2c_master_tx(OLED_I2C_ADDRESS, bytes, count);
    if (tx < 1) return -1;
    return tx;
}
oled_write((uint8_t []){ 0x00, 0xAF }, 2);
oled_write((uint8_t []){ 0x00, 0x8D, 0x14 }, 3);
// ...多次调用
  • 匿名数组强制转换传参,稍显繁琐。
  • 只能传裸指针和长度。
  • 函数参数是非 const 指针,实际不应该修改数据,违反了 const 语义。

改进版(C++ 风格)

int oled_write(uint8_t const * bytes, int count) {
    int tx = i2c_master_tx(OLED_I2C_ADDRESS, const_cast<uint8_t*>(bytes), count);
    if (tx < 1) return -1;
    return tx;
}
// 新增一个重载,接受 initializer_list,调用上面的函数
int oled_write(std::initializer_list<uint8_t> bytes) {
    return oled_write(bytes.begin(), bytes.size());
}
// 调用变得更简洁,使用大括号列表传入
oled_write({ 0x00, 0xAF });
oled_write({ 0x00, 0x8D, 0x14 });
// ...多次调用

关键改进点

项目 说明
const 修饰指针参数 表明函数不会修改传入的数据,更安全,表达语义清晰。
std::initializer_list 允许直接用 {} 列表传入数据,调用简洁易读。
重载函数设计 让调用者用更自然的语法传参,同时内部仍用底层写函数。
const_cast 兼容底层 C 函数接口,实际数据不被修改。

总结

  • 这段改进体现了用现代 C++ 特性 简化代码调用和增强类型安全 的思路;
  • 同时保持和底层硬件接口的兼容性(i2c_master_tx 要求非 const 指针);
  • 让初始化 OLED 指令写入时代码更简洁且易维护。

关于 MQTT 协议里 客户端到服务器(Client to Server)消息类型的结构体定义,以及在 C++ 里对字符串和容器类型的封装和定制:

1. MQTT 消息类型结构体(Client to Server)

  • connect:客户端连接请求,携带客户端 ID。
    struct connect {
        mqtt::string client_id;
    };
    
  • subscribe:客户端订阅主题,包含包ID和主题过滤器列表,每个过滤器是一个 (topic, qos) 的元组。
    struct subscribe {
        uint16_t packet_id;
        using topic_filter_t = std::tuple<mqtt::string, qos_t>;
        using filters_t = mqtt::vector<topic_filter_t>;
        filters_t filters;
    };
    
  • unsubscribe:取消订阅,包含包ID和主题列表。
    struct unsubscribe {
        uint16_t packet_id;
        mqtt::vector<mqtt::string> filters;
    };
    
  • pingreq:心跳请求,结构体为空,表示发送心跳包。
    struct pingreq {};
    
  • disconnect:断开连接请求,带一个布尔变量表示是否强制断开。
    struct disconnect {
        bool force;
    };
    

2. 命名空间 mqtt 对类型的封装

  • 字符串类型mqtt::string 是对 std::string 的别名,方便以后替换或者定制。
  • 向量容器mqtt::vector 是对 std::vector 的模板别名。

3. 定制的内存分配器版本(更高级)

在第二段代码中,mqtt::stringmqtt::vector 是用自定义分配器 my_allocator 的封装版本:

namespace mqtt {
    template <typename Char, typename Traits = std::char_traits<Char>>
    using basic_string = std::basic_string<Char, Traits, my_allocator<Char>>;
    using string = basic_string<char>;
    template <typename T>
    using vector = std::vector<T, my_allocator<T>>;
}
  • 这样设计的好处是可以替换内存管理方式,比如用于嵌入式系统或者特定的性能优化。

总结

  • 你定义了一些 MQTT 客户端发给服务器的消息结构体。
  • 你用类型别名对字符串和向量做了封装,方便后期维护和定制。
  • 进一步使用自定义分配器让容器和字符串在内存管理上更灵活。

你这段内容是关于一个基于模板设计的 mqtt::client,它结合了 连接(Connection)执行器(Executor)发布处理器(PublishHandler),并且内部用了 Boost.MSM(状态机库)实现客户端状态机。下面帮你系统梳理这部分内容:

1. 模板参数解释

template <typename Connection, typename Executor, typename PublishHandler = mqtt_publish_handler_t>
class client
  • Connection:表示网络连接的类型(TCP、WebSocket 等)。
  • Executor:负责任务调度,比如线程池或事件循环。
  • PublishHandler:处理收到的发布消息的回调类型,默认是 mqtt_publish_handler_t

2. 成员变量

detail::client_interface_wrapper<Connection, Executor> client_interface_;
PublishHandler publish_handler_;
mqtt_string_t mqtt_identifier_;
detail::client_machine client_machine_;
  • client_interface_:封装了连接和执行器的接口适配层。
  • publish_handler_:发布消息处理函数对象。
  • mqtt_identifier_:客户端ID。
  • client_machine_:基于 Boost.MSM 的状态机,管理客户端状态。

3. 构造函数

client(mqtt_string_t identifier, Connection & connection, Executor & task_executor)
: client_interface_(connection, task_executor),
  mqtt_identifier_(identifier)
{
    initialize_submachines(client_machine_, &client_interface_);
    client_machine_.start();
}
  • 初始化接口适配层和客户端状态机。
  • 启动状态机,准备进入工作状态。

4. 公开接口(方法)

  • connect(configuration_t):连接到 MQTT Broker,传入连接配置。
  • disconnect(bool force=false):断开连接,可选是否强制断开。
  • subscribe(...) / unsubscribe(...):支持单个或多个主题的订阅与取消,带回调处理。
  • set_publish_handler(F&& handler):设置接收消息的回调处理器。
  • publish(...):支持多种重载,发送消息到指定主题,支持 QoS 和保留标志。
    这些方法都是对 MQTT 协议功能的封装,方便用户调用。

5. Boost.MSM 状态机

struct client_machine_ : public machine_base<client_machine_> { };
using client_machine = boost::msm::back::state_machine<client_machine_>;
  • Boost.MSM 是 Boost 提供的 C++ 状态机库。
  • client_machine_ 是状态机的定义。
  • client_machine 是具体的状态机实例类型,用来管理 MQTT 客户端的连接、订阅、重连等状态和事件。

总结

  • mqtt::client 是一个高度模板化、灵活的 MQTT 客户端类。
  • 依赖传入连接和执行器,支持异步和多种网络层。
  • 通过状态机管理复杂的客户端生命周期和协议交互。
  • 提供丰富的 API 方便用户实现连接、订阅、发布等功能。
  • 支持用户自定义发布消息处理函数,提高扩展性。

基于 Boost.MSM(Meta State Machine) 库实现的 MQTT 客户端状态机示例代码,我帮你系统地梳理一下这部分内容和结构:

1. 状态机的状态(States)

  • NotConnected
    • 表示客户端未连接。
    • on_entry 时调用 fsm.client_->update_connection_status(DISCONNECTED);,更新状态。
  • Connected
    • 表示客户端已连接。
    • on_entry 调用 update_connection_status(CONNECTED);
    • on_exit 调用 update_connection_status(DISCONNECTING);
  • ShuttingDown
    • 表示正在关闭。
    • 使用 deferred_events 机制,暂时延迟处理 connect 事件,等状态机进入下一个状态再处理。

2. 状态机的转换表(transition_table)

  • 表示不同事件触发后,状态如何转移及执行什么动作,格式如下:
    | 当前状态 | 事件 | 目标状态 | 动作 | 保护条件(Guard) |
    | ------------- | ------------------------ | ------------- | ------------ | ----------- |
    | NotConnected | event::connect | ConnectBroker | none | none |
    | ConnectBroker | event::connect | Connected | none | none |
    | Connected | event::publish_out | Connected | send_packet | none |
    | Connected | event::subscribe | Connected | send_packet | none |
    | Connected | event::unsubscribe | Connected | send_packet | none |
    | Connected | event::connect | Connected | none | none |
    | Connected | event::disconnect | ShuttingDown | none | none |
    | ShuttingDown | event::shutdown_timeout | NotConnected | none | none |

send_packet 是一个动作结构体,负责发送对应的 MQTT 报文。

3. 动作(Actions)

struct send_packet
{
    template<class Fsm, class SourceState, class TargetState>
    void operator()( event::publish_out const& evt, Fsm& fsm, SourceState&, TargetState& )
    {
        fsm.client_->send(evt.publish);
    }
    template<class Fsm, class SourceState, class TargetState>
    void operator()( event::subscribe const& evt, Fsm& fsm, SourceState&, TargetState& )
    {
        fsm.client_->send(evt.subscribe);
    }
    template<class Fsm, class SourceState, class TargetState>
    void operator()( event::unsubscribe const& evt, Fsm& fsm, SourceState&, TargetState& )
    {
        fsm.client_->send(evt.unsubscribe);
    }
};
  • 这是执行动作的调用符重载,用来处理不同事件时调用客户端的 send 方法。

4. 状态机定义

struct client_machine_ : public machine_base<client_machine_>
{
    using submachines = Submachines<ConnectBroker>;
    using initial_state = NotConnected;
};
using client_machine = boost::msm::back::state_machine<client_machine_>;
  • client_machine_ 是状态机的主体类,继承自 machine_base
  • 它包含了子状态机 ConnectBroker(这里没贴详细内容,但它是管理连接的子状态机)。
  • 初始状态是 NotConnected

5. 事件(Events)

  • event::connect:连接请求。
  • event::publish_out:发布消息。
  • event::subscribe / event::unsubscribe:订阅和取消订阅。
  • event::disconnect:断开连接请求。
  • event::shutdown_timeout:关闭超时事件。

6. 机制补充

  • 状态切换时执行的钩子on_entryon_exit 用来做状态变化的逻辑处理,比如通知客户端当前状态。
  • 延迟事件ShuttingDown 状态用 deferred_eventsconnect 事件延迟到其他状态处理,避免状态机混乱。

总结

你这是一个典型的 Boost.MSM 状态机写法:

  • 用结构体定义状态;
  • transition_table 定义状态、事件、目标状态、动作和条件;
  • 事件驱动状态流转,动作触发客户端方法;
  • 结构清晰、职责分明,易维护和扩展。

你这段代码是定义了一些 Boost.MSM 状态机辅助模板和基类,帮助构建 MQTT 客户端的状态机。详细解析如下:

1. TransitionTable

template <typename ...T>
using TransitionTable = boost::mpl::vector<T...>;
  • 用 Boost.MPL(Meta Programming Library)的 vector 来定义状态机的转换表。
  • 模板参数包 T... 表示多个转换规则。
  • 这是对 Boost.MSM 要求的转换表类型的封装,方便写法和维护。

2. Submachines

template <typename ...T>
using Submachines = meta::meta_list<T...>;
  • 定义子状态机列表,使用一个元编程列表 meta::meta_list(你代码里的 meta 应该是自定义的元编程工具)。
  • 支持状态机中嵌套子状态机(hierarchical state machines)。
  • 参数包 T... 是多个子状态机类型。

3. machine_base

template <typename Derived>
struct machine_base : public msm::front::state_machine_def<Derived>
{
    mqtt::detail::client_interface * client_ = nullptr;
};
  • 自定义了状态机基类模板,继承自 Boost.MSM 的 state_machine_def,这是定义状态机结构的入口类。
  • 这个基类里增加了一个指针 client_,指向 MQTT 客户端接口,方便状态机内部调用客户端功能。
  • Derived 是 CRTP(Curiously Recurring Template Pattern)用法,编译期把派生类传给基类,实现静态多态。

4. 结合示意

  • 你会用 TransitionTable 定义状态机转换规则。
  • Submachines 定义状态机包含的子状态机。
  • machine_base<Derived> 作为状态机定义的基类,内置客户端接口指针。
  • 这样设计让 MQTT 客户端状态机结构清晰,且可以方便访问和操作底层客户端逻辑。

你给出的代码展示了类型擦除(Type Erasure)接口设计在 MQTT 客户端里的应用,结合了面向接口编程和模板封装,具体分析如下:

1. 抽象接口 client_interface

struct client_interface
{
    virtual void send_to_broker(uint8_t const * data, uint16_t length) = 0;
    virtual void receive_from_broker(/*...*/) = 0;
    virtual void queue_task(/*...*/) = 0;
    virtual void update_connection_status(/*...*/) = 0;
    template <typename Packet>
    void send(Packet const & packet)
    {
        auto buffer = serialize(packet);
        send_to_broker(pointer(buffer), size(buffer));
    }
};
  • 纯虚函数定义了客户端必须实现的操作接口,和具体的实现细节解耦:
    • send_to_broker:向 Broker 发送原始字节数据。
    • receive_from_broker:从 Broker 接收数据(参数省略,具体实现自定)。
    • queue_task:任务排队,用于异步调度。
    • update_connection_status:更新连接状态(连接、断开等)。
  • 模板函数 send(Packet const &)
    • 用于发送一个高级封装的 MQTT 报文。
    • 调用 serialize(packet) 把报文序列化成字节流。
    • 再调用虚函数 send_to_broker 发送实际数据。
    • 这里封装了具体报文序列化与发送的分离,方便扩展和维护。

2. 类型擦除包装器 client_interface_wrapper

template <typename Connection, typename Executor>
struct client_interface_wrapper : client_interface
{
    client_interface_wrapper(Connection & broker, Executor & executor)
    : broker_(broker)
    , executor_(executor)
    {}
    virtual void send_to_broker(uint8_t const * data, uint16_t length) override
    {
        broker_.send(data, length);
    }
private:
    Connection & broker_;
    Executor & executor_;
};
  • 模板结构,泛化了底层具体的连接类型 Connection 和任务执行器类型 Executor
  • 通过组合持有 ConnectionExecutor 的引用。
  • 实现了基类的虚函数 send_to_broker
    • 调用底层 broker_send 方法,完成数据发送。
  • 其他虚函数(receive_from_broker, queue_task, update_connection_status)应该类似地调用对应具体实现(代码未贴出)。
  • 这样通过类型擦除(type erasure),上层状态机和逻辑只依赖抽象接口,不关心具体连接和执行器类型,实现了解耦和灵活性。

3. 设计目的和好处

  • 类型擦除(Type Erasure) 让你用统一接口操作不同类型的连接和执行器。
  • 抽象接口隐藏实现细节,上层代码只通过 client_interface 指针/引用调用。
  • 模板包装器把具体类型适配到接口,方便复用和扩展。
  • 易于测试:可以替换不同实现来模拟网络或执行器行为。
  • 结构清晰,面向接口编程典范。

代码整理与分析

1. mqtt::client 构造函数

template <typename Connection, typename Executor, typename PublishHandler>
client<Connection,Executor,PublishHandler>::client(
    mqtt_string_t identifier,
    Connection & connection,
    Executor & task_executor)
: client_interface_(connection, task_executor),
  mqtt_identifier_(identifier)
{
    initialize_submachines(client_machine_, &client_interface_);
    client_machine_.start();
}
  • 构造时调用 initialize_submachines,初始化顶层状态机及所有子状态机。
  • 传入顶层状态机实例和接口指针 client_interface_
  • 启动状态机 client_machine_.start()

2. initialize_submachines 函数模板

template <typename Machine, typename Interface>
void initialize_submachines(Machine & machine, Interface * interface)
{
    machine.client_ = interface;  // 顶层状态机获得接口指针
    initialize_submachine_impl<Machine, Interface> impl(machine, interface);
    decend<Machine>::apply(&impl);  // 递归初始化所有子状态机
}
  • 给当前状态机成员变量 client_ 赋值接口指针。
  • 创建初始化实现对象 impl
  • 调用 decend<Machine>::apply 递归遍历子状态机。

3. initialize_submachine_impl 模板结构

template <typename MSMBackEnd, typename Interface>
struct initialize_submachine_impl
{
    initialize_submachine_impl(MSMBackEnd & machine, Interface * interface)
        : machine_(machine), interface_(interface) {}
    template <typename SubMach>
    void operator()(SubMach const &)
    {
        SubMach* submachine = machine_.template get_state<SubMach*>();
        submachine->client_ = interface_;  // 给子状态机赋接口指针
        initialize_submachine_impl<SubMach, Interface> impl(*submachine, interface_);
        decend<SubMach>::apply(&impl);  // 递归初始化子状态机的子状态机
    }
    MSMBackEnd & machine_;
    Interface * interface_;
};
  • 接收状态机和接口指针。
  • 针对每个子状态机类型 SubMach
    • 取出子状态机实例。
    • 给子状态机赋接口指针。
    • 递归调用自己,继续初始化子状态机的子状态机。

4. decend 模板(递归遍历子状态机)

template <typename Machine, typename Enable = void>
struct decend
{
    template <typename T>
    static void apply(T*) {}
};
template <typename Machine>
struct decend<Machine,
    std::enable_if_t<meta::size_v<typename Machine::submachines> != 0>>
{
    template <typename T>
    static void apply(T* t)
    {
        meta::for_each<typename Machine::submachines>(*t);
    }
};
  • 基本模板(没有子状态机):apply 空实现。
  • 特化版本(有子状态机):
    • 使用 meta::for_each 遍历 Machine::submachines 类型列表。
    • 对每个子状态机类型调用传入的函数对象。

5. meta_list 及辅助模板元编程工具

template <typename ... T>
struct meta_list {};
template <typename List>
struct size;
template <typename ... T>
struct size<meta_list<T...>>
{
    static const std::size_t value = sizeof...(T);
};
template <class T>
constexpr std::size_t size_v = size<T>::value;
  • meta_list<T...>:存储类型参数包,表示类型列表。
  • size<meta_list<T...>>:计算类型列表中元素数量。
  • size_v<T>:辅助变量模板,简写方便。

6. meta::for_each 模板函数

namespace detail
{
    struct do_nothing
    {
        template <typename ... Args>
        do_nothing(Args && ...) {}
    };
    template <typename ...T, typename Func>
    Func for_each(meta_list<T...> &&, Func && f)
    {
        do_nothing((f(T{}), 0)...);
        return f;
    }
}
template <typename List, typename Func>
Func for_each(Func && f)
{
    return detail::for_each(List{}, std::forward<Func>(f));
}
  • 利用参数包展开 (f(T{}),0)... 依次调用 f,完成对 meta_list 类型的遍历。
  • do_nothing 用于避免参数包展开时的返回值警告。
  • 这是一个零开销的编译期类型列表遍历实现。

总结

  • 这套代码是一个递归遍历状态机层级树的模板元编程实现。
  • 每个状态机维护一个 submachines 类型列表,记录其子状态机类型。
  • 递归调用 initialize_submachines,自动为所有状态机层级赋予公共接口指针 client_
  • meta_list + for_each 实现了静态类型列表遍历。
  • decend 用于区分有无子状态机的递归终止条件。
  • 典型的 C++ TMP + Boost.MSM 状态机结合应用,设计简洁且灵活。

涉及MQTT协议数据类型定义、Boost.MSM状态机设计、Boost.Spirit解析器、以及模板元编程和序列化代码。下面我帮你梳理重点代码片段并分析其设计思路和复杂度,特别针对你关心的 initialize_submachines 的复杂性。

1. MQTT Client 状态机设计示意

struct client_machine_ : public machine_base<client_machine_>
{
    using submachines = Submachines<ConnectBroker>;
    using initial_state = NotConnected;
};
using client_machine = boost::msm::back::state_machine<client_machine_>;
  • client_machine_ 是顶层状态机定义,继承基础状态机模板。
  • 通过 submachines 记录其子状态机类型列表。
  • 初始状态定义为 NotConnected
  • client_machine 是其 Boost.MSM 后端状态机实例。

2. MQTT 数据结构示例

enum class qos_t : uint8_t { QOS0 = 0x00, QOS1 = 0x01, QOS2 = 0x02, FAILURE = 0x80 };
struct publish
{
    bool dup = false;
    uint8_t qos = 0x00;
    bool retain = false;
    mqtt::string topic_name;
    uint16_t packet_id;
    mqtt::vector<uint8_t> payload;
};
struct puback { uint16_t packet_id; };
// 更多控制消息结构体:connack, suback, unsuback, pingresp 等
  • 这些结构体映射MQTT协议中的控制包。
  • 使用强类型和C++ STL容器方便数据封装。

3. Boost.Fusion 适配结构体(方便序列化/解析)

BOOST_FUSION_ADAPT_STRUCT(
    cierelabs::mqtt::packet::puback,
    (uint16_t, packet_id)
)
BOOST_FUSION_ADAPT_STRUCT(
    cierelabs::mqtt::packet::connack,
    (bool, session_pressent)
    (uint8_t, response_code)
)
// 其他类似适配
  • 通过 BOOST_FUSION_ADAPT_STRUCT 把普通结构体适配成Fusion序列,方便Boost.Spirit等库进行解析和序列化。

4. 解析流程示例

bool keepPacketizing = true;
while(keepPacketizing)
{
    switch(parsePhase_)
    {
        case ParsePhase::HEADER_BYTE:
            keepPacketizing = readHeader();
            break;
        case ParsePhase::LENGTH:
            keepPacketizing = readLength();
            break;
        case ParsePhase::BODY:
            keepPacketizing = readBody();
            break;
    }
}
bool MQTTPacketizer::readHeader()
{
    if(!data_.empty())
    {
        currentHeader_ = data_.front();
        data_.pop_front();
        parsePhase_ = ParsePhase::LENGTH;
        return true;
    }
    return false;
}
  • 用分阶段状态机解析MQTT报文(头字节、长度、主体)。
  • 解析阶段用状态机风格编写,逻辑清晰。

5. Boost.Spirit 解析器定义示例

auto const control_packet_def =
    omit[byte_(0x40)] >> puback
  | omit[byte_(0x10)] >> publish
  | omit[byte_(0x20)] >> connack
  | omit[byte_(0x90)] >> suback
  | omit[byte_(0xb0)] >> unsuback
  | omit[byte_(0xc0)] >> attr(packet::pingresp{});
auto const qos_def =
    omit[byte_(0x00)] >> attr(packet::qos_t::QOS0)
  | omit[byte_(0x01)] >> attr(packet::qos_t::QOS1)
  | omit[byte_(0x02)] >> attr(packet::qos_t::QOS2)
  | omit[byte_(0x80)] >> attr(packet::qos_t::FAILURE);
  • 利用Boost.Spirit表达式组合定义了MQTT报文的语法。
  • 轻量级且灵活,编译时生成高效解析代码。

6. 序列化示例

template <typename T>
auto serialize(T && packet)
{
    auto variable_length = get_variable_length(packet);
    uint16_t packet_size = variable_length + 1 + (int)(variable_length/128) + 1;
    auto packet_data = get_packet_buffer(packet_size);
    serialize_impl(packet, std::back_inserter(packet_data), variable_length);
    return packet_data;
}
template <typename Iterator>
Iterator serialize_impl(packet::subscribe const & subscribe, Iterator iter, int variable_length)
{
    *iter++ = 0x82; // control header
    encode_length(variable_length, iter);
    encode(subscribe.packet_id, iter);
    for(auto & topic : subscribe.filters)
    {
        encode(std::get<0>(topic), iter);
        *iter++ = static_cast<uint8_t>(std::get<1>(topic));
    }
    return iter;
}
  • 通过模板和迭代器泛化序列化逻辑。
  • 精准控制二进制数据写入。

7. 重点:initialize_submachines 复杂性分析

  • initialize_submachines 递归初始化所有状态机子状态机,并设置接口指针。
  • 使用模板元编程通过 meta_list 记录子状态机类型,配合 meta::for_each 遍历。
  • 每个状态机的子状态机数量决定递归深度和调用次数。
    为何复杂?
  • 递归模板实例化:编译器需展开每层子状态机类型,实例化大量模板类和函数。
  • 模板元编程库设计meta_list, for_each 等自定义模板库增加理解门槛。
  • 多层状态机树结构:每层状态机拥有多个子状态机,递归深度和分支多,静态展开代码规模增大。
  • 静态多态设计:为了零运行时开销,尽量将关系在编译期确定,导致模板复杂度提升。
    优点:
  • 运行时无额外开销,接口指针绑定完全静态化。
  • 编译期错误检查能力强,类型安全。
  • 灵活支持任意层级嵌套的状态机结构。

总结

  • 这套设计在嵌入式MQTT客户端中提供了灵活且高效的状态机层级管理。
  • 复杂度主要来源于静态递归模板展开,兼顾了性能和灵活性。
  • 代码同时使用了Boost.MSM、Boost.Fusion、Boost.Spirit等多个Boost库,构成强大的协议解析和状态机系统。
  • 模板元编程部分可以看作编译期反射的实现,便于对复杂层级的状态机自动初始化和遍历。

网站公告

今日签到

点亮在社区的每一天
去签到