python调用C++代码-方法-使用Pybind11库将C++代码与Python绑定
python调用C++代码-方法1-使用Pybind11库将C++代码与Python绑定
目的
这个方法的目的是使用Pybind11
库将C++代码与Python绑定,从而在Python中调用高效的C++代码。这种方法适用于需要在Python项目中利用C++的性能优势,尤其是在需要高性能计算或已有C++代码库的情况下。
1. 准备工作
安装必要的软件和库:
- Python:确保系统中已安装Python(Python 3.x版本)。
- C++编译器:
- Visual Studio:确保安装了包含C++开发工具的Visual Studio(推荐使用)。
- MinGW:如果你更熟悉GCC编译器,可以使用MinGW。
- Pybind11库:通过
pip
安装Pybind11:pip install pybind11
2. 创建项目目录和文件结构
组织你的项目文件:
-
创建一个项目目录:
mkdir your_project_directory cd your_project_directory
-
在这个目录下,创建以下文件:
example.cpp
: 包含C++源代码。setup.py
: 用于定义编译过程的Python脚本。test_example.py
: 用于测试生成的Python模块。
文件结构示例:
/your_project_directory
|-- example.cpp # C++源代码文件
|-- setup.py # Python构建脚本
|-- test_example.py # Python测试脚本
3. 编写C++代码
创建example.cpp
文件,并写入以下代码:
#include <pybind11/pybind11.h>
// 定义一个简单的C++函数,计算两个整数的和
int add(int a, int b) {
return a + b;
}
// 使用Pybind11将C++函数绑定到Python模块
PYBIND11_MODULE(example, m) {
m.def("add", &add, "A function that adds two numbers");
}
解释:
#include <pybind11/pybind11.h>
:引入Pybind11
的核心头文件。int add(int a, int b)
:这是一个简单的C++函数,用于计算两个整数的和。PYBIND11_MODULE(example, m)
:这是一个宏,用于定义Python模块。example
是模块名称,m
是模块对象。4. 编写构建脚本
创建setup.py
文件,内容如下:
from setuptools import setup, Extension
import pybind11
# 定义一个扩展模块,指定源文件和包含路径
ext_modules = [
Extension(
'example', # 模块名称
['example.cpp'], # 源文件
include_dirs=[pybind11.get_include()], # 包含pybind11头文件的路径
language='c++' # 指定使用C++编译
),
]
# 定义构建过程
setup(
name='example',
ext_modules=ext_modules,
)
解释:
setup.py
文件使用setuptools
来管理构建流程。Extension
类用于定义扩展模块的属性,包括模块名称、源文件、包含路径和编译语言。我的目录如下:

