ros2笔记-3.2python 话题订阅与发布

3.1 话题通信介绍

话题发布订阅模型,有4个关键点:发布者、订阅者、话题名称、话题类型

bohu@bohu-TM1701:~$ ros2 node info /turtlesim 
There are 2 nodes in the graph with the exact name "/turtlesim". You are seeing information about only one of them.
/turtlesim
  Subscribers:
    /parameter_events: rcl_interfaces/msg/ParameterEvent
    /turtle1/cmd_vel: geometry_msgs/msg/Twist
  Publishers:
    /parameter_events: rcl_interfaces/msg/ParameterEvent
    /rosout: rcl_interfaces/msg/Log
    /turtle1/color_sensor: turtlesim/msg/Color
    /turtle1/pose: turtlesim/msg/Pose

在小海龟模拟器节点启动后,可以查看节点信息,就有Publishers发布者、Subscribers接受者话题。

其中:ros2 topic echo /turtle1/pose 可以查看小海龟的位姿信息。

x: 2.833137035369873
y: 3.6268489360809326
theta: -2.7248146533966064
linear_velocity: 0.0
angular_velocity: 0.0

通过话题而不是之前的键盘也能控制小海龟移动。

ros2 topic pub /turtle1/cmd_vel geometry_msgs/msg/Twist "{linear: {x: 1.0, y: 0.0} , angular: {z: -1.0}}"

这个可以让小海龟转圈。

3.2 python话题订阅与发布

需求背景:先创建一个节点,用来下载和发布novel话题,然后再创建一个小说阅读节点。用于订阅novel话题,合成语音并进行播放。

3.2.1 通过话题发布小说

准备工作:

上一节那样打开终端,启动本地web服务器python3 -m http.server

编写业务代码,在3/topic_ws/src文件夹下,打开终端,创建demo_python_topic功能包

bohu@bohu-TM1701:~/3/topic_ws/src$ ros2 pkg create demo_python_topic --build-type ament_python --dependencies rclpy example_interfaces --license Apache-2.0
going to create a new package
package name: demo_python_topic
destination directory: /home/bohu/3/topic_ws/src
package format: 3
version: 0.0.0
description: TODO: Package description
maintainer: ['bohu <13508678+bohu83@user.noreply.gitee.com>']
licenses: ['Apache-2.0']
build type: ament_python
dependencies: ['rclpy', 'example_interfaces']
creating folder ./demo_python_topic
creating ./demo_python_topic/package.xml
creating source folder
creating folder ./demo_python_topic/demo_python_topic
creating ./demo_python_topic/setup.py
creating ./demo_python_topic/setup.cfg
creating folder ./demo_python_topic/resource
creating ./demo_python_topic/resource/demo_python_topic
creating ./demo_python_topic/demo_python_topic/__init__.py
creating folder ./demo_python_topic/test
creating ./demo_python_topic/test/test_copyright.py
creating ./demo_python_topic/test/test_flake8.py
creating ./demo_python_topic/test/test_pep257.py

注意增加了2个依赖。在创建完成后再package.xml会发现。

创建完功能包之后,在src/demo_python_topic/demo_python_topic下创建novel_pub_node.py文件,代码如下

import rclpy
from rclpy.node import Node
import requests
from example_interfaces.msg import String
from queue import Queue

class NovelPubNode(Node):
    def __init__(self, node_name):
        super().__init__(node_name)
        self.get_logger().info(f'{node_name},启动')
        self.novels_queue_ = Queue() #创建队列
        self.novel_publisher_ = self.create_publisher(String,'novel',10)#发布者
        self.timer_ = self.create_timer(5,self.timer_callback)
       

    def timer_callback(self):
        if self.novels_queue_.qsize()>0:
           line = self.novels_queue_.get()
           msg = String();#组装消息
           msg.data = line;
           self.novel_publisher_.publish(msg)
           self.get_logger().info(f'发布了:{msg}')   

    def download(self,url):
        reponse = requests.get(url)
        reponse.encoding = 'utf-8'
        text = reponse.text
        self.get_logger().info(f'下载完成‘{url},{len(text)}’')
        for line in text.splitlines():
            self.novels_queue_.put(line)
        
     

