【python】根据RS485通讯协议,读写数据–完整代码(嵌入式软件自动化测试)

目录

  • 一、通讯连接工具
  • 二、通讯协议
  • 2.1 数据格式
  • 2.2 协议指令
  • 三、代码实现RS485通讯读写
  • 3.1 完整代码
  • 3.2 串口通讯打开
  • 3.3 发送数据包
  • 3.4 接收数据包
  • 3.5 参数读取
  • 3.6 串口通讯关闭
  • 3.7 运行结果
  • 一、通讯连接工具

    1.USB转RS485通讯转换器1个
    2.PCB线路板1个
    3.转换器的AB口与线路板AB口相连,A连接A,B连接B

    二、通讯协议

    2.1 数据格式


    根据通讯协议文档,可以看出只需要关注【命令/状态】和【返回数据/参数设置】两列,其他列固定。

    2.2 协议指令

    1、举例:查询指令

    (1)根据命令/状态表,查询指令为0x02,即D3位为0x02
    (2)发送查询指令:FE 0C A5 02 00 00 00 00 00 00 00 00 00 00 DA
    (3)返回数据:D4-D6为当前时间的时分秒,D7D8为用水量,D9D10为用水流速。

    解释高低字节:
    高字节占据高位(左侧),低字节占据低位(右侧)。
    例如:高字节是 b’\xfe’,低字节是 b’\x2e’,它们组合成一个 16 位的数值de2e,转换十进制为56878。

    计算方法
    将高字节左移 8 位: 高字节 << 8。
    将低字节加到高字节的结果中: (高字节 << 8) | 低字节。

    协议指令测试:

    三、代码实现RS485通讯读写

    3.1 完整代码

    先附上完整代码:

    import serial
    import binascii
    # 打开串口COM5, 9600波特率, 8数据位, 0停止位
    ser = serial.Serial(port='COM5', baudrate=9600, bytesize=8, parity='N', stopbits=1,  # 1.根据实际通讯协议要求,实例化串口及各项参数
                     dsrdtr=False, rtscts=False, xonxoff=False, timeout=2)
    
    # 检查串口是否已经打开
    if ser.is_open:
        print('串口已成功打开.')
    
    # 发送的数据包
    data_to_send1 = 'FE 0C A5 02 00 00 00 00 00 00 00 00 00 00 DA' #16进制数据
    data_to_send1=data_to_send1.replace(' ', '')  # 去除空格
    data_to_send=binascii.a2b_hex(data_to_send1)  # 转换为16进制字符串
    
    # 发送16进制字符串
    write_len=ser.write(data_to_send)
    
    # 读取数据包, 分字节和功能读取, 数据位为2字节
    receive=True
    ASSCII_receive_list=[]
    while receive:
        receives=ser.read(1) # serial.read() 方法从串口接收到数据是byte字节型,转换要参考 ASCII 与16进制对照表
        if receives == []:
            # 没接受有效数据时,会一直读取到空列表,等于空列表时,利用continue语句再次重新读取
            continue
    
        elif receives != []:  # 表示当接收数据不等于空列表[]时,将数据存储在列表
            ASSCII_receive_list.append(receives)
            if len(ASSCII_receive_list)==write_len: #读取结束
                print(f'接收到的数据列表(ASSCII):{ASSCII_receive_list}')  # 打印接收到的16进制列表
                hex_receive_list=[]
                for byte_value in ASSCII_receive_list: # byte字节型转换为对应的16进制值
                    # hex_receive_list.append(f"{byte[0]:02x}") #方式一:将每个字节单独转换为 16 进制时
                    hex_receive_list.append(byte_value.hex().zfill(2)) #方式二:去掉 '0x' 并补齐两位
                print(f'接收到的数据列表(16进制):{hex_receive_list}') #打印接收到的16进制列表
                break
    
    #读取列表中参数(十六进制)
    hour=hex_receive_list[4] #读取小时字节
    minute=hex_receive_list[5] #读取分钟字节
    seconds=hex_receive_list[6] #读取秒字节
    
    high_use_water=hex_receive_list[8] #读取用水量高位字节
    low_use_water=hex_receive_list[7] #读取用水量低位字节
    
    high_water_velocity=hex_receive_list[10] #读取水流速高位字节
    low_water_velocity=hex_receive_list[9] #读取水流速低位字节
    
    
    #十六进制字符串转换为十进制
    hour_value=int(hour,16)#将字符串按照基数16解析为十进制
    minute_value=int(minute,16)
    seconds_value=int(seconds,16)
    
    high_use_water_value=int(high_use_water,16)
    low_use_water_value=int(low_use_water,16)
    
    high_water_velocity_value=int(high_water_velocity,16)
    low_water_velocity_value=int(low_water_velocity,16)
    
    #组合高低字节为一个十进制数
    '''
    解释高低字节
    高字节占据高位(左侧),低字节占据低位(右侧)。
    例如:高字节是 b'\xfe',低字节是 b'\x2e',它们组合成一个 16 位的数值de2e,转换十进制为56878。
    
    计算方法
    将高字节左移 8 位: 高字节 << 8。
    将低字节加到高字节的结果中: (高字节 << 8) | 低字节。
    '''
    total_use_water_value=(high_use_water_value<<8)|low_use_water_value #总用水量
    water_velocity_value=(high_water_velocity_value<<8)|low_water_velocity_value #水流速
    
    # 计算实际用水量的值(根据通讯协议实际定义计算)
    total_use_water = total_use_water_value / 10.0
    
    #计算实际水流速的值(根据通讯协议实际定义计算)
    if water_velocity_value==0:
        water_velocity=0
    else:
        water_velocity = int(str(water_velocity_value)[1:])/10
    
    
    # 打印接收到的数据
    print(f'当前时间:{hour_value}:{minute_value}:{seconds_value}')
    print(f'用水量:{total_use_water}')
    print(f'水流速:{water_velocity}')
    
    # 关闭串口
    ser.close()
    

    3.2 串口通讯打开


    通讯协议对接口定义:9600波特率、8个数据位、无校验位、1个停止位。
    代码如下:

    # 打开串口COM5, 9600波特率, 8数据位, 0停止位
    ser = serial.Serial(port='COM5', baudrate=9600, bytesize=8, parity='N', stopbits=1,  # 1.根据实际通讯协议要求,实例化串口及各项参数
                     dsrdtr=False, rtscts=False, xonxoff=False, timeout=2)
    
    # 检查串口是否已经打开
    if ser.is_open:
        print('串口已成功打开.')
    

    3.3 发送数据包

    对发送数据进行处理,转换为16进制字符串发送,代码如下:

    # 发送的数据包
    data_to_send1 = 'FE 0C A5 02 00 00 00 00 00 00 00 00 00 00 DA' #16进制数据
    data_to_send1=data_to_send1.replace(' ', '')  # 去除空格
    data_to_send=binascii.a2b_hex(data_to_send1)  # 转换为16进制字符串
    
    # 发送16进制字符串
    write_len=ser.write(data_to_send)
    

    3.4 接收数据包

    注意ser.read()方法串口接收到数据是byte字节型,转换可以参考 ASCII 与16进制对照表,所以要把byte字节型转换为对应的16进制值
    代码如下:

    # 读取数据包, 分字节和功能读取, 数据位为2字节
    receive=True
    ASSCII_receive_list=[]
    while receive:
        receives=ser.read(1) # serial.read() 方法从串口接收到数据是byte字节型,转换要参考 ASCII 与16进制对照表
        if receives == []:
            # 没接受有效数据时,会一直读取到空列表,等于空列表时,利用continue语句再次重新读取
            continue
    
        elif receives != []:  # 表示当接收数据不等于空列表[]时,将数据存储在列表
            ASSCII_receive_list.append(receives)
            if len(ASSCII_receive_list)==write_len: #读取结束
                print(f'接收到的数据列表(ASSCII):{ASSCII_receive_list}')  # 打印接收到的16进制列表
                hex_receive_list=[]
                for byte_value in ASSCII_receive_list: # byte字节型转换为对应的16进制值
                    # hex_receive_list.append(f"{byte[0]:02x}") #方式一:将每个字节单独转换为 16 进制时
                    hex_receive_list.append(byte_value.hex().zfill(2)) #方式二:去掉 '0x' 并补齐两位
                print(f'接收到的数据列表(16进制):{hex_receive_list}') #打印接收到的16进制列表
                break
    
    

    3.5 参数读取

    参照通讯协议文档,从接收的数据包列表读取对应的参数,计算后输出
    代码如下:

    #读取列表中参数(十六进制)
    hour=hex_receive_list[4] #读取小时字节
    minute=hex_receive_list[5] #读取分钟字节
    seconds=hex_receive_list[6] #读取秒字节
    
    high_use_water=hex_receive_list[8] #读取用水量高位字节
    low_use_water=hex_receive_list[7] #读取用水量低位字节
    
    high_water_velocity=hex_receive_list[10] #读取水流速高位字节
    low_water_velocity=hex_receive_list[9] #读取水流速低位字节
    
    
    #十六进制字符串转换为十进制
    hour_value=int(hour,16)#将字符串按照基数16解析为十进制
    minute_value=int(minute,16)
    seconds_value=int(seconds,16)
    
    high_use_water_value=int(high_use_water,16)
    low_use_water_value=int(low_use_water,16)
    
    high_water_velocity_value=int(high_water_velocity,16)
    low_water_velocity_value=int(low_water_velocity,16)
    
    #组合高低字节为一个十进制数
    '''
    解释高低字节
    高字节占据高位(左侧),低字节占据低位(右侧)。
    例如:高字节是 b'\xfe',低字节是 b'\x2e',它们组合成一个 16 位的数值de2e,转换十进制为56878。
    
    计算方法
    将高字节左移 8 位: 高字节 << 8。
    将低字节加到高字节的结果中: (高字节 << 8) | 低字节。
    '''
    total_use_water_value=(high_use_water_value<<8)|low_use_water_value #总用水量
    water_velocity_value=(high_water_velocity_value<<8)|low_water_velocity_value #水流速
    
    # 计算实际用水量的值(根据通讯协议实际定义计算)
    total_use_water = total_use_water_value / 10.0
    
    #计算实际水流速的值(根据通讯协议实际定义计算)
    if water_velocity_value==0:
        water_velocity=0
    else:
        water_velocity = int(str(water_velocity_value)[1:])/10
    
    # 打印接收到的数据
    print(f'当前时间:{hour_value}:{minute_value}:{seconds_value}')
    print(f'用水量:{total_use_water}')
    print(f'水流速:{water_velocity}')
    
    

    3.6 串口通讯关闭

    代码如下:

    ser.close()
    

    3.7 运行结果


    ASSCII转16进制在线转换:链接

    作者:测试笔记(自看)

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【python】根据RS485通讯协议,读写数据–完整代码(嵌入式软件自动化测试)

    发表回复