roboto.formats.mcap#

Fetching and decoding topic data stored in MCAP files.

Covers chunk-index-driven prefetching over HTTP range requests and decoding of JSON-, ROS1-, and ROS2-encoded messages with field-path projection.

Submodules#

Package Contents#

roboto.formats.mcap.Accessor#

Reads one path’s value out of a decoded message and writes it into an Accumulator.

The accessor is compiled once per (fields, getter_type) and reused for every subsequent message in the same read pass. Compilation resolves ROS time-field name remapping and sequence boundaries against a sample message, so per-call work is just attribute access plus accumulator writes.

roboto.formats.mcap.END_OF_STREAM#

Sentinel returned by McapReader.next_decoded() when the stream is exhausted.

A decoded message value can legitimately be None (a JSON null payload), so exhaustion cannot be signaled with None without making a real null-valued message indistinguishable from end-of-stream. Callers test result is END_OF_STREAM to detect exhaustion and treat every other value – None included – as a delivered message.

class roboto.formats.mcap.McapDialect(*args, **kwds)#

Bases: enum.Enum

The robotics framework dialect of an MCAP file’s messages.

OTHER covers every non-ROS encoding (JSON, omgidl, protobuf, flatbuffer, self-describing) and any absent or unknown encoding.

OTHER = 'other'#
ROS1 = 'ros1'#
ROS2 = 'ros2'#
class roboto.formats.mcap.McapReader(stream, fields, start_time=None, end_time=None, log_time_order=True)#

Reader for processing MCAP files with field projection.

Provides an iterator interface for reading decoded messages from MCAP files, with support for temporal filtering and field selection. Handles multiple encoding formats including JSON, ROS1, and ROS2.

The reader automatically decodes messages using appropriate decoders and filters the output based on the specified fields and time range.

Parameters:
property field_paths: list[str]#

Get the list of fields being projected, as dot-delimited paths.

Returns the dot-delimited names of the fields provided during initialization.

Returns:

List of field names in dot notation.

Return type:

list[str]

property has_next: bool#

Check if there are more messages available to read.

Returns:

True if there are more messages to read, False otherwise.

Return type:

bool

next()#

Read and return the next decoded message.

Advances the reader to the next message and returns it as a DecodedMessage object, or None if no more messages are available.

Returns:

DecodedMessage containing the next message data, or None if no more messages.

Return type:

Union[roboto.formats.mcap.decoded_message.DecodedMessage, None]

Examples

>>> while reader.has_next:
...     message = reader.next()
...     if message:
...         data = message.to_dict()
...         print(f"Message at {data.get('log_time')}: {data}")
next_decoded()#

Read and return the next message’s raw decoded value, advancing the reader.

The raw value is what the format decoder produced – a dict for JSON-encoded messages, a dynamically created class instance for ROS1/ROS2 – with no projection applied. Callers that want projected dictionary output use next() and DecodedMessage.to_dict() instead.

A decoded value of None is a real message (a JSON null payload) and is delivered as such. Exhaustion is signaled with the dedicated END_OF_STREAM sentinel instead, so callers must test result is END_OF_STREAM rather than result is None to detect the end.

Returns:

The decoded message value, or END_OF_STREAM if no more messages are available.

Return type:

Any

property next_envelope_timestamp: McapEnvelopeTimestamp#

Get the envelope timestamps of the next message to be read.

Returns:

The next message’s log_time and publish_time in nanoseconds, or both math.inf if no more messages.

Return type:

McapEnvelopeTimestamp

next_message_is_time_aligned(timestamp)#

Check if the next message has the specified timestamp.

Used for time-aligned reading when merging data from multiple readers.

Parameters:

timestamp (Union[int, float]) – Timestamp to check against in nanoseconds.

Returns:

True if the next message has the specified timestamp, False otherwise.

Return type:

bool

property schema_encoding: str | None#

The MCAP Schema-record encoding of the next message to be read.

The encoding (e.g. "ros1msg", "ros2msg", "jsonschema") identifies the framework or format that produced the messages; roboto.formats.mcap.dialect_from_schema_encoding() maps it to an McapDialect.

Returns:

The schema record’s encoding, or None if the reader is exhausted or the next message carries no schema (a schemaless channel).

Return type:

Optional[str]

roboto.formats.mcap.Resolution#

a no-op, a simple attribute chain, or a per-element sequence crossing.

Build one with none_resolution(), simple_resolution(), or sequence_resolution(), then compile it with build_accessor().

