Day37 MQTT协议 多客户端服务器模型

发布于:2025-09-07 ⋅ 阅读:(14) ⋅ 点赞:(0)

day37 MQTT协议 多客户端服务器模型

服务器/多客户端模型

在服务器开发中,处理多个客户端连接是常见需求。根据不同的应用场景和资源限制,有多种实现方式:

多客户端处理模型

  1. 多路IO多客户端模型:使用selectepoll等IO多路复用技术,单线程处理多个客户端连接
  2. 并发多客户端模型:使用fork创建子进程或pthread创建子线程处理每个客户端
  3. 循环服务器模型:简单的while(1)循环中依次处理accept()recv()send(),但只能处理单个客户端

线程模型实现 (tcp_thread)

服务器端代码 (ser.c)
#include <netinet/in.h>   // 提供Internet地址族相关定义,如sockaddr_in结构体
#include <netinet/ip.h>   // 提供IP协议相关定义
#include <pthread.h>      // 提供线程操作相关函数和数据类型
#include <stdio.h>        // 提供输入输出函数
#include <stdlib.h>       // 提供标准库函数,如内存分配、程序退出等
#include <string.h>       // 提供字符串操作函数
#include <sys/socket.h>   // 提供套接字相关函数和数据类型
#include <sys/types.h>    // 提供基本系统数据类型
#include <time.h>         // 提供时间相关函数
#include <unistd.h>       // 提供Unix标准函数,如close、read、write等
#include <semaphore.h>    // 提供信号量相关函数和数据类型

// 定义一个信号量,用于同步主线程和子线程对连接套接字的处理
sem_t sem_conn;

// 线程处理函数:用于处理客户端连接的子线程
void *th(void *arg)
{
    // 将传递过来的连接套接字描述符转换为int类型
    int conn = *(int *)arg;
    
    // 发送信号量,通知主线程可以继续接受新的连接
    sem_post(&sem_conn);
    
    // 分离当前线程,使其在结束时自动释放资源,无需主线程调用pthread_join
    pthread_detach(pthread_self());
    
    time_t tm;  // 用于存储时间的变量
    
    // 循环处理客户端请求
    while (1)
    {
        char buf[1024] = {0};  // 缓冲区,用于接收和发送数据
        // 从客户端接收数据,存入buf缓冲区
        int ret = recv(conn, buf, sizeof(buf), 0);
        
        // 如果接收失败或客户端关闭连接(ret <= 0)
        if (ret <= 0)
        {
            close(conn);  // 关闭连接套接字
            break;        // 退出循环,结束线程
        }
        
        // 获取当前系统时间
        time(&tm);
        // 将客户端发送的内容与当前时间拼接在一起
        sprintf(buf, "%s %s", buf, ctime(&tm));
        // 将拼接后的内容发送回客户端
        send(conn, buf, strlen(buf), 0);
    }
    return NULL;
}

// 定义一个类型别名SA,代表struct sockaddr*,简化代码书写
typedef struct sockaddr *(SA);

