介绍

基于Python的串口通信功能主要通过使用pyserial库来实现。pyserial是一个跨平台的Python库,它提供了对串行端口(如RS232、USB转串口等)进行读写操作的支持。下面将详细介绍如何使用pyserial来进行串口通信。

安装PySerial

首先需要安装pyserial库,可以通过pip工具轻松完成:

pip install pyserial

基本概念

  • 波特率(Baud Rate):表示每秒传输的数据位数,是串行通信中的一个重要参数,必须确保发送端和接收端设置相同的波特率。

  • 数据位(Data Bits):通常为7或8位,指每次传输的数据量大小。

  • 停止位(Stop Bits):用于标识一个字节结束的数量,默认情况下为1个停止位。

  • 校验位(Parity Bit):可选参数,用来检测传输错误,有无校验、奇校验和偶校验三种选择。

  • 流控制(Flow Control):硬件流控(RTS/CTS, DTR/DSR)或软件流控(XON/XOFF),用于管理数据流动。

  • 使用PySerial的基本步骤

    1. 导入模块

    2. import serial
    3. from serial.tools import list_ports
    4. 查找可用的串口

    5. 可以使用list_ports.comports()函数列出系统中所有可用的串口设备。
    6. 打开串口连接

    7. 使用serial.Serial()创建一个串口对象,并指定相应的参数(如端口号、波特率等)。
    8. 配置串口参数

    9. 在创建串口对象时可以传递额外的关键字参数来设置波特率、数据位、停止位、校验位等。
    10. 读取与写入数据

    11. 通过调用串口对象的read()write()方法来进行数据的读取和发送。
    12. 关闭串口连接

    13. 当不再需要使用串口时,应该调用close()方法来释放资源。

    串行通信服务实现

    这段代码展示了如何使用 pyserial 库来管理和操作串行端口,包括列出可用的串行端口、打开和关闭端口、发送和接收数据。以下是详细的功能解析:

    环境准备

    确保安装了 pyserial 库,以便能够访问和管理串行端口。

    pip install pyserial

    列出所有串行端口

    list_ports 函数用于扫描系统中的所有串行端口,并返回一个包含这些端口名称的列表。它还记录每个端口的描述信息和硬件ID,方便调试或用户选择正确的端口。

    def list_ports():
        port_names = []
        ports = serial.tools.list_ports.comports()
        for port, desc, hwid in ports:
            log_message(f"Port: {port}, Description: {desc}, HWID: {hwid}")
            port_names.append(port)
        return port_names

    串行通信服务类 (CommService)

    定义了一个用于串行通信的服务类CommService,以及一个列出所有可用串行端口的函数list_ports。它使用了Python的serial库来处理串行通信,并且引入了一些辅助模块来增强功能

    导入模块

    
    

    python

    深色版本

    import serial
    import binascii
    import serial.tools.list_ports
    from utils.const_util import AppConst
  • serial:提供了对串行端口(如RS232、USB转串等)的访问。
  • binascii:包含将二进制数据转换为ASCII字符串的函数。
  • serial.tools.list_ports:提供工具函数来枚举系统上的串行端口。
  • utils.const_util.AppConst:自定义模块中定义的应用常量。
  • 列出所有可用串行端口

    
    

    python

    深色版本

    def list_ports():
        port_names = []
        ports = serial.tools.list_ports.comports()
        for port, desc, hwid in ports:
            print(f"Port: {port}, Description: {desc}, HWID: {hwid}")
            port_names.append(port)
    
        return port_names
  • list_ports函数:遍历所有可用的串行端口,并打印每个端口的名称、描述和硬件ID。
  • 返回一个包含所有端口名称的列表,可用于选择具体的通信端口。
  • 串行通信服务类

    
    

    python

    深色版本

    class CommService:
        def __init__(self, comm_port):
            self.comm_port = comm_port
            self.ser_comm = None
  • 构造方法__init__:初始化时设置串行端口名称,并将实际的串行连接对象初始化为None
  • 设置串行端口
        def set_comm_port(self, comm_port):
            self.comm_port = comm_port
  • set_comm_port方法:允许动态更改串行端口名称。
  • 打开串行连接
        def open_comm(self):
            try:
                self.ser_comm = serial.Serial(self.comm_port, 9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE,
                                              stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False,
                                              dsrdtr=False)
            except:
                return None
            return self.ser_comm
  • open_comm方法:尝试打开指定的串行端口,并配置波特率、字节大小、校验位、停止位等参数。
  • 如果打开失败(例如端口不存在或已被占用),捕获异常并返回None;否则返回打开的串行连接对象。
  • 关闭串行连接
        def close_comm(self):
            if self.is_opened:
                self.ser_comm.close()
                self.ser_comm = None
  • close_comm方法:检查串行连接是否打开,如果是,则关闭连接并重置连接对象为None
  • 发送数据
        def send_comm(self, data):
            if self.is_opened:
                try:
                    self.ser_comm.write(bytearray.fromhex(data.replace(' ', '')))
                except serial.serialutil.SerialTimeoutException:
                    msg = AppConst.COMM_SEND_OVER
                    return False, msg
                return True, ''
            else:
                msg = AppConst.COMM_SEND_FAILED
                return False, msg
  • send_comm方法:如果串行连接是打开状态,尝试发送十六进制格式的数据。
  • 使用bytearray.fromhex()将十六进制字符串转换为字节数组后发送。
  • 捕获可能发生的超时异常,并返回相应的错误信息。
  • 成功发送后返回True及空消息,发送失败则返回False及错误消息。
  • 读取数据
        def read_comm(self):
            if self.is_opened:
                if self.ser_comm.in_waiting > 0:
                    try:
                        data = self.ser_comm.readline()
                    except:
                        return None
                    return binascii.hexlify(data).decode('ascii')
                return None
            else:
                return None
  • read_comm方法:如果串行连接是打开状态并且有数据待读取,尝试读取一行数据。
  • 使用binascii.hexlify()将读取到的数据转换为十六进制表示形式,并解码为ASCII字符串。
  • 如果发生异常或者没有数据可读,则返回None
  • 检查连接状态
        def is_opened(self):
            return self.ser_comm is not None and self.ser_comm.is_open
  • is_opened方法:检查当前是否有有效的串行连接对象,并且该连接是否处于打开状态。
  • 总结

    CommService类封装了与串行设备通信所需的所有基本操作,包括打开/关闭连接、发送/接收数据以及检查连接状态。它还提供了一个静态方法list_ports来列出系统上所有的串行端口,这在需要用户选择具体端口进行通信时非常有用。此外,通过使用try-except语句,代码实现了简单的错误处理逻辑,确保即使在出现问题的情况下也能维持程序的基本运行。最后,利用AppConst中的常量,使得错误消息可以集中管理,便于维护和国际化支持。

    完整代码

    import serial
    import binascii
    import serial.tools.list_ports
    from utils.const_util import AppConst
    
    
    def list_ports():
        port_names = []
        ports = serial.tools.list_ports.comports()
        for port, desc, hwid in ports:
            print(f"Port: {port}, Description: {desc}, HWID: {hwid}")
            port_names.append(port)
    
        return port_names
    
    
    class CommService:
        def __init__(self, comm_port):
            self.comm_port = comm_port
            self.ser_comm = None
    
        def set_comm_port(self, comm_port):
            self.comm_port = comm_port
    
        def open_comm(self):
            try:
                self.ser_comm = serial.Serial(self.comm_port, 9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE,
                                              stopbits=serial.STOPBITS_ONE, timeout=1, xonxoff=False, rtscts=False,
                                              dsrdtr=False)
            except:
                return None
            return self.ser_comm
    
        def close_comm(self):
            if self.is_opened:
                self.ser_comm.close()
                self.ser_comm = None
    
        def send_comm(self, data):
            if self.is_opened:
                try:
                    self.ser_comm.write(bytearray.fromhex(data.replace(' ', '')))
                except serial.serialutil.SerialTimeoutException:
                    msg = AppConst.COMM_SEND_OVER
                    return False, msg
                return True, ''
            else:
                msg = AppConst.COMM_SEND_FAILED
                return False, msg
    
        def read_comm(self):
            if self.is_opened:
                if self.ser_comm.in_waiting > 0:
                    try:
                        data = self.ser_comm.readline()
                    except:
                        return None
                    return binascii.hexlify(data).decode('ascii')
    
                return None
            else:
                return None
    
        def is_opened(self):
            return self.ser_comm is not None and self.ser_comm.is_open
    

    使用示例

    下面是一个简单的使用示例,假设已经有一个实例化好的 CommService 对象 comm_service

    # 列出所有可用的串行端口
    available_ports = list_ports()
    
    # 假设选择了第一个可用端口进行通信
    if available_ports:
        comm_service.set_comm_port(available_ports[0])
        ser = comm_service.open_comm()
        if ser:
            # 发送一些数据到串行端口
            success, msg = comm_service.send_comm("55 AA 01 02 03")
            if success:
                print("Data sent successfully.")
            else:
                print(f"Failed to send data: {msg}")
    
            # 尝试读取来自串行端口的数据
            received_data = comm_service.read_comm()
            if received_data:
                print(f"Received data: {received_data}")
    
            # 关闭串行端口
            comm_service.close_comm()
    else:
        print("No available serial ports found.")

    通过这种方式,您可以轻松地与连接到计算机的任何串行设备进行交互,无论是读取传感器数据还是控制外部硬件。

    作者:道友老李

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【Python】串口操作

    发表回复