Appearance
MQTT程序分层
我们主要关注的是客户端(记者和用户), 我们只需要把信息发送给电视台或者接受信息就行了
- 初始化以及连接
- 发出订阅信息
- 有一个线程读传感器, 有某种情况发生以后发布消息
- 另一个线程等待消息, 在等到消息以后处理消息
c
int main(void)
{
int res;
pthread_t thread1;
mqtt_client_t *client = NULL;
printf("\nwelcome to mqttclient test...\n");
mqtt_log_init();
client = mqtt_lease(); //获取一个句柄
#ifdef TEST_USEING_TLS
//未使用
mqtt_set_port(client, "8883");
mqtt_set_ca(client, (char*)test_ca_get());
#else
mqtt_set_port(client, "1883"); //设置一个端口
#endif
mqtt_set_host(client, "www.jiejie01.top"); //设置服务器
mqtt_set_client_id(client, random_string(10));
mqtt_set_user_name(client, random_string(10));
mqtt_set_password(client, random_string(10));
mqtt_set_clean_session(client, 1);
mqtt_connect(client);`//连接服务器
mqtt_subscribe(client, "topic1", QOS0, topic1_handler);
mqtt_subscribe(client, "topic2", QOS1, NULL);
mqtt_subscribe(client, "topic3", QOS2, NULL);
res = pthread_create(&thread1, NULL, mqtt_publish_thread, client); //创建一个线程用于接收数据
if(res != 0) {
MQTT_LOG_E("create mqtt publish thread fail");
exit(res);
}
while (1) {
sleep(100);
}
}
- 获取一个句柄
c
mqtt_client_t *mqtt_lease(void)
{
int rc;
mqtt_client_t* c;
c = (mqtt_client_t *)platform_memory_alloc(sizeof(mqtt_client_t));
if (NULL == c)
return NULL;
memset(c, 0, sizeof(mqtt_client_t));
rc = mqtt_init(c);
if (MQTT_SUCCESS_ERROR != rc)
return NULL;
return c;
}
申请一个MQTT的句柄, 并进行初始化
c
static int mqtt_init(mqtt_client_t* c)
{
/* network init */
c->mqtt_network = (network_t*) platform_memory_alloc(sizeof(network_t));
if (NULL == c->mqtt_network) {
MQTT_LOG_E("%s:%d %s()... malloc memory failed...", __FILE__, __LINE__, __FUNCTION__);
RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
}
memset(c->mqtt_network, 0, sizeof(network_t));
c->mqtt_packet_id = 1;
c->mqtt_clean_session = 0; //no clear session by default
c->mqtt_will_flag = 0;
c->mqtt_cmd_timeout = MQTT_DEFAULT_CMD_TIMEOUT;
c->mqtt_client_state = CLIENT_STATE_INITIALIZED;
c->mqtt_ping_outstanding = 0;
c->mqtt_ack_handler_number = 0;
c->mqtt_client_id_len = 0;
c->mqtt_user_name_len = 0;
c->mqtt_password_len = 0;
c->mqtt_keep_alive_interval = MQTT_KEEP_ALIVE_INTERVAL; //多少秒联系一次
c->mqtt_version = MQTT_VERSION;
c->mqtt_reconnect_try_duration = MQTT_RECONNECT_DEFAULT_DURATION;
c->mqtt_will_options = NULL;
c->mqtt_reconnect_data = NULL;
c->mqtt_reconnect_handler = NULL;
c->mqtt_interceptor_handler = NULL;
mqtt_read_buf_malloc(c, MQTT_DEFAULT_BUF_SIZE); //读写buff
mqtt_write_buf_malloc(c, MQTT_DEFAULT_BUF_SIZE);
mqtt_list_init(&c->mqtt_msg_handler_list);
mqtt_list_init(&c->mqtt_ack_handler_list);
platform_mutex_init(&c->mqtt_write_lock);
platform_mutex_init(&c->mqtt_global_lock);
platform_timer_init(&c->mqtt_last_sent);
platform_timer_init(&c->mqtt_last_received);
RETURN_ERROR(MQTT_SUCCESS_ERROR);
}
主要是设置一些参数
- 网络连接
c
int mqtt_connect(mqtt_client_t* c)
{
/* connect server in blocking mode and wait for connection result */
return mqtt_connect_with_results(c);
}
c
int network_connect(network_t *n)
{
#ifndef MQTT_NETWORK_TYPE_NO_TLS
if (n->channel)
return nettype_tls_connect(n);
#endif
return nettype_tcp_connect(n);
}
c
int nettype_tcp_connect(network_t* n)
{
n->socket = platform_net_socket_connect(n->host, n->port, PLATFORM_NET_PROTO_TCP);//平台相关的socket连接
if (n->socket < 0)
RETURN_ERROR(n->socket);
RETURN_ERROR(MQTT_SUCCESS_ERROR);
}
platform_net_socket_connect这个是主要需要实现的, 需要使用AT命令进行实现, 通过串口进行实现
创建线程
调用过程:
c
main
mqtt_connect(client);
mqtt_connect_with_results(c);
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
rc = network_connect(c->mqtt_network); //在这里面会调用平台提供的函数
/* send connect packet */
if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
goto exit;
if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {
}
/* connect success, and need init mqtt thread */
c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread,c, ...);
发布消息
调用过程:
c
main
res = pthread_create(&thread1, NULL, mqtt_publish_thread, client);
mqtt_publish_thread
mqtt_publish(client, "topic1", &msg);
// 1. 构造消息
mqtt_message_t msg;
memset(&msg, 0, sizeof(msg));
msg.payload = (void *) buf; //初始化一个信息
msg.payloadlen = xxx; //设置信息的长度
mqtt_publish(client, "topic1", &msg);
// 1.1 根据MQTT协议构造数据包
// 1.2 根据平台相关的函数发送数据包
mqtt_send_packet
network_write
nettype_tcp_write
platform_net_socket_write_timeout
4.4 最复杂:订阅消息
消息何时到来?不知道!
所以,必定是某个内核线程不断查询网卡:
读网卡数据
- 得到数据的话就判断、处理
c
mqtt_subscribe(client, "topic1", QOS0, topic1_handler);
mqtt_subscribe(client, "topic2", QOS1, NULL);
mqtt_subscribe(client, "topic3", QOS2, NULL);
之后调用这个函数订阅主题, 当收到某一个主题的函数的时候会调用注册的函数
c
def_class(message_handlers_t,
private_member(
mqtt_list_t list;//插入到某一个链表
mqtt_qos_t qos;//服务质量
const char* topic_filter;//主题名字
message_handler_t handler;//函数指针
)
)
之后会创建这样一个结构体
关键线程
- 处理收到的数据
- 保持心跳
c
while (1) {
rc = mqtt_yield(c, c->mqtt_cmd_timeout);//接受信息
if (MQTT_CLEAN_SESSION_ERROR == rc) {//错误处理
MQTT_LOG_W("%s:%d %s()..., mqtt clean session....", __FILE__, __LINE__, __FUNCTION__);
network_disconnect(c->mqtt_network);
mqtt_clean_session(c);
goto exit;
} else if (MQTT_RECONNECT_TIMEOUT_ERROR == rc) {
MQTT_LOG_W("%s:%d %s()..., mqtt reconnect timeout....", __FILE__, __LINE__, __FUNCTION__);
}
}
- 接收信息的函数
c
static int mqtt_yield(mqtt_client_t* c, int timeout_ms)
{
int rc = MQTT_SUCCESS_ERROR;
client_state_t state;
platform_timer_t timer;
if (NULL == c)
RETURN_ERROR(MQTT_FAILED_ERROR);
if (0 == timeout_ms)
timeout_ms = c->mqtt_cmd_timeout;
platform_timer_cutdown(&timer, timeout_ms);
while (!platform_timer_is_expired(&timer)) {
state = mqtt_get_client_state(c);
if (CLIENT_STATE_CLEAN_SESSION == state) {
RETURN_ERROR(MQTT_CLEAN_SESSION_ERROR);
} else if (CLIENT_STATE_CONNECTED != state) {
/* mqtt not connect, need reconnect */
rc = mqtt_try_reconnect(c);
if (MQTT_RECONNECT_TIMEOUT_ERROR == rc)
RETURN_ERROR(rc);
continue;
}
/* mqtt connected, handle mqtt packet */
rc = mqtt_packet_handle(c, &timer);//信息的处理
if (rc >= 0) {
/* scan ack list, destroy ack handler that have timed out or resend them */
mqtt_ack_list_scan(c, 1);
} else if (MQTT_NOT_CONNECT_ERROR == rc) {
MQTT_LOG_E("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, __FUNCTION__);
} else {
break;
}
}
RETURN_ERROR(rc);
}
- 处理信息
c
static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
{
int rc = MQTT_SUCCESS_ERROR;
int packet_type = 0;
rc = mqtt_read_packet(c, &packet_type, timer);//会在这里进行读数据包, 会把包的数据保存在第二个参数里面
switch (packet_type) {
case 0: /* timed out reading packet or an error occurred while reading data*/
if (MQTT_BUFFER_TOO_SHORT_ERROR == rc) {//没有收到
MQTT_LOG_E("the client read buffer is too short, please call mqtt_set_read_buf_size() to reset the buffer size");
/* don't return directly, you need to stay active, because there is data readable now, but the buffer is too small */
}
break;
//处理不同的包
case CONNACK: /* has been processed */
goto exit;
case PUBACK:
case PUBCOMP:
rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
break;
case SUBACK:
rc = mqtt_suback_packet_handle(c, timer);
break;
case UNSUBACK:
rc = mqtt_unsuback_packet_handle(c, timer);
break;
case PUBLISH:
rc = mqtt_publish_packet_handle(c, timer);
break;
case PUBREC:
case PUBREL:
rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
break;
case PINGRESP:
c->mqtt_ping_outstanding = 0; /* keep alive ping success */
break;
default:
break;
}
rc = mqtt_keep_alive(c);//保持心跳
exit:
if (rc == MQTT_SUCCESS_ERROR)
rc = packet_type;
RETURN_ERROR(rc);
}
- 心跳处理
c
int mqtt_keep_alive(mqtt_client_t* c)
{
int rc = MQTT_SUCCESS_ERROR;
rc = mqtt_is_connected(c);
if (MQTT_SUCCESS_ERROR != rc)
RETURN_ERROR(rc);
if (platform_timer_is_expired(&c->mqtt_last_sent) || platform_timer_is_expired(&c->mqtt_last_received)) {//时间到了
if (c->mqtt_ping_outstanding) {
MQTT_LOG_W("%s:%d %s()... ping outstanding", __FILE__, __LINE__, __FUNCTION__);
/*must realse the socket file descriptor zhaoshimin 20200629*/
network_release(c->mqtt_network);
mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
rc = MQTT_NOT_CONNECT_ERROR; /* PINGRESP not received in keepalive interval */
} else {
platform_timer_t timer;
int len = MQTTSerialize_pingreq(c->mqtt_write_buf, c->mqtt_write_buf_size);
if (len > 0 && (rc = mqtt_send_packet(c, len, &timer)) == MQTT_SUCCESS_ERROR) // send the ping packet
c->mqtt_ping_outstanding++;
}
}
RETURN_ERROR(rc);
}
- 处理头部
c
struct
{
unsigned int retain : 1; /**< retained flag bit */
unsigned int qos : 2; /**< QoS value, 0, 1 or 2 */
unsigned int dup : 1; /**< DUP flag bit */
unsigned int type : 4; /**< message type nibble */
} bits;
使用位域对头部进行解析
- 其中一个处理函数
c
static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
{
int len = 0, rc = MQTT_SUCCESS_ERROR;
MQTTString topic_name;
mqtt_message_t msg;
int qos;
msg.payloadlen = 0;
//确保连接
rc = mqtt_is_connected(c);
if (MQTT_SUCCESS_ERROR != rc)
RETURN_ERROR(rc);
if (MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name,
(uint8_t**)&msg.payload, (int*)&msg.payloadlen, c->mqtt_read_buf, c->mqtt_read_buf_size) != 1)
RETURN_ERROR(MQTT_PUBLISH_PACKET_ERROR);
msg.qos = (mqtt_qos_t)qos;
/* for qos1 and qos2, you need to send a ack packet */
if (msg.qos != QOS0) {
platform_mutex_lock(&c->mqtt_write_lock);
if (msg.qos == QOS1)
len = MQTTSerialize_ack(c->mqtt_write_buf, c->mqtt_write_buf_size, PUBACK, 0, msg.id);
else if (msg.qos == QOS2)
len = MQTTSerialize_ack(c->mqtt_write_buf, c->mqtt_write_buf_size, PUBREC, 0, msg.id);
if (len <= 0)
rc = MQTT_SERIALIZE_PUBLISH_ACK_PACKET_ERROR;
else
rc = mqtt_send_packet(c, len, timer);
platform_mutex_unlock(&c->mqtt_write_lock);
}
if (rc < 0)
RETURN_ERROR(rc);
if (msg.qos != QOS2)
mqtt_deliver_message(c, &topic_name, &msg);
else {
/* record the received of a qos2 message and only processes it when the qos2 message is received for the first time */
if ((rc = mqtt_ack_list_record(c, PUBREL, msg.id, len, NULL)) != MQTT_ACK_NODE_IS_EXIST_ERROR)
mqtt_deliver_message(c, &topic_name, &msg); //在这里对信息进一步解析,调用对应处理函数
}
RETURN_ERROR(rc);
}
c
static int mqtt_deliver_message(mqtt_client_t* c, MQTTString* topic_name, mqtt_message_t* message)
{
int rc = MQTT_FAILED_ERROR;
message_handlers_t *msg_handler;
/* get mqtt message handler */
msg_handler = mqtt_get_msg_handler(c, topic_name); //获取处理函数
if (NULL != msg_handler) {
message_data_t md;
mqtt_new_message_data(&md, topic_name, message); /* make a message data */
msg_handler->handler(c, &md); /* deliver the message */
rc = MQTT_SUCCESS_ERROR;
} else if (NULL != c->mqtt_interceptor_handler) {
message_data_t md;
mqtt_new_message_data(&md, topic_name, message); /* make a message data */
c->mqtt_interceptor_handler(c, &md);
rc = MQTT_SUCCESS_ERROR;
}
memset(message->payload, 0, strlen((const char *)message->payload));
memset(topic_name->lenstring.data, 0, topic_name->lenstring.len);
RETURN_ERROR(rc);
}
c
#define LIST_FOR_EACH_SAFE(curr, next, list) \
for (curr = (list)->next, next = curr->next; curr != (list); \
curr = next, next = curr->next)
- 订阅消息调用函数, 告诉他要订阅的消息以及接收到消息以后的处理函数
- 这个订阅会把这个消息加到一个链表里面, 链表包含名称以及处理函数
- 创建一个线程进行处理接收到的消息
- 线程里面会进行读数据包, 之后根据数据包的类型进行处理
实际的移植
使用的是FreeRTOS, 里面的时钟等已经被实现, 需要自己实现网络连接的接口
移植的是platform_net_socket.c文件里面的connect函数, write函数, read函数