Python MQTT实践指南1:利用阿里云物联网平台实现图像传输
最近实验有需要用到MQTT传输图像的需求,本人也不太会写除Python以外的代码,恰好今天读到嵌入式圈内的大佬@DS小龙哥的文章:基于阿里云物联网平台设计的实时图传系统 _ 采用 MQTT 协议传输图像
受大佬启发,今天利用Python写一个简单的MQTT图传程序。
软件:
Pycharm
MQTTX
Wireshark
手机端APP:IoT MQTT Panel
由于大佬在文章中已经写了关于在阿里云创建产品、设备及消息转发的相关设置,本文将不再赘述,添加设备后如下图所示:
我们利用Python脚本发送随机数检查设备接收信息的情况:
import time
import json
import random
import paho.mqtt.client as mqtt
#username和password
#可直接在设备页面一键复制
client_id = f"client_id"
timestamp = str(int(time.time()))
username = f"username"
password = f"password"
# MQTT连接地址
broker = f"mqtthosturl"
port = 1883
# 回调函数
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe(f"/sys/${ProductKey}/${deviceName}/thing/event/property/post")
#注意把自己的设备信息更换
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
# 创建客户端
client = mqtt.Client(client_id=client_id)
client.username_pw_set(username, password)
# 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message
# 连接到阿里云物联网平台
client.connect(broker, port, 60)
# 启动MQTT客户端
client.loop_start()
# 发送随机数的函数
def publish_random_number():
topic = f"/sys/${ProductKey}/${deviceName}/thing/event/property/post"
while True:
random_number = random.randint(1000000000, 9999999999)
payload = {
"method": "thing.event.property.post",
"params": {
"image": str(random_number)
}
}
client.publish(topic, json.dumps(payload))
print(f"Published: {json.dumps(payload)}")
time.sleep(1)
# 启动发送随机数的循环
publish_random_number()
运行此脚本:
可以在平台的设备监测里看到:
OK,设备连接正常,下一步我们来搞一下图片传输
由于@DS小龙哥使用的是QT客户端
我们要用Python实现此功能需要 重新编写脚本,原理很简单,就是将图片转为base64编码,然后发送端建立与平台的连接,通过MQTT协议发送,平台中转给接收端,接收端可以解码此图片呈现。
直接上代码:
import os
import time
import json
import base64
import hmac
import hashlib
import paho.mqtt.client as mqtt
# 阿里云物联网平台的参数
product_key = "product_key"
device_name = "video_1"
device_secret = "device_secret"
region_id = "cn-shanghai" # 选择你的阿里云区域
#username和password
#可直接在设备页面一键复制
client_id = f"client_id"
timestamp = str(int(time.time()))
username = f"username"
password = f"password"
# MQTT连接地址
broker = f"mqtthosturl"
port = 1883
# 回调函数
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected successfully.")
else:
print(f"Connection failed with result code {rc}")
client.subscribe(f"/sys/{product_key}/{device_name}/thing/event/property/post")
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
def on_publish(client, userdata, mid):
print(f"Message {mid} published.")
def on_log(client, userdata, level, buf):
print("log: ", buf)
# 创建客户端
client = mqtt.Client(client_id=client_id)
client.username_pw_set(username, password)
# 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message
client.on_publish = on_publish
client.on_log = on_log
# 连接到阿里云物联网平台
print(f"Connecting to broker {broker} on port {port}")
client.connect(broker, port, 60)
# 启动MQTT客户端
client.loop_start()
# 读取图像并转换为 Base64 编码的块
def get_image_base64_chunks(image_path, chunk_size=102400):
with open(image_path, "rb") as image_file:
image_data = base64.b64encode(image_file.read()).decode('utf-8')
for i in range(0, len(image_data), chunk_size):
yield image_data[i:i + chunk_size]
# 发送 Base64 图像块的函数
def publish_image_chunk(image_chunk, chunk_index):
topic = f"/{product_key}/{device_name}/user/update"
payload = {
"method": "thing.event.property.post",
"params": {
"image": image_chunk,
"chunk_index": chunk_index
}
}
result = client.publish(topic, json.dumps(payload))
print(f"Publishing chunk {chunk_index}: {json.dumps(payload)} with result: {result}")
# 启动发送 Base64 图像块的循环
def publish_images_from_directory(directory):
while True: # 无限循环
image_files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f)) and f.lower().endswith(('.png', '.jpg', '.jpeg'))]
for image_file in image_files:
image_path = os.path.join(directory, image_file)
chunk_index = 0
for image_chunk in get_image_base64_chunks(image_path):
publish_image_chunk(image_chunk, chunk_index)
chunk_index += 1
time.sleep(1) # 可以根据需要调整发送间隔时间
time.sleep(2) # 每秒发送一张图片
try:
publish_images_from_directory(r"images direct") # 替换为你的图像目录路径
except KeyboardInterrupt:
print("Stopped by User")
client.loop_stop()
client.disconnect()
手机端查看接收的图片:
下载这个APP,安卓:IoT MQTT Panel,iOS:IoT MQTT Panel
网络问题自行解决哈
按照平台一键复制的数据填写
保存后自动连接成功了
添加一个面板
面板设置如下:
让我们启动脚本:
图片以base64编码的形式发送了
手机端也接收到图片了
MQTTX看看收到的消息:
Wireshark抓包
作者:kuraki peaf