python调用C++代码-方法-使用Pybind11库将C++代码与Python绑定

python调用C++代码-方法1-使用Pybind11库将C++代码与Python绑定

目的

这个方法的目的是使用Pybind11库将C++代码与Python绑定,从而在Python中调用高效的C++代码。这种方法适用于需要在Python项目中利用C++的性能优势,尤其是在需要高性能计算或已有C++代码库的情况下。

1. 准备工作

安装必要的软件和库

  1. Python:确保系统中已安装Python(Python 3.x版本)。
  2. C++编译器
  3. Visual Studio:确保安装了包含C++开发工具的Visual Studio(推荐使用)。
  4. MinGW:如果你更熟悉GCC编译器,可以使用MinGW。
  5. Pybind11库:通过pip安装Pybind11:
    pip install pybind11
    
2. 创建项目目录和文件结构

组织你的项目文件

  1. 创建一个项目目录:

    mkdir your_project_directory
    cd your_project_directory
    
  2. 在这个目录下,创建以下文件:

  3. example.cpp: 包含C++源代码。
  4. setup.py: 用于定义编译过程的Python脚本。
  5. test_example.py: 用于测试生成的Python模块。

文件结构示例

/your_project_directory
|-- example.cpp       # C++源代码文件
|-- setup.py          # Python构建脚本
|-- test_example.py   # Python测试脚本
3. 编写C++代码

创建example.cpp文件,并写入以下代码:

#include <pybind11/pybind11.h>

// 定义一个简单的C++函数,计算两个整数的和
int add(int a, int b) {
    return a + b;
}

// 使用Pybind11将C++函数绑定到Python模块
PYBIND11_MODULE(example, m) {
    m.def("add", &add, "A function that adds two numbers");
}

