stm32开发之netxduo组件之mqtt客户端的使用记录

发布于:2024-04-26 ⋅ 阅读:(23) ⋅ 点赞:(0)

前言

1使用mqtt协议的简单示例记录

代码

MQTT服务端(C# 编写,使用MQTTnet提供的示例代码)

主程序


namespace ConsoleApp1;

public class Program
{

    public static async Task Main(string[] args)
    {
      await Run_Server_With_Logging();

    }
    
    
   
}
   public static async Task Run_Server_With_Logging()
    {
        /*
         * This sample starts a simple MQTT server and prints the logs to the output.
         *
         * IMPORTANT! Do not enable logging in live environment. It will decrease performance.
         *
         * See sample "Run_Minimal_Server" for more details.
         */

        var mqttFactory = new MqttFactory(new ConsoleLogger());

        var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();

        using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
        {
            await mqttServer.StartAsync();

            Console.WriteLine("Press Enter to exit.");
            Console.ReadLine();

            // Stop and dispose the MQTT server if it is no longer needed!
            await mqttServer.StopAsync();
        }
    }

NexDuo组件的编写

/*
 * Copyright (c) 2024-2024,shchl
 *
 * SPDX-License-Identifier: Apache-2.0
 *
 * Change Logs:
 * Date           Author       Notes
 * 2024-4-15     shchl   first version
 */
#include "includes.h"

#if APP_COMPONENT_NX_ENABLE
#include "nx_api.h"
#include "nx_stm32_eth_driver.h"

#define     APP_NX_PACKET_SIZE              1536 /*数据包大小*/
#define     APP_NX_PACKET_POOL_SIZE         ((sizeof(NX_PACKET) + APP_NX_PACKET_SIZE) * 16)

#define     APP_NX_IP_STACK_SIZE            2048
#define     APP_NX_IP_PRIORITY              10

#define     APP_NX_ARP_CACHE_SIZE           1024

#define     APP_NX_IP_ADDR                  IP_ADDRESS(192,168,8,9)
#define     APP_NX_IP_SUB_MASK              IP_ADDRESS(255,255,255,0)

/*
*******************************************************************************************************
*                               外部引入变量
*******************************************************************************************************
*/

/*
*******************************************************************************************************
*                               变量
*******************************************************************************************************
*/

void *loc_ip_area;
void *packet_pool_area;
void *arp_cache_area;

NX_PACKET_POOL g_packet_pool;/*全局packet pool 池*/
NX_IP g_loc_ip; /*全局 本地ip实例*/
/*
*********************************************************************************************************
*                                       静态全局变量
*********************************************************************************************************
*/

/*
*********************************************************************************************************
*                                      函数声明
*********************************************************************************************************
*/


/*
*********************************************************************************************************
*                                      外部函数
*********************************************************************************************************
*/
int nx_application_define(void) {
    UINT status;
    /*内存分配*/
    packet_pool_area = app_malloc(APP_NX_PACKET_POOL_SIZE);
    loc_ip_area = app_malloc(APP_NX_IP_STACK_SIZE);
    arp_cache_area = app_malloc(APP_NX_ARP_CACHE_SIZE);
    nx_system_initialize();
    status = app_nx_ip_instance_create();
    if (status) {
        tx_printf("app_nx_ip_instance_create error:%d\r\n", status);
    }

    return NX_SUCCESS;
}

TX_APP_DEFINE_EXPORT(nx_application_define); /*首先创建模块应用*/




UINT app_nx_ip_instance_create() {
    UINT stat = 0;
    /*创建packet pool*/
    stat += nx_packet_pool_create(&g_packet_pool, "NX PACKET POOL",
                                  APP_NX_PACKET_SIZE,
                                  packet_pool_area, APP_NX_PACKET_POOL_SIZE);

    stat += nx_ip_create(&g_loc_ip,
                         "NX IP",
                         APP_NX_IP_ADDR,
                         APP_NX_IP_SUB_MASK,
                         &g_packet_pool,
                         nx_stm32_eth_driver,
                         loc_ip_area,
                         APP_NX_IP_STACK_SIZE,
                         APP_NX_IP_PRIORITY);

    /*ip 相关配置*/
    {
        /* 启用 ARP 并为 IP 实例  提供 ARP 缓存内存。 */
        stat += nx_arp_enable(&g_loc_ip, arp_cache_area, APP_NX_ARP_CACHE_SIZE);
        /* 启用 ICMP */
        stat += nxd_icmp_enable(&g_loc_ip);
        /* 为IP 实例启用 TCP 处理.  */
        stat += nx_tcp_enable(&g_loc_ip);
        /*禁止分包 */
        stat += nx_ip_fragment_disable(&g_loc_ip);
    }

    return stat;
}
/*
*********************************************************************************************************
*                                      内部函数
*********************************************************************************************************
*/











#endif

MQTT 客户端任务线程

/*
 * Copyright (c) 2024-2024,shchl
 *
 * SPDX-License-Identifier: Apache-2.0
 *
 * Change Logs:
 * Date           Author       Notes
 * 2024-4-19     shchl   first version
 *
 * @description mqtt 客户端任务
 */
#include "includes.h"
#include "nxd_mqtt_client.h"

#define QOS0 0
#define QOS1 1
#if 1

#define APP_TASK_NET_MQTT_CLIENT_STACK_SIZE (4096)
#define APP_TASK_NET_MQTT_CLIENT_PRIORITY (10)

#define APP_MQTT_CLIENT_STACK_SIZE 4096
#define APP_MQTT_CLIENT_PRIORITY 2

/* MQTT 相关宏定义*/
#define MQTT_CLIENT_ID "client_id_stm32"
#define MQTT_SERVER_IP_ADDR IP_ADDRESS(192, 168, 8, 2)

#define MQTT_KEEP_ALIVE_TIMER 300
#define STRLEN(p) (sizeof(p) - 1)
#define MQTT_SUB_TOPIC "/sub/topic1"
#define MQTT_SUB_TOPIC2 "/sub/topic2"
#define MQTT_PUB_TOPIC "/pub/topic"

//-------------------------------事件标志位
#define MQTT_APP_CONNECT_ERROR_EVENT ((1) << 0)      // 客户端连接失败事件
#define MQTT_APP_SUB_TOPIC_RECEIVED_EVENT ((1) << 1) // 客户端订阅主题接收到数据事件
#define MQTT_APP_CLIENT_DISCONNECT_EVENT ((1) << 2)  // 客户端断开连接

#define MATT_APP_ALL_EVENT (MQTT_APP_CONNECT_ERROR_EVENT | MQTT_APP_SUB_TOPIC_RECEIVED_EVENT \
                                                               |MQTT_APP_CLIENT_DISCONNECT_EVENT)