def main():
    rclpy.init()
    node = NovelPubNode('novel_pub')
    node.download('http://0.0.0.0:8000/novel1.txt')
    rclpy.spin(node)
    rclpy.shutdown()    

代码不长,可是我跟着视频敲了2遍才能跑起来有些不起眼有没提示的错误运行时会报错,下载逻辑跟之前一样,处理逻辑增加了按行处理,把每行放入队列。队列是个先进先出的数据结构,适合此业务场景。回调函数timer_callback 从队列获取数据,组装为String格式。调用发布者发布。

再setup.py注册novel_pub_node节点,并构建,运行novel_pub_node.

bohu@bohu-TM1701:~/3/topic_ws$ ros2 run demo_python_topic novel_pub_node 
[INFO] [1736161295.090347220] [novel_pub]: novel_pub,启动
[INFO] [1736161295.097754661] [novel_pub]: 下载完成‘http://0.0.0.0:8000/novel1.txt,447’
[INFO] [1736161300.097022327] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。1')
[INFO] [1736161305.093794085] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。2')
[INFO] [1736161310.095291773] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。3')
[INFO] [1736161315.094403770] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。4')
[INFO] [1736161320.095770036] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。5')
[INFO] [1736161325.093794607] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。6')
[INFO] [1736161330.093633498] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。7')
[INFO] [1736161335.094141394] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。8')
[INFO] [1736161340.094232710] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。9')
[INFO] [1736161345.093890803] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。10')
[INFO] [1736161350.095869992] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。11')
[INFO] [1736161355.093779808] [novel_pub]: 发布了:example_interfaces.msg.String(data='CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。12')

可以看到小说已经被成功下载和发布。再观察下话题是否发布。

bohu@bohu-TM1701:~$ ros2 topic list
/novel
/parameter_events
/rosout

bohu@bohu-TM1701:~$ ros2 topic echo /novel
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。2
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。3
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。4
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。5
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。6
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。7
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。8
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。9
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。10
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。11
---
data: CQ,CQ,CQ,这里是BH8VYW,这里是BH8VYW,收到请回答。12
---

可以看到程序运行结果正常输出了小说内容。

3.2.2 订阅小说并合成语音

先安装依赖 sudo apt install python3-pip -y

pip3 install espeakng

sudo apt install espeak-ng

在src/demo_python_topic/demo_python_topic下创建novelsub_node.py文件。代码如下所示

import espeakng
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String
from queue import Queue
import threading
import time

class NovelSubNode(Node):
    def __init__(self, node_name):
        super().__init__(node_name)
        self.get_logger().info(f'{node_name},启动')
        self.novels_queue_ = Queue()
        self.novel_subscriber_ = self.create_subscription(String,'novel',self.novel_callback,10)
        self.speaker_thread_ = threading.Thread(target=self.speaker_thread)
        self.speaker_thread_.start()

    def novel_callback(self,msg):
        self.novels_queue_.put(msg.data )

    def speaker_thread(self):
        speaker = espeakng.Speaker()
        speaker.voice = 'zh'
        
        while rclpy.ok(): #检测当前ros上下文是否ok
            if  self.novels_queue_.qsize()>0:
                text = self.novels_queue_.get()
                self.get_logger().info(f'朗读:{text}')
                speaker.say(text) #说
                speaker.wait() # 等他说完
            else:
                # 让当前线程休眠1s
                time.sleep(1)    

def main():
    rclpy.init()
    node = NovelSubNode('novel_sub')
    rclpy.spin(node)
    rclpy.shutdown()    

相对上面的代码,基本结构类似,多了语音合成库espeakng.

因为朗读速度没有接受的快,所以使用队列存储接收到的数据。从话题接收到数据回调novel_callback就是往队列放。而朗读线程对应方法speaker_thread,创建了speaker对象,设置了声音为中文,调用rclpy.ok(): 检测当前ros上下文是否ok,不断循环判断队列是否有数据,有数据调用say进行朗读。没有数据就休眠1秒。

修改setup.py,注册节点,分别编译运行 

ros2 run demo_python_topic novel_sub_node

ros2 run demo_python_topic novel_pub_node

就能听见合成语音了。

作者:bohu83

物联沃分享整理
物联沃-IOTWORD物联网 » ros2笔记-3.2python 话题订阅与发布

发表回复