Use with native stack
Messages read with rosbags are simple dataclasses that mimic the native ROS2 interface. If you want to pass those messages to the native ROS2 stack, you need to convert them into native objects first.
Message instance conversion
"""Example: Message instance conversion."""
from __future__ import annotations
import importlib
from typing import TYPE_CHECKING
import numpy
if TYPE_CHECKING:
from typing import Any
NATIVE_CLASSES: dict[str, Any] = {}
def to_native(msg: Any) -> Any: # noqa: ANN401
"""Convert rosbags message to native message.
Args:
msg: Rosbags message.
Returns:
Native message.
"""
msgtype: str = msg.__msgtype__
if msgtype not in NATIVE_CLASSES:
pkg, name = msgtype.rsplit('/', 1)
NATIVE_CLASSES[msgtype] = getattr(importlib.import_module(pkg.replace('/', '.')), name)
fields = {}
for name, field in msg.__dataclass_fields__.items():
if 'ClassVar' in field.type:
continue
value = getattr(msg, name)
if '__msg__' in field.type:
value = to_native(value)
elif isinstance(value, list):
value = [to_native(x) for x in value]
elif isinstance(value, numpy.ndarray):
value = value.tolist()
fields[name] = value
return NATIVE_CLASSES[msgtype](**fields)
if __name__ == '__main__':
from rosbags.typesys.types import (
builtin_interfaces__msg__Time,
sensor_msgs__msg__Image,
std_msgs__msg__Header,
)
image = sensor_msgs__msg__Image(
std_msgs__msg__Header(
builtin_interfaces__msg__Time(42, 666),
'/frame',
),
4,
4,
'rgb8',
False,
4 * 3,
numpy.zeros(4 * 4 * 3, dtype=numpy.uint8),
)
native_image = to_native(image)
# native_image can now be passed to the ROS stack