int main(int argc, char **argv)
{
    // 创建监听套接字
    // AF_INET:使用IPv4地址族
    // SOCK_STREAM:使用面向连接的TCP协议
    // 0:自动选择合适的协议(此处为TCP)
    int listfd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == listfd)  // 如果套接字创建失败
    {
        perror("scoket error\n");  // 打印错误信息
        return 1;                  // 异常退出程序
    }
    
    // 定义服务器和客户端的地址结构体
    struct sockaddr_in ser, cli;
    // 初始化地址结构体为0
    bzero(&ser, sizeof(ser));
    bzero(&cli, sizeof(cli));

    // 设置服务器地址信息
    ser.sin_family = AF_INET;                // 使用IPv4地址族
    ser.sin_port = htons(50000);             // 设置端口号为50000,htons用于主机字节序转网络字节序
    ser.sin_addr.s_addr = INADDR_ANY;        // 绑定到所有可用的网络接口(所有IP地址)

    // 将监听套接字绑定到指定的地址和端口
    int ret = bind(listfd, (SA)&ser, sizeof(ser));
    if (-1 == ret)  // 如果绑定失败
    {
        perror("bind");  // 打印错误信息
        return 1;        // 异常退出程序
    }
    
    // 开始监听套接字,等待客户端连接
    // 第二个参数3表示等待连接队列的最大长度(三次握手未完成的连接)
    listen(listfd, 3);
    
    socklen_t len = sizeof(cli);  // 用于存储客户端地址结构体的长度
    
    // 初始化信号量,第二个参数0表示线程间共享,第三个参数0表示初始值
    sem_init(&sem_conn, 0, 0);
    
    // 循环接受客户端连接
    while (1)
    {
        // 接受客户端连接,返回一个新的连接套接字用于与该客户端通信
        // listfd:监听套接字
        // (SA)&cli:用于存储客户端地址信息
        // &len:用于存储客户端地址结构体的长度
        int conn = accept(listfd, (SA)&cli, &len);
        if (-1 == conn)  // 如果接受连接失败
        {
            perror("accept");  // 打印错误信息
            close(conn);       // 关闭连接套接字
            continue;          // 继续接受下一个连接
        }
        
        // 创建子线程,用于处理当前客户端的请求
        pthread_t tid;
        // 第一个参数:线程ID
        // 第二个参数:线程属性,NULL表示使用默认属性
        // 第三个参数:线程处理函数
        // 第四个参数:传递给线程处理函数的参数(连接套接字描述符)
        pthread_create(&tid, NULL, th, &conn);
        
        // 等待信号量,确保子线程已经获取了连接套接字描述符
        sem_wait(&sem_conn);
    }
    
    // 关闭监听套接字(实际中由于上面是无限循环,这里的代码不会执行)
    close(listfd);
    // 销毁信号量
    sem_destroy(&sem_conn);
    
    return 0;
}
客户端代码 (cli.c)
#include <netinet/in.h>   // 提供Internet地址族相关定义
#include <netinet/ip.h>   // 提供IP协议相关定义
#include <stdio.h>        // 提供输入输出函数
#include <stdlib.h>       // 提供标准库函数
#include <string.h>       // 提供字符串操作函数
#include <sys/socket.h>   // 提供套接字相关函数
#include <sys/types.h>    // 提供基本系统数据类型
#include <time.h>         // 提供时间相关函数
#include <unistd.h>       // 提供Unix标准函数(如sleep)

// 定义结构体指针别名,简化代码书写
typedef struct sockaddr *(SA);

int main(int argc, char **argv)
{
    // 创建TCP套接字
    // AF_INET: 使用IPv4地址族
    // SOCK_STREAM: 使用面向连接的TCP协议
    // 0: 自动选择合适的协议(此处为TCP)
    int conn = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == conn)  // 检查套接字创建是否失败
    {
        perror("socket");  // 输出错误信息
        return 1;          // 异常退出
    }
    
    // 定义服务器地址结构体
    struct sockaddr_in ser;
    // 初始化地址结构体为0
    bzero(&ser, sizeof(ser));

    // 设置服务器地址信息
    ser.sin_family = AF_INET;                // 使用IPv4地址族
    ser.sin_port = htons(50000);             // 服务器端口号(50000),转换为网络字节序
    ser.sin_addr.s_addr = INADDR_ANY;        // 连接到本地所有可用接口(实际应指定服务器IP)

    // 连接到服务器
    int ret = connect(conn, (SA)&ser, sizeof(ser));
    if (-1 == ret)  // 检查连接是否失败
    {
        perror("connect error\n");  // 输出错误信息
        return 1;                   // 异常退出
    }
    
    // 循环发送10次数据
    int i = 10;
    while (i)
    {
        char buf[1024] = "hello,this is tcp test";  // 要发送的消息
        // 发送数据到服务器
        send(conn, buf, strlen(buf), 0);
        
        // 清空缓冲区,准备接收数据
        bzero(buf, sizeof(buf));
        
        // 接收服务器返回的数据
        recv(conn, buf, sizeof(buf), 0);
        
        // 打印服务器返回的内容
        printf("from ser:%s\n", buf);
        
        // 休眠1秒
        sleep(1);
        
        i--;  // 减少循环计数
    }
    
    // 关闭连接套接字
    close(conn);

    return 0;
}