5. 编译C++代码并生成Python模块
使用命令行编译生成Python模块:
- 打开命令行(如pycharm-terminal开发者命令行)。
- 导航到项目目录:
cd your_project_directory
- 运行以下命令进行编译:
python setup.py build_ext --inplace
这条命令会将
example.cpp
文件编译成一个可直接导入的Python模块(在Windows上通常是example.pyd
)。
进入项目路径进行编译:
成功生成结果如下:
此时项目文件如下:
6. 测试生成的Python模块
创建并运行测试脚本:
-
创建
test_example.py
文件,并写入以下代码:import example result = example.add(10, 20) print(result) # 预期输出: 30
-
运行测试脚本:
python test_example.py
如果一切正常,你应该会看到输出结果
30
。
测试成功结果如下:
总结
通过这个详细的步骤,你可以将C++代码与Python成功绑定。这种方法不仅适用于简单的函数绑定,也可以扩展到更复杂的C++类和模块,使得Python能够利用C++的强大性能。
实战测试
我有一段C++代码的主要功能是通过Modbus TCP协议从远程服务器读取输入寄存器的数据,并将读取到的数据进行解析和显示。
完整代码如下:
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
#include <stdio.h>
#include <cstdint>
#include <ctime>
#pragma comment(lib, "Ws2_32.lib") // 链接Ws2_32.lib库
// 定义一些常量
#define IP "192.168.0.150"
#define PORT 6789
#define UNIT_ID 3
#define REGISTER_ADDRESS 1000
#define NUM_REGISTERS 51200
#define MAX_REGISTERS_PER_REQUEST 255
#define MAX_RETRIES 3
#define TIMEOUT_SEC 5 // 超时时间(秒)
// 特定寄存器的地址和含义映射
typedef struct {
uint16_t address;
const char* description;
} SpecialRegister;
SpecialRegister special_registers[] = {
{100, "uhf_db"},
{101, "reserve"},
{102, "放电次数"},
{103, "放电相位"}
};
void create_read_input_registers_request(uint8_t* request, uint16_t address, uint16_t count) {
uint16_t transaction_id = htons(1);
uint16_t protocol_id = htons(0);
uint16_t length = htons(6);
uint8_t unit_id = UNIT_ID;
uint8_t function_code = 4;
uint16_t addr = htons(address);
uint16_t cnt = htons(count);
memcpy(request, &transaction_id, 2);
memcpy(request + 2, &protocol_id, 2);
memcpy(request + 4, &length, 2);
memcpy(request + 6, &unit_id, 1);
memcpy(request + 7, &function_code, 1);
memcpy(request + 8, &addr, 2);
memcpy(request + 10, &cnt, 2);
}
void parse_response(uint8_t* response, int response_size, uint16_t* registers) {
int data_length = response[8];
if (response_size < 9 + data_length) {
printf("错误:响应数据不完整\n");
return;
}
for (int i = 0; i < data_length / 2; i++) {
registers[i] = ntohs(*(uint16_t*)&response[9 + i * 2]);
}
}
int read_input_registers(SOCKET sockfd, uint16_t address, uint16_t count, uint16_t* registers) {
uint8_t request[12];
create_read_input_registers_request(request, address, count);
if (send(sockfd, (const char*)request, 12, 0) == SOCKET_ERROR) {
printf("发送请求失败: %ld\n", WSAGetLastError());
return -1;
}
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(sockfd, &readfds);
struct timeval timeout;
timeout.tv_sec = TIMEOUT_SEC;
timeout.tv_usec = 0;
int select_result = select(sockfd + 1, &readfds, NULL, NULL, &timeout);
if (select_result > 0) {
uint8_t response[1024];
int response_size = recv(sockfd, (char*)response, sizeof(response), 0);
if (response_size == SOCKET_ERROR) {
printf("接收响应失败: %ld\n", WSAGetLastError());
return -1;
}
parse_response(response, response_size, registers);
return response_size;
}
else if (select_result == 0) {
printf("接收响应超时\n");
return -1;
}
else {
printf("select 调用失败: %ld\n", WSAGetLastError());
return -1;
}
}
void print_special_registers(uint16_t* special_values) {
printf("特定寄存器值:\n");
for (int i = 0; i < sizeof(special_registers) / sizeof(SpecialRegister); i++) {
uint16_t address = special_registers[i].address;
const char* description = special_registers[i].description;
printf("地址 %u (%s): %u\n", address, description, special_values[address - 100]);
}
}
int main() {
WSADATA wsaData;
SOCKET sockfd;
struct sockaddr_in server_addr;
uint16_t* all_registers = (uint16_t*)malloc(NUM_REGISTERS * sizeof(uint16_t));
uint16_t special_values[4] = { 0 };
if (all_registers == NULL) {
printf("内存分配失败\n");
return 1;
}
printf("正在连接到 Modbus TCP 服务器 %s:%d\n", IP, PORT);
printf("正在读取 %d 个输入寄存器...\n", NUM_REGISTERS);
// 初始化 WinSock
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
printf("WinSock 初始化失败: %d\n", WSAGetLastError());
free(all_registers);
return 1;
}
// 创建套接字
if ((sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
printf("套接字创建失败: %ld\n", WSAGetLastError());
WSACleanup();
free(all_registers);
return 1;
}
// 设置套接字为非阻塞模式
u_long mode = 1;
ioctlsocket(sockfd, FIONBIO, &mode);
// 配置服务器地址
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(PORT);
inet_pton(AF_INET, IP, &server_addr.sin_addr);
// 连接服务器
if (connect(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
if (WSAGetLastError() != WSAEWOULDBLOCK) {
printf("连接服务器失败: %ld\n", WSAGetLastError());
closesocket(sockfd);
WSACleanup();
free(all_registers);
return 1;
}
// 使用 select 等待连接完成
fd_set writefds;
FD_ZERO(&writefds);
FD_SET(sockfd, &writefds);
struct timeval timeout;
timeout.tv_sec = TIMEOUT_SEC;
timeout.tv_usec = 0;
int select_result = select(sockfd + 1, NULL, &writefds, NULL, &timeout);
if (select_result <= 0) {
printf("连接服务器超时或失败\n");
closesocket(sockfd);
WSACleanup();
free(all_registers);
return 1;
}
}
// 计时开始
clock_t start_time = clock();
// 分批读取寄存器
for (int i = 0; i < NUM_REGISTERS; i += MAX_REGISTERS_PER_REQUEST) {
int batch_size = (NUM_REGISTERS - i < MAX_REGISTERS_PER_REQUEST) ? NUM_REGISTERS - i : MAX_REGISTERS_PER_REQUEST;
if (read_input_registers(sockfd, REGISTER_ADDRESS + i, batch_size, all_registers + i) < 0) {
printf("读取寄存器失败\n");
closesocket(sockfd);
WSACleanup();
free(all_registers);
return 1;
}
}
// 计算并显示all_registers数组的大小
size_t array_size_bytes = NUM_REGISTERS * sizeof(uint16_t); // 数组占用的总字节数
size_t array_size_elements = NUM_REGISTERS; // 数组中的元素数
printf("all_registers数组的大小: %zu 字节\n", array_size_bytes);
printf("all_registers数组中的元素数量: %zu\n", array_size_elements);
// 读取特定寄存器
if (read_input_registers(sockfd, 100, 4, special_values) < 0) {
printf("读取特定寄存器失败\n");
closesocket(sockfd);
WSACleanup();
free(all_registers);
return 1;
}
// 计时结束
clock_t end_time = clock();
double total_duration = (double)(end_time - start_time) / CLOCKS_PER_SEC;
// 输出结果
printf("读取到的寄存器值 (前10个): ");
for (int i = 0; i < 10 && i < NUM_REGISTERS; i++) {
printf("%u ", all_registers[i]);
}
// 打印特定寄存器值
print_special_registers(special_values);
printf("\n总共读取到 %d 个寄存器\n", array_size_elements);
printf("总耗时: %.4f 秒\n", total_duration);
// 关闭套接字
closesocket(sockfd);
// 清理 WinSock
WSACleanup();
// 释放内存
free(all_registers);
return 0;
}
输出结果如下:
要将这个复杂的C++代码封装并优化以便于在Python中调用,可以按照以下步骤进行:
1. 分解功能并创建类封装
首先,将整个功能封装到一个C++类中,使其更加模块化和易于管理。我们将核心的Modbus TCP客户端功能封装在一个类中,并暴露必要的接口供Python调用。
2. 提供Python接口
使用Pybind11
将C++类和函数绑定到Python,使其可以直接在Python中调用。
优化后的C++代码
以下是经过优化并封装的C++代码:
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
#include <stdio.h>
#include <cstdint>
#include <ctime>
#include <vector>
#include <string>
#include <iostream>
#include <stdexcept>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#pragma comment(lib, "Ws2_32.lib")
#define TIMEOUT_SEC 5 // 超时时间(秒)
class ModbusTCPClient {
public:
ModbusTCPClient(const std::string& ip, uint16_t port, uint8_t unit_id)
: ip_(ip), port_(port), unit_id_(unit_id), sockfd_(INVALID_SOCKET) {
if (WSAStartup(MAKEWORD(2, 2), &wsaData_) != 0) {
throw std::runtime_error("WinSock 初始化失败");
}
}
~ModbusTCPClient() {
if (sockfd_ != INVALID_SOCKET) {
closesocket(sockfd_);
}
WSACleanup();
}
void connect_server() {
sockfd_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockfd_ == INVALID_SOCKET) {
throw std::runtime_error("套接字创建失败");
}
u_long mode = 1;
ioctlsocket(sockfd_, FIONBIO, &mode);
sockaddr_in server_addr{};
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port_);
inet_pton(AF_INET, ip_.c_str(), &server_addr.sin_addr);
if (connect(sockfd_, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
if (WSAGetLastError() != WSAEWOULDBLOCK) {
throw std::runtime_error("连接服务器失败");
}
fd_set writefds;
FD_ZERO(&writefds);
FD_SET(sockfd_, &writefds);
timeval timeout{TIMEOUT_SEC, 0};
int select_result = select(sockfd_ + 1, nullptr, &writefds, nullptr, &timeout);
if (select_result <= 0) {
throw std::runtime_error("连接服务器超时或失败");
}
}
}
std::vector<uint16_t> read_input_registers(uint16_t address, uint16_t count) {
std::vector<uint16_t> registers(count, 0);
uint8_t request[12];
create_read_input_registers_request(request, address, count);
if (send(sockfd_, reinterpret_cast<const char*>(request), 12, 0) == SOCKET_ERROR) {
throw std::runtime_error("发送请求失败");
}
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(sockfd_, &readfds);
timeval timeout{TIMEOUT_SEC, 0};
int select_result = select(sockfd_ + 1, &readfds, nullptr, nullptr, &timeout);
if (select_result > 0) {
uint8_t response[1024];
int response_size = recv(sockfd_, reinterpret_cast<char*>(response), sizeof(response), 0);
if (response_size == SOCKET_ERROR) {
throw std::runtime_error("接收响应失败");
}
parse_response(response, response_size, registers.data());
} else if (select_result == 0) {
throw std::runtime_error("接收响应超时");
} else {
throw std::runtime_error("select 调用失败");
}
return registers;
}
private:
std::string ip_;
uint16_t port_;
uint8_t unit_id_;
SOCKET sockfd_;
WSADATA wsaData_;
void create_read_input_registers_request(uint8_t* request, uint16_t address, uint16_t count) {
uint16_t transaction_id = htons(1);
uint16_t protocol_id = htons(0);
uint16_t length = htons(6);
uint8_t unit_id = unit_id_;
uint8_t function_code = 4;
uint16_t addr = htons(address);
uint16_t cnt = htons(count);
memcpy(request, &transaction_id, 2);
memcpy(request + 2, &protocol_id, 2);
memcpy(request + 4, &length, 2);
memcpy(request + 6, &unit_id, 1);
memcpy(request + 7, &function_code, 1);
memcpy(request + 8, &addr, 2);
memcpy(request + 10, &cnt, 2);
}
void parse_response(uint8_t* response, int response_size, uint16_t* registers) {
int data_length = response[8];
if (response_size < 9 + data_length) {
throw std::runtime_error("响应数据不完整");
}
for (int i = 0; i < data_length / 2; i++) {
registers[i] = ntohs(*(uint16_t*)&response[9 + i * 2]);
}
}
};
// Pybind11模块定义
namespace py = pybind11;
PYBIND11_MODULE(modbus_client, m) {
py::class_<ModbusTCPClient>(m, "ModbusTCPClient")
.def(py::init<const std::string&, uint16_t, uint8_t>())
.def("connect_server", &ModbusTCPClient::connect_server)
.def("read_input_registers", &ModbusTCPClient::read_input_registers);
}
代码说明
- ModbusTCPClient类:封装了Modbus TCP客户端的功能,包括连接服务器和读取寄存器的功能。
- connect_server方法:负责连接到Modbus TCP服务器。
- read_input_registers方法:读取输入寄存器,返回寄存器值的
std::vector
。 - Pybind11模块:使用
Pybind11
将ModbusTCPClient
类绑定到Python模块modbus_client
中,供Python调用。
3. 创建Python构建脚本
创建一个setup.py
文件,用于定义构建过程:
from setuptools import setup, Extension
import pybind11
ext_modules = [
Extension(
'modbus_client', # 模块名称
['modbus_client.cpp'], # 源文件
include_dirs=[pybind11.get_include()],
libraries=["Ws2_32"], # 链接Ws2_32.lib库
language='c++',
),
]
setup(
name='modbus_client',
ext_modules=ext_modules,
)
4. 编译并测试
-
编译模块:在项目目录中,使用命令行运行以下命令进行编译:
python setup.py build_ext --inplace
-
测试Python代码:编写一个Python脚本来测试生成的模块:
import modbus_client
client = modbus_client.ModbusTCPClient("192.168.0.150", 6789, 3)
client.connect_server()
# 读取特定寄存器(示例地址100,读取4个寄存器)
registers = client.read_input_registers(100, 4)
print(f"读取的寄存器值: {registers}")
通过上述流程步骤,结果如下所示:
优化测试代码:
如下所示:
import modbus_client
client = modbus_client.ModbusTCPClient("192.168.0.150", 6789, 3)
client.connect_server()
# 读取特定寄存器(示例地址100,读取4个寄存器)
registers = client.read_input_registers(100, 4)
print(f"读取的寄存器值: {registers}")
registers2 = client.read_input_registers(1000, 127)
print(f"读取的寄存器值: {registers2}")
registers3 = client.read_input_registers(1127, 127)
print(f"读取的寄存器值: {registers3}")
运行结果如下:
我测试最大每次读127,要是多一点就出错,当设置为128时,结果如下:
我的寄存器地址从1000开始,总共有51200个数值,故要分批读取。
读取特定寄存器和读取51200个寄存器的数据,并将max_registers_per_request设置为125。这个值符合Modbus协议中典型的最大读取寄存器数量限制。
完整代码如下:
import modbus_client
# 初始化Modbus TCP客户端
client = modbus_client.ModbusTCPClient("192.168.0.150", 6789, 3)
client.connect_server()
# 定义读取参数
start_address = 1000
total_registers = 51200
max_registers_per_request = 125 # 设置为125,符合Modbus标准
# 存储结果的列表
result = []
# 分批次读取51200个寄存器数据
for i in range(0, total_registers, max_registers_per_request):
batch_size = min(max_registers_per_request, total_registers - i)
current_address = start_address + i
registers = client.read_input_registers(current_address, batch_size)
result.extend(registers)
# 读取特定寄存器的值(示例地址100,读取4个寄存器)
special_registers = client.read_input_registers(100, 4)
# 输出结果
print(f"读取到的寄存器总数: {len(result)}")
print(f"后10个寄存器值: {result[:-10]}")
print(f"特定寄存器(地址100-103)的值: {special_registers}")
在现有的代码基础上,我们可以添加计时功能来计算读取寄存器的耗时。我们将使用Python的time
模块来测量时间。在读取寄存器之前记录开始时间,完成后记录结束时间,最后计算和输出总耗时。
优化后的代码(带计时功能)
import modbus_client
import time
# 初始化Modbus TCP客户端
client = modbus_client.ModbusTCPClient("192.168.0.150", 6789, 3)
client.connect_server()
# 定义读取参数
start_address = 1000
total_registers = 51200
max_registers_per_request = 125 # 设置为125,符合Modbus标准
# 存储结果的列表
result = []
# 开始计时
start_time = time.time()
# 分批次读取51200个寄存器数据
for i in range(0, total_registers, max_registers_per_request):
batch_size = min(max_registers_per_request, total_registers - i)
current_address = start_address + i
registers = client.read_input_registers(current_address, batch_size)
result.extend(registers)
# 读取特定寄存器的值(示例地址100,读取4个寄存器)
special_registers = client.read_input_registers(100, 4)
# 结束计时
end_time = time.time()
total_duration = end_time - start_time
# 输出结果
print(f"读取到的寄存器总数: {len(result)}")
print(f"后10个寄存器值: {result[-10:]}")
print(f"特定寄存器(地址100-103)的值: {special_registers}")
print(f"读取 {total_registers} 个寄存器总耗时: {total_duration:.4f} 秒")
输出结果如下:
代码说明
-
导入
time
模块:我们使用time
模块来进行计时。 -
开始计时:
- 在开始读取寄存器之前,调用
start_time = time.time()
记录当前时间。 -
结束计时并计算总耗时:
- 在所有寄存器读取完毕后,调用
end_time = time.time()
记录结束时间。 total_duration
表示总耗时,计算方式为end_time - start_time
。-
输出耗时:
- 通过
print(f"读取 {total_registers} 个寄存器总耗时: {total_duration:.4f} 秒")
输出读取51200个寄存器的总耗时,结果精确到小数点后四位。
优化结果
通过添加计时功能,你可以精确地知道读取寄存器操作花费的时间。这对于性能优化和评估系统响应时间非常有用。程序在读取完所有寄存器后,会输出耗时信息,帮助你了解通信的效率。
使用纯python写法读取数据代码如下:
import socket
import struct
import time
import select
# 定义一些常量
IP = "192.168.0.150"
PORT = 6789
UNIT_ID = 3
REGISTER_ADDRESS = 1000
NUM_REGISTERS = 51200
MAX_REGISTERS_PER_REQUEST = 125
TIMEOUT_SEC = 5 # 超时时间(秒)
# 特定寄存器的地址和含义映射
special_registers = [
(100, "uhf_db"),
(101, "reserve"),
(102, "放电次数"),
(103, "放电相位")
]
def create_read_input_registers_request(address, count):
transaction_id = 1
protocol_id = 0
length = 6
unit_id = UNIT_ID
function_code = 4
request = struct.pack('>HHHBBHH', transaction_id, protocol_id, length, unit_id, function_code, address, count)
return request
def parse_response(response, count):
data_length = response[8]
if len(response) < 9 + data_length:
raise ValueError("响应数据不完整")
registers = struct.unpack('>' + 'H' * (data_length // 2), response[9:9 + data_length])
return registers
def read_input_registers(sock, address, count):
request = create_read_input_registers_request(address, count)
sock.sendall(request)
sock.settimeout(TIMEOUT_SEC)
response = sock.recv(1024)
return parse_response(response, count)
def print_special_registers(special_values):
print("特定寄存器值:")
for address, description in special_registers:
print(f"地址 {address} ({description}): {special_values[address - 100]}")
def main():
all_registers = [0] * NUM_REGISTERS
special_values = [0] * 4
print(f"正在连接到 Modbus TCP 服务器 {IP}:{PORT}")
print(f"正在读取 {NUM_REGISTERS} 个输入寄存器...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
server_address = (IP, PORT)
sock.connect_ex(server_address)
ready_to_write = [sock]
_, writable, _ = select.select([], ready_to_write, [], TIMEOUT_SEC)
if not writable:
print("连接服务器超时或失败")
sock.close()
return
start_time = time.time()
try:
for i in range(0, NUM_REGISTERS, MAX_REGISTERS_PER_REQUEST):
batch_size = min(NUM_REGISTERS - i, MAX_REGISTERS_PER_REQUEST)
batch_registers = read_input_registers(sock, REGISTER_ADDRESS + i, batch_size)
all_registers[i:i + batch_size] = batch_registers
special_values = read_input_registers(sock, 100, 4)
except Exception as e:
print(f"读取寄存器失败: {e}")
sock.close()
return
end_time = time.time()
total_duration = end_time - start_time
print("读取到的寄存器值 (后10个):", all_registers[-10:])
print_special_registers(special_values)
print(f"\n总共读取到 {len(all_registers)} 个寄存器")
print(f"总耗时: {total_duration:.4f} 秒")
sock.close()
if __name__ == "__main__":
main()
结果如下:
很纳闷,python调用C++代码和纯python读取效率差不多。
总结
通过封装和优化,复杂的C++代码可以更加模块化和易于维护,同时通过Pybind11
轻松暴露给Python调用。这使得在高效处理低层次网络通信的同时,可以在高层次应用中利用Python的灵活性。
作者:像风一样自由2020