Type:

A resolved accessor path

roboto.formats.mcap.build_accessor(resolution, getter, is_class_getter)#

Compile a resolution into an Accessor that reads its path into an accumulator.

The resolution carries the (possibly time-remapped) structure; this only selects the matching runtime walk. It does not sample, so a caller that built the resolution from a schema can compile without a message in hand.

Parameters:
  • resolution (Resolution)

  • getter (AttrGetter)

  • is_class_getter (bool)

Return type:

Accessor

roboto.formats.mcap.compile_accessors(fields, sample, getter)#

Compile one accessor per field. Does not cache; callers manage caching.

Returns a tuple of (accessors, fully_resolved). fully_resolved is False if any path traversed an empty sequence in sample and the inner shape past it had to be guessed. Callers maintaining a cross-message cache should not cache speculative compilations, since the next message may need a different shape.

Parameters:
Return type:

tuple[list[Accessor], bool]

roboto.formats.mcap.dialect_from_schema_encoding(encoding)#

The McapDialect named by an MCAP Schema record’s encoding.

ros1msg -> ROS1; ros2msg/ros2idl -> ROS2; every other encoding (including None, an unknown string, or a non-ROS well-known encoding) -> OTHER. Literals are taken from mcap.well_known.SchemaEncoding.

Examples

>>> dialect_from_schema_encoding("ros1msg")
<McapDialect.ROS1: 'ros1'>
>>> dialect_from_schema_encoding("ros2idl")
<McapDialect.ROS2: 'ros2'>
>>> dialect_from_schema_encoding(None)
<McapDialect.OTHER: 'other'>
Parameters:

encoding (Optional[str])

Return type:

McapDialect

roboto.formats.mcap.getter_for(message)#

The shared attribute getter matching a decoded message’s shape.

Parameters:

message (Any)

Return type:

AttrGetter

roboto.formats.mcap.none_resolution()#

A resolution whose accessor is a no-op (the path is absent on a message).

Return type:

Resolution

roboto.formats.mcap.open_for_window(signed_url, start_time=None, end_time=None)#

Open a remote MCAP file for reading, prefetching only the chunks in a log-time window.

Reads the file’s summary section to locate its chunk index, identifies the chunks whose message log times intersect [start_time, end_time), and prefetches that byte range in parallel so subsequent sequential reads hit the local buffer instead of issuing many small HTTP range requests. The returned reader is positioned at the start of the file. The caller owns it and must close() it.

The window is matched against the chunk index’s log-time bounds; when row timestamps come from somewhere other than the message log time, pass no bounds (prefetching then covers every chunk) and filter rows after decode.

Parameters:
  • signed_url (str) – Resolved download URL of the MCAP file.

  • start_time (Optional[int]) – Inclusive window lower bound in nanoseconds, or None for unbounded.

  • end_time (Optional[int]) – Exclusive window upper bound in nanoseconds, or None for unbounded.

Returns:

An HttpRangeReader over the file, primed with the in-window chunk bytes and positioned at offset 0.

Return type:

roboto.storage.HttpRangeReader

roboto.formats.mcap.remap_time_fields(resolution, sample, getter)#

Substitute ROS runtime time-field names into resolution, observed against sample.

A schema declares a ROS time struct’s leaves by their canonical names (sec, nsec), but a decoded ROS1 message exposes secs/nsecs and a ROS2 message sec/nanosec. This walks sample along the resolution and rewrites the trailing path components past any time value to the runtime names, so the built accessor reads the right attributes.

Returns (remapped, time_resolved). time_resolved is False only when a time-bearing leaf sits past a sequence that is empty in sample — its element cannot be observed, so the runtime names stay a guess and the caller should re-resolve against a later, non-empty message. A resolution with no time component is returned unchanged with True. JSON messages (canonical names) are always a no-op.

Parameters:
  • resolution (Resolution)

  • sample (Any)

  • getter (AttrGetter)

Return type:

tuple[Resolution, bool]

roboto.formats.mcap.sequence_resolution(pre_path, sub)#

A resolution that crosses the sequence at pre_path, applying sub per element.

Parameters:
  • pre_path (collections.abc.Sequence[str])

  • sub (Resolution)

Return type:

Resolution

roboto.formats.mcap.simple_resolution(path)#

A resolution for a straight attribute chain (no sequence crossing).

Parameters:

path (collections.abc.Sequence[str])

Return type:

Resolution