ros2bag_py的api小结、丢帧问题对策
ros2bag_py的api小结、丢帧问题对策
- 零、原因
- 一、对于使用原生命令,丢帧问题的努力以及最终没有解决
- 二、总结ros2bag_py的api与用法
- 三、源码下载
零、原因
生产中遇到ros2bag record原生代码丢帧问题,因此需要自行调用api。然而官方没有相关的api文档,需要自己挨个确认。
链接: link.
一、对于使用原生命令,丢帧问题的努力以及最终没有解决
1、发现录制video的数据,丢帧严重。经中间件负责人确认,是record没有进行多线程录制。dds.shm只有一个线程在收数据。可能是性能不足导致失败。
验证:分别ros2 bag record只录制1、2、4个topic,查看丢帧情况。
结果:1个topic不丢帧。录制的topic越多,丢帧越严重。
2、对策:使用qos提高通信质量。
发布端增加qos的配置,同时订阅端:
ros2 bag record -s mcap -a --qos-profile-overrides-path q0s_overrides.yaml
结果:无法解决丢帧问题。性能仍不足。
3、设置datasharing开关
export RMW_FASTRTPS_USE_QOS_FROM_XML =1
结果:可以启动多个线程dds.shm进行录制。但是通过top命令查看,TIME+没有变化,说明新增的线程没运行。
二、总结ros2bag_py的api与用法
-
bag_rewrite
作用:重写或转换 ROS 2 bag 文件 的工具,主要用途是对已录制的 bag 文件进行二次处理,例如修改内容、筛选数据、转换格式或修复问题,生成新的 bag 文件。
核心功能与典型场景:
bag_rewrite 的核心是读取原始 bag 文件,根据指定规则处理数据,再写入新的 bag 文件。常见用途包括:1-1、筛选话题或消息
从原始 bag 中提取特定话题(例如只保留 /lidar 话题,过滤掉无关的 /camera 话题),或按时间范围截取消息(例如只保留第 10-20 秒的内容)。
1-2、转换存储格式
将 bag 文件从一种存储格式(如 sqlite3)转换为另一种(如 mcap),或修改压缩方式(如从无压缩转为 zstd 压缩)。
1-3、修复损坏的 bag 文件
对于因异常终止录制导致的不完整 bag 文件,bag_rewrite 可尝试读取有效部分并生成修复后的新文件。
1-4、修改消息内容
(需结合自定义逻辑)对消息字段进行修改(如坐标转换、单位调整),再写入新 bag。
合并或拆分 bag 文件
将多个 bag 文件按时间顺序合并为一个,或按话题 / 时间拆分原始 bag 为多个小文件
from rosbag2_py import bag_rewrite, StorageOptions, ConverterOptions# 配置输入源(原始bag文件)
input_storage = StorageOptions(uri="original_bag", storage_id="sqlite3")# 配置输出目标(新bag文件)
output_storage = StorageOptions(uri="processed_bag", storage_id="mcap") # 转换为mcap格式# 配置转换规则(如筛选话题)
converter_options = ConverterOptions(input_serialization_format="cdr",output_serialization_format="cdr"
)# 定义筛选规则:只保留特定话题
filter = {"topics": ["/lidar/points", "/imu/data"]}# 执行重写
bag_rewrite(input_storages=[input_storage], # 可传入多个输入bagoutput_storage=output_storage,converter_options=converter_options,storage_filter=filter # 应用筛选规则
)
- QoSProfile
用途:QoS(服务质量)配置转换
其核心作用是将 C++ 层面的 rclcpp QoS 配置转换为 Python 层面的 rclpy QoS 配置,实现两种语言环境下 QoS 参数的兼容互通。
具体用途与背景:
当需要在 Python 代码中处理来自 C++ 模块的 QoS 配置(例如 rosbag2 内部 C++ 逻辑传递给 Python 接口的 QoS 参数)时,就需要通过 convert_rclcpp_qos_to_rclpy_qos 进行格式转换,确保 Python 代码能正确解析和使用这些 QoS 配置
import rclpy
from rclpy.qos import QoSProfile
from rosbag2_py import convert_rclcpp_qos_to_rclpy_qos# 假设从C++层获取了rclcpp的QoS对象(实际中可能通过rosbag2内部接口获取)
# 这里用伪代码表示C++ QoS配置:可靠性为可靠,历史记录为保留最后10条
rclcpp_qos = ... # 来自C++层的rclcpp::QoS对象# 转换为Python的QoSProfile
rclpy_qos = convert_rclcpp_qos_to_rclpy_qos(rclcpp_qos)# 验证转换后的QoS参数
print(f"可靠性: {rclpy_qos.reliability}") # 输出:ReliabilityPolicy.RELIABLE
print(f"历史记录深度: {rclpy_qos.depth}") # 输出:10
- CompressionMode
ros2bag_py 中的 CompressionMode 是一个枚举类,用于指定 ROS 2 bag 文件录制或重写时的压缩模式,控制消息数据的压缩策略,以平衡存储占用和性能开销
NONE:
不启用压缩,消息数据直接以原始格式写入 bag 文件。
适用场景:对录制性能要求极高(如高频实时数据),或磁盘空间充足时。
BEST_EFFORT:
采用 “尽力而为” 的压缩策略:仅对当前有剩余 CPU 资源时进行压缩,避免因压缩耗时导致丢帧。
适用场景:希望减少存储占用,但又不想因压缩过度影响实时录制(如避免丢帧)。
ALWAYS:
强制对所有消息进行压缩,无论当前系统负载如何。
适用场景:优先保证存储效率,可接受一定的 CPU 开销和潜在的录制延迟(如离线录制非实时数据)
import rosbag2_py
from rosbag2_py import CompressionMode# 配置录制参数
record_options = rosbag2_py.RecordOptions()
record_options.topics = ["/camera/image_raw", "/lidar/points"] # 大容量数据话题
record_options.output_file = "compressed_bag"# 设置压缩模式为 ALWAYS(强制压缩)
record_options.compression_mode = CompressionMode.ALWAYS
# 可选:指定压缩算法(如 zstd、lz4 等,需系统支持)
record_options.compression_options.compression_format = "zstd"# 启动录制
recorder = rosbag2_py.Recorder()
recorder.open(record_options)
recorder.start()
- CompressionOptions
用于精细化设置 ROS 2 bag 文件的压缩参数,配合 CompressionMode 共同控制数据压缩的具体方式(如压缩算法、压缩级别等),实现对压缩效果和性能的精细调控。
核心作用
当需要对 bag 文件进行压缩时(通过 CompressionMode 指定压缩模式),CompressionOptions 用于进一步补充压缩的具体细节,解决 “如何压缩” 的问题,例如:
选择哪种压缩算法(如 zstd、lz4、bz2 等)。
配置压缩级别(权衡压缩率和速度)。
定义压缩数据的分片大小(针对大文件优化)。
1、compression_format
类型:字符串(如 “zstd”、“lz4”、“none”)。
作用:指定压缩算法。不同算法在压缩率、速度、CPU 开销上有显著差异:
lz4:速度快、压缩率中等,适合实时场景。
zstd:压缩率高、速度较快,平衡性能和空间。
bz2:压缩率高但速度慢,适合离线存储。
2、compression_level
类型:整数(通常 0-9 或 0-19,因算法而异)。
作用:控制压缩强度。数值越高,压缩率越高,但消耗 CPU 更多、速度更慢。
示例:zstd 算法的 compression_level=3 表示中等压缩强度。
3、chunk_size
类型:整数(字节数)。
作用:指定压缩数据的分片大小。大文件拆分后可并行压缩,也便于后续随机访问
import rosbag2_py
from rosbag2_py import CompressionMode, CompressionOptions# 1. 配置压缩参数
compression_opts = CompressionOptions()
compression_opts.compression_format = "zstd" # 选择zstd算法
compression_opts.compression_level = 5 # 中等压缩强度
compression_opts.chunk_size = 1048576 # 1MB分片大小# 2. 配置录制参数,关联压缩模式和选项
record_options = rosbag2_py.RecordOptions()
record_options.topics = ["/lidar/points"] # 大容量点云话题
record_options.compression_mode = CompressionMode.ALWAYS # 强制压缩
record_options.compression_options = compression_opts # 应用压缩细节# 3. 启动录制
recorder = rosbag2_py.Recorder()
recorder.open(record_options)
recorder.start()
- compression_mode_from_string
compression_mode_from_string 是 ros2bag_py 中的一个工具函数,用于将字符串形式的压缩模式名称转换为 CompressionMode 枚举类型的对应值。它是字符串与枚举类型之间的转换桥梁,方便在配置文件、命令行参数或用户输入中指定压缩模式后,在代码中正确解析为 CompressionMode 枚举值。
场景:
解析命令行参数
当通过命令行工具(如 ros2 bag record)指定压缩模式时(例如 --compression-mode always),底层代码会通过 compression_mode_from_string 将字符串 “always” 转换为 CompressionMode.ALWAYS 枚举值。
读取配置文件
若从配置文件(如 YAML)中读取压缩模式配置(如 compression_mode: “best_effort”),需用该函数将字符串转换为枚举值后,再传给 RecordOptions 等配置类。
处理用户输入
在自定义 Python 脚本中,若允许用户通过字符串输入指定压缩模式(如 input_mode = “none”),可通过该函数转换为枚举类型,避免手动判断字符串与枚举的映射关系。
from rosbag2_py import compression_mode_from_string, CompressionMode# 将字符串转换为CompressionMode枚举
mode1 = compression_mode_from_string("always")
print(mode1 == CompressionMode.ALWAYS) # 输出:Truemode2 = compression_mode_from_string("best_effort")
print(mode2 == CompressionMode.BEST_EFFORT) # 输出:Truemode3 = compression_mode_from_string("none")
print(mode3 == CompressionMode.NONE) # 输出:True# 不区分大小写
mode4 = compression_mode_from_string("ALWAYS")
print(mode4 == CompressionMode.ALWAYS) # 输出:True
- compression_mode_from_string
其作用与 compression_mode_from_string 相反,用于将 CompressionMode 枚举类型的值转换为对应的字符串表示形式。它主要用于将代码中使用的枚举类型转换为人类可读的字符串,方便日志输出、配置展示或用户交互
from rosbag2_py import CompressionMode, compression_mode_to_stringcurrent_mode = CompressionMode.BEST_EFFORT
print(f"当前压缩模式: {compression_mode_to_string(current_mode}") # 输出:当前压缩模式: best_effort
- ConverterOptions 【重要】
用于指定 ROS 2 bag 文件读写或转换时的序列化 / 反序列化格式,控制消息数据在存储和传输过程中的编码方式,确保不同系统或模块之间能够正确解析消息内容
class ConverterOptions:input_serialization_format: stroutput_serialization_format: strdef __init__(self, input_serialization_format: str = ..., output_serialization_format: str = ...) -> None: ...
定义输入序列化格式(从 bag 文件读取消息时使用的编码格式)。
定义输出序列化格式(写入 bag 文件时使用的编码格式)。
用法:
1、input_serialization_format
类型:字符串(如 "cdr"、"json")。
作用:指定从 bag 文件读取消息时,消息数据的序列化格式。
常见值:"cdr"(ROS 2 默认的序列化格式,基于 CDR 协议)、"json"(JSON 格式,可读性强但体积大)等。
2、output_serialization_format
类型:字符串(如 "cdr"、"json")。
作用:指定将消息写入 bag 文件时,使用的序列化格式。
与输入格式配合,可实现格式转换(如 input="cdr", output="json" 表示将 CDR 格式转为 JSON 格式存储)。
import rosbag2_py
from rosbag2_py import StorageOptions, ConverterOptions# 配置读写格式:以CDR格式读取,以CDR格式写入(默认常用配置)
converter_options = ConverterOptions(input_serialization_format="cdr",output_serialization_format="cdr"
)# 读取bag文件(需指定格式以正确解析)
reader = rosbag2_py.SequentialReader()
reader.open(StorageOptions(uri="input_bag", storage_id="sqlite3"),converter_options
)# 写入新bag文件(指定输出格式)
writer = rosbag2_py.SequentialWriter()
writer.open(StorageOptions(uri="output_bag", storage_id="sqlite3"),converter_options
)
- FileInformation
代码
class FileInformation:duration: datetime.timedeltamessage_count: intpath: strstarting_time: objectdef __init__(self, path: str, starting_time: object, duration: object, message_count: int) -> None: ...
用法(存疑):
from rosbag2_py import MetadataIo# 读取bag文件的元数据
metadata = MetadataIo().read_metadata("my_bag")# 遍历所有片段文件的信息
for file_info in metadata.file_information:print(f"文件路径: {file_info.path}")print(f"时间范围: {file_info.starting_time} - {file_info.ending_time}")print(f"消息总数: {file_info.message_count}")print(f"包含话题: {file_info.topics_with_message_count}\n")
-
get_default_storage_id
get_default_storage_id 是 ros2bag_py 中的一个工具函数,用于获取 ROS 2 系统默认的 bag 文件存储格式标识符(storage ID)。它的核心作用是提供当前环境下默认使用的 bag 文件存储类型,方便在不手动指定存储格式时,使用系统默认配置进行 bag 文件的录制、读取或处理。sqlite3:基于 SQLite 数据库的存储格式,是早期 ROS 2 版本的默认选择。
mcap:一种专为机器人系统设计的高效存储格式,逐渐成为新的默认选择(如 ROS 2 Iron 及之后版本)。
from rosbag2_py import get_default_storage_id, SequentialWriter, StorageOptions# 获取默认存储格式
default_storage = get_default_storage_id()
print(f"系统默认存储格式: {default_storage}") # 例如输出:mcap# 使用默认格式创建writer
writer = SequentialWriter()
writer.open(StorageOptions(uri="my_bag", storage_id=default_storage), # 应用默认格式# ... 其他配置 ...
)
- get_default_storage_id
用于获取当前系统中已注册的所有 ROS 2 bag 文件读取器(reader)的标识符列表。这些标识符对应着系统支持的 bag 文件存储格式,帮助用户了解当前环境中可以读取哪些类型的 bag 文件。
from rosbag2_py import get_registered_readers# 获取所有支持的读取器(即支持的存储格式)
supported_readers = get_registered_readers()
print(f"系统支持读取的bag格式: {supported_readers}")
# 示例输出:['sqlite3', 'mcap']
- get_default_storage_id
用于返回当前系统中已注册的所有 bag 文件写入器(writer)的标识符列表。这些标识符对应系统支持的 bag 文件存储格式(如 sqlite3、mcap 等),表示可以将消息数据写入哪种格式的 bag 文件。
from rosbag2_py import get_registered_writers# 获取所有支持的写入格式
supported_writers = get_registered_writers()
print(f"系统支持写入的bag格式: {supported_writers}") # 示例:['sqlite3', 'mcap']
- get_registered_serializers
用于返回当前系统中已注册的消息序列化器(serializer)的标识符列表。序列化器是将 ROS 2 消息(如 Image、PointCloud2)转换为二进制流(用于存储或传输)的组件,其标识符对应支持的序列化协议(如 cdr、json 等)
from rosbag2_py import get_registered_serializers# 获取所有支持的序列化协议
supported_serializers = get_registered_serializers()
print(f"系统支持的消息序列化协议: {supported_serializers}") # 示例:['cdr', 'json']
- to_rclcpp_qos_vector
主要用于将 Python 层面的 QoS 配置列表 转换为 C++ 层面的 rclcpp QoS 配置向量(vector),以支持跨语言(Python 到 C++)的 QoS 配置传递,确保 ROS 2 中 Python 代码与 C++ 代码之间 QoS 参数的兼容互通。
def to_rclcpp_qos_vector(arg0: str, arg1: int) -> List[QoS]: ...
import rclpy
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from rosbag2_py import to_rclcpp_qos_vector, Recorder, RecordOptions# 初始化rclpy(需初始化才能创建QoS配置)
rclpy.init()# 定义多个话题的QoS配置(Python列表)
qos_profiles = [# 话题1:高可靠性配置QoSProfile(reliability=ReliabilityPolicy.RELIABLE,history=HistoryPolicy.KEEP_LAST,depth=10),# 话题2:尽力而为配置(适合高频数据)QoSProfile(reliability=ReliabilityPolicy.BEST_EFFORT,history=HistoryPolicy.KEEP_LAST,depth=100)
]# 转换为C++层面的rclcpp QoS向量
rclcpp_qos_vector = to_rclcpp_qos_vector(qos_profiles)# 配置录制参数,应用转换后的QoS
record_options = RecordOptions()
record_options
# 假设录制的话题序列与QoS配置一一对应
record_options.topics = ["/camera/image", "/lidar/points"]
record_options.qos_profile_overrides = rclcpp_qos_vector # 传递C++兼容的QoS向量
# 其他配置...# 启动录制器(底层C++逻辑将使用转换后的QoS配置订阅话题)
recorder = Recorder()
recorder.open(record_options)
recorder.start()
-
ReadOrder
-
用于指定从 ROS 2 bag 文件中读取消息时的顺序规则。它决定了消息被解析和处理的时序逻辑,核心作用是解决 “以何种顺序读取 bag 中存储的多话题消息” 的问题
1、TIMESTAMP(按时间戳排序)含义:读取消息时,按消息自带的时间戳(通常是发布时间)进行全局排序,确保消息严格按照时间先后顺序被处理。
适用场景:需要还原真实物理时间线的场景,例如:
回放传感器数据时保持时间同步(如激光雷达与相机数据的时序对齐)。
离线分析多设备协同工作时的时间关联性。
2、FILE(按文件存储顺序)
含义:按消息被写入 bag 文件的原始顺序读取(即录制时的存储顺序),不额外进行时间排序。
适用场景:更关注读取效率或原始存储顺序的场景,例如:
快速遍历 bag 文件内容(无需排序开销,性能更高)。
分析录制过程中的数据写入逻辑(如验证录制是否存在异常顺序)
import rosbag2_py
from rosbag2_py import StorageOptions, ConverterOptions, ReadOrder# 配置 bag 文件路径和格式
storage_options = StorageOptions(uri="my_recording", storage_id="mcap")
converter_options = ConverterOptions("cdr", "cdr")# 创建读取器
reader = rosbag2_py.SequentialReader()
reader.open(storage_options, converter_options)# 设置读取顺序为按时间戳排序(还原真实时序)
reader.set_read_order(ReadOrder.TIMESTAMP)# 按配置的顺序读取消息
while reader.has_next():topic, msg, timestamp = reader.read_next()print(f"[{timestamp}] {topic}: {msg}")
- ReadOrderSortBy
- Reindexer
- SequentialCompressionReader
- SequentialCompressionWriter
- SequentialReader
- SequentialWriter 【重要】
源码:
class SequentialWriter:def __init__(self) -> None: ...def close(self) -> None: ...def create_topic(self, arg0: rosbag2_py._storage.TopicMetadata) -> None: ...def open(self, arg0: rosbag2_py._storage.StorageOptions, arg1: rosbag2_py._storage.ConverterOptions) -> None: ...def remove_topic(self, arg0: rosbag2_py._storage.TopicMetadata) -> None: ...def split_bagfile(self) -> None: ...def take_snapshot(self) -> bool: ...@overloaddef write(self, arg0: str, arg1: str, arg2: int) -> None: ...@overloaddef write(self, arg0: str, arg1: str, arg2: int, arg3: int) -> None: ...
一般先执行初始化,然后open,指定写入位置以及文件类型;然后create_topic。最后执行write(topic名字, 序列化msg内容,时间戳)
其中:
write 方法是 SequentialWriter 的核心功能,用于将 ROS 2 消息(已序列化)写入到 bag 文件中,同时记录消息的话题、时间戳等元信息
topic_name:字符串,消息所属的话题名称(如 "/lidar/points")。
serialized_message:序列化后的消息数据(通常是 rclpy.serialization.serialize_message 处理后的二进制数据)。
timestamp:整数(纳秒级),消息的时间戳(通常是消息发布时间或录制时间)import rclpy
from rclpy.serialization import serialize_message
from rosbag2_py import SequentialWriter, StorageOptions, ConverterOptions# 初始化写入器
writer = SequentialWriter()
writer.open(StorageOptions(uri="my_bag", storage_id="mcap"),ConverterOptions("cdr", "cdr")
)# 注册话题(需先注册才能写入消息)
writer.create_topic(TopicMetadata(name="/camera/image",type="sensor_msgs/msg/Image",serialization_format="cdr"
))# 假设接收到一个Image消息msg
msg = ... # 从话题订阅中获取的消息
serialized_msg = serialize_message(msg) # 序列化消息
timestamp = ... # 消息的时间戳(纳秒)# 写入消息到bag文件
writer.write("/camera/image", serialized_msg, timestamp)
remove_topic:动态管理录制的话题,允许在不停止整个录制过程的情况下,移除特定话题的写入权限
- StorageFilter
用于在读取或处理 ROS 2 bag 文件时筛选数据,即按照指定规则过滤出需要的消息,忽略不需要的内容,从而提高数据处理效率。 - StorageOptions 【重要】
核心配置类,用于定义 ROS 2 bag 文件的存储相关参数,包括文件路径、存储格式、分片策略等,是控制 bag 文件如何被创建、读取或写入的基础配置。
源码:
class StorageOptions:custom_data: Dict[str, str]end_time_ns: intmax_bagfile_duration: intmax_bagfile_size: intmax_cache_size: intsnapshot_mode: boolstart_time_ns: intstorage_config_uri: strstorage_id: strstorage_preset_profile: struri: strdef __init__(self, uri: str, storage_id: str = ..., max_bagfile_size: int = ..., max_bagfile_duration: int = ..., max_cache_size: int = ..., storage_preset_profile: str = ..., storage_config_uri: str = ..., snapshot_mode: bool = ..., start_time_ns: int = ..., end_time_ns: int = ..., custom_data: Dict[str, str] = ...) -> None: ...
1、uri
类型:字符串
含义:bag 文件的路径或存储目录。
示例:“~/rosbags/my_recording” 表示 bag 文件将存储在该目录下(具体文件会按存储格式自动生成,如 my_recording_0.mcap)。
2、storage_id
类型:字符串
含义:指定 bag 文件的存储格式标识符(如 “sqlite3”、“mcap”),需与系统中已注册的读写器(get_registered_readers/get_registered_writers 返回值)匹配。
示例:storage_id=“mcap” 表示使用 MCAP 格式存储 bag 文件。
3、max_bagfile_size
类型:整数(字节数)
含义:单个 bag 分片文件的最大大小。当文件达到该大小后,会自动创建新的分片文件(如 my_recording_0.mcap、my_recording_1.mcap)。
默认值:通常为 0(表示不限制单个文件大小,仅生成一个文件)。
4、max_bagfile_duration
类型:整数(纳秒)
含义:单个 bag 分片文件的最大录制时长。达到该时长后,自动创建新的分片文件。
示例:max_bagfile_duration=3600e9 表示每小时生成一个新的分片文件。
- TopicMetadata 【重要】
用于存储 ROS 2 话题的元信息(元数据),主要描述话题的基本属性,确保 bag 文件在写入或读取时能正确识别和解析话题数据。在 ROS 2 中,每个话题都有其独特的标识和数据类型(如 /camera/image 话题对应 sensor_msgs/msg/Image 类型)。当录制话题数据到 bag 文件时,需要将这些话题的基本信息(名称、类型、序列化格式等)一并存储,否则后续读取时无法正确解析消息内容。
TopicMetadata 的核心作用就是封装这些必要的话题元信息,作为写入器(如 SequentialWriter)注册话题时的参数,确保 bag 文件中不仅包含消息数据,还包含足够的元信息来解释这些数据的类型和结构。源码:
class TopicMetadata:id: intname: stroffered_qos_profiles: List[QoS]serialization_format: strtype: strtype_description_hash: strdef __init__(self, id: int, name: str, type: str, serialization_format: str, offered_qos_profiles: List[QoS] = ..., type_description_hash: str = ...) -> None: ...def equals(self, arg0: TopicMetadata) -> bool: ...
实例:
import rosbag2_py
from rosbag2_py import TopicMetadata, SequentialWriter, StorageOptions, ConverterOptions# 创建话题元信息:定义一个名为"/camera/image"的话题,消息类型为sensor_msgs/msg/Image,使用cdr序列化
image_topic_meta = TopicMetadata(name="/camera/image",type="sensor_msgs/msg/Image",serialization_format="cdr"
)# 初始化写入器
writer = SequentialWriter()
writer.open(StorageOptions(uri="my_bag", storage_id="mcap"),ConverterOptions("cdr", "cdr")
)# 注册话题(必须在写入消息前完成)
writer.create_topic(image_topic_meta)# 之后才能写入该话题的消息...
- TopicInformation
TopicInformation 是 ros2bag_py 中的一个数据类,用于存储 ROS 2 bag 文件中某个话题的统计信息和元数据汇总,主要用于描述一个话题在 bag 文件中的整体情况(如消息数量、数据范围等),方便快速快速了解该话题的内容规模和特征。
from rosbag2_py import MetadataIo# 读取bag文件的元数据
metadata = MetadataIo().read_metadata("my_bag")# 遍历所有话题的信息
for topic_info in metadata.topics_with_message_count:# 获取话题基础元数据topic_name = topic_info.topic_metadata.nametopic_type = topic_info.topic_metadata.type# 获取统计信息msg_count = topic_info.message_countstart_time = topic_info.starting_timeend_time = topic_info.ending_timeprint(f"话题: {topic_name}")print(f" 消息类型: {topic_type}")print(f" 消息数量: {msg_count}")print(f" 时间范围: {start_time} - {end_time}\n")
-
BagMetadata
-
MessageDefinition
-
MetadataIo
-
Info
-
Player
-
PlayOptions
-
ServiceRequestsSource
-
Recorder
-
RecordOptions
-
LocalMessageDefinitionSource
总结:实现写入mcap直接相关的,只有
SequentialWriter
StorageOptions
TopicMetadata
ConverterOptions
这4个方法
三、源码下载
# 创建工作空间
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws# 克隆ROS 2代码库(以Humble版本为例)
git clone https://github.com/ros2/ros2.git src/ros2
cd src/ros2
git checkout humble# 同步子模块(包含rosbag2相关代码)
vcs import ../ < ros2.repos
vcs pull ../