理想运行结果

  • 服务器启动后监听50000端口
  • 客户端连接成功后,每秒发送一条"hello,this is tcp test"消息
  • 服务器接收消息后添加当前时间戳返回给客户端
  • 客户端输出类似:from ser:hello,this is tcp test Thu Jun 20 14:30:45 2023

进程模型实现 (tcp_fork)

#include <netinet/in.h>   // 提供Internet地址族相关定义
#include <netinet/ip.h>   // 提供IP协议相关定义
#include <signal.h>       // 提供信号处理相关函数
#include <stdio.h>        // 提供输入输出函数
#include <stdlib.h>       // 提供标准库函数
#include <string.h>       // 提供字符串操作函数
#include <sys/socket.h>   // 提供套接字相关函数
#include <sys/types.h>    // 提供基本系统数据类型
#include <sys/wait.h>     // 提供进程等待相关函数
#include <time.h>         // 提供时间相关函数
#include <unistd.h>       // 提供Unix标准函数

// 定义结构体指针别名,简化代码书写
typedef struct sockaddr *(SA);

// 信号处理函数:用于处理子进程退出信号,回收僵尸进程
void myhandle(int num)
{
    // 等待子进程结束,回收其资源,防止僵尸进程
    wait(NULL);
}

int main(int argc, char **argv)
{
    // 注册SIGCHLD信号的处理函数为myhandle
    // 当子进程退出时会产生SIGCHLD信号,触发该函数回收资源
    signal(SIGCHLD, myhandle);
    
    // 创建监听套接字
    // AF_INET: 使用IPv4地址族
    // SOCK_STREAM: 使用面向连接的TCP协议
    // 0: 自动选择合适的协议(此处为TCP)
    int listfd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == listfd)  // 检查套接字创建是否失败
    {
        perror("scoket error\n");  // 输出错误信息
        return 1;                  // 异常退出
    }
    
    // 定义服务器和客户端的地址结构体
    struct sockaddr_in ser, cli;
    // 初始化地址结构体为0
    bzero(&ser, sizeof(ser));
    bzero(&cli, sizeof(cli));

    // 设置服务器地址信息
    ser.sin_family = AF_INET;                // 使用IPv4地址族
    ser.sin_port = htons(50000);             // 服务器端口号(50000),转换为网络字节序
    ser.sin_addr.s_addr = INADDR_ANY;        // 绑定到所有可用的网络接口(所有IP地址)
    
    // 将监听套接字绑定到指定的地址和端口
    int ret = bind(listfd, (SA)&ser, sizeof(ser));
    if (-1 == ret)  // 检查绑定是否失败
    {
        perror("bind");  // 输出错误信息
        return 1;        // 异常退出
    }
    
    // 开始监听套接字,等待客户端连接
    // 第二个参数3表示等待连接队列的最大长度(三次握手未完成的连接)
    listen(listfd, 3);
    socklen_t len = sizeof(cli);  // 用于存储客户端地址结构体的长度
    
    time_t tm;  // 用于存储时间的变量
    
    // 循环接受客户端连接
    while (1)
    {
        // 接受客户端连接,返回一个新的连接套接字用于与该客户端通信
        // listfd:监听套接字
        // (SA)&cli:用于存储客户端地址信息
        // &len:用于存储客户端地址结构体的长度
        int conn = accept(listfd, (SA)&cli, &len);
        if (-1 == conn)  // 检查接受连接是否失败
        {
            perror("accept");  // 输出错误信息
            close(conn);       // 关闭连接套接字
            continue;          // 继续接受下一个连接
        }
        
        // 创建子进程,用于处理当前客户端的请求
        pid_t pid = fork();
        if (pid > 0)  // 父进程分支
        {
            // 父进程不需要连接套接字,关闭它
            close(conn);
            // wait();  // 注释掉的等待方式,改用信号处理
        }
        else if (0 == pid)  // 子进程分支
        {
            // 子进程不需要监听套接字,关闭它
            close(listfd);
            
            // 循环处理客户端请求
            while (1)
            {
                char buf[1024] = {0};  // 缓冲区,用于接收和发送数据
                // 从客户端接收数据
                int ret = recv(conn, buf, sizeof(buf), 0);
                
                // 如果接收失败或客户端关闭连接(ret <= 0)
                if (ret <= 0)
                {
                    break;  // 退出循环
                }
                
                // 获取当前系统时间
                time(&tm);
                // 将客户端发送的内容与当前时间拼接
                sprintf(buf, "%s %s", buf, ctime(&tm));
                // 将拼接后的内容发送回客户端
                send(conn, buf, strlen(buf), 0);
            }
            
            // 处理完毕,退出子进程
            exit(1);
        }
        else  // fork失败分支
        {
            perror("fork");  // 输出错误信息
            continue;        // 继续接受下一个连接
        }
    }
    
    // 关闭监听套接字(实际中由于上面是无限循环,这里的代码不会执行)
    close(listfd);
    
    return 0;
}

