Python MQTT实践指南1:利用阿里云物联网平台实现图像传输

最近实验有需要用到MQTT传输图像的需求,本人也不太会写除Python以外的代码,恰好今天读到嵌入式圈内的大佬@DS小龙哥的文章:基于阿里云物联网平台设计的实时图传系统 _ 采用 MQTT 协议传输图像
受大佬启发,今天利用Python写一个简单的MQTT图传程序。

软件:
Pycharm
MQTTX
Wireshark
手机端APP:IoT MQTT Panel

由于大佬在文章中已经写了关于在阿里云创建产品、设备及消息转发的相关设置,本文将不再赘述,添加设备后如下图所示:

我们利用Python脚本发送随机数检查设备接收信息的情况:

import time
import json
import random
import paho.mqtt.client as mqtt

#username和password
#可直接在设备页面一键复制
client_id = f"client_id"
timestamp = str(int(time.time()))
username = f"username"
password = f"password"

# MQTT连接地址
broker = f"mqtthosturl"
port = 1883

# 回调函数
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    client.subscribe(f"/sys/${ProductKey}/${deviceName}/thing/event/property/post")
    #注意把自己的设备信息更换
def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))
# 创建客户端
client = mqtt.Client(client_id=client_id)
client.username_pw_set(username, password)
# 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message
# 连接到阿里云物联网平台
client.connect(broker, port, 60)
# 启动MQTT客户端
client.loop_start()
# 发送随机数的函数
def publish_random_number():
    topic = f"/sys/${ProductKey}/${deviceName}/thing/event/property/post"
    while True:
        random_number = random.randint(1000000000, 9999999999)
        payload = {
            "method": "thing.event.property.post",
            "params": {
                "image": str(random_number)
            }
        }
        client.publish(topic, json.dumps(payload))
        print(f"Published: {json.dumps(payload)}")
        time.sleep(1)

# 启动发送随机数的循环
publish_random_number()


运行此脚本:

可以在平台的设备监测里看到:

OK,设备连接正常,下一步我们来搞一下图片传输
由于@DS小龙哥使用的是QT客户端

我们要用Python实现此功能需要 重新编写脚本,原理很简单,就是将图片转为base64编码,然后发送端建立与平台的连接,通过MQTT协议发送,平台中转给接收端,接收端可以解码此图片呈现。

直接上代码:

import os
import time
import json
import base64
import hmac
import hashlib
import paho.mqtt.client as mqtt

# 阿里云物联网平台的参数
product_key = "product_key"
device_name = "video_1"
device_secret = "device_secret"
region_id = "cn-shanghai"  # 选择你的阿里云区域

#username和password
#可直接在设备页面一键复制
client_id = f"client_id"
timestamp = str(int(time.time()))
username = f"username"
password = f"password"

# MQTT连接地址
broker = f"mqtthosturl"
port = 1883

# 回调函数
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected successfully.")
    else:
        print(f"Connection failed with result code {rc}")
    client.subscribe(f"/sys/{product_key}/{device_name}/thing/event/property/post")

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

def on_publish(client, userdata, mid):
    print(f"Message {mid} published.")

def on_log(client, userdata, level, buf):
    print("log: ", buf)

# 创建客户端
client = mqtt.Client(client_id=client_id)
client.username_pw_set(username, password)

# 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_log = on_log

# 连接到阿里云物联网平台
print(f"Connecting to broker {broker} on port {port}")
client.connect(broker, port, 60)

# 启动MQTT客户端
client.loop_start()

# 读取图像并转换为 Base64 编码的块
def get_image_base64_chunks(image_path, chunk_size=102400):
    with open(image_path, "rb") as image_file:
        image_data = base64.b64encode(image_file.read()).decode('utf-8')
    for i in range(0, len(image_data), chunk_size):
        yield image_data[i:i + chunk_size]

# 发送 Base64 图像块的函数
def publish_image_chunk(image_chunk, chunk_index):
    topic = f"/{product_key}/{device_name}/user/update"
    payload = {
        "method": "thing.event.property.post",
        "params": {
            "image": image_chunk,
            "chunk_index": chunk_index
        }
    }
    result = client.publish(topic, json.dumps(payload))
    print(f"Publishing chunk {chunk_index}: {json.dumps(payload)} with result: {result}")

# 启动发送 Base64 图像块的循环
def publish_images_from_directory(directory):
    while True:  # 无限循环
        image_files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f)) and f.lower().endswith(('.png', '.jpg', '.jpeg'))]
        for image_file in image_files:
            image_path = os.path.join(directory, image_file)
            chunk_index = 0
            for image_chunk in get_image_base64_chunks(image_path):
                publish_image_chunk(image_chunk, chunk_index)
                chunk_index += 1
                time.sleep(1)  # 可以根据需要调整发送间隔时间
            time.sleep(2)  # 每秒发送一张图片

try:
    publish_images_from_directory(r"images direct")  # 替换为你的图像目录路径
except KeyboardInterrupt:
    print("Stopped by User")
    client.loop_stop()
    client.disconnect()

手机端查看接收的图片:
下载这个APP,安卓:IoT MQTT Panel,iOS:IoT MQTT Panel
网络问题自行解决哈



按照平台一键复制的数据填写


保存后自动连接成功了

添加一个面板
添加一个面板

面板设置如下:


让我们启动脚本:

图片以base64编码的形式发送了

手机端也接收到图片了

MQTTX看看收到的消息:

Wireshark抓包

作者:kuraki peaf

物联沃分享整理
物联沃-IOTWORD物联网 » Python MQTT实践指南1:利用阿里云物联网平台实现图像传输

发表回复