模拟can信号实现通信
实车上算法一般通过ros进行通信,车辆和控制器之间则通过can通信实现,今天来学习一下如何模拟这个can。
can信号的发送和接收一般是需要载体的,我们一般都有can0和can1设备可以使用,在电脑上创建这个设备:
加载vcan内核模块:
sudo modprobe vcan
创建虚拟CAN接口:
sudo ip link add dev vcan0 type vcan
将虚拟CAN接口处于在线状态:
sudo ip link set up vcan0
然后就能进行测试了:
cansend vcan0 260##00000000000000000
可以去另一个终端打印相关的信息:
candump vcan0
详细信息(比如判断是can帧还是can-fd帧)通过:
candump -L vcan0
来打印
说明:
1.基本用法
cansend 需要两个参数: 和 <can_frame>。
是 CAN 设备的名称,如 vcan0。
<can_frame> 是要发送的 CAN 帧的格式。
2.CAN 帧格式
<can_id>#{R|data}:用于标准的 CAN 2.0 帧。
<can_id> 是 CAN 帧的标识符,可以是 3 个(标准帧)或 8 个(扩展帧)十六进制字符。
{R} 表示远程传输请求。
{data} 是数据字段,包含 0 到 8 个十六进制值(对于 CAN FD 是 0 到 64)。
<can_id>##{data}:用于 CAN FD 帧。
是一个十六进制值(0 到 F),定义了 CAN FD 帧的标志。
3.示例
5A1#11.2233.44556677.88:发送一个标准 CAN 帧,ID 为 5A1,数据为 11 22 33 44 55 66 77 88。
123#DEADBEEF:发送一个标准 CAN 帧,ID 为 123,数据为 DE AD BE EF。
5AA#:发送一个扩展 CAN 帧,ID 为 5AA,没有数据。
123##1:发送一个 CAN FD 帧,ID 为 123,标志为 1,没有数据。
213##311:发送一个 CAN FD 帧,ID 为 213,标志为 311,没有数据。
1F334455#1122334455667788:发送一个扩展 CAN 帧,ID 为 1F334455,数据为 11 22 33 44 55 66 77 88。
123#R:发送一个远程传输请求,ID 为 123。
程序示例:
新建一个功能包创建新的CanData.msg:
int32 drive_mode
int32 gear_position
int32 emergency_status
int32 reset_key
int32 remote_key
float32 current_angle
float32 vehicle_speed
int32 throttle_feedback
int32 fault_code1
int32 fault_code2
int32 fault_code3
int32 fault_code4
int32 total_mileage
float32 brake_pressure
int32 soc
int32 vehicle_status1
int32 vehicle_status2
int32 left_turn
int32 right_turn
int32 horn
int32 high_beam
int32 brake_sensor
int32 low_beam
int32 fog_light
int32 seat_indicator
int32 height_sensor
float32 pitch_sensor
int32 run_mode
int32 handbrake
float32 run_speed
int32 main_version
int32 sub_version
int32 rev_version
int32 date
int32 total_fault
int32 vcu_status
int32 vehicle_start_status
int32 vcu_voltage
附带的cmakelist和package:
cmake_minimum_required(VERSION 3.0.2)
project(wirecontrol)
find_package(catkin REQUIRED COMPONENTS
rospy
std_msgs
message_generation
)
add_message_files(
FILES
CanData.msg
)
generate_messages(
DEPENDENCIES
std_msgs
)
catkin_package(
CATKIN_DEPENDS message_runtime
)
<?xml version="1.0"?>
<package format="2">
<name>wirecontrol</name>
<version>0.0.0</version>
<description>The wirecontrol package</description>
<maintainer email="cyun@todo.todo">cyun</maintainer>
<license>TODO</license>
<buildtool_depend>catkin</buildtool_depend>
<build_depend>message_generation</build_depend>
<build_depend>rospy</build_depend>
<build_depend>std_msgs</build_depend>
<build_export_depend>rospy</build_export_depend>
<build_export_depend>std_msgs</build_export_depend>
<exec_depend>message_runtime</exec_depend>
<exec_depend>rospy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<export>
</export>
</package>
函数编写,主要是通过dbc文件来发送can消息,比较方便。
#!/usr/bin/env python
'''
Author: CYUN && cyun@tju.enu.cn
Date: 2024-07-26 16:58:02
LastEditors: CYUN && cyun@tju.enu.cn
LastEditTime: 2024-07-26 18:09:29
FilePath: /undefined/home/cyun/forklift_sim_ws/src/wirecontrol/scripts/wirecontrol.py
Description: send can messages from vcan0 to controller
Copyright (c) 2024 by Tianjin University, All Rights Reserved.
'''
import os
import time
import can
import rospy
import cantools
from car_interfaces.msg import CanData # 替换为你的包名
# 获取DBC文件路径
script_directory = os.path.dirname(os.path.abspath(__file__))
relative_path = "../config/IPC_VCU_ZRD.dbc"
dbc_file_path = os.path.join(script_directory, relative_path)
# 加载DBC文件
dbc = cantools.db.load_file(dbc_file_path)
# 创建与 'vcan0' 接口的 CAN 总线连接
# can_bus = can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=500000)
can_bus = can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=500000, fd=True)
# 初始化全局变量
data = CanData()
previous_send_time = {}
def can_data_callback(msg):
global data
data = msg
def send_can_message(bus, message):
global previous_send_time
current_time = time.time()
# 发布can/canfd消息
# can_msg = can.Message(arbitration_id=msg_id, data=encoded_message, is_extended_id=False, is_fd=True)
# can_msg = can.Message(arbitration_id=message.arbitration_id, data=message.data, is_extended_id=message.is_extended_id, is_fd=fd)
bus.send(message)
if message.arbitration_id in previous_send_time:
time_diff = current_time - previous_send_time[message.arbitration_id]
frequency = 1.0 / time_diff if time_diff > 0 else 0
print(f"Sent CAN message: ID=0x{message.arbitration_id:X}, Data={''.join(format(x, '02x') for x in message.data)}")
print(f"send sequence: {frequency:.2f} Hz")
else:
print(f"Sent CAN message: ID=0x{message.arbitration_id:X}, Data={''.join(format(x, '02x') for x in message.data)}")
print("send sequence: First message")
previous_send_time[message.arbitration_id] = current_time
def main():
global can_bus
rospy.init_node('can_publisher', anonymous=True)
rospy.Subscriber('can_data_topic', CanData, can_data_callback) # 订阅自定义消息
rate = rospy.Rate(50) # 50 Hz
try:
while not rospy.is_shutdown():
for msg_id in [0x201, 0x202, 0x203, 0x204, 0x205, 0x261, 0x260]:
if msg_id == 0x201:
message = dbc.get_message_by_frame_id(0x201)
data_dict = {
'Drive_Mode': data.drive_mode,
'Gear': data.gear_position,
'EPOSts': data.emergency_status,
'YK_H': data.reset_key,
'YK_F': data.remote_key,
'Angle': int(data.current_angle * 100), # 根据协议可能需要调整
'Car_Speed': int(data.vehicle_speed * 1000), # 根据协议可能需要调整
'Motor_Torque': data.throttle_feedback
}
elif msg_id == 0x202:
message = dbc.get_message_by_frame_id(0x202)
data_dict = {
'Fault1': data.fault_code1,
'Fault2': data.fault_code2,
'Fault3': data.fault_code3,
'Fault4': data.fault_code4,
'Mileage': data.total_mileage
}
elif msg_id == 0x203:
message = dbc.get_message_by_frame_id(0x203)
data_dict = {
'Brake_Pressure': int(data.brake_pressure * 20), # 根据协议可能需要调整
'SOC': data.soc,
'CarSts1': data.vehicle_status1,
'CarSts2': data.vehicle_status2,
'LR_WheelSpeed': data.left_turn,
'RR_WheelSpeed': data.right_turn
}
elif msg_id == 0x204:
message = dbc.get_message_by_frame_id(0x204)
data_dict = {
'Pitching': data.height_sensor,
'LX_Hight': int(data.pitch_sensor * 10), # 根据协议可能需要调整
}
elif msg_id == 0x261:
message = dbc.get_message_by_frame_id(0x261)
data_dict = {
'LV_Sun': data.main_version,
'LV_zCode': data.sub_version,
'LV_Main': data.rev_version,
'Time': data.date
}
elif msg_id == 0x260:
message = dbc.get_message_by_frame_id(0x260)
data_dict = {
'Fault': data.total_fault,
'VCU_Sts': data.vcu_status,
'VCU_Service_Voltage': data.vehicle_start_status,
'CarStartState': data.vcu_voltage
}
encoded_message = message.encode(data_dict)
# 发布CAN消息
# can_msg = can.Message(arbitration_id=msg_id, data=encoded_message, is_extended_id=False)
# 发布CANFD消息
can_msg = can.Message(arbitration_id=msg_id, data=encoded_message, is_extended_id=False, is_fd=True)
send_can_message(can_bus, can_msg)
rate.sleep()
except rospy.ROSInterruptException:
pass
except Exception as e:
print(f"An error occurred: {e}")
finally:
can_bus.shutdown()
if __name__ == '__main__':
main()
运行需要安装一些can工具:
sudo apt install can-utils
pip install can cantools
这里面通过设置fd的bool值来确定发送can还是canfd消息。
将车辆模型的相关信息通过ros topic发送到can处理接口后,就能够实现通过控制can信号的值来控制我们的车辆了。很不错。
另外,还是查看所有can口的命令:
ip addr | grep "can"
我这里有两个虚拟can口
可以通过
sudo ip link delete vcan0
来删除can口
作者:白云千载尽