理想运行结果

  • 服务器启动后监听50000端口
  • 每个客户端连接都会创建一个子进程处理
  • 信号处理函数自动回收僵尸进程
  • 客户端连接后,服务器会回复带时间戳的消息
  • 多个客户端可以同时连接并获得服务

MQTT协议详解

MQTT概述

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,构建于TCP/IP协议之上。该协议由IBM在1999年发布,具有轻量、简单、开放和易于实现的特点,在物联网、小型设备、移动应用等领域应用广泛。

核心概念

  1. 发布/订阅模式

    • 客户端分为发布者和订阅者
    • 发布者将消息发布到特定主题(Topic)
    • 订阅者通过订阅感兴趣的主题来接收消息
    • 这种模式实现了应用程序之间的解耦
    • 多个订阅者可以同时接收来自同一主题的消息
  2. 服务质量(QoS)

    • QoS 0:最多分发一次,消息可能丢失或重复,适用于对消息可靠性要求不高的场景,如环境传感器数据采集
    • QoS 1:至少分发一次,确保消息到达,但可能出现重复,常用于设备控制指令传输
    • QoS 2:仅分发一次,保证消息只到达一次,适用于对消息可靠性要求极高的场景,如金融交易数据传输
  3. 主题(Topic)与主题过滤器(Topic Filter)

    • 主题是消息的分类标识,是UTF-8编码字符串
    • 不能超过65535字节,层级数量无限制
    • 区分大小写
    • 主题过滤器用于订阅时筛选感兴趣的主题,可包含通配符
      • +:单层通配符,只能用于单个主题层级匹配
      • #:多层通配符,用于匹配主题中任意层级,但必须位于主题过滤器的最后
  4. 会话(Session)

    • 客户端与服务器建立连接后形成一个会话
    • 会话存在于网络连接期间,也可能跨越多个连续的网络连接
    • 会话用于存储客户端和服务器之间的交互状态,如客户端的订阅信息、未确认的消息等
  5. 遗嘱(Last Will and Testament)

    • 客户端可设置遗嘱消息,当客户端异常断开连接时,服务器会发布该遗嘱消息
    • 需在Connect时由客户端指定相关设置项
    • 包括Will Flag(开启或关闭遗嘱功能)、Will QoS(遗嘱消息的服务质量等级)、Will Retain(遗嘱是否保留)、Will Topic(遗嘱话题)和Will Payload(遗嘱消息内容)

协议格式

MQTT数据包由三部分构成:

1. 固定头(Fixed Header)
  • 存在于所有MQTT数据包中
  • 包含数据包类型(如CONNECT、PUBLISH等)
  • 包含标识位(如DUP、QoS、RETAIN)
  • 剩余长度(表示可变头和消息体的总大小)
