Save images as rosbag

The following examples show how to create new ROS bags from images.

Save rosbag1

"""Example: Save images as rosbag1."""

import numpy

from rosbags.rosbag1 import Writer
from rosbags.serde import serialize_ros1
from rosbags.typesys.types import builtin_interfaces__msg__Time as Time
from rosbags.typesys.types import sensor_msgs__msg__CompressedImage as CompressedImage
from rosbags.typesys.types import std_msgs__msg__Header as Header

TOPIC = '/camera'
FRAMEID = 'map'

# Contains filenames and their timestamps
IMAGES = [
    ('homer.jpg', 42),
    ('marge.jpg', 43),
]


def save_images() -> None:
    """Iterate over IMAGES and save to output bag."""
    with Writer('output.bag') as writer:
        conn = writer.add_connection(TOPIC, CompressedImage.__msgtype__)

        for path, timestamp in IMAGES:
            message = CompressedImage(
                Header(
                    stamp=Time(
                        sec=int(timestamp // 10**9),
                        nanosec=int(timestamp % 10**9),
                    ),
                    frame_id=FRAMEID,
                ),
                format='jpeg',  # could also be 'png'
                data=numpy.fromfile(path, dtype=numpy.uint8),
            )

            writer.write(
                conn,
                timestamp,
                serialize_ros1(message, CompressedImage.__msgtype__),
            )

Save rosbag2

"""Save multiple images in rosbag2."""

import numpy

from rosbags.rosbag2 import Writer
from rosbags.serde import serialize_cdr
from rosbags.typesys.types import builtin_interfaces__msg__Time as Time
from rosbags.typesys.types import sensor_msgs__msg__CompressedImage as CompressedImage
from rosbags.typesys.types import std_msgs__msg__Header as Header

TOPIC = '/camera'
FRAMEID = 'map'

# Contains filenames and their timestamps
IMAGES = [
    ('homer.jpg', 42),
    ('marge.jpg', 43),
]


def save_images() -> None:
    """Iterate over IMAGES and save to output bag."""
    with Writer('output') as writer:
        conn = writer.add_connection(TOPIC, CompressedImage.__msgtype__, 'cdr', '')

        for path, timestamp in IMAGES:
            message = CompressedImage(
                Header(
                    stamp=Time(
                        sec=int(timestamp // 10**9),
                        nanosec=int(timestamp % 10**9),
                    ),
                    frame_id=FRAMEID,
                ),
                format='jpeg',  # could also be 'png'
                data=numpy.fromfile(path, dtype=numpy.uint8),
            )

            writer.write(
                conn,
                timestamp,
                serialize_cdr(message, message.__msgtype__),
            )