使用Node.js搭建MQTT服务器,实现STM32和ESP8266的消息订阅和发布

从零开始用Nodejs搭建一个MQTT服务器,并且用stm32通过esp8266进行消息订阅和发布

一、项目背景

最近在做一个物联网项目,需要用到服务器进行数据的存储和数据的请求和发送,之前我用过onenet平台上的http服务,虽然能通过get和post请求进行数据的提交和发送,但是平台上的数据发生改变却不能主动推送给esp8266,与我此次的项目不符合,所以pass。然后我了解了下mqtt协议,它可以消息的发布和订阅实现服务器和esp8266的双向通信,而我之前又学过一些js,所以我就想能不能自己搭建一个mqtt服务器,最终还真的实现了。

二、搭建mqtt服务器

1.搭建开发环境

  • 安装nodejs,可以前往nodejs官网下载,网上有许多的安装教程,这里就不介绍了
  • 安装vscode,同样网上也有许多的教程,就不介绍了
  • 2.创建一个项目

  • 创建一个文件夹并用vscode打开
  • 在终端输入npm init -y,初始化项目
  • 下载aedes包,aedes是一个基于Node.js的MQTT(Message Queuing Telemetry Transport)服务器实现,它可以让你轻松地搭建MQTT服务器,用于处理MQTT协议的消息传递。通过在终端输入命令npm i aedes --save进行下载。
  • 创建一个app.js文件
  • 3.代码编写

    //app.js
    //引入mqtt包
    import aedes from "aedes";
    //网络服务包,nodejs自带
    import net from "net";
    
    //创建aedes实例
    /*
    	配置项: mq: 消息队列,用于存储和处理消息。默认情况下,aedes 使用内存消息队列,但你可以使用其他消息队列实现,例如 Redis。
                id: 服务器的唯一标识符。如果未指定,则将自动生成一个唯一标识符。
                persistence: 持久化存储,用于将连接和会话状态存储到磁盘或数据库中。默认情况下,aedes 使用内存持久化存储,但你可以使用其他持久化存储实现,例如 LevelDB 或 MongoDB。
                concurrency: 最大并发连接数。默认情况下,aedes 允许无限制的并发连接。
                heartbeatInterval: 心跳间隔时间,用于检测连接是否处于活动状态。默认情况下,aedes 每 5 分钟发送一次心跳包。
                connectTimeout: 连接超时时间。默认情况下,aedes 允许无限制的连接超时时间。
                queueLimit: 消息队列长度限制,用于限制消息队列的最大长度。默认情况下,aedes 不限制消息队列长度。
                maxClientsIdLength: 最大客户端 ID 长度。默认情况下,aedes 允许客户端 ID 的最大长度为 23 个字符。
                preConnect: 在连接建立之前执行的处理程序。可以用于验证客户端的身份或执行其他预处理操作。
                authenticate: 身份验证处理程序,用于验证客户端的身份。可以使用用户名/密码、证书等进行身份验证。
                authorizePublish: 发布授权处理程序,用于授权客户端发布消息。
                authorizeSubscribe: 订阅授权处理程序,用于授权客户端订阅主题。
                authorizeForward: 转发授权处理程序,用于授权客户端转发消息。
                published: 发布处理程序,用于在消息发布后执行自定义操作,例如记录日志或触发事件。
    */
    
    //我只用到三个配置项,其他配置项有需要可以自行配置
    export const aedesApp = new aedes({
        heartbeatInterval: 60000, //60s发送一次心跳包
        connectTimeout: 120000,   //如果与服务器120s没有收到连接客户端发过来的心跳包,则视为连接断开
    });
    
    //验证账号密码
    aedesApp.authenticate = async function (client, username, password, callback) {
        //client.id是客户端的id,是唯一的,username是客户端的用户名(密码为buffer,需要转化为string),password是客户端的密码
        //我们可以在这里进行用户的身份验证,是否允许客户端的这次连接请求
        const newPassword = password.toString();
        if(username === 'admin' && newPassword === '123456'){
            callback(null, true); //callback函数需要传递两个参数,第一个是错误实例,第二个是是否同意连接
        }else{
            callback(null, false)
        }
    }
    
    //监听MQTT服务器端口,当有客户端连接上时,触发该回调
    aedesApp.on('client', async (client) => {
        console.log('ClientConnect:', client.id)
    })
    
    //监听MQTT服务器端口,当有客户端主动断开连接或者服务器120s内没收到某个客户端的心跳包就会触发该回调
    aedesApp.on('clientDisconnect', async (client) => {
        console.log('clientDisconnect:', client.id)
    })
    
    //订阅主题。该函数第一个参数是要订阅的主题; 第二个是用于处理收到的消息的函,它接受两个参数:packet 和 callback。packet 是一个 AedesPublishPacket 对象,表示收到的消息;callback 是一个函数,用于在消息处理完成后通知 aedes 服务器;第三个参数是订阅成功的回调函数
    aedesApp.subscribe("myMsg", async function (packet, callback) {
        callback();
    }, () => {
        console.log("订阅myMsg成功");
    });
    
    //处理收到的消息,我们订阅所有主题收到的消息都可以通过这个事件获取(我们可以把订阅收到消息的处理函数写在上面订阅主题函数的第二个参数里面,或者统一写在下面)
    aedesApp.on("publish", async function (packet, client){
        //packet.topic表示哪个主题,packet.payload是收到的数据,是一串二进制数据,我们需要用.toString()将它转化为字符串
      	if(packet.topic === 'myMsg'){
            console.log("Received message:", packet.payload.toString());
        }
    })
    
    //发布主题
    setInterval(()=>{          //两秒发布一次
        aedesApp.publish({
            topic: "success",  //发布主题
            payload: "yes",    //消息内容
            qos: 1,            //MQTT消息的服务质量(quality of service)。服务质量是1,这意味着这个消息需要至少一次确认(ACK)才能被认为是传输成功
            retain: false,     // MQTT消息的保留标志(retain flag),它用于控制消息是否应该被保留在MQTT服务器上,以便新的订阅者可以接收到它。保留标志是false,这意味着这个消息不应该被保留
            cmd: "publish",    // MQTT消息的命令(command),它用于控制消息的类型。命令是"publish",这意味着这个消息是一个发布消息
            dup: false         //判断消息是否是重复的
        }, (err) => {          //发布失败的回调
            if (err) {
                console.log('发布失败')
            }
    	});
    },2000)
    
    //创建服务器
    const server = net.createServer(aedesApp.handle);
    // 运行服务器,运行在1883端口
    server.listen(1883, function () {
      console.log('server started and listening on port 1883')
    })
    

    4.启动服务

  • 在终端输入node app.js启动服务

  • 终端输出server started and listening on port 1883,服务启动成功

  • 5.验证服务是否正常

  • 下载mqtt包,在终端输入npm i mqtt --save

  • 创建一个mqtt.js文件

  • 代码编写

    import mqtt from "mqtt";
    
    //连接到 MQTT 服务器
    const client = mqtt.connect("mqtt://localhost",{
      username:'admin',   //用户名
      password:'123456',  //密码
      clientId: '1',      //客户端id
    });
    
    //订阅主题
    client.on("connect", function () {
      client.subscribe("success", function (err) {
        if (!err) {
          console.log("Subscribed to success");
        }
      });
    });
    
    //处理收到的消息
    client.on("message", function (topic, message) {
      if (topic === "success") {
        console.log("Received message:", message.toString());
      }
    });
    //发布消息
    setInterval(() => {
      client.publish("myMsg", '123123');
    }, 2000);
    
  • 打开另个一个终端,在终端输入node mqtt.js启动连接

  • 启动app.js的终端打印出

    订阅myMsg成功
    ClientConnect: 1
    Received message: 123123
    

    运行截图

  • 当前终端打印出

    Subscribed to success
    Received message : yes
    

    运行截图 运行截图

  • 到此,mqtt服务启动成功,且能正常的发布和订阅消息

  • 三、stm32f103 代码编写

    mqtt指令可以到安可信的官网查看:安可信mqtt固件

    1.esp8266.h

    #ifndef _ESP8266_H_
    #define _ESP8266_H_
    #include "stm32f10x.h"
    
    #define REV_OK		0	
    #define REV_WAIT	1	
    
    void USART3_Init(u32 baud);
    void ESP8266_Clear(void);
    void ESP8266_MQTT_Init(void);
    void publishMYMSG();
    
    #endif
    

    2.esp8266.c

    #include "esp8266.h"
    #include "delay.h"
    #include "string.h"
    
    //复位
    #define ESP8266_RST                                 "AT+RST\r\n"    
    //检测esp8266是否处于正常工作状态
    #define ESP8266_AT                                  "AT\r\n"
    //设置为客户端模式
    #define ESP8266_CWMODE                              "AT+CWMODE=1" 
    //设置WiFi和密码
    #define ESP8266_WIFI_INFO		                    "AT+CWJAP=\"Robotlab-2.4G\",\"8b107lab\"\r\n"
    //设置客户端信息,第三个参数为客户端id,第四个为用户名,第五个为密码
    #define ESP8266_MQTT_MQTTUSERCFG                    "AT+MQTTUSERCFG=0,1,\"4\",\"admin\",\"123456\",0,0,\"\"\r\n"  
    //开启心跳检测,60s内没收到心跳包视为断开
    #define ESP8266_MQTT_MQTTCONNCFG                    "AT+MQTTCONNCFG=0,60,1,\"\",\"\",0,0"
    //连接mqtt服务器(注意:esp8266连接的WiFi需要跟启动mqtt服务器的电脑连接的是同一个,连接的ip地址我们可以打开cmd,输入ipconfig指令进行查看)
    #define ESP8266_MQTT_MQTTCONN                       "AT+MQTTCONN=0,\"192.168.1.104\",1883,1\r\n"
    //订阅success主题
    #define ESP8266_MQTT_MQTTSUB                        "AT+MQTTSUB=0,\"success\",1\r\n"
    //发布myMsg主题
    #define ESP8266_MQTT_MQTTPUB_MYMSG                   "AT+MQTTPUB=0,\"myMsg\",\"%s\",0,0\r\n"    
    unsigned char esp8266_buf[500];
    unsigned int esp8266_cnt = 0, esp8266_cntPre = 0;
    
    //==========================================================
    //	函数名称:	ESP8266_Clear
    //
    //	函数功能:	清空缓存                    
    //
    //	入口参数:	无
    //
    //	返回参数:	无
    //
    //	说明:		
    //==========================================================
    void ESP8266_Clear(void)
    {
    	memset(esp8266_buf, 0, sizeof(esp8266_buf));
    	esp8266_cnt = 0;
    }
    
    //==========================================================
    //	函数名称:	ESP8266_WaitRecive
    //
    //	函数功能:	等待接收完成
    //
    //	入口参数:	无
    //
    //	返回参数:	REV_OK-接收完成		REV_WAIT-接收超时未完成
    //
    //	说明:		循环调用检测是否接收完成
    //==========================================================
    _Bool ESP8266_WaitRecive(void)
    {
    	if(esp8266_cnt == 0) 							//如果接收计数为0 则说明没有处于接收数据中,所以直接跳出,结束函数
    		return REV_WAIT;	
    	if(esp8266_cnt == esp8266_cntPre)				//如果上一次的值和这次相同,则说明接收完毕
    	{
    		esp8266_cnt = 0;							//清0接收计数		
    		return REV_OK;								//返回接收完成标志
    	}
    	esp8266_cntPre = esp8266_cnt;					//置为相同
    	return REV_WAIT;									//返回接收未完成标志
    }
    
    
    //==========================================================
    //	函数名称:	ESP8266_SendCmd
    //
    //	函数功能:	发送命令
    //
    //	入口参数:	cmd:命令
    //				res:需要检查的返回指令
    //
    //	返回参数:	0-成功	1-失败
    //
    //	说明:		
    //==========================================================
    
    _Bool ESP8266_SendCmd(char *cmd, char *res, u16 time)
    {	
    	char *str;
    	Usart3_SendString((unsigned char *)cmd, strlen((const char *)cmd));
    	while(time--)
    	{
    		if(ESP8266_WaitRecive() == REV_OK)							//如果收到数据
    		{
    			str=strstr((const char *)esp8266_buf, res) ;   //接收返回的数据
    			if(str!= NULL)		//如果检索到关键词
    			{
    				ESP8266_Clear();									//清空缓存
    				return 0;
    			}
    		}
    		delay_ms(10);
    	}
    	return 0;
    }
    
    //==========================================================
    //	函数名称:	ESP8266_MQTT_Init
    //
    //	函数功能:	mqtt初始化
    //
    //	入口参数:	无
    //
    //	返回参数:	无
    //
    //	说明:		
    //==========================================================
    
    void ESP8266_MQTT_Init(void)
    {
        printf("START Init\r\n");
    	ESP8266_Clear();
        while(ESP8266_SendCmd("AT+RST\r\n", "ready", 200)); 
    	printf("AT\r\n");
    	while(ESP8266_SendCmd("AT\r\n", "OK", 200));
    	printf("CWMODE\r\n");
    	while(ESP8266_SendCmd(ESP8266_CWMODE, "OK", 200));
    	printf("CWJAP\r\n");
    	while(ESP8266_SendCmd(ESP8266_WIFI_INFO, "OK", 200));
        printf(ESP8266_MQTT_MQTTUSERCFG);
    	while(ESP8266_SendCmd(ESP8266_MQTT_MQTTUSERCFG, "OK", 200));
    	printf(ESP8266_MQTT_MQTTCONN);
    	while(ESP8266_SendCmd(ESP8266_MQTT_MQTTCONN, "OK", 200));
        printf(ESP8266_MQTT_MQTTSUB);
    	while(ESP8266_SendCmd(ESP8266_MQTT_MQTTSUB, "OK", 200));
    	printf("ESP8266 Init OK\r\n");
    }
    
    //发布myMsg主题
    void publishMYMSG()
    {
        char data[150];
        sprintf(data, ESP8266_MQTT_MQTTPUB_MYMSG,  "123");
        ESP8266_SendCmd(data, "OK", 100);
        memset(data, 0, sizeof(data));
    }
    
    
    //串口初始化
    void USART3_Init(u32 baud)
    {
        USART_InitTypeDef USART_InitStructure;
        NVIC_InitTypeDef NVIC_InitStructure;
        GPIO_InitTypeDef GPIO_InitStructure; // 声明一个结构体变量,用来初始化GPIO
        // 使能串口的RCC时钟
        RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOB, ENABLE); // 使能UART3所在GPIOB的时钟
        RCC_APB1PeriphClockCmd(RCC_APB1Periph_USART3, ENABLE);
        // 串口使用的GPIO口配置
        //  Configure USART3 Tx (PB.10) as alternate function push-pull
        GPIO_InitStructure.GPIO_Pin = GPIO_Pin_10;
        GPIO_InitStructure.GPIO_Speed = GPIO_Speed_50MHz;
        GPIO_InitStructure.GPIO_Mode = GPIO_Mode_AF_PP;
        GPIO_Init(GPIOB, &GPIO_InitStructure);
        // Configure USART3 Rx (PB.11) as input floating
        GPIO_InitStructure.GPIO_Pin = GPIO_Pin_11;
        GPIO_InitStructure.GPIO_Mode = GPIO_Mode_IN_FLOATING;
        GPIO_Init(GPIOB, &GPIO_InitStructure);
    
        // 串口中断配置
        // Enable the USART3 Interrupt
        NVIC_InitStructure.NVIC_IRQChannel = USART3_IRQn;
        NVIC_InitStructure.NVIC_IRQChannelPreemptionPriority = 6; // 抢占优先级3
        NVIC_InitStructure.NVIC_IRQChannelSubPriority = 0;        // 子优先级2
        NVIC_InitStructure.NVIC_IRQChannelCmd = ENABLE;
        NVIC_Init(&NVIC_InitStructure);
        // Enable USART3 Receive interrupts 使能串口接收中断
        USART_ITConfig(USART3, USART_IT_RXNE, ENABLE);
        // 配置串口
        USART_InitStructure.USART_BaudRate = baud;
        USART_InitStructure.USART_WordLength = USART_WordLength_8b;
        USART_InitStructure.USART_StopBits = USART_StopBits_1;
        USART_InitStructure.USART_Parity = USART_Parity_No;
        USART_InitStructure.USART_HardwareFlowControl = USART_HardwareFlowControl_None;
        USART_InitStructure.USART_Mode = USART_Mode_Rx | USART_Mode_Tx;
        // Configure USART3
        USART_Init(USART3, &USART_InitStructure); // 配置串口3
        // Enable the USART3
        USART_Cmd(USART3, ENABLE); // 使能串口3
    
    }
    //串口发送字符串
    void Usart3_SendString(unsigned char *str, unsigned short len)
    {
    
        unsigned short count = 0;
    
        for (; count < len; count++)
        {
            USART_SendData(USART3, *str++);
            while (USART_GetFlagStatus(USART3, USART_FLAG_TXE) == RESET)
                ;
            while ((USART_GetFlagStatus(USART3, USART_FLAG_TC) == RESET))
                ; // 等待串口发送完毕
        }
    }
    
    
    //==========================================================
    //	函数名称:	USART3_IRQHandler
    //
    //	函数功能:	串口3收发中断
    //
    //	入口参数:	无
    //
    //	返回参数:	无
    //
    //	说明:		
    //==========================================================
    
    void USART3_IRQHandler(void)
    {
    	if(USART_GetFlagStatus(USART3, USART_IT_RXNE))
    	{
    		if(esp8266_cnt >= sizeof(esp8266_buf))	esp8266_cnt = 0; 
    		esp8266_buf[esp8266_cnt++] = USART_ReceiveData(USART3);
            //1.处理函数最好不要写在中断函数里面,如果处理函数时间过长,会造成串口中断触发不及时,数据丢失
            //2.建议最好使用操作系统,将数据处理函数单独写成一个任务,或者使用一个标志位,收到就将标志位置1,然后在main函数中判断处理,就能避免上面的情况
            //3.我们在判断是否收到mqtt消息时需要有一些特殊字段来进行判断,比如我们可以在mqtt服务器发布消息时给每个消息都添加一个 ok 字段,便于我们识别
             if(strstr((const char *)esp8266_buf, "success"))
             {
                printf("收到success主题\r\n");
                ESP8266_Clear(); 
             }
    	}
    
    }
    
    

    3.main.c

    #include "esp8266.h"
    int main()
    {
    	USART3_Init(115200);
        ESP8266_MQTT_Init();
        while(1)
        {
            publishMYMSG();
            delay_ms(2000);
        }
    }
    

    4.运行结果

    串口打印

    运行截图

    mqtt服务器

    运行截图

    四、结语

    如果你有一些nodejs后端知识,你就可以将上述的mqtt服务进行优化扩展,比如可以使用数据库进行数据持久化,可以使用http服务进行前后端交互,在通过mqtt发布主题给esp8266进行响应,等等。

    有什么问题欢迎大家提问,有什么错误也欢迎大家指出来。

    创作不易,留下你宝贵的赞吧!!!

    物联沃分享整理
    物联沃-IOTWORD物联网 » 使用Node.js搭建MQTT服务器,实现STM32和ESP8266的消息订阅和发布

    发表回复