ROS2基础函数详解:理论与实践结合,探索单片机架构下的应用
博主最近在学ROS2,一起学啊~
目录
Publisher:
话题通讯
Publisher:
"""
需求:以某个固定频率发送文本“hello world!”,文本后缀编号,每发送一条消息,编号递增1。
步骤:
1.导包;
2.初始化 ROS2 客户端;
3.定义节点类;
3-1.创建发布方;
3-2.创建定时器;
3-3.组织消息并发布。
4.调用spin函数,并传入节点对象;
5.释放资源。
"""
# 1.导包;
import rclpy
from rclpy.node import Node
#导入字符串类型
from std_msgs.msg import String
# 3.定义节点类;
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher_py')
# 3-1.创建发布方;
self.publisher_ = self.create_publisher(String, 'topic', 10)
# 3-2.创建定时器;
timer_period = 0.5
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
# 3-3.组织消息并发布。
def timer_callback(self):
msg = String()
msg.data = 'Hello World(py): %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('发布的消息: "%s"' % msg.data)
self.i += 1
def main(args=None):
# 2.初始化 ROS2 客户端;
rclpy.init(args=args)
# 4.调用spin函数,并传入节点对象;
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
# 5.释放资源。
rclpy.shutdown()
if __name__ == '__main__':
main()
super().__init__('minimal_publisher_py'):调用父类的初始化方法,设置节点名称为"minimal_publisher_py"
self.create_publisher() 方法用于创建一个发布者(Publisher),它允许节点将消息发送到指定的主题。这个方法返回一个 Publisher 对象,你可以通过这个对象来发布消息。
self.create_timer() 方法用于创建一个定时器(Timer),它会在指定的时间间隔后触发一个回调函数。这个方法返回一个 Timer 对象,你可以通过这个对象来管理定时器的行为。
rclpy.spin(node) 的主要作用是进入一个循环,等待并处理 ROS2 的事件。
__init__
中info输出一次是因为__init__
方法只在创建类实例时被调用一次,rclpy.spin()
的作用是让节点持续运行并处理回调函数,而不是重新创建节点实例。
Publisher 对象提供了以下功能:
-
发布消息:通过调用
publish(msg)
方法,将消息发送到指定的主题。 -
获取主题名称:通过
topic_name
属性,获取发布者绑定的主题名称。 -
获取队列大小:通过
qos_profile
属性,获取发布者的 QoS(Quality of Service)配置,包括队列大小等参数。
对于self.publisher的调用:
-
创建发布者
Python
复制
self.publisher_ = self.create_publisher(String, 'topic', 10)
-
String
:消息类型,表示发布者将发送std_msgs.msg.String
类型的消息。 -
'topic'
:主题名称,表示消息将被发送到名为topic
的主题。 -
10
:队列大小,表示发布者可以缓冲最多 10 条未发送的消息。 -
发布消息
Python
复制
self.publisher_.publish(msg)
-
通过
publish(msg)
方法,将消息msg
发送到指定的主题。 -
获取主题名称
Python
复制
topic_name = self.publisher_.topic_name
-
通过
topic_name
属性,可以获取发布者绑定的主题名称。 -
获取 QoS 配置
Python
复制
qos_profile = self.publisher_.qos_profile
-
通过
qos_profile
属性,可以获取发布者的 QoS 配置,包括队列大小等参数。
Timer 对象提供了以下功能:
-
触发回调函数:定时器会在指定的时间间隔后触发指定的回调函数。
-
获取定时器的周期:通过
timer_period_ns
属性,获取定时器的时间间隔(以纳秒为单位)。 -
取消定时器:通过
cancel()
方法,停止定时器的触发。 -
检查定时器是否已取消:通过
is_canceled()
方法,检查定时器是否已被取消。 -
重新启动定时器:通过
reset()
方法,重新启动已取消的定时器。
timer_period_ns
:定时器的时间间隔,以纳秒为单位。
Python
复制
timer_period_ns = self.timer.timer_period_ns
cancel()
:取消定时器,停止回调函数的触发。
Python
复制
self.timer.cancel()
is_canceled()
:检查定时器是否已被取消。
Python
复制
if self.timer.is_canceled():
self.get_logger().info('Timer is canceled')
reset()
:重新启动已取消的定时器。
Python
复制
self.timer.reset()
我们来类比一下嵌入式:建立节点和发布者就和配置外设一样,定时发送就是调用函数。
一个简单的定时发送demo:(这么看ROS不过是换了种语言和表达逻辑)
uint8_t count=0;
char msg[10];
int main(void)
{
Timer_Init();
UART_Init();
while(1)
{
String_Send(msg);
printf("发布的消失是%s",msg);
}
}
void TIM2_IRQHandler(void)//定时器2中断1s
{
char numStr[10],str[10];
if (TIM_GetITStatus(TIM2, TIM_IT_Update) == SET)
{
//发送消息
// 将数值转换为字符串
sprintf(numStr, "%d", count);
// 初始化目标字符串
strcpy(msg, "helloword");
// 拼接数值字符串
strcat(msg, numStr);
//计数
count++;
TIM_ClearITPendingBit(TIM2, TIM_IT_Update);
}
}
Subscription:
# 1.导包;
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
# 3.定义节点类;
class Listener(Node):
def __init__(self):
super().__init__('listener_node_py')
self.get_logger().info('订阅方创建了py')
# 3-1.创建订阅方;
"""
参数:
消息类型
话题名称
回调函数
Qos队列长度
"""
self.subscription = self.create_subscription(String,"chatter",self.do_callback,10)
self.subscription # 防止未使用的警告
def do_callback(self,msg):
# 3-2.处理订阅到的消息。
self.get_logger().info("订阅的数据:%s" % msg.data)
def main():
rclpy.init()
rclpy.spin(Listener())
rclpy.shutdown()
if __name__ == 'main':
main()
在 ROS 2 中,self.create_subscription()
是节点类(Node
)的一个方法,用于创建一个订阅者(Subscriber),以便节点能够接收来自特定话题(Topic)的消息。有4个参数,见代码中注释,返回的是订阅者对象。
在简单的订阅场景中,可能不需要直接操作 Subscription
对象,但在一些高级场景中,它可能会很有用。以下是一些可能的用途:
-
动态取消订阅: 如果你需要在运行时动态取消订阅,可以通过调用
Subscription
对象的destroy()
方法来实现。例如:Python
复制
self.subscription.destroy()
-
检查订阅状态: 你可以通过
Subscription
对象的属性来检查订阅的状态。例如,self.subscription.topic_name
可以获取订阅的话题名称。 -
调试和日志记录: 在调试过程中,
Subscription
对象可以帮助你更好地理解订阅行为。例如,你可以打印出订阅的话题名称和消息类型self.get_logger().info(f"Subscribed to topic: {self.subscription.topic_name}")
服务通信
服务端
"""
需求:编写服务端,接收客户端发送请求,提取其中两个整型数据,相加后将结果响应回客户端。
步骤:
1.导包;
2.初始化 ROS2 客户端;
3.定义节点类;
3-1.创建服务端;
3-2.处理请求数据并响应结果。
4.调用spin函数,并传入节点对象;
5.释放资源。
"""
# 1.导包;
import rclpy
from rclpy.node import Node
from base_interfaces_demo.srv import AddInts
# 3.定义节点类;
class Service(Node):
def __init__(self):
super().__init__('minimal_service_py')
# 3-1.创建服务端;
self.srv = self.create_service(AddInts, 'add_ints', self.add_two_ints_callback)
self.get_logger().info("服务端启动!")
# 3-2.处理请求数据并响应结果。
def add_two_ints_callback(self, request, response):
response.sum = request.num1 + request.num2
self.get_logger().info('请求数据:(%d,%d),响应结果:%d' % (request.num1, request.num2, response.sum))
return response
def main():
# 2.初始化 ROS2 客户端;
rclpy.init()
# 4.调用spin函数,并传入节点对象;
rclpy.spin(Service())
# 5.释放资源。
rclpy.shutdown()
if __name__ == '__main__':
main()
客户端
"""
需求:编写客户端,发送两个整型变量作为请求数据,并处理响应结果。
步骤:
1.导包;
2.初始化 ROS2 客户端;
3.定义节点类;
3-1.创建客户端;
3-2.等待服务连接;
3-3.组织请求数据并发送;
4.创建对象调用其功能,处理响应结果;
5.释放资源。
"""
# 1.导包;
import sys
import rclpy
from rclpy.logging import get_logger
from rclpy.node import Node
from base_interfaces_demo.srv import AddInts
# 3.定义节点类;
class Client(Node):
def __init__(self):
super().__init__('minimal_client_py')
# 3-1.创建客户端;
self.cli = self.create_client(AddInts, 'add_ints')
# 3-2.等待服务连接;
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('服务连接中,请稍候...')
self.req = AddInts.Request()
# 3-3.组织请求数据并发送;
def send_request(self):
self.req.num1 = int(sys.argv[1])
self.req.num2 = int(sys.argv[2])
self.future = self.cli.call_async(self.req)
def main():
#校验工作
if len(sys.argv)!= 3:
get_logger("rclpy").info("请提交2个整形参数")
# 2.初始化 ROS2 客户端;
rclpy.init()
# 4.创建对象并调用其功能;
minimal_client = Client()
minimal_client.send_request()
# 处理响应
rclpy.spin_until_future_complete(minimal_client,minimal_client.future)
try:
response = minimal_client.future.result()
except Exception as e:
minimal_client.get_logger().info(
'服务请求失败: %r' % (e,))
else:
minimal_client.get_logger().info(
'响应结果: %d + %d = %d' %
(minimal_client.req.num1, minimal_client.req.num2, response.sum))
# 5.释放资源。
rclpy.shutdown()
if __name__ == '__main__':
main()
作者:自动化小秋葵