/*
*******************************************************************************************************
*                               外部引入变量
*******************************************************************************************************
*/

/*
*******************************************************************************************************
*                               变量
*******************************************************************************************************
*/

TX_EVENT_FLAGS_GROUP mqtt_app_event_group; /*mqtt 事件组*/
NXD_ADDRESS mqtt_server_ip;                /*mqtt 服务地址*/

/*
*********************************************************************************************************
*                                       静态全局变量
*********************************************************************************************************
*/
static NXD_MQTT_CLIENT mqtt_client; /*声明 mqtt 客户端*/
static TX_THREAD net_mqtt_client_thread;
static void *area;

static void *mqtt_client_area;
static UCHAR message_buffer[NXD_MQTT_MAX_MESSAGE_LENGTH];
static UCHAR topic_buffer[NXD_MQTT_MAX_TOPIC_NAME_LENGTH];
UINT topic_actual_len, topic_actual_msg_len;

/*
*********************************************************************************************************
*                                      函数声明
*********************************************************************************************************
*/
static void net_mqtt_client_thread_entry(ULONG par);

static VOID mqtt_disconnect_notify(NXD_MQTT_CLIENT *client);

static VOID mqtt_receive_notify(NXD_MQTT_CLIENT *client_ptr, UINT message_count);


/*
*********************************************************************************************************
*                                      外部函数
*********************************************************************************************************
*/
int app_task_net_mqtt_client_create() {

    area = app_malloc(APP_TASK_NET_MQTT_CLIENT_STACK_SIZE);
    tx_thread_create(&net_mqtt_client_thread,             /* 任务控制块地址 */
                     "app net server",                    /* 任务名 */
                     net_mqtt_client_thread_entry,        /* 启动任务函数地址 */
                     0,                                   /* 传递给任务的参数 */
                     area,                                /* 堆栈基地址 */
                     APP_TASK_NET_MQTT_CLIENT_STACK_SIZE, /* 堆栈空间大小 */
                     APP_TASK_NET_MQTT_CLIENT_PRIORITY,   /* 任务优先级*/
                     APP_TASK_NET_MQTT_CLIENT_PRIORITY,   /* 任务抢占阀值 */
                     TX_NO_TIME_SLICE,                    /* 不开启时间片 */
                     TX_AUTO_START);                      /* 创建后立即启动 */

    /*内存分配*/
    mqtt_client_area = app_malloc(APP_MQTT_CLIENT_STACK_SIZE);

    /*创建mqtt 事件*/
    tx_event_flags_create(&mqtt_app_event_group, "mqtt event");

    /*服务端连接信息配置*/
    mqtt_server_ip.nxd_ip_version = 4;
    mqtt_server_ip.nxd_ip_address.v4 = MQTT_SERVER_IP_ADDR;

    return TX_SUCCESS;
}

