ROS2基础函数详解:理论与实践结合,探索单片机架构下的应用

博主最近在学ROS2,一起学啊~

目录

Publisher:


话题通讯

Publisher:

"""  
    需求:以某个固定频率发送文本“hello world!”,文本后缀编号,每发送一条消息,编号递增1。
    步骤:
        1.导包;
        2.初始化 ROS2 客户端;
        3.定义节点类;
            3-1.创建发布方;
            3-2.创建定时器;
            3-3.组织消息并发布。
        4.调用spin函数,并传入节点对象;
        5.释放资源。
"""
# 1.导包;
import rclpy
from rclpy.node import Node
#导入字符串类型
from std_msgs.msg import String
 
 
# 3.定义节点类;
class MinimalPublisher(Node):
 
    def __init__(self):
        super().__init__('minimal_publisher_py')
        # 3-1.创建发布方;
        self.publisher_ = self.create_publisher(String, 'topic', 10)
        # 3-2.创建定时器;
        timer_period = 0.5 
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0
 
    # 3-3.组织消息并发布。
    def timer_callback(self):
        msg = String()
        msg.data = 'Hello World(py): %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('发布的消息: "%s"' % msg.data)
        self.i += 1
 
 
def main(args=None):
    # 2.初始化 ROS2 客户端;
    rclpy.init(args=args)
    # 4.调用spin函数,并传入节点对象;
    minimal_publisher = MinimalPublisher()
    rclpy.spin(minimal_publisher)
    # 5.释放资源。
    rclpy.shutdown()
 
 
if __name__ == '__main__':
    main()

super().__init__('minimal_publisher_py'):调用父类的初始化方法,设置节点名称为"minimal_publisher_py"

self.create_publisher() 方法用于创建一个发布者(Publisher),它允许节点将消息发送到指定的主题。这个方法返回一个 Publisher 对象,你可以通过这个对象来发布消息。

self.create_timer() 方法用于创建一个定时器(Timer),它会在指定的时间间隔后触发一个回调函数。这个方法返回一个 Timer 对象,你可以通过这个对象来管理定时器的行为。

rclpy.spin(node) 的主要作用是进入一个循环,等待并处理 ROS2 的事件。

__init__ 中info输出一次是因为__init__ 方法只在创建类实例时被调用一次,rclpy.spin() 的作用是让节点持续运行并处理回调函数,而不是重新创建节点实例。


Publisher 对象提供了以下功能:

  1. 发布消息:通过调用 publish(msg) 方法,将消息发送到指定的主题。

  2. 获取主题名称:通过 topic_name 属性,获取发布者绑定的主题名称。

  3. 获取队列大小:通过 qos_profile 属性,获取发布者的 QoS(Quality of Service)配置,包括队列大小等参数。