解释

  • #include <pybind11/pybind11.h>:引入Pybind11的核心头文件。
  • int add(int a, int b):这是一个简单的C++函数,用于计算两个整数的和。
  • PYBIND11_MODULE(example, m):这是一个宏,用于定义Python模块。example是模块名称,m是模块对象。
  • 4. 编写构建脚本

    创建setup.py文件,内容如下:

    from setuptools import setup, Extension
    import pybind11
    
    # 定义一个扩展模块,指定源文件和包含路径
    ext_modules = [
        Extension(
            'example',  # 模块名称
            ['example.cpp'],  # 源文件
            include_dirs=[pybind11.get_include()],  # 包含pybind11头文件的路径
            language='c++'  # 指定使用C++编译
        ),
    ]
    
    # 定义构建过程
    setup(
        name='example',
        ext_modules=ext_modules,
    )
    

    解释

  • setup.py文件使用setuptools来管理构建流程。
  • Extension类用于定义扩展模块的属性,包括模块名称、源文件、包含路径和编译语言。
    我的目录如下:
  • 5. 编译C++代码并生成Python模块

    使用命令行编译生成Python模块

    1. 打开命令行(如pycharm-terminal开发者命令行)。
    2. 导航到项目目录
      cd your_project_directory
      
    3. 运行以下命令进行编译
      python setup.py build_ext --inplace
      

      这条命令会将example.cpp文件编译成一个可直接导入的Python模块(在Windows上通常是example.pyd)。
      进入项目路径进行编译:

      成功生成结果如下:

      此时项目文件如下:

    6. 测试生成的Python模块

    创建并运行测试脚本

    1. 创建test_example.py文件,并写入以下代码:

      import example
      
      result = example.add(10, 20)
      print(result)  # 预期输出: 30
      
    2. 运行测试脚本

      python test_example.py
      

      如果一切正常,你应该会看到输出结果30
      测试成功结果如下:

    总结

    通过这个详细的步骤,你可以将C++代码与Python成功绑定。这种方法不仅适用于简单的函数绑定,也可以扩展到更复杂的C++类和模块,使得Python能够利用C++的强大性能。

    实战测试

    我有一段C++代码的主要功能是通过Modbus TCP协议从远程服务器读取输入寄存器的数据,并将读取到的数据进行解析和显示。
    完整代码如下:

    #include <winsock2.h>
    #include <ws2tcpip.h>
    #include <windows.h>
    #include <stdio.h>
    #include <cstdint>
    #include <ctime>
    
    #pragma comment(lib, "Ws2_32.lib")  // 链接Ws2_32.lib库
    
    // 定义一些常量
    #define IP "192.168.0.150"
    #define PORT 6789
    #define UNIT_ID 3
    #define REGISTER_ADDRESS 1000
    #define NUM_REGISTERS 51200
    #define MAX_REGISTERS_PER_REQUEST 255
    #define MAX_RETRIES 3
    #define TIMEOUT_SEC 5  // 超时时间(秒)
    
    // 特定寄存器的地址和含义映射
    typedef struct {
        uint16_t address;
        const char* description;
    } SpecialRegister;
    
    SpecialRegister special_registers[] = {
        {100, "uhf_db"},
        {101, "reserve"},
        {102, "放电次数"},
        {103, "放电相位"}
    };
    
    void create_read_input_registers_request(uint8_t* request, uint16_t address, uint16_t count) {
        uint16_t transaction_id = htons(1);
        uint16_t protocol_id = htons(0);
        uint16_t length = htons(6);
        uint8_t unit_id = UNIT_ID;
        uint8_t function_code = 4;
        uint16_t addr = htons(address);
        uint16_t cnt = htons(count);
    
        memcpy(request, &transaction_id, 2);
        memcpy(request + 2, &protocol_id, 2);
        memcpy(request + 4, &length, 2);
        memcpy(request + 6, &unit_id, 1);
        memcpy(request + 7, &function_code, 1);
        memcpy(request + 8, &addr, 2);
        memcpy(request + 10, &cnt, 2);
    }
    
    void parse_response(uint8_t* response, int response_size, uint16_t* registers) {
        int data_length = response[8];
        if (response_size < 9 + data_length) {
            printf("错误:响应数据不完整\n");
            return;
        }
    
        for (int i = 0; i < data_length / 2; i++) {
            registers[i] = ntohs(*(uint16_t*)&response[9 + i * 2]);
        }
    }
    
    int read_input_registers(SOCKET sockfd, uint16_t address, uint16_t count, uint16_t* registers) {
        uint8_t request[12];
        create_read_input_registers_request(request, address, count);
    
        if (send(sockfd, (const char*)request, 12, 0) == SOCKET_ERROR) {
            printf("发送请求失败: %ld\n", WSAGetLastError());
            return -1;
        }
    
        fd_set readfds;
        FD_ZERO(&readfds);
        FD_SET(sockfd, &readfds);
    
        struct timeval timeout;
        timeout.tv_sec = TIMEOUT_SEC;
        timeout.tv_usec = 0;
    
        int select_result = select(sockfd + 1, &readfds, NULL, NULL, &timeout);
        if (select_result > 0) {
            uint8_t response[1024];
            int response_size = recv(sockfd, (char*)response, sizeof(response), 0);
            if (response_size == SOCKET_ERROR) {
                printf("接收响应失败: %ld\n", WSAGetLastError());
                return -1;
            }
    
            parse_response(response, response_size, registers);
            return response_size;
        }
        else if (select_result == 0) {
            printf("接收响应超时\n");
            return -1;
        }
        else {
            printf("select 调用失败: %ld\n", WSAGetLastError());
            return -1;
        }
    }
    
    void print_special_registers(uint16_t* special_values) {
        printf("特定寄存器值:\n");
        for (int i = 0; i < sizeof(special_registers) / sizeof(SpecialRegister); i++) {
            uint16_t address = special_registers[i].address;
            const char* description = special_registers[i].description;
            printf("地址 %u (%s): %u\n", address, description, special_values[address - 100]);
        }
    }
    
    int main() {
        WSADATA wsaData;
        SOCKET sockfd;
        struct sockaddr_in server_addr;
        uint16_t* all_registers = (uint16_t*)malloc(NUM_REGISTERS * sizeof(uint16_t));
        uint16_t special_values[4] = { 0 };
    
        if (all_registers == NULL) {
            printf("内存分配失败\n");
            return 1;
        }
    
        printf("正在连接到 Modbus TCP 服务器 %s:%d\n", IP, PORT);
        printf("正在读取 %d 个输入寄存器...\n", NUM_REGISTERS);
    
        // 初始化 WinSock
        if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
            printf("WinSock 初始化失败: %d\n", WSAGetLastError());
            free(all_registers);
            return 1;
        }
    
        // 创建套接字
        if ((sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
            printf("套接字创建失败: %ld\n", WSAGetLastError());
            WSACleanup();
            free(all_registers);
            return 1;
        }
    
        // 设置套接字为非阻塞模式
        u_long mode = 1;
        ioctlsocket(sockfd, FIONBIO, &mode);
    
        // 配置服务器地址
        server_addr.sin_family = AF_INET;
        server_addr.sin_port = htons(PORT);
        inet_pton(AF_INET, IP, &server_addr.sin_addr);
    
        // 连接服务器
        if (connect(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
            if (WSAGetLastError() != WSAEWOULDBLOCK) {
                printf("连接服务器失败: %ld\n", WSAGetLastError());
                closesocket(sockfd);
                WSACleanup();
                free(all_registers);
                return 1;
            }
    
            // 使用 select 等待连接完成
            fd_set writefds;
            FD_ZERO(&writefds);
            FD_SET(sockfd, &writefds);
            struct timeval timeout;
            timeout.tv_sec = TIMEOUT_SEC;
            timeout.tv_usec = 0;
    
            int select_result = select(sockfd + 1, NULL, &writefds, NULL, &timeout);
            if (select_result <= 0) {
                printf("连接服务器超时或失败\n");
                closesocket(sockfd);
                WSACleanup();
                free(all_registers);
                return 1;
            }
        }
    
        // 计时开始
        clock_t start_time = clock();
    
        // 分批读取寄存器
        for (int i = 0; i < NUM_REGISTERS; i += MAX_REGISTERS_PER_REQUEST) {
            int batch_size = (NUM_REGISTERS - i < MAX_REGISTERS_PER_REQUEST) ? NUM_REGISTERS - i : MAX_REGISTERS_PER_REQUEST;
            if (read_input_registers(sockfd, REGISTER_ADDRESS + i, batch_size, all_registers + i) < 0) {
                printf("读取寄存器失败\n");
                closesocket(sockfd);
                WSACleanup();
                free(all_registers);
                return 1;
            }
        }
    
        // 计算并显示all_registers数组的大小
        size_t array_size_bytes = NUM_REGISTERS * sizeof(uint16_t); // 数组占用的总字节数
        size_t array_size_elements = NUM_REGISTERS; // 数组中的元素数
    
        printf("all_registers数组的大小: %zu 字节\n", array_size_bytes);
        printf("all_registers数组中的元素数量: %zu\n", array_size_elements);
    
        // 读取特定寄存器
        if (read_input_registers(sockfd, 100, 4, special_values) < 0) {
            printf("读取特定寄存器失败\n");
            closesocket(sockfd);
            WSACleanup();
            free(all_registers);
            return 1;
        }
    
        // 计时结束
        clock_t end_time = clock();
        double total_duration = (double)(end_time - start_time) / CLOCKS_PER_SEC;
    
        // 输出结果
        printf("读取到的寄存器值 (前10个): ");
        for (int i = 0; i < 10 && i < NUM_REGISTERS; i++) {
            printf("%u ", all_registers[i]);
        }
    
        // 打印特定寄存器值
        print_special_registers(special_values);
    
        printf("\n总共读取到 %d 个寄存器\n", array_size_elements);
        printf("总耗时: %.4f 秒\n", total_duration);
    
        // 关闭套接字
        closesocket(sockfd);
        // 清理 WinSock
        WSACleanup();
        // 释放内存
        free(all_registers);
    
        return 0;
    }
    
    

    输出结果如下:

    要将这个复杂的C++代码封装并优化以便于在Python中调用,可以按照以下步骤进行:

    1. 分解功能并创建类封装

    首先,将整个功能封装到一个C++类中,使其更加模块化和易于管理。我们将核心的Modbus TCP客户端功能封装在一个类中,并暴露必要的接口供Python调用。

    2. 提供Python接口

    使用Pybind11将C++类和函数绑定到Python,使其可以直接在Python中调用。

    优化后的C++代码

    以下是经过优化并封装的C++代码:

    #include <winsock2.h>
    #include <ws2tcpip.h>
    #include <windows.h>
    #include <stdio.h>
    #include <cstdint>
    #include <ctime>
    #include <vector>
    #include <string>
    #include <iostream>
    #include <stdexcept>
    #include <pybind11/pybind11.h>
    #include <pybind11/stl.h>
    
    #pragma comment(lib, "Ws2_32.lib")
    
    #define TIMEOUT_SEC 5  // 超时时间(秒)
    
    class ModbusTCPClient {
    public:
        ModbusTCPClient(const std::string& ip, uint16_t port, uint8_t unit_id)
            : ip_(ip), port_(port), unit_id_(unit_id), sockfd_(INVALID_SOCKET) {
            if (WSAStartup(MAKEWORD(2, 2), &wsaData_) != 0) {
                throw std::runtime_error("WinSock 初始化失败");
            }
        }
    
        ~ModbusTCPClient() {
            if (sockfd_ != INVALID_SOCKET) {
                closesocket(sockfd_);
            }
            WSACleanup();
        }
    
        void connect_server() {
            sockfd_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
            if (sockfd_ == INVALID_SOCKET) {
                throw std::runtime_error("套接字创建失败");
            }
    
            u_long mode = 1;
            ioctlsocket(sockfd_, FIONBIO, &mode);
    
            sockaddr_in server_addr{};
            server_addr.sin_family = AF_INET;
            server_addr.sin_port = htons(port_);
            inet_pton(AF_INET, ip_.c_str(), &server_addr.sin_addr);
    
            if (connect(sockfd_, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
                if (WSAGetLastError() != WSAEWOULDBLOCK) {
                    throw std::runtime_error("连接服务器失败");
                }
    
                fd_set writefds;
                FD_ZERO(&writefds);
                FD_SET(sockfd_, &writefds);
                timeval timeout{TIMEOUT_SEC, 0};
    
                int select_result = select(sockfd_ + 1, nullptr, &writefds, nullptr, &timeout);
                if (select_result <= 0) {
                    throw std::runtime_error("连接服务器超时或失败");
                }
            }
        }
    
        std::vector<uint16_t> read_input_registers(uint16_t address, uint16_t count) {
            std::vector<uint16_t> registers(count, 0);
            uint8_t request[12];
            create_read_input_registers_request(request, address, count);
    
            if (send(sockfd_, reinterpret_cast<const char*>(request), 12, 0) == SOCKET_ERROR) {
                throw std::runtime_error("发送请求失败");
            }
    
            fd_set readfds;
            FD_ZERO(&readfds);
            FD_SET(sockfd_, &readfds);
    
            timeval timeout{TIMEOUT_SEC, 0};
            int select_result = select(sockfd_ + 1, &readfds, nullptr, nullptr, &timeout);
    
            if (select_result > 0) {
                uint8_t response[1024];
                int response_size = recv(sockfd_, reinterpret_cast<char*>(response), sizeof(response), 0);
                if (response_size == SOCKET_ERROR) {
                    throw std::runtime_error("接收响应失败");
                }
                parse_response(response, response_size, registers.data());
            } else if (select_result == 0) {
                throw std::runtime_error("接收响应超时");
            } else {
                throw std::runtime_error("select 调用失败");
            }
    
            return registers;
        }
    
    private:
        std::string ip_;
        uint16_t port_;
        uint8_t unit_id_;
        SOCKET sockfd_;
        WSADATA wsaData_;
    
        void create_read_input_registers_request(uint8_t* request, uint16_t address, uint16_t count) {
            uint16_t transaction_id = htons(1);
            uint16_t protocol_id = htons(0);
            uint16_t length = htons(6);
            uint8_t unit_id = unit_id_;
            uint8_t function_code = 4;
            uint16_t addr = htons(address);
            uint16_t cnt = htons(count);
    
            memcpy(request, &transaction_id, 2);
            memcpy(request + 2, &protocol_id, 2);
            memcpy(request + 4, &length, 2);
            memcpy(request + 6, &unit_id, 1);
            memcpy(request + 7, &function_code, 1);
            memcpy(request + 8, &addr, 2);
            memcpy(request + 10, &cnt, 2);
        }
    
        void parse_response(uint8_t* response, int response_size, uint16_t* registers) {
            int data_length = response[8];
            if (response_size < 9 + data_length) {
                throw std::runtime_error("响应数据不完整");
            }
    
            for (int i = 0; i < data_length / 2; i++) {
                registers[i] = ntohs(*(uint16_t*)&response[9 + i * 2]);
            }
        }
    };
    
    // Pybind11模块定义
    namespace py = pybind11;
    
    PYBIND11_MODULE(modbus_client, m) {
        py::class_<ModbusTCPClient>(m, "ModbusTCPClient")
            .def(py::init<const std::string&, uint16_t, uint8_t>())
            .def("connect_server", &ModbusTCPClient::connect_server)
            .def("read_input_registers", &ModbusTCPClient::read_input_registers);
    }
    

    代码说明

    1. ModbusTCPClient类:封装了Modbus TCP客户端的功能,包括连接服务器和读取寄存器的功能。
    2. connect_server方法:负责连接到Modbus TCP服务器。
    3. read_input_registers方法:读取输入寄存器,返回寄存器值的std::vector
    4. Pybind11模块:使用Pybind11ModbusTCPClient类绑定到Python模块modbus_client中,供Python调用。

    3. 创建Python构建脚本

    创建一个setup.py文件,用于定义构建过程:

    from setuptools import setup, Extension
    import pybind11
    
    ext_modules = [
        Extension(
            'modbus_client',  # 模块名称
            ['modbus_client.cpp'],  # 源文件
            include_dirs=[pybind11.get_include()],
            libraries=["Ws2_32"],  # 链接Ws2_32.lib库
            language='c++',
        ),
    ]
    
    setup(
        name='modbus_client',
        ext_modules=ext_modules,
    )
    

    4. 编译并测试

    1. 编译模块:在项目目录中,使用命令行运行以下命令进行编译:

      python setup.py build_ext --inplace
      
    2. 测试Python代码:编写一个Python脚本来测试生成的模块:

    import modbus_client
    
    client = modbus_client.ModbusTCPClient("192.168.0.150", 6789, 3)
    client.connect_server()
    
    # 读取特定寄存器(示例地址100,读取4个寄存器)
    registers = client.read_input_registers(100, 4)
    print(f"读取的寄存器值: {registers}")
    

    通过上述流程步骤,结果如下所示:

    优化测试代码:

    如下所示:

    import modbus_client
    
    client = modbus_client.ModbusTCPClient("192.168.0.150", 6789, 3)
    client.connect_server()
    
    # 读取特定寄存器(示例地址100,读取4个寄存器)
    registers = client.read_input_registers(100, 4)
    print(f"读取的寄存器值: {registers}")
    
    registers2 = client.read_input_registers(1000, 127)
    print(f"读取的寄存器值: {registers2}")
    registers3 = client.read_input_registers(1127, 127)
    print(f"读取的寄存器值: {registers3}")
    
    

    运行结果如下:

    我测试最大每次读127,要是多一点就出错,当设置为128时,结果如下:

    我的寄存器地址从1000开始,总共有51200个数值,故要分批读取。
    读取特定寄存器和读取51200个寄存器的数据,并将max_registers_per_request设置为125。这个值符合Modbus协议中典型的最大读取寄存器数量限制。
    完整代码如下:

    import modbus_client
    
    # 初始化Modbus TCP客户端
    client = modbus_client.ModbusTCPClient("192.168.0.150", 6789, 3)
    client.connect_server()
    
    # 定义读取参数
    start_address = 1000
    total_registers = 51200
    max_registers_per_request = 125  # 设置为125,符合Modbus标准
    
    # 存储结果的列表
    result = []
    
    # 分批次读取51200个寄存器数据
    for i in range(0, total_registers, max_registers_per_request):
        batch_size = min(max_registers_per_request, total_registers - i)
        current_address = start_address + i
        registers = client.read_input_registers(current_address, batch_size)
        result.extend(registers)
    
    # 读取特定寄存器的值(示例地址100,读取4个寄存器)
    special_registers = client.read_input_registers(100, 4)
    
    # 输出结果
    print(f"读取到的寄存器总数: {len(result)}")
    print(f"后10个寄存器值: {result[:-10]}")
    print(f"特定寄存器(地址100-103)的值: {special_registers}")
    
    

    在现有的代码基础上,我们可以添加计时功能来计算读取寄存器的耗时。我们将使用Python的time模块来测量时间。在读取寄存器之前记录开始时间,完成后记录结束时间,最后计算和输出总耗时。

    优化后的代码(带计时功能)

    import modbus_client
    import time
    
    # 初始化Modbus TCP客户端
    client = modbus_client.ModbusTCPClient("192.168.0.150", 6789, 3)
    client.connect_server()
    
    # 定义读取参数
    start_address = 1000
    total_registers = 51200
    max_registers_per_request = 125  # 设置为125,符合Modbus标准
    
    # 存储结果的列表
    result = []
    
    # 开始计时
    start_time = time.time()
    
    # 分批次读取51200个寄存器数据
    for i in range(0, total_registers, max_registers_per_request):
        batch_size = min(max_registers_per_request, total_registers - i)
        current_address = start_address + i
        registers = client.read_input_registers(current_address, batch_size)
        result.extend(registers)
    
    # 读取特定寄存器的值(示例地址100,读取4个寄存器)
    special_registers = client.read_input_registers(100, 4)
    # 结束计时
    end_time = time.time()
    total_duration = end_time - start_time
    
    
    
    # 输出结果
    print(f"读取到的寄存器总数: {len(result)}")
    print(f"后10个寄存器值: {result[-10:]}")
    print(f"特定寄存器(地址100-103)的值: {special_registers}")
    print(f"读取 {total_registers} 个寄存器总耗时: {total_duration:.4f} 秒")
    

    输出结果如下:

    代码说明

    1. 导入time模块:我们使用time模块来进行计时。

    2. 开始计时

    3. 在开始读取寄存器之前,调用start_time = time.time()记录当前时间。
    4. 结束计时并计算总耗时

    5. 在所有寄存器读取完毕后,调用end_time = time.time()记录结束时间。
    6. total_duration表示总耗时,计算方式为end_time - start_time
    7. 输出耗时

    8. 通过print(f"读取 {total_registers} 个寄存器总耗时: {total_duration:.4f} 秒")输出读取51200个寄存器的总耗时,结果精确到小数点后四位。

    优化结果

    通过添加计时功能,你可以精确地知道读取寄存器操作花费的时间。这对于性能优化和评估系统响应时间非常有用。程序在读取完所有寄存器后,会输出耗时信息,帮助你了解通信的效率。

    使用纯python写法读取数据代码如下:

    import socket
    import struct
    import time
    import select
    
    # 定义一些常量
    IP = "192.168.0.150"
    PORT = 6789
    UNIT_ID = 3
    REGISTER_ADDRESS = 1000
    NUM_REGISTERS = 51200
    MAX_REGISTERS_PER_REQUEST = 125
    TIMEOUT_SEC = 5  # 超时时间(秒)
    
    # 特定寄存器的地址和含义映射
    special_registers = [
        (100, "uhf_db"),
        (101, "reserve"),
        (102, "放电次数"),
        (103, "放电相位")
    ]
    
    def create_read_input_registers_request(address, count):
        transaction_id = 1
        protocol_id = 0
        length = 6
        unit_id = UNIT_ID
        function_code = 4
        request = struct.pack('>HHHBBHH', transaction_id, protocol_id, length, unit_id, function_code, address, count)
        return request
    
    def parse_response(response, count):
        data_length = response[8]
        if len(response) < 9 + data_length:
            raise ValueError("响应数据不完整")
    
        registers = struct.unpack('>' + 'H' * (data_length // 2), response[9:9 + data_length])
        return registers
    
    def read_input_registers(sock, address, count):
        request = create_read_input_registers_request(address, count)
        sock.sendall(request)
    
        sock.settimeout(TIMEOUT_SEC)
        response = sock.recv(1024)
        return parse_response(response, count)
    
    def print_special_registers(special_values):
        print("特定寄存器值:")
        for address, description in special_registers:
            print(f"地址 {address} ({description}): {special_values[address - 100]}")
    
    def main():
        all_registers = [0] * NUM_REGISTERS
        special_values = [0] * 4
    
        print(f"正在连接到 Modbus TCP 服务器 {IP}:{PORT}")
        print(f"正在读取 {NUM_REGISTERS} 个输入寄存器...")
    
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(0)
    
        server_address = (IP, PORT)
        sock.connect_ex(server_address)
    
        ready_to_write = [sock]
        _, writable, _ = select.select([], ready_to_write, [], TIMEOUT_SEC)
    
        if not writable:
            print("连接服务器超时或失败")
            sock.close()
            return
    
        start_time = time.time()
    
        try:
            for i in range(0, NUM_REGISTERS, MAX_REGISTERS_PER_REQUEST):
                batch_size = min(NUM_REGISTERS - i, MAX_REGISTERS_PER_REQUEST)
                batch_registers = read_input_registers(sock, REGISTER_ADDRESS + i, batch_size)
                all_registers[i:i + batch_size] = batch_registers
    
            special_values = read_input_registers(sock, 100, 4)
        except Exception as e:
            print(f"读取寄存器失败: {e}")
            sock.close()
            return
    
        end_time = time.time()
        total_duration = end_time - start_time
    
        print("读取到的寄存器值 (后10个):", all_registers[-10:])
        print_special_registers(special_values)
        print(f"\n总共读取到 {len(all_registers)} 个寄存器")
        print(f"总耗时: {total_duration:.4f} 秒")
    
        sock.close()
    
    if __name__ == "__main__":
        main()
    
    

    结果如下:

    很纳闷,python调用C++代码和纯python读取效率差不多。

    总结

    通过封装和优化,复杂的C++代码可以更加模块化和易于维护,同时通过Pybind11轻松暴露给Python调用。这使得在高效处理低层次网络通信的同时,可以在高层次应用中利用Python的灵活性。

    作者:像风一样自由2020

    物联沃分享整理
    物联沃-IOTWORD物联网 » python调用C++代码-方法-使用Pybind11库将C++代码与Python绑定

    发表回复