Edit message definition
Some ROS messages definitons differ between ROS1 and ROS2, most notably std_msgs/msg/Header
. Even withing the same version of ROS, message definitions can evolve over time. As it is such a common case, rosbags handles std_msgs/msg/Header
automatically when converting between rosbag1 and rosbag2. For all other types it is up to the end user to convert between message definitions afterwards.
Update CameraInfo
"""Example: Edit message definition.
Between ROS1 and ROS2 the CameraInfo message definition has changed.
The D, K, R, and P field names have been changed from upper case to
lower case. This example shows how to downgrade the messages in a
rosbag1 to the proper message definition.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, cast
from build.lib.rosbags.serde.serdes import serialize_ros1
from rosbags.highlevel.anyreader import AnyReader, SimpleTypeStore
from rosbags.rosbag1 import Writer
from rosbags.typesys import get_types_from_msg
from rosbags.typesys.msg import gendefhash
from rosbags.typesys.register import register_types
if TYPE_CHECKING:
from pathlib import Path
from rosbags.interfaces import ConnectionExtRosbag1
from rosbags.typesys.types import sensor_msgs__msg__CameraInfo
# Noetic camera info message definition, taken from:
# http://docs.ros.org/en/noetic/api/sensor_msgs/html/msg/CameraInfo.html
CAMERAINFO_DEFINITION = """
std_msgs/Header header
uint32 height
uint32 width
string distortion_model
float64[] D
float64[9] K
float64[9] R
float64[12] P
uint32 binning_x
uint32 binning_y
sensor_msgs/RegionOfInterest roi
"""
def downgrade_camerainfo_to_rosbag1(src: Path, dst: Path) -> None:
"""Edit message definitions in a rosbag1.
Args:
src: Rosbag1 source path.
dst: Destination path.
topic: Name of topic to remove.
"""
typestore = SimpleTypeStore({})
register_types(
get_types_from_msg(CAMERAINFO_DEFINITION, 'sensor_msgs/msg/CameraInfo'),
typestore,
)
CameraInfo = typestore.sensor_msgs__msg__CameraInfo # type: ignore[attr-defined]
with AnyReader([src]) as reader, Writer(dst) as writer:
conn_map = {}
for conn in reader.connections:
ext = cast('ConnectionExtRosbag1', conn.ext)
# Use updated message definition and md5sum for CameraInfo.
if conn.msgtype == 'sensor_msgs/msg/CameraInfo':
msgdef, md5sum = gendefhash(conn.msgtype, {}, typestore)
else:
msgdef, md5sum = conn.msgdef, conn.digest
conn_map[conn.id] = writer.add_connection(
conn.topic,
conn.msgtype,
msgdef,
md5sum,
ext.callerid,
ext.latching,
)
for conn, timestamp, data in reader.messages():
wconn = conn_map[conn.id]
if conn.msgtype == 'sensor_msgs/msg/CameraInfo':
msg = cast(
'sensor_msgs__msg__CameraInfo',
reader.deserialize(data, conn.msgtype),
)
converted_msg = CameraInfo(
header=msg.header,
height=msg.height,
width=msg.width,
distortion_model=msg.distortion_model,
# Map lower case names to upper case.
D=msg.d,
K=msg.k,
R=msg.r,
P=msg.p,
binning_x=msg.binning_x,
binning_y=msg.binning_y,
roi=msg.roi,
)
data = serialize_ros1(converted_msg, wconn.msgtype, typestore)
writer.write(wconn, timestamp, data)