rosbridge_suit、roslibpy 源码阅读与简单测试 —— 图片编解码与传输
Title: rosbridge_suit、roslibpy 源码阅读与简单测试 —— 图片编解码与传输
文章目录
- I. rosbridge_suit
- II. roslibpy
- III. rosbridge_suit 中 PNG compression 编解码
- IV. roslibpy 中 PNG compression 客户端例子
- V. 简单测试实例
- VI. 小节
以下简单记录, 以备忘.
I. rosbridge_suit
rosbridge_suit (https://github.com/RobotWebTools/rosbridge_suite) 是 rosbridge v2 Protocol 的服务器实现. 作用是为第三方非 ROS 应用提供 ROS 的 JSON 接口. 比如可以在非 ROS 应用中订阅 ROS Topics、调用 ROS Services 等. ROS2 版本中利用 WebSocket 实现网络传输, 之前 ROS1 版本还支持 TCP 和 UDP.
安装在运行 ROS 的主机上 (以 ROS2 Jazzy 版本为例)
sudo apt-get install ros-jazzy-rosbridge-server
source /opt/ros/jazzy/setup.bash
II. roslibpy
roslibpy (https://github.com/gramaziokohler/roslibpy) 是 RosBridge 的客户端, 在 WebSocket 传输层上实现与 rosbridge 的通讯. 在没有 ROS 环境甚至非 Linux 系统下, 为 Python 应用提供诸如 publishing、subscribing、service calls、actionlib、TF 等 ROS 基本功能.
安装在非 ROS 主机上
conda install roslibpy
III. rosbridge_suit 中 PNG compression 编解码
rosbridge_suite/ROSBRIDGE_PROTOCOL.md
中的 # rosbridge v2.0 Protocol Specification
描述, rosbridge 将图片、地图等转化/压缩为 PNG-encoded 字节码.
#### 3.1.2 PNG compression ( _png_ ) [experimental]Some messages (such as images and maps) can be extremely large, and for efficiency
reasons we may wish to transfer them as PNG-encoded bytes. The PNG opcode
duplicates the fragmentation logic of the FRG opcode (and it is possible and
reasonable to only have a single fragment), except that the data field consists
of ASCII-encoded PNG bytes.```json
{ "op": "png",(optional) "id": <string>,"data": <string>,(optional) "num": <int>,(optional) "total": <int>
}
```* **id** – only required if the message is fragmented. Identifies thefragments for the fragmented message.* **data** – a fragment of a PNG-encoded message or an entire message.* **num** – only required if the message is fragmented. The index of the fragment.* **total** – only required if the message is fragmented. The total number of fragments.To construct a PNG compressed message, take the JSON string of the original
message and read the bytes of the string into a PNG image. Then, ASCII-encode
the image. This string is now used as the data field. If fragmentation is
necessary, then fragment the data and set the ID, num and total fields to the
appropriate values in the fragments. Otherwise these fields can be left out.
[rosbridge_suit] rosbridge_library/src/rosbridge_library/internal/pngcompression.py
中对 PNG compression 编解码的实现.
from base64 import standard_b64decode, standard_b64encode
from io import BytesIO
from math import ceil, floor, sqrtfrom PIL import Imagedef encode(string):"""PNG-compress the string in a square RGB image padded with '\n', return the b64 encoded bytes"""string_bytes = string.encode("utf-8")length = len(string_bytes)# RGB 3 channels, square RBG imagewidth = floor(sqrt(length / 3.0)) height = ceil((length / 3.0) / width)bytes_needed = int(width * height * 3)# padded with '\n'string_padded = string_bytes + (b"\n" * (bytes_needed - length))i = Image.frombytes("RGB", (int(width), int(height)), string_padded)buff = BytesIO()i.save(buff, "png")# encoding binary data to printable ASCII charactersencoded = standard_b64encode(buff.getvalue())return encodeddef decode(string):"""b64 decode the string, then PNG-decompress and remove the '\n' padding"""decoded = standard_b64decode(string)buff = BytesIO(decoded)i = Image.open(buff, formats=("png",)).convert("RGB")dec_str = i.tobytes().decode("utf-8")dec_str = dec_str.replace("\n", "") # Remove padding from encodingreturn dec_str
IV. roslibpy 中 PNG compression 客户端例子
[roslibpy] docs/examples/05_subscribe_to_images.py
中的图片格式消息订阅
import base64
import logging
import timeimport roslibpy# Configure logging
fmt = '%(asctime)s %(levelname)8s: %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)
log = logging.getLogger(__name__)client = roslibpy.Ros(host='127.0.0.1', port=9090)def receive_image(msg):log.info('Received image seq=%d', msg['header']['seq'])base64_bytes = msg['data'].encode('ascii')image_bytes = base64.b64decode(base64_bytes)with open('received-image-{}.{}'.format(msg['header']['seq'], msg['format']) , 'wb') as image_file:image_file.write(image_bytes)subscriber = roslibpy.Topic(client, '/camera/image/compressed', 'sensor_msgs/CompressedImage')
subscriber.subscribe(receive_image)client.run_forever()
[roslibpy] docs/examples/04_publish_image.py
中的图片消息发布
import base64
import logging
import timeimport roslibpy# Configure logging
fmt = '%(asctime)s %(levelname)8s: %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)
log = logging.getLogger(__name__)client = roslibpy.Ros(host='127.0.0.1', port=9090)publisher = roslibpy.Topic(client, '/camera/image/compressed', 'sensor_msgs/CompressedImage')
publisher.advertise()def publish_image():with open('robots.jpg', 'rb') as image_file:image_bytes = image_file.read()encoded = base64.b64encode(image_bytes).decode('ascii')publisher.publish(dict(format='jpeg', data=encoded))client.on_ready(publish_image)
client.run_forever()
其中 sensor_msgs/CompressedImage
Message 的结构
# This message contains a compressed imageHeader header # Header timestamp should be acquisition time of image# Header frame_id should be optical frame of camera# origin of frame should be optical center of camera# +x should point to the right in the image# +y should point down in the image# +z should point into to plane of the imagestring format # Specifies the format of the data# Acceptable values differ by the image transport used:# - compressed_image_transport:# ORIG_PIXFMT; CODEC compressed [COMPRESSED_PIXFMT]# where:# - ORIG_PIXFMT is pixel format of the raw image, i.e.# the content of sensor_msgs/Image/encoding with# values from include/sensor_msgs/image_encodings.h# - CODEC is one of [jpeg, png]# - COMPRESSED_PIXFMT is only appended for color images# and is the pixel format used by the compression# algorithm. Valid values for jpeg encoding are:# [bgr8, rgb8]. Valid values for png encoding are:# [bgr8, rgb8, bgr16, rgb16].# If the field is empty or does not correspond to the# above pattern, the image is treated as bgr8 or mono8# jpeg image (depending on the number of channels).# - compressed_depth_image_transport:# ORIG_PIXFMT; compressedDepth CODEC# where:# - ORIG_PIXFMT is pixel format of the raw image, i.e.# the content of sensor_msgs/Image/encoding with# values from include/sensor_msgs/image_encodings.h# It is usually one of [16UC1, 32FC1].# - CODEC is one of [png, rvl]# If the field is empty or does not correspond to the# above pattern, the image is treated as png image.# - Other image transports can store whatever values they# need for successful decoding of the image. Refer to# documentation of the other transports for details.uint8[] data # Compressed image buffer
V. 简单测试实例
- ROS 主机启动各种 ROS 节点, 同时确保启动了 rosbridge_websocket 节点
ros2 launch rosbridge_server rosbridge_websocket_launch.xml
- 非 ROS 环境的 python 应用(简单的测试 demo)
import roslibpy
import time
import cv2
import numpy as np
import base64
import loggingclass RosBridgeTest:def __init__(self, ros_ip='172.17.0.2', ros_port=9090):# 创建ROS客户端连接self.ros = roslibpy.Ros(host=ros_ip, port=ros_port)self.ros.on_ready(self._on_connect)# 创建订阅者topic_name = '/model/robot/odometry'message_type = 'nav_msgs/Odometry'self.listener = roslibpy.Topic(self.ros, topic_name, message_type)# 创建订阅者topic_name_camera_info = '/camera_info'message_type_camera_info = 'sensor_msgs/msg/CameraInfo'self.listener_camera_info = roslibpy.Topic(self.ros, topic_name_camera_info, message_type_camera_info)# 创建订阅者topic_name_image = '/wide_angle_camera'message_type_image = 'sensor_msgs/msg/Image'self.listener_image = roslibpy.Topic(self.ros, topic_name_image, message_type_image, queue_length = 2)# 创建发布者topic_name_cmd_vel = '/model/robot/cmd_vel'message_type_cmd_vel = 'geometry_msgs/msg/Twist'self.publisher_cmd_vel = roslibpy.Topic(self.ros, topic_name_cmd_vel, message_type_cmd_vel)print("Connecting to ROS_BRIDGE: ws://{ros_ip}:{ros_port} ...")def _on_connect(self):"""连接成功的回调"""print("Connected to ROS_BRIDGE_SERVER successfully.")def _publish_cmd_vel(self):if not self.ros.is_connected:print("ROS is not connected. Messages cannot be sent.")returntwist = {'linear': {'x': 0.1, 'y': 0.0, 'z': 0.0},'angular': {'x': 0.0, 'y': 0.0, 'z': 0.0}}count = 0while (count < 3):self.publisher_cmd_vel.publish(roslibpy.Message(twist))count += 1def _subscribe_odometry(self):self.listener.subscribe(self._message_callback)def _subscribe_camera_info(self):self.listener_camera_info.subscribe(self._message_callback_camera_info)def _subscribe_image(self):self.listener_image.subscribe(self._image_callback)# self.listener_image.subscribe(self.receive_image)def _message_callback(self, message):"""处理接收到的消息(/model/robot/odometry)"""print(f"\n[收到消息 /model/robot/odometry]:\n{message}")def _message_callback_camera_info(self, message):"""处理接收到的消息(/camera_info)"""print(f"\n[收到消息 /camera_info]:\n{message}")def _image_callback(self, img_message):"""处理接收到的消息(/wide_angle_camera)"""print(f"\n[收到消息 /wide_angle_camera]:\n{img_message['header']}")# 打印图像信息,确认编码格式print(f"Image encoding: {img_message['encoding']}")print(f"Image height: {img_message['height']}")print(f"Image width: {img_message['width']}")print(f"Data size: {len(img_message['data'])}")# img_data = img_message['data']base64_bytes = img_message['data'].encode('ascii')img_data = base64.b64decode(base64_bytes)# 使用np.frombuffer处理字节数据img = np.frombuffer(img_data, dtype=np.uint8)# 计算通道数total_pixels = img_message['height'] * img_message['width']channels = len(img) // total_pixelsprint(f"channels: {channels}")img = img.reshape(img_message['height'], img_message['width'], channels)# 根据encoding字段转换颜色空间if img_message['encoding'] == 'rgb8':img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)# img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)cv2.imshow("preview", img)cv2.waitKey(1)def _run(self):"""运行主循环"""try:while self.ros.is_connected:time.sleep(0.1)except KeyboardInterrupt:print("Closing the connection with ROS ...")self.listener.unsubscribe()self.listener_camera_info.unsubscribe()self.publisher_cmd_vel.unadvertise()self.ros.terminate()if __name__ == "__main__":# 创建对象example = RosBridgeTest()# 连接到 rosexample.ros.run()# 发布消息example._publish_cmd_vel()# 订阅消息example._subscribe_camera_info()example._subscribe_odometry()example._subscribe_image()# 运行主循环example._run()
能够正常接收到消息, OpenCV 显示窗口也能够更新图片.
[收到消息 /wide_angle_camera]:
{‘stamp’: {‘sec’: 32, ‘nanosec’: 10000000}, ‘frame_id’: ‘robot/base_footprint/wideangle_camera’}
Image encoding: rgb8
Image height: 1024
Image width: 1024
Data size: 4194304
channels: 3[收到消息 /camera_info]:
{‘header’: {‘stamp’: {‘sec’: 32, ‘nanosec’: 406000000}, ‘frame_id’: …[收到消息 /model/robot/odometry]:
{‘header’: {‘stamp’: {‘sec’: 32, ‘nanosec’: 880000000}, ‘frame_id’: ‘odom’}, …
存在的问题是, 消息传输的延时比较大.
ROS2 中的图像已经更新很久了, 通过 rosbridge 传过来的非 ROS 测试程序显示才有更新.
第一次运行时, 还误以为程序有问题, 无法更新图片消息呢.
VI. 小节
在基于 rosbridge_suit 的非 ROS 应用与 ROS 节点之间的通讯中, 文本信息的订阅或者发布相对简单直接.
而图像信息的发布或者接收相对比较复杂, 因为涉及到编解码以及图像的恢复.
版权声明:本文为博主原创文章,遵循 CC 4.0 BY 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/woyaomaishu2/article/details/148177281
本文作者:wzf@robotics_notes