ROS 2机器人开发--话题通信:订阅与发布
1 话题通信介绍
2 C++话题订阅与发布
2.1 发布速度控制海龟画圆
3 话题通信实战----制作一个可视化系统状态监听工具
3.1 准备工作
3.2 自定义通信接口
3.3 系统信息获取与发布
3.4 测试QT
3.5 数据可视化显示
1 话题通信介绍
在ROS 2中,发布和订阅是实现节点间通信的核心机制。节点可以通过发布消息到特定话题来共享信息,而其他节点则可以通过订阅这些话题来接收消息。这种通信模式是异步的,特别适合用于分布式系统。
ROS 2的话题通信机制涉及四个关键要素:发布者、订阅者、话题名称和话题类型。发布者可以类比为信息的生产者,订阅者则是信息的消费者。话题名称类似于信息的标识符,而话题类型则定义了消息的结构和内容格式。
下面以海龟模拟器为例, 进一步讲解话题通信。
按下 ctrl+alt+T 打开终端,输入代码如图,打开海龟模拟器。
ros2 run turtlesim turtlesim_node
打开后,打开一个新的终端,使用命令来查看turtlesim 节点的信息,输入代码:
ros2 node info
命令用于查询节点的详细信息。其中,Subscribers 部分列出了 /turtlesim
节点订阅的所有话题。可以看到,该节点订阅了名为 /turtle1/cmd_vel
的话题,用于接收控制指令,其消息类型为 geometry_msgs/msg/Twist
。
在 Publishers 部分,列出了该节点发布的所有话题。重点关注的是 /turtle1/pose
话题,它用于发布海龟的当前位置和速度信息,其消息类型为 turtlesim/msg/Pose
。至于话题下的服务和动作相关信息,暂时可以忽略。
为了实时接收并输出海龟的位姿信息,可以尝试订阅 /turtle1/pose
话题。在任意终端中输入以下命令:
ros2 topic echo /turtle1/pose
ros2 topic echo
是 ROS 2 中用于输出话题数据的命令。运行该命令后,它会实时显示接收到的话题数据。具体来说,x
和 y
表示海龟的位置坐标;theta
表示海龟的朝向角度;linear_velocity
代表海龟的线速度,即其前进或后退的速度,其中前进为正值,后退为负值;angular_velocity
表示海龟的角速度,即其绕自身旋转的速度,逆时针旋转为正值,顺时针旋转为负值。
通过命令行工具,不仅可以方便地查看话题数据,还可以发布数据。然而,在向某个话题发布数据之前,必须确定其消息接口。消息接口类似于公众号发文时的内容类型,发布前需要明确是要发送图片、视频还是图文信息。使用以下代码可以查看某个话题的详细信息。
ros2 topic info /turtle1/cmd_vel -v
可以看到其消息接口为Topic type:geometry_msgs/msg/Twist,接着使用如下代码来查看该消息接口的详细定义。
ros2 interface show geometry_msgs/msg/Twist
ros2 interface show
命令用于展示接口的定义详情,其具体格式将在后续自定义消息接口时进行详细说明。通过该命令的结果可以发现,geometry_msgs/msg/Twist
接口包含六个变量。其中,linear
代表线速度,其包含的 x
、y
和 z
三个变量分别对应机器人在三个方向上的运动速度,单位为米/秒(m/s)。在 ROS 中,机器人前进的方向被定义为 x 轴方向,因此 linear.x
表示机器人在前进方向上的速度。
angular
代表角速度,其包含的 x
、y
和 z
三个变量分别表示机器人绕 x、y 和 z 三个轴的旋转速度,单位为弧度/秒(rad/s)。由于海龟生活在二维空间中,它仅能绕 z 轴(即垂直向上的方向)进行旋转。明确了消息接口的具体内容后,就可以通过代码实现数据的发布。
使用命令发布线速度:
ros2 topic pub /turtle1/cmd_vel geometry_msgs/msg/Twist "{linear: {x: 1.0}}"
观察到海龟移动了 !
使用命令发布角速度:
ros2 topic pub /turtle1/cmd_vel geometry_msgs/msg/Twist "{angular: {z: 1.0}}"
观察到海龟旋转了 !
虽然可以在命令行里轻松实现话题的发布和订阅,但要想更灵活地使用话题,还需要学习如何在程序中使用。
2 C++话题订阅与发布
在实际的机器人开发项目中做运动控制时,C++ 用得比较多,因为Python是高级语言,运行效率比较低,不适合机器人底层代码开发。本节我们就利用C++ 和海龟模拟器,先通过话题发布速度,控制海龟画圆。
2.1 发布速度控制海龟画圆
当海龟模拟器节点运行起来后,会自动生成 一 只海龟。模拟器节点会订阅名称为/ turtle1/cmd_vel 的话题用于接收控制命令,该话题的接口类型是geometry_msgs/msg/Twist。
订阅的同时,该节点会发布其当前位置,该话题的接口类型是turtlesim/msg/Pose 。 为了能够在代码中订阅和发布对应的话题,在创建功能包时需要加入对geometry_msgs 和 turtlesim 的 依赖。打开终端,进入 chapt3/topic_ws/src 目录,输入代码来创建 demo_ cpp_topic 功能包。
ros2 pkg create demo_cpp_topic --build-type ament_cmake --dependencies rclcpp geometry_mgs turtlesim --license Apache-2.0
创建完功能包后,在src/demo_cpp_topic/src 下创建turtle_circle.cpp文件,在该文件中编写代码。如下:
#include "rclcpp/rclcpp.hpp" // 包含 ROS 2 的核心功能库
#include "geometry_msgs/msg/twist.hpp" // 包含 Twist 消息类型,用于控制乌龟的速度
#include <chrono> // 包含 C++ 标准库中的时间相关功能
using namespace std::chrono_literals; // 使用命名空间简化时间单位的表示
// 定义一个类,继承自 rclcpp::Node
class TurtleCircle : public rclcpp::Node
{
private:
rclcpp::TimerBase::SharedPtr timer_; // 定时器智能指针,用于周期性执行任务
rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr publisher_; // 发布者智能指针,用于发布 Twist 消息
public:
// 构造函数,初始化节点
explicit TurtleCircle(const std::string& node_name) : Node(node_name)
{
// 创建一个发布者,发布到 "/turtle1/cmd_vel" 话题,队列大小为 10
publisher_ = this->create_publisher<geometry_msgs::msg::Twist>("/turtle1/cmd_vel", 10);
// 创建一个定时器,周期为 1 秒(1000 毫秒),回调函数为 timer_callback
timer_ = this->create_wall_timer(1000ms, std::bind(&TurtleCircle::timer_callback, this));
}
private:
// 定时器回调函数
void timer_callback()
{
// 创建一个 Twist 消息
auto msg = geometry_msgs::msg::Twist();
// 设置线速度和角速度
msg.linear.x = 1.0; // 线速度为 1.0 m/s
msg.angular.z = 0.5; // 角速度为 0.5 rad/s
// 发布消息到 "/turtle1/cmd_vel" 话题
publisher_->publish(msg);
}
};
// 主函数
int main(int argc, char *argv[])
{
// 初始化 ROS 2 节点
rclcpp::init(argc, argv);
// 创建一个节点实例
auto node = std::make_shared<TurtleCircle>("turtle_square");
// 运行节点,等待回调函数执行
rclcpp::spin(node);
// 关闭 ROS 2 节点
rclcpp::shutdown();
return 0;
}
在以上代码中,首先包含了ROS 2客户端库rclcpp 和消息接口geometry_msgs/ msg/twist.hpp,然后引入了时间库头文件,并使用using来声明使用时间单位字面量,字面量是C++14 中的新特性,引入后可以直接使用数字加单位(s 或 ms 等)来表示时间,让代码更加直观。
导入头文件后,接着定义了一个TurtleCircle类,为其添加了定时器的共享指针timer_ 和话题发布者的共享指针 publisher_ 两个属性,然后在构造函数中分别对这两个属性进行初始化 。
this->create_publisher方法是从父类继承而来的,用于初始化发布者。◇是C++ 的模板 语法,被包裹的geometry_msgs::msg::Twist 是话题的接口类型,该方法的第一个参数是话题 的名称,和海龟订阅的话题名称要保持一致才能通信;第二个参数是10,和Python 中一样, 与ROS2 的服务质量有关,在第10章中有详细讲解,这里的10表示历史队列长度。
this->create_wall_timer同样来自父类节点,用于初始化定时器。该方法的第一个参数是调用周期,这里设置为1000ms, 表示间隔1s 调用一次;第二个参数是回调函数,这里将成员方法timer_callback 通 过 std::bind 变成可以直接调用的回调函数。
在timer_callback 方法内,首先创建了一个geometry_msgs::msg::Twist 类型的消息对象, 然后为前进方向的线速度x 赋值1m/s, 接着将绕z 轴的旋转角速度设置为0.5rad/s, 此时海龟的转弯半径应该是2m(1m/s÷0.5rad/s)。
代码完成后,在CMakeLists.txt 中添加turtle_circle 节点,并添加依赖,主要添加指令如下所示。
cmake_minimum_required(VERSION 3.8)
project(demo_cpp_topic)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(geometry_mgs REQUIRED)
find_package(turtlesim REQUIRED)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
add_executable(turtle_circle src/turtle_circle.cpp)
ament_target_dependencies(turtle_circle rclcpp geometry_msgs)
install(TARGETS
turtle_circle
DESTINATION lib/${PROJECT_NAME}
)
ament_package()
首先进入chapt3/src,在空白地方右击,进入终端。
依次输入:
构建功能包
colcon build
让ros2能够找到demo_cpp_topic和下面的节点
source install/setup.bash
运行文件夹demo_cpp_topic下面的turtle_circle节点:
ros2 run demo_cpp_topic turtle_circle
这个页面不要动,返回桌面,按下ctrl+ali+T重新开启一个终端,运行海龟节点:
ros2 run turtlesim turtlesim_node
成功画圆!!!!!!!!
3 话题通信实战----制作一个可视化系统状态监听工具
3.1 准备工作
需求分析: 系统监测 、 可视化
首先进入创建一个文件夹,命名为chapt3,进入,右击空白处进入一个终端,输入代码创建一个工作空间
mkdir -p topic_practice_ws/src
3.2 自定义通信接口
进入src,打开终端,或者直接cd topic_practice_ws/src进入
创建一个接口功能包:
ros2 pkg create status_interfaces --build-type ament_cmake --dependencies rosidl_default_generators builtin_interfaces --license Apache-2.0
这个命令的作用是:
-
创建一个名为
status_interfaces
的新 ROS 2 功能包。 -
使用
ament_cmake
作为构建系统。 -
添加
rosidl_default_generators
和builtin_interfaces
作为依赖项。
在status_interfaces下面建msg,再在下面建SystemStatus.msg
完整目录:src/status_interfaces/msg/SystemStatus.msg
在msg里面编辑消息接口定义文件:
builtin_interfaces/Time stamp #记录时间戳
string host_name #系统名称
float32 cpu_percent #CPU使用率
float32 memory_percent #内存使用率
float32 memory_total #内存总量
float32 memory_available #剩余有效内存
float64 net_sent #网络发送数据总量
float64 net_recv #网络接收数据总量
定义好数据接口文件后,需要在CMakeLists.txt 中对该文件进行注册,声明其为消息接口文件,并为其添加builtin_interfaces依赖,添加完成后CMakeLists.txt中的部分代码如图所示。
添加的代码是:
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SystemStatus.msg"
DEPENDENCIES builtin_interfaces
)
还需要在package.xml中添加代码,声明这个功能包是一个消息接口功能包。
<member_of_group>rosidl_interface_packages</member_of_group>
接下来,构建,再查看一下接口是否构建成功。
命令如下:
(记住一定要退到工作空间topic_practice_ws下执行命令哦!)
(一般情况下,构建都是在工作空间下构建!)
colcon build
source install/setup.bash
ros2 interface show status_interfaces/msg/SystemStatus
成功查看消息接口!接下来我们使用接口来传递数据。
3.3 系统信息获取与发布
在src中再创建一个功能包 status_publisher
ros2 pkg create status_publisher --build-type ament_python --dependencies rclpy_status_interfaces --license Apache-2.0
打开status_publisher,新建一个py文件,如图所示:
内容如下:
import rclpy # 导入ROS 2的Python客户端库
from rclpy.node import Node # 导入Node类,用于创建节点
from status_interfaces.msg import SystemStatus # 导入自定义的消息类型SystemStatus
import psutil # 导入psutil库,用于获取系统状态信息
import platform # 导入platform库,用于获取系统信息
class SysStatusPub(Node): # 定义一个继承自Node的类SysStatusPub,用于发布系统状态
def __init__(self, node_name): # 初始化函数
super().__init__(node_name) # 调用父类(Node)的构造函数
self.status_publisher_ = self.create_publisher( # 创建一个发布者
SystemStatus, 'sys_status', 10) # 指定话题名称和队列大小
self.timer = self.create_timer(1, self.timer_callback) # 创建一个定时器,每秒触发一次
def timer_callback(self): # 定时器回调函数
cpu_percent = psutil.cpu_percent() # 获取CPU使用率
memory_info = psutil.virtual_memory() # 获取内存信息
net_io_counters = psutil.net_io_counters() # 获取网络I/O信息
msg = SystemStatus() # 创建一个SystemStatus消息实例
msg.stamp = self.get_clock().now().to_msg() # 获取当前时间戳并转换为ROS 2时间戳
msg.host_name = platform.node() # 获取主机名
msg.cpu_percent = cpu_percent # 设置CPU使用率
msg.memory_percent = memory_info.percent # 设置内存使用率
msg.memory_total = memory_info.total / 1024 / 1024 # 计算总内存量(GB)
msg.memory_available = memory_info.available / 1024 / 1024 # 计算可用内存量(GB)
msg.net_sent = net_io_counters.bytes_sent / 1024 / 1024 # 计算发送的网络数据量(GB)
msg.net_recv = net_io_counters.bytes_recv / 1024 / 1024 # 计算接收的网络数据量(GB)
self.get_logger().info(f'发布:{str(msg)}') # 打印日志信息
self.status_publisher_.publish(msg) # 发布消息
def main(): # 主函数
rclpy.init() # 初始化ROS 2
node = SysStatusPub('sys_status_pub') # 创建SysStatusPub节点实例
rclpy.spin(node) # 保持节点运行,等待回调函数被触发
rclpy.shutdown() # 关闭ROS 2
这段代码创建了一个ROS 2节点,名为SysStatusPub
,用于监控和发布系统的CPU使用率、内存使用情况和网络I/O数据。它首先导入必要的库和消息类型,然后定义了一个继承自Node
的类,该类初始化时创建了一个定时器,每秒触发一次回调函数。在回调函数中,它收集系统状态信息,创建一个消息对象,填充数据,然后发布到sys_status
话题上。最后,主函数初始化ROS 2,启动节点,并保持运行直到关闭。
还需要在setup.py中对节点进行注册,
'sys_status_pub=status_publisher.sys_status_pub:main',
打开终端,退回工作空间,构建功能包:
然后查看接口是否构建成功。
首先让系统找到位置,代码如下:
source install/setup.bash
再运行功能包下面的节点
ros2 run status_publisher sys_status_pub
成功打印!!!
3.4 测试QT
现在使用QT来创建一个简单的页面
老样子,创建一个新的功能包status_display
ros2 pkg create status_display --build-type ament_cmake --dependencies rclcpp status_interfaces --license Apache-2.0
在刚才新建功能包的src目录下建一个cpp文件
编辑代码:
#include <QApplication> // 引入 QApplication 类,用于管理应用程序的控制流程
#include <QLabel> // 引入 QLabel 类,用于显示文本标签
#include <QString> // 引入 QString 类,用于处理字符串
int main(int argc, char* argv[]) { // 主函数,程序的入口点
QApplication app(argc, argv); // 创建 QApplication 实例,管理GUI程序的事件循环和资源
QLabel* label = new QLabel(); // 创建 QLabel 实例,用于显示文本
QString message = QString::fromStdString("Hello Qt!"); // 创建 QString 实例并初始化为 "Hello Qt!"
label->setText(message); // 设置标签文本为 message 字符串
label->show(); // 显示标签窗口
app.exec(); // 进入应用程序的事件循环,等待用户操作
return 0; // 返回 0 表示程序正常退出
}
这段代码是一个简单的Qt应用程序,用于创建一个窗口并在其中显示一条消息。
程序首先引入了Qt的三个核心组件:QApplication
、QLabel
和QString
。QApplication
管理应用程序的资源和事件循环;QLabel
用于在窗口中显示文本;QString
用于处理文本字符串。
在main
函数中,首先创建了一个QApplication
对象,它是每个Qt应用程序必须的。然后创建了一个QLabel
对象,并设置其文本为"Hello Qt!"
。最后,显示标签窗口并进入事件循环,等待用户操作直到应用程序关闭。
还要在CMakeLists.txt里面注册,完整代码如图:
cmake_minimum_required(VERSION 3.8)
project(status_display)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(status_interfaces REQUIRED)
find_package(Qt5 REQUIRED COMPONENTS Widgets)
add_executable(hello_qt src/hello_qt.cpp)
target_link_libraries(hello_qt Qt5::Widgets)
install(TARGETS hello_qt
DESTINATION lib/${PROJECT_NAME}
)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()
然后构建,运行这个例子,结果如图所示:
命令照着图片上敲一下,道理都一样,就不贴了
成功!!!!!!!!!!!!
3.5 数据可视化显示
新建文件sys_status_display.cpp,位置如图所示: (千万不要建错位置)
编写代码
#include <QApplication>
#include <QLabel>
#include <QString>
#include "rclcpp/rclcpp.hpp"
#include "status_interfaces/msg/system_status.hpp"
using SystemStatus = status_interfaces::msg::SystemStatus;
class SysStatusDisplay : public rclcpp::Node {
public:
SysStatusDisplay() : Node("sys_status_display") {
subscription_ = this->create_subscription<SystemStatus>(
"sys_status", 10, [&](const SystemStatus::SharedPtr msg) -> void {
label_->setText(get_qstr_from_msg(msg));
});
// 创建一个空的 SystemStatus 对象,转化成 QString 进行显示
label_ = new QLabel(get_qstr_from_msg(std::make_shared<SystemStatus>()));
label_->show();
}
QString get_qstr_from_msg(const SystemStatus::SharedPtr msg) {
std::stringstream show_str;
show_str
<< "===========系统状态可视化显示工具============\n"
<< "数 据 时 间:\t" << msg->stamp.sec << "\ts\n"
<< "用 户 名:\t" << msg->host_name << "\t\n"
<< "CPU使用率:\t" << msg->cpu_percent << "\t%\n"
<< "内存使用率:\t" << msg->memory_percent << "\t%\n"
<< "内存总大小:\t" << msg->memory_total << "\tMB\n"
<< "剩余有效内存:\t" << msg->memory_available << "\tMB\n"
<< "网络发送量:\t" << msg->net_sent << "\tMB\n"
<< "网络接收量:\t" << msg->net_recv << "\tMB\n"
<< "==========================================";
return QString::fromStdString(show_str.str());
}
private:
rclcpp::Subscription<SystemStatus>::SharedPtr subscription_;
QLabel* label_;
};
int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);
QApplication app(argc, argv);
auto node = std::make_shared<SysStatusDisplay>();
std::thread spin_thread([&]() -> void { rclcpp::spin(node); });
spin_thread.detach();
app.exec();
rclcpp::shutdown();
return 0;
}
修改CMakeLists.txt,添加代码,,完整代码如下:
cmake_minimum_required(VERSION 3.8)
project(status_display)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(status_interfaces REQUIRED)
find_package(Qt5 REQUIRED COMPONENTS Widgets)
add_executable(hello_qt src/hello_qt.cpp)
add_executable(sys_status_display src/sys_status_display.cpp)
target_link_libraries(hello_qt Qt5::Widgets)
target_link_libraries(sys_status_display Qt5::Widgets)
ament_target_dependencies(sys_status_display rclcpp status_interfaces)
install(TARGETS hello_qt
sys_status_display
DESTINATION lib/${PROJECT_NAME}
)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()
打开终端,构建,运行该节点,命令如图所示:
成功运行!!!!!!观察到QT已经打开了,但是没有数据,因为还没有数据传过来,这个节点只是订阅与可视化数据。我们还需要发布数据。
如下图所示,运行发布者,发布数据,
然后观察QT界面,会发现已经成功接手到数据了。
大功告成!!!!!!!!!!!!