Skip to content

MQTT

服务器

c
# 安装mosquitto
sudo apt-get install mosquitto
# 安装客户端
sudo apt-get install mosquitto-clients
# 安装设备端
sudo apt-get install mosquitto-dev

关防火墙和安全组设置

bash
mosquitto_sub -v -t hello    # 订阅安全组
-h 指定域名
-t 指定订阅的主题,主题为:hello
-v 详细模式 打印调试信息
bash
mosquitto_pub -t hello -m world  # 发布订阅
-t 指定订阅的主题,主题为:hello
-m 指定发布的消息的内容

esp32

流程

  1. wifi初始化以及设置参数
  2. wifi连接
  3. mqtt初始化
  4. 订阅监听事件
  1. 初始化esp_mqtt_client_config_t结构体, 使用esp_mqtt_client_init进行初始化
  2. esp_mqtt_client_register_event注册一下事件处理的函数
  3. esp_mqtt_client_start开启服务
  4. 在事件回调函数里面处理事件

事件

  • MQTT_EVENT_BEFORE_CONNECT:客户端已初始化并即将开始连接至服务器。
  • MQTT_EVENT_CONNECTED:客户端已成功连接至服务器。客户端已准备好收发数据。
  • MQTT_EVENT_DISCONNECTED:由于无法读取或写入数据,例如因为服务器无法使用,客户端已终止连接。
  • MQTT_EVENT_SUBSCRIBED:服务器已确认客户端的订阅请求。事件数据将包含订阅消息的消息 ID。
  • MQTT_EVENT_UNSUBSCRIBED:服务器已确认客户端的退订请求。事件数据将包含退订消息的消息 ID。
  • MQTT_EVENT_PUBLISHED:服务器已确认客户端的发布消息。消息将仅针对 QoS 级别 1 和 2 发布,因为级别 0 不会进行确认。事件数据将包含发布消息的消息 ID。
  • MQTT_EVENT_DATA:客户端已收到发布消息。事件数据包含:消息 ID、发布消息所属主题名称、收到的数据及其长度。对于超出内部缓冲区的数据,将发布多个 MQTT_EVENT_DATA,并更新事件数据的 current_data_offsettotal_data_len 以跟踪碎片化消息。
  • MQTT_EVENT_ERROR:客户端遇到错误。使用事件数据 error_handle 字段中的 error_type,可以发现错误。错误类型决定 error_handle 结构体的哪些部分会被填充。

遗嘱消息 (LWT)

初始化结构体里面有一个last_will, 可以设置这一个的参数从而在意外断开里面的时候通过遗嘱通知其他的设备

  • topic:指向 LWT 消息主题的指针
  • msg:指向 LWT 消息的指针
  • msg_len:LWT 消息的长度,msg 不以空字符结尾时需要该字段
  • qos:LWT 消息的服务质量
  • retain:指定 LWT 消息的保留标志

API

常用

c
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config);

初始化, config里面最少需要设置.broker.address.uri="mqtt://110.41.39.131/"这一个参数

c
esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)

开启mqtt服务

c
int esp_mqtt_client_subscribe_single(esp_mqtt_client_handle_t client, const char *topic, int qos);

订阅一个主题

c
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)

取消订阅

c
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)

发送消息

c
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler, void *event_handler_arg)

注册事件, 可以使用ESP_EVENT_ANY_ID注册所有的事件

其他

c
esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *uri)

重新设置URL

示例

c
#include <stdio.h>
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "mqtt_client.h"
#include "esp_event.h"
#include "esp_wifi.h"
#include "nvs_flash.h"
 
void Z_WiFi_Init(void);
void Z_Mqtt_Init(void);
 
bool Z_mqtt_connect_flag=false;             //记录是否连接上MQTT服务器的一个标志,如果连接上了才可以发布信息
esp_mqtt_client_handle_t emcht;             //MQTT客户端句柄
 
