当前位置: 首页 > news >正文

ROS2-话题学习

强烈推荐教程:

《ROS 2机器人开发从入门到实践》3.2.2订阅小说并合成语音_哔哩哔哩_bilibili

构建功能包

# create package demo_python_pkg

ros2 pkg create --build-type ament_python --license Apache-2.0 demo_python_pkg

 

自己写的代码放在./demo_python_pkg/demo_python_pkg目录下

发布者

import rclpy
from rclpy.node import Node
import requests
from example_interfaces.msg import String
from queue import Queue


class NovelPubNode(Node):
    def __init__(self, node_name):
        super().__init__(node_name)
        self.novels_queue = Queue()
        self.get_logger().info("NovelPubNode has been started")
        self.novel_pub = self.create_publisher(String, "novel_topic", 10)

        self.timer = self.create_timer(5,self.timer_callback)


    def timer_callback(self):
        if not self.novels_queue.empty():
            line = self.novels_queue.get()
            msg = String()
            msg.data = line
            self.novel_pub.publish(msg)
            self.get_logger().info(f"Published novel: {line}")       

    def download_novel(self, url):
        response = requests.get(url)
        response.encoding = 'utf-8'
        text = response.text
        for text_line in text.splitlines():
            self.novels_queue.put(text_line)

        self.get_logger().info(f"Downloaded novel: {text}")



def main():
    rclpy.init()
    node = NovelPubNode("novel_pub_node")
    node.download_novel("http://0.0.0.0:8000/novel_1.txt")
    rclpy.spin(node)
    node.shutdown()

 

订阅者

import time
import espeakng
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String
from queue import Queue
import threading


class NovelSubNode(Node):
    def __init__(self, node_name):
        super().__init__(node_name)
        self.get_logger().info("NovelSubNode has been created!")
        self.novels_queue = Queue()
        self.create_subscription(String, "novel_topic", self.novel_callback, 10)
        self.say_thread = threading.Thread(target=self.say)
        self.say_thread.start()
        


    def novel_callback(self, msg):
        self.novels_queue.put(msg.data)
        


    def say(self):
        engine = espeakng.Speaker()
        engine.voice = "zh"
        while rclpy.ok():
            if not self.novels_queue.empty():
                novel = self.novels_queue.get()
                engine.say(novel)
                print("Said: " + novel)
                engine.wait()
            else:
                time.sleep(1)



def main():
    rclpy.init()
    node = NovelSubNode("novel_sub_node")
    rclpy.spin(node)
    rclpy.shutdown()
    

代码完成后,配置setup.py文件

格式为:"名字 = 包名.文件名:函数名"

在根目录运行以下终端命令

# build package demo_python_pkg

colcon build

# source setup.bash

source install/setup.bash

运行以上命令后,得到build、install、log文件夹

可执行的节点文件在以下文件夹

./install/demo_python_pkg/lib/demo_python_pkg

运行节点命令

ros2 run demo_python_pkg python_pub_node

ros2 run demo_python_pkg python_sub_node

 

其他常用命令

# check if node is running

ros2 node list

# check if topic is published

ros2 topic list

# check topic content

ros2 topic echo /novel_topic

# check topic speed

ros2 topic hz /novel_topic

# check if service is available

ros2 service list

相关文章:

  • RabbitMQ高级特性--消息确认机制
  • [网络爬虫] 动态网页抓取 — Selenium 入门操作
  • 搞定python之一----开发环境配置
  • AtCoder Beginner Contest 396(ABCDEF)
  • 【LLM】大模型推理、微调显卡挑选一览表
  • 【论文解读】《LIMO: Less is More for Reasoning》
  • PHP的Workerman 和 Java 常驻内存的相似性
  • Java【网络原理】(3)网络编程续
  • 如何避免项目后期盲目加人赶工
  • 机试准备第13天
  • 知识蒸馏综述Knowledge Distillation: A Survey解读
  • 国产算力助力工业智能新范式
  • PyTorch 学习路线
  • 探秘 Netty 通信中的 SslHandler 类:保障网络通信安全的基石
  • 【MySQL】发展起源与核心架构组件详细介绍
  • STL之list的使用(超详解)
  • 【时间序列】Patch:到底是什么?
  • 【极光 Orbit•STC8A-8H】03. 小刀初试:点亮你的LED灯
  • 数字信号处理之信号功率谱计算welch方法(分段加窗平均周期图)、Bartlett方法(周期图)(Python)
  • 以网络形式启动Linux系统后,通过挂载(mount)的方式,将eMMC存储器中旧的根文件系统所在逻辑2分区格式化,并解压新的根文件系统到逻辑2分区
  • 东莞网站建设设/网站搜索优化公司
  • 做网站卖掉/公司推广
  • php动态网站开发案例教程/百度软件下载中心官方网站
  • 做机械毕业设计哪个网站好/百度做广告费用
  • 专业建站公司报价/实时积分榜
  • 全国最大的网站建设公司排名/深圳百度总部