Skip to content

MQTT程序分层

我们主要关注的是客户端(记者和用户), 我们只需要把信息发送给电视台或者接受信息就行了

  1. 初始化以及连接
  2. 发出订阅信息
  3. 有一个线程读传感器, 有某种情况发生以后发布消息
  4. 另一个线程等待消息, 在等到消息以后处理消息
c
int main(void)
{
    int res;
    pthread_t thread1;
    mqtt_client_t *client = NULL;
    
    printf("\nwelcome to mqttclient test...\n");

    mqtt_log_init();

    client = mqtt_lease();	//获取一个句柄

#ifdef TEST_USEING_TLS
    //未使用
    mqtt_set_port(client, "8883");
    mqtt_set_ca(client, (char*)test_ca_get());
#else
    mqtt_set_port(client, "1883");	//设置一个端口
#endif

    mqtt_set_host(client, "www.jiejie01.top");		//设置服务器
    mqtt_set_client_id(client, random_string(10));
    mqtt_set_user_name(client, random_string(10));
    mqtt_set_password(client, random_string(10));
    mqtt_set_clean_session(client, 1);

    mqtt_connect(client);`//连接服务器
    
    mqtt_subscribe(client, "topic1", QOS0, topic1_handler);
    mqtt_subscribe(client, "topic2", QOS1, NULL);
    mqtt_subscribe(client, "topic3", QOS2, NULL);
    
    res = pthread_create(&thread1, NULL, mqtt_publish_thread, client);  //创建一个线程用于接收数据
    if(res != 0) {
        MQTT_LOG_E("create mqtt publish thread fail");
        exit(res);
    }

    while (1) {
        sleep(100);
    }
}
  • 获取一个句柄
c
mqtt_client_t *mqtt_lease(void)
{
    int rc;
    mqtt_client_t* c;

    c = (mqtt_client_t *)platform_memory_alloc(sizeof(mqtt_client_t));
    if (NULL == c)
        return NULL;

    memset(c, 0, sizeof(mqtt_client_t));

    rc = mqtt_init(c);
    if (MQTT_SUCCESS_ERROR != rc)
        return NULL;
    
    return c;
}

申请一个MQTT的句柄, 并进行初始化

c
static int mqtt_init(mqtt_client_t* c)
{
    /* network init */
    c->mqtt_network = (network_t*) platform_memory_alloc(sizeof(network_t));

    if (NULL == c->mqtt_network) {
        MQTT_LOG_E("%s:%d %s()... malloc memory failed...", __FILE__, __LINE__, __FUNCTION__);
        RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
    }
    memset(c->mqtt_network, 0, sizeof(network_t));

    c->mqtt_packet_id = 1;
    c->mqtt_clean_session = 0;          //no clear session by default
    c->mqtt_will_flag = 0;
    c->mqtt_cmd_timeout = MQTT_DEFAULT_CMD_TIMEOUT;
    c->mqtt_client_state = CLIENT_STATE_INITIALIZED;
    
    c->mqtt_ping_outstanding = 0;
    c->mqtt_ack_handler_number = 0;
    c->mqtt_client_id_len = 0;
    c->mqtt_user_name_len = 0;
    c->mqtt_password_len = 0;
    c->mqtt_keep_alive_interval = MQTT_KEEP_ALIVE_INTERVAL;	//多少秒联系一次
    c->mqtt_version = MQTT_VERSION;
    c->mqtt_reconnect_try_duration = MQTT_RECONNECT_DEFAULT_DURATION;

    c->mqtt_will_options = NULL;
    c->mqtt_reconnect_data = NULL;
    c->mqtt_reconnect_handler = NULL;
    c->mqtt_interceptor_handler = NULL;
    
    mqtt_read_buf_malloc(c, MQTT_DEFAULT_BUF_SIZE);		//读写buff
    mqtt_write_buf_malloc(c, MQTT_DEFAULT_BUF_SIZE);

    mqtt_list_init(&c->mqtt_msg_handler_list);
    mqtt_list_init(&c->mqtt_ack_handler_list);
    
    platform_mutex_init(&c->mqtt_write_lock);
    platform_mutex_init(&c->mqtt_global_lock);

    platform_timer_init(&c->mqtt_last_sent);
    platform_timer_init(&c->mqtt_last_received);

    RETURN_ERROR(MQTT_SUCCESS_ERROR);
}

主要是设置一些参数

  • 网络连接
c
int mqtt_connect(mqtt_client_t* c)
{
    /* connect server in blocking mode and wait for connection result */
    return mqtt_connect_with_results(c);
}
c
int network_connect(network_t *n)
{
#ifndef MQTT_NETWORK_TYPE_NO_TLS
    if (n->channel)
        return nettype_tls_connect(n);
#endif
    return nettype_tcp_connect(n);

}
c

int nettype_tcp_connect(network_t* n)
{
    n->socket = platform_net_socket_connect(n->host, n->port, PLATFORM_NET_PROTO_TCP);//平台相关的socket连接
    if (n->socket < 0)
        RETURN_ERROR(n->socket);
    
    RETURN_ERROR(MQTT_SUCCESS_ERROR);
}

platform_net_socket_connect这个是主要需要实现的, 需要使用AT命令进行实现, 通过串口进行实现

创建线程

调用过程:

c
main
	mqtt_connect(client);
		mqtt_connect_with_results(c);
		    rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
		    rc = network_connect(c->mqtt_network);	//在这里面会调用平台提供的函数

            /* send connect packet */
            if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
                goto exit;

		    if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {
            }

            /* connect success, and need init mqtt thread */
            c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread,c, ...);

发布消息

调用过程:

c
main
    res = pthread_create(&thread1, NULL, mqtt_publish_thread, client);
				mqtt_publish_thread
                    mqtt_publish(client, "topic1", &msg);

// 1. 构造消息
mqtt_message_t msg;

memset(&msg, 0, sizeof(msg));
msg.payload = (void *) buf;	//初始化一个信息
msg.payloadlen = xxx;	//设置信息的长度

mqtt_publish(client, "topic1", &msg);
	// 1.1 根据MQTT协议构造数据包

    // 1.2 根据平台相关的函数发送数据包
    mqtt_send_packet
        network_write
        	nettype_tcp_write
        		platform_net_socket_write_timeout

4.4 最复杂:订阅消息

消息何时到来?不知道!

所以,必定是某个内核线程不断查询网卡:

  • 读网卡数据

    • 得到数据的话就判断、处理
c
    mqtt_subscribe(client, "topic1", QOS0, topic1_handler);
    mqtt_subscribe(client, "topic2", QOS1, NULL);
    mqtt_subscribe(client, "topic3", QOS2, NULL);

之后调用这个函数订阅主题, 当收到某一个主题的函数的时候会调用注册的函数

c
def_class(message_handlers_t,
    private_member(
        mqtt_list_t         list;//插入到某一个链表
        mqtt_qos_t          qos;//服务质量
        const char*         topic_filter;//主题名字
        message_handler_t   handler;//函数指针
    )
)

之后会创建这样一个结构体

关键线程

  1. 处理收到的数据
  2. 保持心跳
c
    while (1) {
        rc = mqtt_yield(c, c->mqtt_cmd_timeout);//接受信息
        if (MQTT_CLEAN_SESSION_ERROR == rc) {//错误处理
            MQTT_LOG_W("%s:%d %s()..., mqtt clean session....", __FILE__, __LINE__, __FUNCTION__);
            network_disconnect(c->mqtt_network);
            mqtt_clean_session(c);
            goto exit;
        } else if (MQTT_RECONNECT_TIMEOUT_ERROR == rc) {
            MQTT_LOG_W("%s:%d %s()..., mqtt reconnect timeout....", __FILE__, __LINE__, __FUNCTION__);
        }
    }
  • 接收信息的函数
c
static int mqtt_yield(mqtt_client_t* c, int timeout_ms)
{
    int rc = MQTT_SUCCESS_ERROR;
    client_state_t state;
    platform_timer_t timer;

    if (NULL == c)
        RETURN_ERROR(MQTT_FAILED_ERROR);

    if (0 == timeout_ms)
        timeout_ms = c->mqtt_cmd_timeout;

   
    platform_timer_cutdown(&timer, timeout_ms);
    
    while (!platform_timer_is_expired(&timer)) {
        state = mqtt_get_client_state(c);
        if (CLIENT_STATE_CLEAN_SESSION ==  state) {
            RETURN_ERROR(MQTT_CLEAN_SESSION_ERROR);
        } else if (CLIENT_STATE_CONNECTED != state) {
            /* mqtt not connect, need reconnect */
            rc = mqtt_try_reconnect(c);

            if (MQTT_RECONNECT_TIMEOUT_ERROR == rc)
                RETURN_ERROR(rc);
            continue;
        }
        
        /* mqtt connected, handle mqtt packet */
        rc = mqtt_packet_handle(c, &timer);//信息的处理

        if (rc >= 0) {
            /* scan ack list, destroy ack handler that have timed out or resend them */
            mqtt_ack_list_scan(c, 1);

        } else if (MQTT_NOT_CONNECT_ERROR == rc) {
            MQTT_LOG_E("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, __FUNCTION__);
        } else {
            break;
        }
    }

    RETURN_ERROR(rc);
}
  • 处理信息
c
static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
{
    int rc = MQTT_SUCCESS_ERROR;
    int packet_type = 0;
    
    rc = mqtt_read_packet(c, &packet_type, timer);//会在这里进行读数据包, 会把包的数据保存在第二个参数里面

    switch (packet_type) {
        case 0: /* timed out reading packet or an error occurred while reading data*/
            if (MQTT_BUFFER_TOO_SHORT_ERROR == rc) {//没有收到
                MQTT_LOG_E("the client read buffer is too short, please call mqtt_set_read_buf_size() to reset the buffer size");
                /* don't return directly, you need to stay active, because there is data readable now, but the buffer is too small */
            }
            break;
		//处理不同的包
        case CONNACK: /* has been processed */
            goto exit;

        case PUBACK:
        case PUBCOMP:
            rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
            break;

        case SUBACK:
            rc = mqtt_suback_packet_handle(c, timer);
            break;
            
        case UNSUBACK:
            rc = mqtt_unsuback_packet_handle(c, timer);
            break;

        case PUBLISH:
            rc = mqtt_publish_packet_handle(c, timer);
            break;

        case PUBREC:
        case PUBREL:
            rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
            break;

        case PINGRESP:
            c->mqtt_ping_outstanding = 0;    /* keep alive ping success */
            break;

        default:
            break;
    }

    rc = mqtt_keep_alive(c);//保持心跳

exit:
    if (rc == MQTT_SUCCESS_ERROR)
        rc = packet_type;

    RETURN_ERROR(rc);
}
  • 心跳处理
c
int mqtt_keep_alive(mqtt_client_t* c)
{
    int rc = MQTT_SUCCESS_ERROR;
    
    rc = mqtt_is_connected(c);
    if (MQTT_SUCCESS_ERROR != rc)
        RETURN_ERROR(rc);

    if (platform_timer_is_expired(&c->mqtt_last_sent) || platform_timer_is_expired(&c->mqtt_last_received)) {//时间到了
        if (c->mqtt_ping_outstanding) {
            MQTT_LOG_W("%s:%d %s()... ping outstanding", __FILE__, __LINE__, __FUNCTION__);
            /*must realse the socket file descriptor zhaoshimin 20200629*/
            network_release(c->mqtt_network);
            
            mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
            rc = MQTT_NOT_CONNECT_ERROR; /* PINGRESP not received in keepalive interval */
        } else {
            platform_timer_t timer;
            int len = MQTTSerialize_pingreq(c->mqtt_write_buf, c->mqtt_write_buf_size);
            if (len > 0 && (rc = mqtt_send_packet(c, len, &timer)) == MQTT_SUCCESS_ERROR) // send the ping packet
                c->mqtt_ping_outstanding++;
        }
    }

    RETURN_ERROR(rc);
}
  • 处理头部
c
struct
{
    unsigned int retain : 1;		/**< retained flag bit */
    unsigned int qos : 2;				/**< QoS value, 0, 1 or 2 */
    unsigned int dup : 1;				/**< DUP flag bit */
    unsigned int type : 4;			/**< message type nibble */
} bits;

使用位域对头部进行解析

  • 其中一个处理函数
c
static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
{
    int len = 0, rc = MQTT_SUCCESS_ERROR;
    MQTTString topic_name;
    mqtt_message_t msg;
    int qos;
    msg.payloadlen = 0; 
    //确保连接
    rc = mqtt_is_connected(c);
    if (MQTT_SUCCESS_ERROR != rc)
        RETURN_ERROR(rc);

    if (MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name,
        (uint8_t**)&msg.payload, (int*)&msg.payloadlen, c->mqtt_read_buf, c->mqtt_read_buf_size) != 1)
        RETURN_ERROR(MQTT_PUBLISH_PACKET_ERROR);
    
    msg.qos = (mqtt_qos_t)qos;

    /* for qos1 and qos2, you need to send a ack packet */
    if (msg.qos != QOS0) {
        platform_mutex_lock(&c->mqtt_write_lock);
        
        if (msg.qos == QOS1)
            len = MQTTSerialize_ack(c->mqtt_write_buf, c->mqtt_write_buf_size, PUBACK, 0, msg.id);
        else if (msg.qos == QOS2)
            len = MQTTSerialize_ack(c->mqtt_write_buf, c->mqtt_write_buf_size, PUBREC, 0, msg.id);

        if (len <= 0)
            rc = MQTT_SERIALIZE_PUBLISH_ACK_PACKET_ERROR;
        else
            rc = mqtt_send_packet(c, len, timer);
        
        platform_mutex_unlock(&c->mqtt_write_lock);
    }

    if (rc < 0)
        RETURN_ERROR(rc);

    if (msg.qos != QOS2)
        mqtt_deliver_message(c, &topic_name, &msg);
    else {
        /* record the received of a qos2 message and only processes it when the qos2 message is received for the first time */
        if ((rc = mqtt_ack_list_record(c, PUBREL, msg.id, len, NULL)) != MQTT_ACK_NODE_IS_EXIST_ERROR)
            mqtt_deliver_message(c, &topic_name, &msg);		//在这里对信息进一步解析,调用对应处理函数
    }
    
    RETURN_ERROR(rc);
}
c
static int mqtt_deliver_message(mqtt_client_t* c, MQTTString* topic_name, mqtt_message_t* message)
{
    int rc = MQTT_FAILED_ERROR;
    message_handlers_t *msg_handler;
    
    /* get mqtt message handler */
    msg_handler = mqtt_get_msg_handler(c, topic_name);	//获取处理函数
    
    if (NULL != msg_handler) {
        message_data_t md;
        mqtt_new_message_data(&md, topic_name, message);    /* make a message data */
        msg_handler->handler(c, &md);       /* deliver the message */
        rc = MQTT_SUCCESS_ERROR;
    } else if (NULL != c->mqtt_interceptor_handler) {
        message_data_t md;
        mqtt_new_message_data(&md, topic_name, message);    /* make a message data */
        c->mqtt_interceptor_handler(c, &md);
        rc = MQTT_SUCCESS_ERROR;
    }
    
    memset(message->payload, 0, strlen((const char *)message->payload));
    memset(topic_name->lenstring.data, 0, topic_name->lenstring.len);

    RETURN_ERROR(rc);
}
c
#define LIST_FOR_EACH_SAFE(curr, next, list) \
    for (curr = (list)->next, next = curr->next; curr != (list); \
            curr = next, next = curr->next)
  1. 订阅消息调用函数, 告诉他要订阅的消息以及接收到消息以后的处理函数
  2. 这个订阅会把这个消息加到一个链表里面, 链表包含名称以及处理函数
  3. 创建一个线程进行处理接收到的消息
  4. 线程里面会进行读数据包, 之后根据数据包的类型进行处理

实际的移植

使用的是FreeRTOS, 里面的时钟等已经被实现, 需要自己实现网络连接的接口

移植的是platform_net_socket.c文件里面的connect函数, write函数, read函数