//WiFI事件处理函数
void  wifi_event_fun(void* handler_arg,esp_event_base_t event_base,int32_t event_id,void* event_data){
    printf("%s,%ld\r\n",event_base,event_id);
 
    if(event_id==WIFI_EVENT_STA_START){                 //如果是STA开启了,那么尝试连接
        printf("\r\nstart wifi success\r\n");
        esp_wifi_connect();                         
    }else if(event_id==WIFI_EVENT_STA_CONNECTED){       //连接上WiFI之后
        printf("connect wifi success\r\n");
        Z_Mqtt_Init();                                  //开始连接MQTT服务器
    }else if(event_id==WIFI_EVENT_STA_DISCONNECTED){    //断开WiFi之后
        esp_wifi_connect();                             //尝试重连WiFi
    }
}
 
//MQTT事件处理函数
void mqtt_event_fun(void *event_handler_arg, esp_event_base_t event_base, int32_t event_id, void *event_data){
    printf("%s,%ld\r\n",event_base,event_id);
 
    if(event_id==MQTT_EVENT_CONNECTED){                 //连接上MQTT服务器
        Z_mqtt_connect_flag=true;
        esp_mqtt_client_subscribe_single(emcht,"Z_topic",1);    //订阅一个测试主题
        printf("success connect mqtt\r\n");
    }else if(event_id==MQTT_EVENT_DISCONNECTED){        //断开MQTT服务器连接
        Z_mqtt_connect_flag=false;
        printf("lose connect mqtt\r\n");
    }else if(event_id==MQTT_EVENT_DATA){                //收到订阅信息
        esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t )event_data;   //强转获取存放订阅信息的参数
        printf("receive data : %.*s from %.*s\r\n",event->data_len,event->data,event->topic_len,event->topic);
    }
}
 
void Z_WiFi_Init(void){
    nvs_flash_init();                           //初始化nvs
    esp_netif_init();                           //初始化TCP/IP堆栈
    esp_event_loop_create_default();            //创建默认事件循环
    esp_event_handler_register(WIFI_EVENT,ESP_EVENT_ANY_ID,wifi_event_fun,NULL);       //绑定事件处理函数
    esp_netif_create_default_wifi_sta();        //创建STA
    wifi_init_config_t wict = WIFI_INIT_CONFIG_DEFAULT();
    esp_wifi_init(&wict);                       //初始化WiFI
    esp_wifi_set_mode(WIFI_MODE_STA);           //设为STA模式
    wifi_config_t wct = {
        .sta = {
            .ssid="Xiaomi_5314",
            .password="13838106970"
        }
    };
    esp_wifi_set_config(WIFI_IF_STA,&wct);      //设置WiFi
    printf("\r\nstart wifi\r\n");
    esp_wifi_start();                           //启动WiFi
}
 
void Z_Mqtt_Init(void){
    esp_mqtt_client_config_t emcct = {
        .broker.address.uri="mqtt://110.41.39.131/",  //MQTT服务器的uri
        .broker.address.port=1883                   //MQTT服务器的端口
    };
    emcht = esp_mqtt_client_init(&emcct);           //初始化MQTT客户端获取句柄
    if(!emcht)  printf("mqtt init error!\r\n");
    
    //注册MQTT事件处理函数
    if(esp_mqtt_client_register_event(emcht,ESP_EVENT_ANY_ID,mqtt_event_fun,NULL)!=ESP_OK)  printf("mqtt register error!\r\n");
 
    //开启MQTT客户端
    if(esp_mqtt_client_start(emcht) != ESP_OK)  printf("mqtt start errpr!\r\n");
}
 
void app_main(void){
    Z_WiFi_Init();
    char* data="Hello World";
    while(1){
        //每隔5S发布一次测试消息
        if(Z_mqtt_connect_flag) esp_mqtt_client_publish(emcht,"hello",data,strlen(data),1,0);
        vTaskDelay(3000/portTICK_PERIOD_MS);
    }
}