Register custom message types
Out of the box rosbags only supports the message types that ship with a default ROS2 distribution. If you want to (de)serialize custom messages you need to add them to the type system manually.
From rosbag1
"""Example: Register rosbag1 types."""
from __future__ import annotations
from typing import TYPE_CHECKING
from rosbags.rosbag1 import Reader
from rosbags.typesys import get_types_from_msg, register_types
if TYPE_CHECKING:
from pathlib import Path
def process_bag(src: Path) -> None:
"""Register contained messages types before processing bag.
Args:
src: Bag to process.
"""
with Reader(src) as reader:
typs = {}
for conn in reader.connections:
typs.update(get_types_from_msg(conn.msgdef, conn.msgtype))
register_types(typs)
# Now all message types used in the bag are registered
# for conn, timestamp, data in reader.messages():
# ...
From definition string
"""Example: Register type from definition string."""
from rosbags.serde import serialize_cdr
from rosbags.typesys import get_types_from_msg, register_types
# Your custom message definition
STRIDX_MSG = """
string string
uint32 index
"""
register_types(get_types_from_msg(STRIDX_MSG, 'custom_msgs/msg/StrIdx'))
# Type import works only after the register_types call,
# the classname is derived from the msgtype name above
# pylint: disable=no-name-in-module,wrong-import-position
from rosbags.typesys.types import custom_msgs__msg__StrIdx as StrIdx # type: ignore # noqa
# pylint: enable=no-name-in-module,wrong-import-position
message = StrIdx(string='foo', index=42)
# Rawdata that can be passed to rosbag2.Writer.write
rawdata = serialize_cdr(message, message.__msgtype__)
From multiple files
"""Example: Register types from msg files."""
from pathlib import Path
from rosbags.typesys import get_types_from_msg, register_types
def guess_msgtype(path: Path) -> str:
"""Guess message type name from path."""
name = path.relative_to(path.parents[2]).with_suffix('')
if 'msg' not in name.parts:
name = name.parent / 'msg' / name.name
return str(name)
add_types = {}
for pathstr in [
'/path/to/custom_msgs/msg/Speed.msg',
'/path/to/custom_msgs/msg/Accel.msg',
]:
msgpath = Path(pathstr)
msgdef = msgpath.read_text(encoding='utf-8')
add_types.update(get_types_from_msg(msgdef, guess_msgtype(msgpath)))
register_types(add_types)
# Type import works only after the register_types call,
# the classname is derived from the msgtype names above.
# pylint: disable=no-name-in-module,wrong-import-position
from rosbags.typesys.types import custom_msgs__msg__Accel as Accel # type: ignore # noqa
from rosbags.typesys.types import custom_msgs__msg__Speed as Speed # type: ignore # noqa
# pylint: enable=no-name-in-module,wrong-import-position