MQTT Client源码分析
目录
之前基于杰杰的mqttclient代码和韦东山老师的教程,把MQTTClient程序移植到STM32F103开发板,F103的开发板串口连接ESP8266模组实现终端连接到MQTT服务器的功能,仅仅是对着韦老师的教程移植和使用杰杰的mqttclient代码,简单的将mqttclient\platform\FreeRTOS\platform_net_socket.c文件中的接口绑定到ESP8266的TCP AT命令,使用ESP8266的Socket,对于杰杰的mqttclient代码并没有深入分析和理解。
1. mqttclient架构
如下图:
1.1 API
mqttclient的API接口:
int mqtt_init(mqtt_client_t* c, client_init_params_t* init);
int mqtt_release(mqtt_client_t* c);
int mqtt_connect(mqtt_client_t* c);
int mqtt_disconnect(mqtt_client_t* c);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);
int mqtt_keep_alive(mqtt_client_t* c);
int mqtt_yield(mqtt_client_t* c, int timeout_ms);
1.2 mqtt_client_t结构体
typedef struct mqtt_client {
char *mqtt_client_id; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文
char *mqtt_user_name; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文
char *mqtt_password; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文
char *mqtt_host; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文
char *mqtt_port; //由用户传入,在连接MQTT时(mqtt_connect_with_results)序列化连接报文
char *mqtt_ca; //TLS才会用到,暂时不分析
void *mqtt_reconnect_data; //MQTT需要重连服务器时用到
uint8_t *mqtt_read_buf; //读数据缓冲区
uint8_t *mqtt_write_buf; //写数据缓冲区
uint16_t mqtt_keep_alive_interval; //MQTT保活超时时间
uint16_t mqtt_packet_id; //报文标识符
uint32_t mqtt_will_flag : 1; //遗嘱标志
uint32_t mqtt_clean_session : 1; //清理会话标志
uint32_t mqtt_ping_outstanding : 2; //PINGREQ后是否正在等待PINGRESP标志
uint32_t mqtt_version : 4; //MQTT协议版本
uint32_t mqtt_ack_handler_number : 24; //用于QOS1和QOS2中ACK记录
uint32_t mqtt_cmd_timeout; //命令超时时间(主要是读写阻塞时间、等待响应的时间、重连等待时间)
uint32_t mqtt_read_buf_size; //读数据缓冲区大小
uint32_t mqtt_write_buf_size; //写数据缓冲区大小
uint32_t mqtt_reconnect_try_duration; //客户端在尝试重新连接到MQTT服务器时所允许的最大尝试时间
size_t mqtt_client_id_len; //clientID最大长度
size_t mqtt_user_name_len; //userName最大长度
size_t mqtt_password_len; //password最大长度
mqtt_will_options_t *mqtt_will_options; //遗嘱消息配置
client_state_t mqtt_client_state; //客户端状态(INVALID、INITIALIZED、CONNECTED、DISCONNECTED、CLEAN_SESSION)
platform_mutex_t mqtt_write_lock; //写数据锁
platform_mutex_t mqtt_global_lock; //全局锁,比如转换客户端状态时需要先锁
mqtt_list_t mqtt_msg_handler_list; //所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息),mqtt协议必须实现的内容
mqtt_list_t mqtt_ack_handler_list; //所有等待响应的报文都会被挂载到这个链表上,异步实现的核心
network_t *mqtt_network; //网卡接口,保存网络相关信息(host、port、socket)
platform_thread_t *mqtt_thread; //内部线程mqtt_yield_thread,所有来自服务器的mqtt包都会在这里被处理
platform_timer_t mqtt_reconnect_timer; //掉线重连定时器
platform_timer_t mqtt_last_sent; //用于保活定时器
platform_timer_t mqtt_last_received; //保活定时器
reconnect_handler_t mqtt_reconnect_handler; //mqtt重连处理
interceptor_handler_t mqtt_interceptor_handler; //publish数据时调用,个人理解是要发送的数据与client绑定
} mqtt_client_t;
1.3 mqtt_yield_thread内部线程
mqtt_yield_thread
线程中主要执行mqtt_yield(c, c->mqtt_cmd_timeout)
函数
mqtt_yield
中主要执行mqtt_packet_handle(c, &timer)
处理MQTT接收到的消息
static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
{
int rc = MQTT_SUCCESS_ERROR;
int packet_type = 0;
rc = mqtt_read_packet(c, &packet_type, timer);
//printf("read packet rc = %d, packet_type = %d, g_remain_len = %d\r\n", rc, packet_type, g_remain_len);
switch (packet_type) {
case 0: /* timed out reading packet */
break;
case CONNACK: /* has been processed */
goto exit;
case PUBACK:
case PUBCOMP:
rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
break;
case SUBACK:
rc = mqtt_suback_packet_handle(c, timer);
break;
case UNSUBACK:
rc = mqtt_unsuback_packet_handle(c, timer);
break;
case PUBLISH:
rc = mqtt_publish_packet_handle(c, timer);
break;
case PUBREC:
case PUBREL:
rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
break;
case PINGRESP:
c->mqtt_ping_outstanding = 0; /* keep alive ping success */
break;
default:
goto exit;
}
rc = mqtt_keep_alive(c); /* Keep the treatment alive */
exit:
if (rc == MQTT_SUCCESS_ERROR)
rc = packet_type;
RETURN_ERROR(rc);
}
1.4 keepalive
当第一次发生超时时,会在mqtt_keep_alive
中序列还一个心跳包发送给服务器,并将mqtt_ping_outstanding
加1,当第二次超时时会设置client状态为CLIENT_STATE_DISCONNECTED尝试重连,若重连成功后需要重新订阅
1.5 ack链表
需要等待服务器应答消息时会加入ack链表(参考2.2中SUBACK),每次接收到服务器消息时,会对ack链表进行扫描,超时后会销毁链表节点,如果是PUBACK、PUBREC、PUBREL、PUBCOMP则需要重发
2. mqttclient流程
2.1 MQTT CONNECT
- 用户初始化mqtt_client_t参数后,调用
int mqtt_connect(mqtt_client_t* c);
连接MQTT服务器
- 调用到底层
rc = network_connect(c->mqtt_network);
连接MQTT服务器
- 最终调用到平台层的
n->socket = platform_net_socket_connect(n->host, n->port, PLATFORM_NET_PROTO_TCP);
发送连接请求,其中实现了与ESP8266 TCP AT SOCKET绑定
- 之后使用
MQTTSerialize_connect(c->mqtt_write_buf, c->mqtt_write_buf_size, &connect_data)
序列化mqtt的CONNECT报文,并使用mqtt_send_packet(c, len, &connect_timer)
发送出去
mqtt_send_packet
也是调用网络接口network_write(c->mqtt_network, &c->mqtt_write_buf[sent], length, platform_timer_remain(timer)
最终调用平台接口platform_net_socket_write_timeout(n->socket, write_buf, len, timeout)
- 发送完毕后在
mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK)
等待服务器回复的CONNACK报文
- 连接服务器成功后创建一个MQTT内部线程
platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK)
并启动,所有来自服务器的mqtt包都会在这里被处理
- 当需要进行重连时,不需要重新创建MQTT内部线程,只需要改变MQTT Client的状态即可
2.2 MQTT SUBSCRIBE
- 连接MQTT服务器后,用户可以直接调用
mqtt_subscribe(client, "home", QOS0, smarthome_msg_handler);
订阅主题
mqtt_subscribe
会调用MQTTSerialize_subscribe(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, packet_id, 1, &topic, (int*)&qos)
序列化订阅报文,并调用mqtt_send_packet(c, len, &timer)
发送订阅消息
- 然后使用
mqtt_msg_handler_create(topic_filter, qos, handler)
创建对应的消息处理节点,这个消息节点在收到服务器的SUBACK订阅应答报文后会挂在到消息处理列表msg_handler上
- 在
mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler)
中记录等待服务器响应的SUBACK
- 收到服务器响应的SUBACK回复后,在
mqtt_suback_packet_handle(c, timer)
中会取消ack_list里的SUBACK记录,并安装到msg_handler_list
MQTT UNSUBSCRIBE的流程与MQTT SUBSCRIBE的流程差不多。
2.3 MQTT PUBLISH
- 连接服务器后,用户可以直接调用
mqtt_publish(client, "home", &msg)
向某主题发布消息
- 与订阅流程类似,先使用
MQTTSerialize_publish(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, msg->qos, msg->retained, msg->id, topic, (uint8_t*)msg->payload, msg->payloadlen)
序列化发布报文,在使用mqtt_send_packet(c, len, &timer)
并发布到服务器
- 对于QOS1或QOS2需要将PUBACK或PUBREC加入到ack_list中,与SUBACK类似,并提前设置了DUP重发标志位
2.4 接收服务器PUBLISH消息
- 服务器发送的PUBLISH消息会在client的内部线程
mqtt_yield_thread
中的mqtt_packet_handle
中处理
- 先对收到的消息进行反序列化,QOS1和QOS2类型需要回复ACK,让后处理收到的消息,注意QOS还需要先等待服务器的PUBREL ACK后再处理接收的消息
- 在
mqtt_get_msg_handler(c, topic_name)
中获取当前主题的处理函数,并将接收的数据在数据处理函数msg_handler
中处理,该处理函数在订阅主题时定义
以上是杰杰mqttclient代码分析,后期如果能够有更深的认识再继续补充,欢迎各位大佬补充和指正!