ROS2学习
前言
本篇文章属于ROS2humble的学习笔记,来源于B站鱼香ROSup主。下面是这位up主的视频链接。本文为个人学习笔记,只能做参考,细节方面建议观看视频,肯定受益匪浅。
《ROS 2机器人开发从入门到实践》课程介绍_哔哩哔哩_bilibili
一、项目一(下载小说,并通过话题间隔5s发送一行)
(1)创建工作空间
在终端中输入:
mkdir -p topic_ws/src
(2)在src目录下创建软件包
处于src目录下,在终端中输入:
ros2 pkg create demo_python_topic --build-type ament_python --dependencies rclpy example_interfaces --license Apache-2.0
回到topic_ws目录下,再输入:
colcon build 构建文件
(3)在有__init__.py文件的目录下创建节点文件novel_pub_node.py,节点代码部分如下
import rclpy
from rclpy.node import Node
import requests
from example_interfaces.msg import String
from queue import Queue
class NovelNode(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.get_logger().info(f'{node_name},启动!')
self.novel_queue = Queue() #创建队列
self.novel_publisher = self.create_publisher(String, 'novel', 100)
self.create_timer(5, self.timer_callback)
def timer_callback(self):
if self.novel_queue.qsize()>0:
line = self.novel_queue.get()
msg = String()
msg.data = line
self.novel_publisher.publish(msg)
self.get_logger().info(f'发布了:{msg}')
def download(self, url):
responese = requests.get(url)
responese.encoding = 'utf-8'
text = responese.text
self.get_logger().info(f'下载{url},{len(text)}')
for line in text.splitlines():
self.novel_queue.put(line)
def main():
rclpy.init()
node = NovelNode('novel_pub')
node.download('https://fanqienovel.com/reader/7173216089122439711?enter_from=page')
rclpy.spin(node)
rclpy.shutdown()
(4)执行代码
先找到
entry_points={
'console_scripts': [
],
},
改为
entry_points={
'console_scripts': [
'novel_pub_node=demo_python_topic.novel_pub_node:main'
],
},
等号左边是可执行文件的名字,等号右边是软件包名和节点名
然后回到 topic_ws目录下,再输入:
colcon build 构建文件
再在终端中输入source install/setup.bash
修改一下环境变量
再运行即可:ros2 run demo_python_topic novel_pub_node
二、项目二(订阅小说并合成语音)
import espeakng
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String
from queue import Queue
import threading
import time
class NovelSubNode(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.get_logger().info(f'{node_name},启动!')
self.novel_queue = Queue()
self.create_subscription(String, 'novel', self.novel_callback, 10)
self.speech_thread_ = threading.Thread(target=self.speaker_thread)
self.speech_thread_.start()
def novel_callback(self, msg):
self.novel_queue.put(msg.data)
pass
def speaker_thread(self):
speaker = espeakng.Speaker()
speaker.voice = 'zh'
while rclpy.ok(): #检测ROS当前上下文是否ok
if self.novel_queue.qsize()>0:
text = self.novel_queue.get()
self.get_logger().info(f'朗读:{text}')
speaker.say(text) #说
speaker.wait() #等说完
else:
#让当前线程休眠1s
time.sleep(1)
def main():
rclpy.init()
node = NovelSubNode('novel_sub')
rclpy.spin(node)
rclpy.shutdown()
espeakng为朗读引入的库
注意当前线程休眠1s的操作很关键,这能降低CPU功耗