2. 可变头(Variable Header)
  • 部分MQTT数据包包含可变头
  • 内容因数据包类型而异
  • 一些数据包(如PUBLISH (QoS > 0)、PUBACK等)的可变头中包含2字节的数据包标识字段
3. 消息体(Payload)
  • 部分MQTT数据包包含消息体
  • 不同类型的消息体内容不同
    • CONNECT消息体:包含客户端的ClientID、订阅的Topic、Message以及用户名和密码
    • SUBSCRIBE消息体:是一系列要订阅的主题以及QoS
    • SUBACK消息体:是服务器对SUBSCRIBE申请的主题及QoS的确认和回复
    • UNSUBSCRIBE消息体:是要取消订阅的主题

服务器与客户端工作流程

连接建立阶段
  1. 客户端发起连接

    • 客户端通过TCP/IP建立底层连接
    • 发送CONNECT报文(首个报文)
    • 报文中含协议名、协议级别、连接标志、保持连接时长等
    • 有效载荷包含客户端标识符(ClientId)、用户名/密码等
  2. 服务器确认连接

    • 服务器验证CONNECT报文合法性
    • 发送CONNACK报文响应
    • 报文中含"当前会话标志"(Session Present)和"连接返回码"
    • 连接返回码0表示连接成功,非0表示拒绝原因
消息交互阶段(发布-订阅核心流程)
  1. 客户端订阅主题

    • 客户端发送SUBSCRIBE报文
    • 有效载荷含"主题过滤器+请求QoS"
    • 服务器发送SUBACK报文确认,有效载荷含对应主题的"授权QoS"
  2. 客户端发布消息

    • 发布端客户端发送PUBLISH报文
    • 可变报头含"主题名"和"报文标识符"(仅QoS>0时需含)
    • 服务器接收后,根据主题名匹配所有订阅该主题的客户端
    • 按订阅的授权QoS,向每个匹配客户端转发PUBLISH报文
  3. 客户端取消订阅

    • 客户端发送UNSUBSCRIBE报文
    • 有效载荷含待取消的主题过滤器
    • 服务器发送UNSUBACK报文确认
连接维护与断开
  1. 心跳保活

    • 客户端需确保控制报文发送间隔不超过"保持连接时长"
    • 无报文可发时需发送PINGREQ报文
    • 服务器收到后必须回复PINGRESP报文
    • 若1.5倍保持连接时长内无客户端报文,服务器需断开连接
  2. 正常断开

    • 客户端需发送DISCONNECT报文,之后关闭TCP连接
    • 服务器收到DISCONNECT后,需丢弃该客户端的遗嘱消息(若存在)