对于self.publisher的调用:

  1. 创建发布者

    Python

    复制

    self.publisher_ = self.create_publisher(String, 'topic', 10)
  2. String:消息类型,表示发布者将发送 std_msgs.msg.String 类型的消息。

  3. 'topic':主题名称,表示消息将被发送到名为 topic 的主题。

  4. 10:队列大小,表示发布者可以缓冲最多 10 条未发送的消息。

  5. 发布消息

    Python

    复制

    self.publisher_.publish(msg)
  6. 通过 publish(msg) 方法,将消息 msg 发送到指定的主题。

  7. 获取主题名称

    Python

    复制

    topic_name = self.publisher_.topic_name
  8. 通过 topic_name 属性,可以获取发布者绑定的主题名称。

  9. 获取 QoS 配置

    Python

    复制

    qos_profile = self.publisher_.qos_profile
  10. 通过 qos_profile 属性,可以获取发布者的 QoS 配置,包括队列大小等参数。


  Timer 对象提供了以下功能:

  1. 触发回调函数:定时器会在指定的时间间隔后触发指定的回调函数。

  2. 获取定时器的周期:通过 timer_period_ns 属性,获取定时器的时间间隔(以纳秒为单位)。

  3. 取消定时器:通过 cancel() 方法,停止定时器的触发。

  4. 检查定时器是否已取消:通过 is_canceled() 方法,检查定时器是否已被取消。

  5. 重新启动定时器:通过 reset() 方法,重新启动已取消的定时器。

  • timer_period_ns:定时器的时间间隔,以纳秒为单位。

    Python

    复制

    timer_period_ns = self.timer.timer_period_ns
  • cancel():取消定时器,停止回调函数的触发。

    Python

    复制

    self.timer.cancel()
  • is_canceled():检查定时器是否已被取消。

    Python

    复制

    if self.timer.is_canceled():
        self.get_logger().info('Timer is canceled')
  • reset():重新启动已取消的定时器。

    Python

    复制

    self.timer.reset()

  • 我们来类比一下嵌入式:建立节点和发布者就和配置外设一样,定时发送就是调用函数。

    一个简单的定时发送demo:(这么看ROS不过是换了种语言和表达逻辑)

    uint8_t count=0;
    char msg[10];
    
    int main(void)
    {
    	Timer_Init();
    	UART_Init();
        
    	while(1)
    	{
    		String_Send(msg);
            printf("发布的消失是%s",msg);
    	}
    }
    
    void TIM2_IRQHandler(void)//定时器2中断1s
    {
            char numStr[10],str[10];
    
    		if (TIM_GetITStatus(TIM2, TIM_IT_Update) == SET)
    		{
                
                //发送消息
                 // 将数值转换为字符串
                sprintf(numStr, "%d", count);
                // 初始化目标字符串
                strcpy(msg, "helloword");
                // 拼接数值字符串
                strcat(msg, numStr);
                //计数
                count++;
    
    			TIM_ClearITPendingBit(TIM2, TIM_IT_Update);
    		}
    }
    

    Subscription:

    # 1.导包;
    import rclpy
    from rclpy.node import Node
    from std_msgs.msg import String
    
    
    # 3.定义节点类;
    class Listener(Node):
    
        def __init__(self):
            super().__init__('listener_node_py')
            self.get_logger().info('订阅方创建了py')
            # 3-1.创建订阅方;
            """
            参数:
                消息类型
                话题名称
                回调函数
                Qos队列长度
            """
            self.subscription = self.create_subscription(String,"chatter",self.do_callback,10)
    
            self.subscription  # 防止未使用的警告
    
        def do_callback(self,msg):       
            # 3-2.处理订阅到的消息。
            self.get_logger().info("订阅的数据:%s" % msg.data)
            
     
    
    
    def main():
    
        rclpy.init()
    
        rclpy.spin(Listener())
    
        rclpy.shutdown()
    
    
    
    if __name__ == 'main':
        main()

    在 ROS 2 中,self.create_subscription() 是节点类(Node)的一个方法,用于创建一个订阅者(Subscriber),以便节点能够接收来自特定话题(Topic)的消息。有4个参数,见代码中注释,返回的是订阅者对象。

    在简单的订阅场景中,可能不需要直接操作 Subscription 对象,但在一些高级场景中,它可能会很有用。以下是一些可能的用途:

    1. 动态取消订阅: 如果你需要在运行时动态取消订阅,可以通过调用 Subscription 对象的 destroy() 方法来实现。例如:

      Python

      复制

      self.subscription.destroy()
    2. 检查订阅状态: 你可以通过 Subscription 对象的属性来检查订阅的状态。例如,self.subscription.topic_name 可以获取订阅的话题名称。

    3. 调试和日志记录: 在调试过程中,Subscription 对象可以帮助你更好地理解订阅行为。例如,你可以打印出订阅的话题名称和消息类型

      self.get_logger().info(f"Subscribed to topic: {self.subscription.topic_name}")

    服务通信

    服务端

    """  
        需求:编写服务端,接收客户端发送请求,提取其中两个整型数据,相加后将结果响应回客户端。
        步骤:
            1.导包;
            2.初始化 ROS2 客户端;
            3.定义节点类;
                3-1.创建服务端;
                3-2.处理请求数据并响应结果。
            4.调用spin函数,并传入节点对象;
            5.释放资源。
    
    """
    
    # 1.导包;
    import rclpy
    from rclpy.node import Node
    from base_interfaces_demo.srv import AddInts
    
    # 3.定义节点类;
    class Service(Node):
    
        def __init__(self):
            super().__init__('minimal_service_py')
            # 3-1.创建服务端;
            self.srv = self.create_service(AddInts, 'add_ints', self.add_two_ints_callback)
            self.get_logger().info("服务端启动!")
    
        # 3-2.处理请求数据并响应结果。
        def add_two_ints_callback(self, request, response):
            response.sum = request.num1 + request.num2
            self.get_logger().info('请求数据:(%d,%d),响应结果:%d' % (request.num1, request.num2, response.sum))
            return response
    
    
    def main():
        # 2.初始化 ROS2 客户端;
        rclpy.init()
        # 4.调用spin函数,并传入节点对象;
        rclpy.spin(Service())
        # 5.释放资源。
        rclpy.shutdown()
    
    
    if __name__ == '__main__':
        main()

    客户端

    """  
        需求:编写客户端,发送两个整型变量作为请求数据,并处理响应结果。
        步骤:
            1.导包;
            2.初始化 ROS2 客户端;
            3.定义节点类;
                3-1.创建客户端;
                3-2.等待服务连接;
                3-3.组织请求数据并发送;
            4.创建对象调用其功能,处理响应结果;
            5.释放资源。
    
    """
    # 1.导包;
    import sys
    import rclpy
    from rclpy.logging import get_logger
    from rclpy.node import Node
    from base_interfaces_demo.srv import AddInts
    
    # 3.定义节点类;
    class Client(Node):
    
        def __init__(self):
            super().__init__('minimal_client_py')
            # 3-1.创建客户端;
            self.cli = self.create_client(AddInts, 'add_ints')
            # 3-2.等待服务连接;
            while not self.cli.wait_for_service(timeout_sec=1.0):
                self.get_logger().info('服务连接中,请稍候...')
            self.req = AddInts.Request()
    
        # 3-3.组织请求数据并发送;
        def send_request(self):
            self.req.num1 = int(sys.argv[1])
            self.req.num2 = int(sys.argv[2])
            self.future = self.cli.call_async(self.req)
    
    
    def main():
        #校验工作
        if len(sys.argv)!= 3:
            get_logger("rclpy").info("请提交2个整形参数")
        # 2.初始化 ROS2 客户端;
        rclpy.init()
    
        # 4.创建对象并调用其功能;
        minimal_client = Client()
        minimal_client.send_request()
    
        # 处理响应
        rclpy.spin_until_future_complete(minimal_client,minimal_client.future)
        try:
            response = minimal_client.future.result()
        except Exception as e:
            minimal_client.get_logger().info(
                '服务请求失败: %r' % (e,))
        else:
            minimal_client.get_logger().info(
                '响应结果: %d + %d = %d' %
                (minimal_client.req.num1, minimal_client.req.num2, response.sum))
    
        # 5.释放资源。
        rclpy.shutdown()
    
    
    if __name__ == '__main__':
        main()

    作者:自动化小秋葵

    物联沃分享整理
    物联沃-IOTWORD物联网 » ROS2基础函数详解:理论与实践结合,探索单片机架构下的应用

    发表回复