Register custom message types

Out of the box rosbags only supports the message types that ship with a default ROS2 distribution. If you want to (de)serialize custom messages you need to add them to the type system manually.

From rosbag1

"""Example: Register rosbag1 types."""

from __future__ import annotations

from typing import TYPE_CHECKING

from rosbags.rosbag1 import Reader
from rosbags.typesys import get_types_from_msg, register_types

if TYPE_CHECKING:
    from pathlib import Path


def process_bag(src: Path) -> None:
    """Register contained messages types before processing bag.

    Args:
        src: Bag to process.

    """
    with Reader(src) as reader:
        typs = {}
        for conn in reader.connections:
            typs.update(get_types_from_msg(conn.msgdef, conn.msgtype))
        register_types(typs)

        # Now all message types used in the bag are registered
        # for conn, timestamp, data in reader.messages():
        #     ...

From definition string

"""Example: Register type from definition string."""

from rosbags.serde import serialize_cdr
from rosbags.typesys import get_types_from_msg, register_types

# Your custom message definition
STRIDX_MSG = """
string string
uint32 index
"""

register_types(get_types_from_msg(STRIDX_MSG, 'custom_msgs/msg/StrIdx'))

# Type import works only after the register_types call,
# the classname is derived from the msgtype name above

# pylint: disable=no-name-in-module,wrong-import-position
from rosbags.typesys.types import custom_msgs__msg__StrIdx as StrIdx  # type: ignore  # noqa

# pylint: enable=no-name-in-module,wrong-import-position

message = StrIdx(string='foo', index=42)

# Rawdata that can be passed to rosbag2.Writer.write
rawdata = serialize_cdr(message, message.__msgtype__)

From multiple files

"""Example: Register types from msg files."""

from pathlib import Path

from rosbags.typesys import get_types_from_msg, register_types


def guess_msgtype(path: Path) -> str:
    """Guess message type name from path."""
    name = path.relative_to(path.parents[2]).with_suffix('')
    if 'msg' not in name.parts:
        name = name.parent / 'msg' / name.name
    return str(name)


add_types = {}

for pathstr in [
    '/path/to/custom_msgs/msg/Speed.msg',
    '/path/to/custom_msgs/msg/Accel.msg',
]:
    msgpath = Path(pathstr)
    msgdef = msgpath.read_text(encoding='utf-8')
    add_types.update(get_types_from_msg(msgdef, guess_msgtype(msgpath)))

register_types(add_types)

# Type import works only after the register_types call,
# the classname is derived from the msgtype names above.

# pylint: disable=no-name-in-module,wrong-import-position
from rosbags.typesys.types import custom_msgs__msg__Accel as Accel  # type: ignore  # noqa
from rosbags.typesys.types import custom_msgs__msg__Speed as Speed  # type: ignore  # noqa

# pylint: enable=no-name-in-module,wrong-import-position