发布-订阅机制核心规则

  1. 主题与主题过滤器

    • 主题名是消息标识(如/sensor/temp,UTF-8编码,无通配符)
    • 主题过滤器是订阅时的匹配规则(支持+单层通配符,如/sensor/+#多层通配符,如/sensor/#
    • 服务器按"逐层级匹配"规则,将PUBLISH报文转发给所有主题过滤器匹配的订阅客户端
  2. 保留消息(Retain)

    • 客户端发布PUBLISH时若设RETAIN=1,服务器需存储该消息
    • 新订阅匹配主题的客户端会立即收到该保留消息
    • 若发布RETAIN=1且有效载荷为空的消息,服务器会删除对应主题的保留消息
  3. 遗嘱消息(Will)

    • 客户端CONNECT时设Will Flag=1,需指定"遗嘱主题"、“遗嘱消息”、“遗嘱QoS”、“遗嘱Retain”
    • 若客户端异常断开(如网络故障、超时),服务器会自动将遗嘱消息发布到遗嘱主题
    • 若客户端正常发送DISCONNECT,服务器需丢弃遗嘱消息

QoS(服务质量)等级与流程

  1. QoS 0(最多一次)

    • 流程:发布端发送PUBLISH(QoS=0,无报文标识符,DUP=0),接收端无需回复
    • 消息可能丢失(如网络中断),不重发
    • 适用场景:环境传感器数据(如实时温度,丢失一次可容忍)
  2. QoS 1(至少一次)

    • 流程:
      1. 发布端发送PUBLISH(QoS=1,含报文标识符,DUP=0),并存储消息
      2. 接收端接收后,发送PUBACK报文(含相同标识符),并将消息交给应用
      3. 发布端收到PUBACK后,删除存储的消息
      4. 若未收到PUBACK,发布端需重发PUBLISHDUP=1),接收端可能收到重复消息
    • 适用场景:设备控制指令(如开灯,需确保到达,重复可通过应用层去重)
  3. QoS 2(仅一次)

    • 流程(四次握手):
      1. 发布端发送PUBLISH(QoS=2,含标识符,DUP=0),存储消息
      2. 接收端接收后,发送PUBREC报文(含相同标识符),存储标识符,不交给应用
      3. 发布端收到PUBREC后,删除PUBLISH消息,发送PUBREL报文(含相同标识符)
      4. 接收端收到PUBREL后,将消息交给应用,发送PUBCOMP报文(含相同标识符)
      5. 发布端收到PUBCOMP后,删除PUBREL相关状态
      6. 各环节未收到确认均需重发,确保消息仅到达一次
    • 适用场景:计费数据、交易指令(不允许丢失或重复)

应用场景

  1. 物联网场景

    • 智能家居(智能家电、传感器通信)
    • 工业物联网(工厂设备数据交互)
    • 农业监测(土壤湿度等传感器数据传输)
    • 设备通过MQTT实现数据上报与远程控制
  2. 传感器数据传输

    • 环境监测(气象站、水质传感器)
    • 农业监测等场景
    • 传感器借MQTT将采集数据发送至服务器,供相关人员订阅获取实时数据
  3. 设备监控管理

    • 服务器集群(CPU、内存等状态监控)
    • 移动设备(基站、智能电表状态上报)
    • 运维/维护人员订阅主题掌握设备状态
  4. 消息推送

    • 轻量级即时通讯软件
    • 移动应用(新闻、社交类)用MQTT推送消息
    • 用户订阅主题接收实时信息

Wireshark抓包分析

