ROS2 话题通信——自定义消息类型(Python)
1、创建工作空间
首先,在根目录下面创建一个新的工作空间test_ws
和新的功能包topic_pkg
,然后新建脚本文件topic_interface_pub.py和topic_interface_sub.py,然后用VScode打开test_ws文件夹。
mkdir -p ./text_ws/src
cd text_ws/src
ros2 pkg create topic_pkg --build-type ament_python --dependencies rclpy std_msgs --node-name topic_interface_pub
touch ./topic_pkg/topic_pkg/topic_interface_sub.py
2、测试一下简单的话题通信
编写发布者节点,复制下面代码到 topic_interface_pub.py
'''
每隔一个固定频率发送helloworld
'''
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from std_msgs.msg import String # 字符串消息类型
"""
创建一个发布者节点
"""
class PublisherNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.count = 0 #设置计数器
self.pub = self.create_publisher(String, "chatter", 10) # 创建发布者对象(消息类型、话题名、队列长度)
self.timer = self.create_timer(0.5, self.timer_callback) # 创建一个定时器(单位为秒的周期,定时执行的回调函数)
def timer_callback(self): # 创建定时器周期执行的回调函数
msg = String() # 创建一个String类型的消息对象
msg.data = 'Hello World' + str(self.count) # 填充消息对象中的消息数据
self.pub.publish(msg) # 发布话题消息
self.get_logger().info('Publishing: "%s"' % msg.data) # 输出日志信息,提示已经完成话题发布
self.count += 1
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = PublisherNode("topic_helloworld_pub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
if __name__ == '__main__':
main()
编写订阅者节点,复制下面代码到 topic_interface_sub.py
"""
每隔一个固定频率订阅helloworld
"""
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from std_msgs.msg import String # ROS2标准定义的String消息
"""
创建一个订阅者节点
"""
class SubscriberNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.sub = self.create_subscription(\
String, "chatter", self.listener_callback, 10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
def listener_callback(self, msg): # 创建回调函数,执行收到话题消息后对数据的处理
self.get_logger().info('I heard: "%s"' % msg.data) # 输出日志信息,提示订阅收到的话题消息
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = SubscriberNode("topic_helloworld_sub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
if __name__ == '__main__':
main()
在setup.py中添加订阅者节点的配置
编译测试一下,分别打开两个终端,在text_ws目录下,进行编译和运行对应节点
colcon build
source install/setup.bash
ros2 run topic_pkg topic_interface_pub
colcon build
source install/setup.bash
ros2 run topic_pkg topic_interface_sub
测试成功
3、自定义接口功能包
进入src目录,然后创建接口功能包interface_pkg,然后在 interface_pkg功能包下创建msg文件夹,并且创建一个Student.msg文件,注意该文件名首字母必须大写
cd ./src/
ros2 pkg create interface_pkg --build-type ament_cmake
mkdir -p ./interface_pkg/msg
touch ./interface_pkg/msg/Student.msg
在Student.msg文件下自定义数据格式
string name
int32 age
float64 height
在CMakeLists.txt中添加以下代码
# 编译的依赖包
find_package(rosidl_default_generators REQUIRED)
#为接口文件生成源代码,接口名字首字母必须大写
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Student.msg"
)
在package.xml中添加以下代码
<!--编译依赖-->
<build_depend>rosidl_default_generators</build_depend>
<!--执行依赖-->
<exec_depend>rosidl_default_runtime</exec_depend>
<!--声明当前所属的功能包组-->
<member_of_group>rosidl_interface_packages</member_of_group>
编译一下interface_pkg功能包
cd ..
colcon build --packages-select interface_pkg
打开install/interface_pkg目录,可以看见在include目录下面生成了C++对应的自定义接口头文件,在local目录下生成了Python对应的自定义接口库
重新编写发布者节点
'''
每隔一个固定频率发布学生信息
'''
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from interface_pkg.msg import Student # 自定义的服务接口
class PublisherNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.pub = self.create_publisher(Student, "chatter", 10) # 创建发布者对象(消息类型、话题名、队列长度)
self.timer = self.create_timer(0.5, self.timer_callback) # 创建一个定时器(单位为秒的周期,定时执行的回调函数)
def timer_callback(self): # 创建定时器周期执行的回调函数
stu = Student() # 创建一个String类型的消息对象
stu.name = "小明"
stu.age = 10
stu.height = 1.4
self.pub.publish(stu) # 发布话题消息
self.get_logger().info("学生信息: %s, %d, %.2f" % (stu.name,stu.age,stu.height)) # 输出日志信息,提示已经完成话题发布
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = PublisherNode("topic_student_pub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
if __name__ == '__main__':
main()
重新编写订阅者节点
"""
订阅学生信息
"""
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from interface_pkg.msg import Student # 自定义的服务接口
"""
创建一个订阅者节点
"""
class SubscriberNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.sub = self.create_subscription(\
Student, "chatter", self.listener_callback, 10) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、队列长度)
def listener_callback(self, stu): # 创建回调函数,执行收到话题消息后对数据的处理
self.get_logger().info("订阅到的学生信息: name = %s, age = %d, height = %.2f" % (stu.name,stu.age,stu.height)) # 输出日志信息,提示订阅收到的话题消息
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = SubscriberNode("topic_student_sub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
if __name__ == '__main__':
main()
由于自定义的接口库导入后VScode没有代码提示,所以需要后续的配置,找到/install/interface_pkg/local目录,右键在终端打开,然后输入pwd,复制该路径
打开设置,分别搜索 Python autoComplete extraPaths 和 Python analysis extraPaths ,然后在settings.json中编辑,然后添加刚刚复制的路径
分别开两个终端编译测试一下
colcon build --packages-select topic_pkg
source install/setup.bash
ros2 run topic_pkg topic_interface_pub
source install/setup.bash
ros2 run topic_pkg topic_interface_sub
可以看到测试成功,发布者成功发布消息,订阅者成功订阅到消息。
作者:SAW_Anan