TX_THREAD_EXPORT(app_task_net_mqtt_client_create);

/*
*********************************************************************************************************
*                                      内部函数
*********************************************************************************************************
*/
static inline UINT mqtt_client_sub_topic(CHAR *topic, UINT qos) {
    return nxd_mqtt_client_subscribe(&mqtt_client, topic, strlen(topic), qos);
}
static inline UINT mqtt_client_unsub_topic(CHAR *topic) {
    return   nxd_mqtt_client_unsubscribe(&mqtt_client,topic,strlen(topic));
}

static void net_mqtt_client_thread_entry(ULONG par) {


    UINT status = 0;
    ULONG actual_status = 0;
    ULONG actual_event_flag;
    do {
        /* 等待 1 秒钟,让 内部 IP 线程完成其初始化。. */
        status = nx_ip_status_check(&g_loc_ip,
                                    NX_IP_INITIALIZE_DONE,
                                    &actual_status,
                                    NX_IP_PERIODIC_RATE);
    } while (status != NX_SUCCESS);
    /* 创建客户端实例. */
    status = nxd_mqtt_client_create(&mqtt_client,
                                    "mqtt client",
                                    MQTT_CLIENT_ID, strlen(MQTT_CLIENT_ID),
                                    &g_loc_ip, &g_packet_pool,
                                    mqtt_client_area,
                                    APP_MQTT_CLIENT_STACK_SIZE,
                                    APP_MQTT_CLIENT_PRIORITY,
                                    NX_NULL, 0);

    /* 设置通知回调函数 */
    // 客户端断开连接通知
    nxd_mqtt_client_disconnect_notify_set(&mqtt_client, mqtt_disconnect_notify);
    // 设置接收到数据通知回调
    nxd_mqtt_client_receive_notify_set(&mqtt_client, mqtt_receive_notify);
    while (1) {
        /* 开始与服务器的连接*/
        status = nxd_mqtt_client_connect(&mqtt_client,
                                         &mqtt_server_ip,
                                         NXD_MQTT_PORT,
                                         MQTT_KEEP_ALIVE_TIMER,
                                         0,
                                         NX_WAIT_FOREVER);

        if (status == NXD_MQTT_CONNECT_FAILURE) {

            logError("mqtt unable connect mqtt server[%d,%d,%d,%d:%d]", NX_IP_FMT(MQTT_SERVER_IP_ADDR), NXD_MQTT_PORT);

            tx_thread_sleep(1000);
            continue;
        }
        /* 订阅主题 */
        status = mqtt_client_sub_topic(MQTT_SUB_TOPIC, QOS0);
        status = mqtt_client_sub_topic(MQTT_SUB_TOPIC2, QOS0);
        switch (status) {
            case 0:
                break;
            case NXD_MQTT_NOT_CONNECTED:
            default:
                logError("nxd_mqtt_client_subscribe error:%#x", status);
                tx_thread_sleep(200);
                continue;
        }


        while (1) {
            status = tx_event_flags_get(&mqtt_app_event_group, MATT_APP_ALL_EVENT, TX_OR_CLEAR, &actual_event_flag,
                                        100);
            if (status == TX_SUCCESS) {/*接收来自topic 的数据*/
                if (MQTT_APP_SUB_TOPIC_RECEIVED_EVENT == actual_event_flag) {

                    status = nxd_mqtt_client_message_get(&mqtt_client,
                                                         topic_buffer, sizeof(topic_buffer), &topic_actual_len,
                                                         message_buffer, sizeof(message_buffer), &topic_actual_msg_len);
                    if (status == NXD_MQTT_SUCCESS) {
                        /*添加结束符*/
                        topic_buffer[topic_actual_len] = 0;
                        message_buffer[topic_actual_msg_len] = 0;
                        logInfo("topic = %s, message = %s", topic_buffer, message_buffer);
                    }
                } else if (actual_event_flag == MQTT_APP_CLIENT_DISCONNECT_EVENT) {

                    logInfo("client disconnect.....");
                    break;
                }

            } else if (status == TX_NO_EVENTS) {/*没有事件通知*/
                // todo

            } else {
                break;
            }

            /*todo 推送状态数据*/
            /*向发布主题推送数据*/
            //    const char *send_data = "this is publish msg";
            //    status += nxd_mqtt_client_publish(&mqtt_client,
            //                                      MQTT_PUB_TOPIC,
            //                                      STRLEN(MQTT_PUB_TOPIC),
            //                                      (CHAR *) send_data,
            //                                      strlen(send_data),
            //                                      0,
            //                                      QOS1,
            //                                      NX_WAIT_FOREVER); /*等待超时*/

        }
        mqtt_client_unsub_topic(MQTT_SUB_TOPIC); /* Now unsubscribe the topic. */
        mqtt_client_unsub_topic(MQTT_SUB_TOPIC2); /* Now unsubscribe the topic. */
        nxd_mqtt_client_disconnect(&mqtt_client); /* Disconnect from the broker. */
    }
    /*删除客户端,释放资源(需要重新创建) */
    nxd_mqtt_client_delete(&mqtt_client);


}

static VOID mqtt_disconnect_notify(NXD_MQTT_CLIENT *client) {
    /**/
    logInfo("mqtt_disconnect_notify :%s", client->nxd_mqtt_client_id);
    tx_event_flags_set(&mqtt_app_event_group, MQTT_APP_CLIENT_DISCONNECT_EVENT, TX_OR);
}

static VOID mqtt_receive_notify(NXD_MQTT_CLIENT *client_ptr, UINT message_count) {
    NX_PARAMETER_NOT_USED(client_ptr);
//    NX_PARAMETER_NOT_USED(message_count); /*接收的消息数*/
    logInfo("rec msg num:%d", message_count);

    tx_event_flags_set(&mqtt_app_event_group, MQTT_APP_SUB_TOPIC_RECEIVED_EVENT, TX_OR);
}

#endif

测试结果(ok)

在这里插入图片描述