环境准备
  1. 安装Wireshark

    • 从Wireshark官网(https://www.wireshark.org/download.html)下载并安装
    • 安装过程中选择合适的网络适配器相关选项
  2. 配置MQTT相关参数

    • 打开Wireshark,进入"编辑"菜单选择"首选项"
    • 在"协议"中找到"mqtt",设置对应参数(一般选择3.1.1版本和1883端口)
抓包操作
  1. 选择网络接口

    • 选择合适的网络接口(本地运行选回环地址,局域网环境选对应接口)
  2. 设置捕获过滤器(可选)

    • 如只想捕获MQTT数据包,可设置"tcp.port == 1883"
  3. 开始捕获

    • 点击"开始捕获"按钮或选择"捕获">“开始捕获”
抓包后的操作
  1. 停止捕获

    • 点击"停止捕获"按钮
  2. 保存与导出数据包

    • 选择"文件">“保存”,通常保存为.pcapng格式
  3. 利用显示过滤器筛选分析

    • 过滤所有MQTT连接:输入"mqtt"
    • 过滤特定QoS等级的消息:“mqtt.qos == 1”
    • 过滤特定主题的消息:“mqtt.topic == ‘test/topic’”

MQTT实战应用

库的移植

MQTT库移植步骤:

  1. OpenSSL编译安装

    tar -xvf openssl-1.0.0s.tar.gz
    cd openssl-1.0.0s
    ./config enable-shared -fPIC  # 必须加入-fPIC选项
    make
    sudo make install
    
  2. Paho MQTT C库编译

    unzip paho.mqtt.c-master.zip
    cd paho.mqtt.c-master
    
    • 修改Makefile:
      • 122行:CC ?= gcc
      • 133行:添加
        CFLAGS += -I /usr/local/ssl/include
        LDFLAGS += -L /usr/local/ssl/lib
        
      • 192行:确保路径正确
        CCFLAGS_SO += -Wno-deprecated-declarations -DOSX -I /usr/local/ssl/include
        LDFLAGS_CS += -Wl,-install_name,lib$(MQTTLIB_CS).so.${MAJOR_VERSION} -L /usr/local/ssl/lib
        
    • 编译安装:
      make
      sudo make install
      

云平台设备配置

OneNET云平台配置
  1. 注册账号并登录控制台

  2. 产品开发 -> 产品品类 -> 设备接入 -> MQTT协议

  3. 创建产品

    • 产品ID:Qon3io17BJ
    • 设备名称:test1
    • 设备密钥:c2Q2OVJKcW5KNDBQdmFLcm1OZEFmZU56cUJhSkhjd2o=
  4. 生成连接参数

    • products/{产品id}/devices/{设备名字}
    • 示例:products/Qon3io17BJ/devices/test1
    • 签名参数:version=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D
  5. 添加物模型

    • 产品开发 -> 详情 -> 设置物模型
    • 添加自定义功能点(如温度)
  6. 设备管理

    • 设备管理 -> 详情 -> 属性 -> 实时刷新

MQTT Demo程序详解

头文件 (head.h)
#ifndef HEAD_H
#define HEAD_H

#include <MQTTAsync.h>
#include <MQTTClient.h>

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>

// OneNET MQTT服务器地址
#define NEW_ADDRESS     "tcp://183.230.40.96:1883"
// 设备名称
#define DEV_NAME        "test1"
// 客户端ID(与设备名称相同)
#define CLIENTID        DEV_NAME
// 产品ID
#define PRODUCT_ID      "Qon3io17BJ"
// 连接密码(包含签名信息)
#define PASSWD          "version=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D"
// 服务质量等级
#define QOS             0
// 等待消息完成的超时时间(毫秒)
#define TIMEOUT         10000L

#endif // HEAD_H
主程序 (main.c)
//https://eclipse.dev/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html#a9a0518d9ca924d12c1329dbe3de5f2b6
#include <stdio.h>
#include "head.h"

// 存储订阅和发布的主题
static char topic[2][200] = {0};
// MQTT客户端实例
static MQTTClient client;
// 消息ID计数器
static int id = 10000;
// 用于存储已确认送达的消息令牌
volatile static MQTTClient_deliveryToken deliveredtoken;

// 构建主题名称
void pack_topic(char * dev_name, char * pro_id)
{
    // 订阅主题格式:$sys/{产品ID}/{设备名称}/thing/property/post/reply
    sprintf(topic[0], "$sys/%s/%s/thing/property/post/reply", pro_id, dev_name);
    // 发布主题格式:$sys/{产品ID}/{设备名称}/thing/property/post
    sprintf(topic[1], "$sys/%s/%s/thing/property/post", pro_id, dev_name);
}

// 发送成功后的回调函数
void delivered(void *context, MQTTClient_deliveryToken dt)
{
    printf("Message with token value %d delivery confirmed\n", dt);
    deliveredtoken = dt;
}

// 接收到消息的回调函数
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
    int i;
    char* payloadptr;

    printf("Message arrived\n");
    printf("     topic: %s\n", topicName);
    printf("   message: ");

    // 打印消息内容
    payloadptr = (char*)message->payload;
    for(i=0; i<message->payloadlen; i++)
    {
        putchar(*payloadptr++);
    }
    putchar('\n');
    
    // 释放消息和主题内存
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}

// 掉线后的回调函数
void connlost(void *context, char *cause)
{
    printf("\nConnection lost\n");
    printf("     cause: %s\n", cause);
}

