使用华为云IoTDA实现设备接入,实时查看数据
基于华为云 设备接入IoTDA的数据实时查看
文章目录
前言
最近在打比赛,做了一个项目,本人有幸主要负责云端数据部分,在此写一下我的工作流程
一、数据收集
我们做的是驾驶员疲劳检测,此处利用了一个大佬的源代码并做出了一些修改
源代码链接
1 、导入的包和数据定义
在原作者基础上导入了一个包并就“驾驶员疲劳,玩手机,喝水,吸烟”四个状态及进行定义
import csv
#云端数据
fatigue_driving = 0
phone = 0
drink = 0
smoke = 0
2、实时修改数据
修改摄像头类的代码块
# 在函数中引入定义的全局变量
global fatigue_driving,phone,drink,smoke
如果成功检测,添加
#云端数据实时修改
if "phone" in lab:
phone = 1
else:
phone = 0
if "drink" in lab:
drink = 1
else:
drink = 0
if "smoke" in lab:
smoke = 1
else:
smoke = 0
在函数末尾将数据导入预先存好的excle中,filename是你自己的文件名
fatigue_driving | 1 |
---|---|
phone | 0 |
drink | 0 |
smoke | 0 |
# 打开CSV文件
with open(filename, 'r', newline='') as csvfile:
# 读取CSV文件内容
csvreader = csv.reader(csvfile)
# 转换为列表,方便修改数据
data = list(csvreader)
#print(data)
yunshuju = [fatigue_driving, phone, drink, smoke]
# 修改每一行第二列的数字
for k in range(len(yunshuju)):
# 假设第二列是数字类型
data[k][1] =yunshuju[k]
#print(data)
# 保存修改后的数据回到文件
with open(filename, 'w', newline='') as csvfile:
# 创建CSV写入对象
csvwriter = csv.writer(csvfile)
# 写入修改后的数据
csvwriter.writerows(data)
修改之后,就可以实时改变excle中的数据了
二、创建设备
1.进入华为云
华为云
选择产品设备接入IoTDA
2.创建产品
协议类型MQTT,数据格式JSON,其他随意
3.定义产品模型
自定义模型
4.添加属性
添加属性名称,定义他们的性质
最后效果
5.注册设备
注册设备,随便填一个设备标识码和密钥,记住他们,其实不记住也行,因为确定之后华为云会生成一个文件DEVICES-KEY.txt,生成之后开始应该是未激活
三、数据上传代码
1.IoT Device SDK使用指南
IoT Device SDK(Python)提供设备接入华为云IoT物联网平台的Python版本的SDK,提供设备和平台之间通讯能力,以及设备服务、网关服务、OTA等高级服务,并且针对各种场景提供了丰富的demo代码。具体内容可以参考IoT Device SDK(Python)使用指南,我们还需要他里面的包iot_device_sdk_python
2.导入库
from __future__ import absolute_import
from iot_device_sdk_python.client.listener.default_publish_action_listener import DefaultPublishActionListener
from typing import List
import time
import logging
from iot_device_sdk_python.client.client_conf import ClientConf
from iot_device_sdk_python.client.connect_auth_info import ConnectAuthInfo
from iot_device_sdk_python.iot_device import IotDevice
from iot_device_sdk_python.client.listener.property_listener import PropertyListener
from iot_device_sdk_python.client.request.service_property import ServiceProperty
from iot_device_sdk_python.client import iot_result
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(threadName)s - %(filename)s[%(funcName)s] - %(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
3.创建设备,属性上报
属性上报指的是设备将当前属性值上报给平台。属性设置指的是平台设置设备的属性值
创建设备,并且设置无限循环while True,设置循环时间time.sleep(1)
class cscDetector:
_logger = logging.getLogger(__name__)
def __init__(self, server_uri, port, device_id, secret, iot_cert_file):
self.server_uri = server_uri
self.port = port
self.device_id = device_id
self.secret = secret
self.iot_cert_file = iot_cert_file
def start(self):
""" 创建设备 """
connect_auth_info = ConnectAuthInfo()
connect_auth_info.server_uri = self.server_uri
connect_auth_info.port = self.port
connect_auth_info.id = self.device_id
connect_auth_info.secret = self.secret
connect_auth_info.iot_cert_path = self.iot_cert_file
connect_auth_info.bs_mode = ConnectAuthInfo.BS_MODE_DIRECT_CONNECT
client_conf = ClientConf(connect_auth_info)
device = IotDevice(client_conf)
device.get_client().set_properties_listener(IICBListener(PropertyListener))
if device.connect() != 0:
logger.error("init failed")
return
# 10s后上报一次设备的属性
time.sleep(10)
# 按照产品模型设置属性
service_property = ServiceProperty()
service_property.service_id = "csc"
service_property.properties = {"fatigue_driving": 0, "phone": 0, "drink": 0, "smoke": 0}
# 组装成列表的形式
services = [service_property]
# 上报设备属性
device.get_client().report_properties(services)
while True:
service_property = ServiceProperty()
service_property.service_id = "csc"
data_upload = Upload_data_read(filename)
print(data_upload)
service_property.properties = {"fatigue_driving": data_upload.get('fatigue_driving'),
"phone": data_upload.get('phone'),
"drink": data_upload.get('drink'),
"smoke": data_upload.get('smoke')}
# 组装成列表的形式
services = [service_property]
device.get_client().report_properties(services, DefaultPublishActionListener())
time.sleep(1)
附:数据阅读函数
def Upload_data_read(filename):
data_file = open(filename, encoding='utf-8')
data_upload = dict()
for line in data_file.readlines():
curLine = line.split(',')
data_upload[curLine[0]]= int(curLine[1])
return data_upload
4.平台设置设备属性
定义类cscListener,两个方法:on_property_set方法处理写属性,on_property_get方法处理读属性。
class cscListener(PropertyListener):
def __init__(self, iot_device: IotDevice):
""" 传入一个IotDevice实例 """
self.device = iot_device
def on_property_set(self, request_id: str, services: List[ServiceProperty]):
"""
处理写属性
:param request_id: 请求id
:param services: List<ServiceProperty>
"""
""" 遍历service """
for service_property in services:
logger.info("on_property_set, service_id:" + service_property.service_id)
""" 遍历属性 """
for property_name in service_property.properties:
logger.info("set property name:" + property_name)
logger.info("set property value:" + str(service_property.properties[property_name]))
self.device.get_client().respond_properties_set(request_id, iot_result.SUCCESS)
def on_property_get(self, request_id: str, service_id: str):
"""
处理读属性。多数场景下,用户可以直接从平台读设备影子,此接口不用实现。
但如果需要支持从设备实时读属性,则需要实现此接口。
:param request_id: 请求id
:param service_id: 服务id,可选
"""
service_property = ServiceProperty()
service_property.service_id = "csc"
data_upload = Upload_data_read(filename)
service_property.properties = {"fatigue_driving": data_upload.get('fatigue_driving'),
"phone": data_upload.get('phone'),
"drink": data_upload.get('drink'),
"smoke": data_upload.get('smoke')}
services = [service_property]
self.device.get_client().respond_properties_get(request_id, services)
5.主函数
根据自己的信息填写
def main():
server_uri = ""
port = 8883
device_id = ""
secret = ""
iot_ca_cert_path = "./resources/GlobalSignRSAOVSSLCA2018.crt.pem"
IICB_detector = IICBDetector(server_uri=server_uri,
port=port,
device_id=device_id,
secret=secret,
iot_cert_file=iot_ca_cert_path)
IICB_detector.start()
四、效果展示
同时运行两个函数
云平台端
总结
呦呵,收获不错
作者:Control&AutoDrive