Navigation2 源码阅读 —— map_server 到底干了哈?
文章目录
- 一、总流程梳理
- 1. 测试方法
- 2. 流程梳理
- 2.1 map_server 节点初始化
- 2.2 map_server 节点配置
- 2.2.1 ` loadMapResponseFromYaml` 函数
- 2.2.2 `loadMapResponseFromYaml` 函数
- 2.2.3 `loadMapFromYaml` 函数
- 2.2.4 `loadMapYaml` 函数
- 2.2.5 `loadMapFromFile` 函数 (核心)
- 2.3 map_server 节点激活
- 3. pgm 地图解析
之前自己做导航一直好奇ros 是如何将 pgm图片加载到 ros概率地图作为代价地图的一份子的。今天抽时间阅读了相关源码,梳理了一下ros是如何将pgm作为/map 话题消息发出来的。如梳理有误,还请大佬们指点出来,不吝感激!
一、总流程梳理
1. 测试方法
navigation2 采用 lifecycle 生命周期管理器节点对各模块进行管理,所以需要如下操作来 对map_server节点进行测试。
# 启动 终端1:
# ros2 run nav2_map_server map_server --ros-args -p yaml_filename:=<yaml_path>
ros2 run nav2_map_server map_server --ros-args -p yaml_filename:=/home/lucas/project/code_learn/code_learn_navi/ros2_proj/fishbot_note/src/fishbot_navigation2/maps/fishbot_map.yaml# 另启一终端2 作为生命节点管理 map_server
ros2 lifecycle set /map_server configure # map_server 配置
ros2 lifecycle set /map_server activate # map_server 激活
2. 流程梳理
2.1 map_server 节点初始化
执行 终端1 即可实现map_server 节点初始化,实现功能很简单:
- 继承
nav2_util::LifecycleNode
,将节点名称为"map_server
" 向生命周期管理器注册; - 初始化
map_available_
为false,表示不可用; - 定义
yaml_filename
文件路径; - 定义
topic_name
为map
; - 定义
frame_id
为map
;
MapServer::MapServer(const rclcpp::NodeOptions & options) : nav2_util::LifecycleNode("map_server", "", options), map_available_(false)
{RCLCPP_INFO(get_logger(), "Creating");// Declare the node parametersdeclare_parameter("yaml_filename", rclcpp::PARAMETER_STRING);declare_parameter("topic_name", "map");declare_parameter("frame_id", "map");
}
执行完终端1 命令后,终端1停留在如下界面,表示已初始化,待配置和激活:
2.2 map_server 节点配置
保持终端1的运行,执行完终端2中的 ros2 lifecycle set /map_server configure
命令,可以看到如下打印,表示加载解析pgm地图完成,等待激活 将地图进行发布。
2.2.1 loadMapResponseFromYaml
函数
在对map_server
节点进行配置中,重点关注 loadMapResponseFromYaml
函数 ,该函数实现 从yaml文件读取了配置参数及将pgm转换成nav_msgs::msg::OccupancyGrid 话题消息内容,转换成功后进行创建话题发布方及服务。
nav2_util::CallbackReturn MapServer::on_configure(const rclcpp_lifecycle::State & /*state*/)
{RCLCPP_INFO(get_logger(), "Configuring");// Get the name of the YAML file to use (can be empty if no initial map should be used)std::string yaml_filename = get_parameter("yaml_filename").as_string();std::string topic_name = get_parameter("topic_name").as_string();frame_id_ = get_parameter("frame_id").as_string();// only try to load map if parameter was setif (!yaml_filename.empty()) {// Shared pointer to LoadMap::Response is also should be initialized// in order to avoid null-pointer dereferencestd::shared_ptr<nav2_msgs::srv::LoadMap::Response> rsp = std::make_shared<nav2_msgs::srv::LoadMap::Response>();if (!loadMapResponseFromYaml(yaml_filename, rsp)) {throw std::runtime_error("Failed to load map yaml file: " + yaml_filename);}}else {RCLCPP_INFO(get_logger(), "yaml-filename parameter is empty, set map through '%s'-service",load_map_service_name_.c_str());}// Make name prefix for servicesconst std::string service_prefix = get_name() + std::string("/");// Create a service that provides the occupancy gridocc_service_ = create_service<nav_msgs::srv::GetMap>(service_prefix + std::string(service_name_),std::bind(&MapServer::getMapCallback, this, _1, _2, _3));// Create a publisher using the QoS settings to emulate a ROS1 latched topicocc_pub_ = create_publisher<nav_msgs::msg::OccupancyGrid>(topic_name, rclcpp::QoS(rclcpp::KeepLast(1)).transient_local().reliable());// Create a service that loads the occupancy grid from a fileload_map_service_ = create_service<nav2_msgs::srv::LoadMap>(service_prefix + std::string(load_map_service_name_),std::bind(&MapServer::loadMapCallback, this, _1, _2, _3));return nav2_util::CallbackReturn::SUCCESS;
}
2.2.2 loadMapResponseFromYaml
函数
此函数实现读取解析pgm地图成功后,对话题消息进行更新数据头,和标记地图状态可用等操作,核心操作在 loadMapFromYaml
函数。
bool MapServer::loadMapResponseFromYaml(const std::string & yaml_file, std::shared_ptr<nav2_msgs::srv::LoadMap::Response> response)
{switch (loadMapFromYaml(yaml_file, msg_)) {case MAP_DOES_NOT_EXIST:response->result = nav2_msgs::srv::LoadMap::Response::RESULT_MAP_DOES_NOT_EXIST;return false;case INVALID_MAP_METADATA:response->result = nav2_msgs::srv::LoadMap::Response::RESULT_INVALID_MAP_METADATA;return false;case INVALID_MAP_DATA:response->result = nav2_msgs::srv::LoadMap::Response::RESULT_INVALID_MAP_DATA;return false;case LOAD_MAP_SUCCESS:// Correcting msg_ header when it belongs to specific nodeupdateMsgHeader();map_available_ = true;response->map = msg_;response->result = nav2_msgs::srv::LoadMap::Response::RESULT_SUCCESS;}return true;
}
2.2.3 loadMapFromYaml
函数
此函数源码在此不放了,重点关注两个函数:
loadMapYaml
函数:对 yaml 文件进行解析;loadMapFromFile
函数:对 pgm地图 根据 yaml文件中配置进行解析;
2.2.4 loadMapYaml
函数
实现对 yaml 文件进行解析,如地图路径、地图分辨率、地图长宽等参数;
LoadParameters loadMapYaml(const std::string & yaml_filename)
{YAML::Node doc = YAML::LoadFile(yaml_filename);LoadParameters load_parameters;// 图像路径处理auto image_file_name = yaml_get_value<std::string>(doc, "image");if (image_file_name.empty()) {throw YAML::Exception(doc["image"].Mark(), "The image tag was empty.");}// 将相对路径转换为绝对路径(基于YAML文件所在目录)if (image_file_name[0] != '/') {// dirname takes a mutable char *, so we copy into a vectorstd::vector<char> fname_copy(yaml_filename.begin(), yaml_filename.end());fname_copy.push_back('\0');image_file_name = std::string(dirname(fname_copy.data())) + '/' + image_file_name;}load_parameters.image_file_name = image_file_name;// resolution、origin字段处理load_parameters.resolution = yaml_get_value<double>(doc, "resolution");load_parameters.origin = yaml_get_value<std::vector<double>>(doc, "origin");// origin字段校验if (load_parameters.origin.size() != 3) {throw YAML::Exception(doc["origin"].Mark(), "value of the 'origin' tag should have 3 elements, not " +std::to_string(load_parameters.origin.size()));}load_parameters.free_thresh = yaml_get_value<double>(doc, "free_thresh");load_parameters.occupied_thresh = yaml_get_value<double>(doc, "occupied_thresh");auto map_mode_node = doc["mode"];if (!map_mode_node.IsDefined()) {load_parameters.mode = MapMode::Trinary;} else {load_parameters.mode = map_mode_from_string(map_mode_node.as<std::string>());}try {load_parameters.negate = yaml_get_value<int>(doc, "negate");} catch (YAML::Exception &) {load_parameters.negate = yaml_get_value<bool>(doc, "negate");}std::cout << "[DEBUG] [map_io]: resolution: " << load_parameters.resolution << std::endl;std::cout << "[DEBUG] [map_io]: origin[0]: " << load_parameters.origin[0] << std::endl;std::cout << "[DEBUG] [map_io]: origin[1]: " << load_parameters.origin[1] << std::endl;std::cout << "[DEBUG] [map_io]: origin[2]: " << load_parameters.origin[2] << std::endl;std::cout << "[DEBUG] [map_io]: free_thresh: " << load_parameters.free_thresh << std::endl;std::cout << "[DEBUG] [map_io]: occupied_thresh: " << load_parameters.occupied_thresh <<std::endl;std::cout << "[DEBUG] [map_io]: mode: " << map_mode_to_string(load_parameters.mode) << std::endl;std::cout << "[DEBUG] [map_io]: negate: " << load_parameters.negate << std::endl; //NOLINTreturn load_parameters;
}
2.2.5 loadMapFromFile
函数 (核心)
个人觉得是 map_server
节点的一个核心,单独拎出来进行分析。
2.3 map_server 节点激活
保持终端1的继续运行,执行完终端2中的 ros2 lifecycle set /map_server activate
命令,可以看到如下打印。
在执行终端2中的 ros2 lifecycle set /map_server activate
命令前,可以打开rviz,执行完激活命令后,可以看到rviz进行地图显示。
代码方面,可以看到激活就是将 地图进行发布出来,然后 调用
createBond()
函数,用于与生命周期管理器建立bond连接(用于监控节点状态)
nav2_util::CallbackReturn MapServer::on_activate(const rclcpp_lifecycle::State & /*state*/)
{RCLCPP_INFO(get_logger(), "Activating");// Publish the map using the latched topicocc_pub_->on_activate();if (map_available_) {auto occ_grid = std::make_unique<nav_msgs::msg::OccupancyGrid>(msg_);occ_pub_->publish(std::move(occ_grid));}// create bond connectioncreateBond();return nav2_util::CallbackReturn::SUCCESS;
}
3. pgm 地图解析
该函数首先对 yaml 解析出的参数列表 load_parameters进行初始化各地图参数,如分辨率、地图尺寸等;然后进行双层for循环进行逐像素遍历解析。
void loadMapFromFile(const LoadParameters & load_parameters, nav_msgs::msg::OccupancyGrid & map)
{// Step 1: 图像初始化与加载Magick::InitializeMagick(nullptr); // 初始化ImageMagick库nav_msgs::msg::OccupancyGrid msg;std::cout << "[INFO] [map_io]: Loading image_file: " << load_parameters.image_file_name << std::endl;Magick::Image img(load_parameters.image_file_name); // 加载图像// Copy the image data into the map structuremsg.info.width = img.size().width(); // 地图宽度msg.info.height = img.size().height(); // 地图高度msg.info.resolution = load_parameters.resolution;msg.info.origin.position.x = load_parameters.origin[0];msg.info.origin.position.y = load_parameters.origin[1];msg.info.origin.position.z = 0.0;msg.info.origin.orientation = orientationAroundZAxis(load_parameters.origin[2]); // 设置朝向,origin[2]为偏航角(yaw),通过orientationAroundZAxis转换为四元数// Allocate space to hold the datamsg.data.resize(msg.info.width * msg.info.height);// Copy pixel data into the map structurefor (size_t y = 0; y < msg.info.height; y++) {for (size_t x = 0; x < msg.info.width; x++) {auto pixel = img.pixelColor(x, y); // 获取当前坐标(x,y)处的像素颜色值 pixelColor(x,y)自动处理越界(ImageMagick内部机制)// Step 1: 像素通道处理 将图像像素的RGB(及可选Alpha)通道数据归一化为单一亮度值std::vector<Magick::Quantum> channels = {pixel.redQuantum(), pixel.greenQuantum(), pixel.blueQuantum()}; // 提取像素的RGB通道值,存储在向量中// 蓝色 DEBUG 打印// std::cout << "\033[32m [DEBUG] [map_io]: channels.size:"<< channels.size() <<" pixel[0]:" << channels.at(0) << " pixel[1]:" << channels.at(1) << " pixel[2]:" << channels.at(2) << " img.matte " << img.matte() << "\033[0m"<<std::endl;// Trinary模式且 img.matte()为true 即图像有Alpha通道,将反转后的Alpha值添加到通道列表中(透明=低占用,不透明=高占用)if (load_parameters.mode == MapMode::Trinary && img.matte()) {// To preserve existing behavior, average in alpha with color channels in Trinary mode.// CAREFUL. alpha is inverted from what you might expect. High = transparent, low = opaquechannels.push_back(MaxRGB - pixel.alphaQuantum()); // 一般pgm无Alpha通道,直接会跳过}// 计算所有通道值的总和double sum = 0;for (auto c : channels) {sum += c;}/// on a scale from 0.0 to 1.0 how bright is the pixel?// Step 2: 亮度计算double shade = Magick::ColorGray::scaleQuantumToDouble(sum / channels.size()); // 将通道平均值归一化到[0,1]范围,表示像素亮度// If negate is true, we consider blacker pixels free, and whiter// pixels occupied. Otherwise, it's vice versa./// on a scale from 0.0 to 1.0, how occupied is the map cell (before thresholding)?double occ = (load_parameters.negate ? shade : 1.0 - shade); //! 根据negate参数决定是否反转亮度值(黑→白或白→黑表示占用)// 蓝色 DEBUG 打印// std::cout << "\033[34m [DEBUG] [map_io]: channels.size:"<< channels.size() << " sum:" << sum <<" pixel[0]:" << channels.at(0) << " shade:" << shade << " occ:" << occ// << " pixel[1]:" << channels.at(1) << " pixel[2]:" << channels.at(2) << "\033[0m\n"<<std::endl;// Step 3: 三种地图模式处理int8_t map_cell;switch (load_parameters.mode) {// Trinary模式(三值划分)case MapMode::Trinary:if (load_parameters.occupied_thresh < occ) {map_cell = nav2_util::OCC_GRID_OCCUPIED;} else if (occ < load_parameters.free_thresh) {map_cell = nav2_util::OCC_GRID_FREE;} else {map_cell = nav2_util::OCC_GRID_UNKNOWN;}break;// Scale模式(线性映射)case MapMode::Scale:if (pixel.alphaQuantum() != OpaqueOpacity) {map_cell = nav2_util::OCC_GRID_UNKNOWN;} else if (load_parameters.occupied_thresh < occ) {map_cell = nav2_util::OCC_GRID_OCCUPIED;} else if (occ < load_parameters.free_thresh) {map_cell = nav2_util::OCC_GRID_FREE;} else {map_cell = std::rint((occ - load_parameters.free_thresh) /(load_parameters.occupied_thresh - load_parameters.free_thresh) * 100.0);}break;// Raw模式(原始值转换)case MapMode::Raw: {double occ_percent = std::round(shade * 255);if (nav2_util::OCC_GRID_FREE <= occ_percent && occ_percent <= nav2_util::OCC_GRID_OCCUPIED){map_cell = static_cast<int8_t>(occ_percent);} else {map_cell = nav2_util::OCC_GRID_UNKNOWN;}break;}default:throw std::runtime_error("Invalid map mode");}// 图像坐标系:原点在左上角,Y轴向下 ---> ROS地图坐标系:原点在左下角,Y轴向上// data按行优先存储,翻转Y轴实现坐标系转换msg.data[msg.info.width * (msg.info.height - y - 1) + x] = map_cell; // Y轴翻转}}
在此,我们先关注 yaml文件中的参数:
image: fishbot_map.pgm
mode: trinary
resolution: 0.05
origin: [-3.37, -2.88, 0]
negate: 0
occupied_thresh: 0.65
free_thresh: 0.25
其含义分别是
- image:pgm 地图名称;
- mode:地图格式,有三种模式: 三值划分
Trinary
、线性映射Scale
和 原始值Raw
,自己遇到的都是三值划分Trinary
; - resolution:地图分辨率;
- origin:地图原点(左下角像素)在世界坐标系中的坐标和朝向
- negate:是否反转图像的亮度值 (
0
:不反转;1
:黑白颜色意义互换:原本黑色的像素(表示障碍物)会变成白色(自由),白色变成黑色); - occupied_thresh:占用阈值,三值模式下,若像素的占用值大于占用阈值(如 0.65),则该像素被认为是占用;
- free_thresh:自由阈值,在三值模式下,若像素的占用值小于自由阈值(如 0.25),则该像素被认为是自由的。介于0.25和0.65之间的值被认为是未知区域。
重点分析 origin
怎么理解 origin: 地图原点(左下角像素)在世界坐标系中的坐标和朝向 ?
以 yaml 参数[-3.37, -2.88, 0]
为例,前两个值表示原点的x和y坐标(单位:米),第三个值表示地图的初始朝向(偏航角,单位:弧度)。即,这个原点表示地图的左下角像素在世界坐标系中的位置是(-3.37, -2.88),且地图没有旋转(偏航角为0)。实际上,很多算法假设地图没有旋转(即yaw=0),因为旋转地图会增加计算的复杂性,并且在实际建图时通常会将地图对齐到世界坐标系,即 在实际应用中,第三个值通常为0。
原点的设置使得我们可以将地图中的像素坐标转换为世界坐标。转换公式为:
- world_x = origin_x + (pixel_x * resolution)
- world_y = origin_y + (pixel_y * resolution)
注意:由于图像坐标系(左上角为原点)和世界坐标系(左下角为原点)的差异,通常需要将图像坐标的y轴翻转后再进行转换。
有了地图各参数的理解,现在开始处理pgm各像素。
从代码上可以看到,没有使用opencv进行读取,而是ImageMagick库(所以通道值很大,见下图红框,推测Q16编译,不是自己理解的255以内的数值),读取各通道数值,然后各通道数值相加sum
再除以总通道数channels.size()
然后使用Magick::ColorGray::scaleQuantumToDouble
函数进行像素值归一化,即 得到该像素值归一化亮度shade
,此时shade
取值范围为 [0,1]
,见下图蓝框 。
重点来了!!,现在有了像素的归一化亮度,此时,像素越白(越是空闲自由区域),归一化亮度越接近于1。而地图格式中的 三值划分Trinary
是当前像素的占用值大于占用阈值(如 0.65),则该像素被认为是占用。这中间需要一步换算,将归一化亮度反转作为划定为障碍物的依据occ
:double occ = (load_parameters.negate ? shade : 1.0 - shade);
,见上图绿框。
auto pixel = img.pixelColor(x, y);
std::vector<Magick::Quantum> channels = {pixel.redQuantum(), pixel.greenQuantum(), pixel.blueQuantum()};if (load_parameters.mode == MapMode::Trinary && img.matte()) {// To preserve existing behavior, average in alpha with color channels in Trinary mode.// CAREFUL. alpha is inverted from what you might expect. High = transparent, low = opaquechannels.push_back(MaxRGB - pixel.alphaQuantum()); // 一般pgm无Alpha通道,直接会跳过
}
// 计算所有通道值的总和
double sum = 0;
for (auto c : channels) {sum += c;
}
/// on a scale from 0.0 to 1.0 how bright is the pixel?
// Step 2: 亮度计算
double shade = Magick::ColorGray::scaleQuantumToDouble(sum / channels.size()); // 将通道平均值归一化到[0,1]范围,表示像素亮度// If negate is true, we consider blacker pixels free, and whiter
// pixels occupied. Otherwise, it's vice versa.
/// on a scale from 0.0 to 1.0, how occupied is the map cell (before thresholding)?
double occ = (load_parameters.negate ? shade : 1.0 - shade); //! 根据negate参数决定是否反转亮度值(黑→白或白→黑表示占用)
有了翻转后的归一化亮度occ
,就可以根据三值划分的阈值进行是否是障碍物判断了:
switch (load_parameters.mode) {// Trinary模式(三值划分)case MapMode::Trinary:if (load_parameters.occupied_thresh < occ) {map_cell = nav2_util::OCC_GRID_OCCUPIED;} else if (occ < load_parameters.free_thresh) {map_cell = nav2_util::OCC_GRID_FREE;} else {map_cell = nav2_util::OCC_GRID_UNKNOWN;}break;...}
除此之外,还需要留意此函数最后一行代码:
msg.data[msg.info.width * (msg.info.height - y - 1) + x] = map_cell;
因为之前说明origin
时谈到由于图像坐标系(左上角为原点)和世界坐标系(左下角为原点)的差异,通常需要将图像坐标的y轴翻转后再进行转换,所以图像中的第0行(y=0)被存储到了地图数据的最后一行(即地图坐标系的底部),而图像中的最后一行(y=height-1)被存储到了地图数据的第0行(即地图坐标系的顶部),实现Y轴翻转。