// MQTT客户端初始化
int mqtt_init()
{
    // 构建主题
    pack_topic(DEV_NAME, PRODUCT_ID);
    
    // 创建MQTT客户端
    int rc = MQTTClient_create(&client, NEW_ADDRESS, CLIENTID,
                               MQTTCLIENT_PERSISTENCE_NONE, NULL);
    if(MQTTCLIENT_SUCCESS != rc)
    {
        printf("create mqtt client failure...\n");
        exit(1);
    }
    
    // 初始化连接选项
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    conn_opts.keepAliveInterval = 20;  // 保持连接间隔(秒)
    conn_opts.cleansession = 1;         // 清理会话
    conn_opts.username = PRODUCT_ID;    // 用户名(产品ID)
    conn_opts.password = PASSWD;        // 密码(包含签名信息)
    
    // 设置回调函数
    rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
    if(MQTTCLIENT_SUCCESS != rc)
    {
        printf("Failed to set callbacks, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }

    // 连接到MQTT服务器
    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }

    // 订阅主题(当前被注释,可根据需要启用)
#if 0
    MQTTClient_subscribe(client, topic[0], QOS);
    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
           , topic[0], CLIENTID, QOS);
#endif 

    return rc;
}

// 发送MQTT消息
int mqtt_send(char * key, int value)
{
    MQTTClient_deliveryToken deliveryToken;
    MQTTClient_message test2_pubmsg = MQTTClient_message_initializer;
    
    // 需要发送的正文
    char message[1024] = {0};
    
    // 设置消息属性
    test2_pubmsg.qos = QOS;
    test2_pubmsg.retained = 0;
    test2_pubmsg.payload = message;

    // 构建JSON格式的消息
    sprintf(message,"{\"id\":\"%d\",\"version\":\"1.0\",\"params\":{\"%s\":{\"value\":%d}}}",id++, key, value);
    test2_pubmsg.payloadlen = strlen(message);

    printf("%s\n",message);
    
    // 发布消息
    int rc = MQTTClient_publishMessage(client, topic[1], &test2_pubmsg, &deliveryToken);
    if(MQTTCLIENT_SUCCESS != rc)
    {
        printf("client to publish failure.. %lu\n", pthread_self());
        exit(1);
    }
    
    // 等待消息确认
    printf("Waiting for up to %d seconds for publication on topic %s for client with ClientID: %s\n"
            ,(int)(TIMEOUT/1000), topic[0], CLIENTID);
    MQTTClient_waitForCompletion(client, deliveryToken, TIMEOUT);
    sleep(1);

    return rc;
}

// 释放MQTT资源
void mqtt_deinit()
{
    // 断开连接
    MQTTClient_disconnect(client, 10000);
    // 销毁客户端
    MQTTClient_destroy(&client);
}

// 主函数
int main(void)
{
    // 初始化MQTT客户端
    mqtt_init();
    
    // 持续发送随机温度数据
    while(1)
    {
        int value = rand() % 100 + 1;
        mqtt_send("tmp", value);
    }
    
    // 释放资源(实际不会执行到此处)
    mqtt_deinit();
    return 0;
}

理想运行结果

{"id":"10000","version":"1.0","params":{"tmp":{"value":42}}}
Waiting for up to 10 seconds for publication on topic $sys/Qon3io17BJ/test1/thing/property/post/reply for client with ClientID: test1
Message arrived
     topic: $sys/Qon3io17BJ/test1/thing/property/post/reply
   message: {"id":"10000","code":200,"msg":"success"}

{"id":"10001","version":"1.0","params":{"tmp":{"value":75}}}
Waiting for up to 10 seconds for publication on topic $sys/Qon3io17BJ/test1/thing/property/post/reply for client with ClientID: test1
Message arrived
     topic: $sys/Qon3io17BJ/test1/thing/property/post/reply
   message: {"id":"10001","code":200,"msg":"success"}

...

程序将不断生成随机温度值(1-100之间),以JSON格式发送到OneNET平台,平台会返回确认消息。在OneNET控制台的设备属性中,可以看到实时更新的温度数据。

wireshark抓包
MQTTtest1
Qon3io17BJyversion=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D