roboto.formats.mcap#
Fetching and decoding topic data stored in MCAP files.
Covers chunk-index-driven prefetching over HTTP range requests and decoding of JSON-, ROS1-, and ROS2-encoded messages with field-path projection.
Submodules#
Package Contents#
- roboto.formats.mcap.Accessor#
Reads one path’s value out of a decoded message and writes it into an
Accumulator.The accessor is compiled once per
(fields, getter_type)and reused for every subsequent message in the same read pass. Compilation resolves ROS time-field name remapping and sequence boundaries against a sample message, so per-call work is just attribute access plus accumulator writes.
- roboto.formats.mcap.END_OF_STREAM#
Sentinel returned by
McapReader.next_decoded()when the stream is exhausted.A decoded message value can legitimately be
None(a JSONnullpayload), so exhaustion cannot be signaled withNonewithout making a real null-valued message indistinguishable from end-of-stream. Callers testresult is END_OF_STREAMto detect exhaustion and treat every other value –Noneincluded – as a delivered message.
- class roboto.formats.mcap.McapDialect(*args, **kwds)#
Bases:
enum.EnumThe robotics framework dialect of an MCAP file’s messages.
OTHERcovers every non-ROS encoding (JSON, omgidl, protobuf, flatbuffer, self-describing) and any absent or unknown encoding.- OTHER = 'other'#
- ROS1 = 'ros1'#
- ROS2 = 'ros2'#
- class roboto.formats.mcap.McapReader(stream, fields, start_time=None, end_time=None, log_time_order=True)#
Reader for processing MCAP files with field projection.
Provides an iterator interface for reading decoded messages from MCAP files, with support for temporal filtering and field selection. Handles multiple encoding formats including JSON, ROS1, and ROS2.
The reader automatically decodes messages using appropriate decoders and filters the output based on the specified fields and time range.
- Parameters:
stream (IO[bytes])
fields (collections.abc.Sequence[roboto.formats.fields.FieldSelection])
start_time (Optional[int])
end_time (Optional[int])
log_time_order (bool)
- property field_paths: list[str]#
Get the list of fields being projected, as dot-delimited paths.
Returns the dot-delimited names of the fields provided during initialization.
- Returns:
List of field names in dot notation.
- Return type:
list[str]
- property has_next: bool#
Check if there are more messages available to read.
- Returns:
True if there are more messages to read, False otherwise.
- Return type:
bool
- next()#
Read and return the next decoded message.
Advances the reader to the next message and returns it as a DecodedMessage object, or None if no more messages are available.
- Returns:
DecodedMessage containing the next message data, or None if no more messages.
- Return type:
Union[roboto.formats.mcap.decoded_message.DecodedMessage, None]
Examples
>>> while reader.has_next: ... message = reader.next() ... if message: ... data = message.to_dict() ... print(f"Message at {data.get('log_time')}: {data}")
- next_decoded()#
Read and return the next message’s raw decoded value, advancing the reader.
The raw value is what the format decoder produced – a dict for JSON-encoded messages, a dynamically created class instance for ROS1/ROS2 – with no projection applied. Callers that want projected dictionary output use
next()andDecodedMessage.to_dict()instead.A decoded value of
Noneis a real message (a JSONnullpayload) and is delivered as such. Exhaustion is signaled with the dedicatedEND_OF_STREAMsentinel instead, so callers must testresult is END_OF_STREAMrather thanresult is Noneto detect the end.- Returns:
The decoded message value, or
END_OF_STREAMif no more messages are available.- Return type:
Any
- property next_envelope_timestamp: McapEnvelopeTimestamp#
Get the envelope timestamps of the next message to be read.
- Returns:
The next message’s
log_timeandpublish_timein nanoseconds, or bothmath.infif no more messages.- Return type:
- next_message_is_time_aligned(timestamp)#
Check if the next message has the specified timestamp.
Used for time-aligned reading when merging data from multiple readers.
- Parameters:
timestamp (Union[int, float]) – Timestamp to check against in nanoseconds.
- Returns:
True if the next message has the specified timestamp, False otherwise.
- Return type:
bool
- property schema_encoding: str | None#
The MCAP Schema-record encoding of the next message to be read.
The encoding (e.g.
"ros1msg","ros2msg","jsonschema") identifies the framework or format that produced the messages;roboto.formats.mcap.dialect_from_schema_encoding()maps it to anMcapDialect.- Returns:
The schema record’s
encoding, orNoneif the reader is exhausted or the next message carries no schema (a schemaless channel).- Return type:
Optional[str]
- roboto.formats.mcap.Resolution#
a no-op, a simple attribute chain, or a per-element sequence crossing.
Build one with
none_resolution(),simple_resolution(), orsequence_resolution(), then compile it withbuild_accessor().- Type:
A resolved accessor path
- roboto.formats.mcap.build_accessor(resolution, getter, is_class_getter)#
Compile a resolution into an
Accessorthat reads its path into an accumulator.The resolution carries the (possibly time-remapped) structure; this only selects the matching runtime walk. It does not sample, so a caller that built the resolution from a schema can compile without a message in hand.
- Parameters:
resolution (Resolution)
getter (AttrGetter)
is_class_getter (bool)
- Return type:
Accessor
- roboto.formats.mcap.compile_accessors(fields, sample, getter)#
Compile one accessor per field. Does not cache; callers manage caching.
Returns a tuple of
(accessors, fully_resolved).fully_resolvedisFalseif any path traversed an empty sequence insampleand the inner shape past it had to be guessed. Callers maintaining a cross-message cache should not cache speculative compilations, since the next message may need a different shape.- Parameters:
fields (collections.abc.Sequence[FieldSelection])
sample (Any)
getter (AttrGetter)
- Return type:
tuple[list[Accessor], bool]
- roboto.formats.mcap.dialect_from_schema_encoding(encoding)#
The
McapDialectnamed by an MCAP Schema record’sencoding.ros1msg-> ROS1;ros2msg/ros2idl-> ROS2; every other encoding (includingNone, an unknown string, or a non-ROS well-known encoding) -> OTHER. Literals are taken frommcap.well_known.SchemaEncoding.Examples
>>> dialect_from_schema_encoding("ros1msg") <McapDialect.ROS1: 'ros1'> >>> dialect_from_schema_encoding("ros2idl") <McapDialect.ROS2: 'ros2'> >>> dialect_from_schema_encoding(None) <McapDialect.OTHER: 'other'>
- Parameters:
encoding (Optional[str])
- Return type:
- roboto.formats.mcap.getter_for(message)#
The shared attribute getter matching a decoded message’s shape.
- Parameters:
message (Any)
- Return type:
- roboto.formats.mcap.none_resolution()#
A resolution whose accessor is a no-op (the path is absent on a message).
- Return type:
Resolution
- roboto.formats.mcap.open_for_window(signed_url, start_time=None, end_time=None)#
Open a remote MCAP file for reading, prefetching only the chunks in a log-time window.
Reads the file’s summary section to locate its chunk index, identifies the chunks whose message log times intersect
[start_time, end_time), and prefetches that byte range in parallel so subsequent sequential reads hit the local buffer instead of issuing many small HTTP range requests. The returned reader is positioned at the start of the file. The caller owns it and mustclose()it.The window is matched against the chunk index’s log-time bounds; when row timestamps come from somewhere other than the message log time, pass no bounds (prefetching then covers every chunk) and filter rows after decode.
- Parameters:
signed_url (str) – Resolved download URL of the MCAP file.
start_time (Optional[int]) – Inclusive window lower bound in nanoseconds, or
Nonefor unbounded.end_time (Optional[int]) – Exclusive window upper bound in nanoseconds, or
Nonefor unbounded.
- Returns:
An
HttpRangeReaderover the file, primed with the in-window chunk bytes and positioned at offset 0.- Return type:
- roboto.formats.mcap.remap_time_fields(resolution, sample, getter)#
Substitute ROS runtime time-field names into
resolution, observed againstsample.A schema declares a ROS time struct’s leaves by their canonical names (
sec,nsec), but a decoded ROS1 message exposessecs/nsecsand a ROS2 messagesec/nanosec. This walkssamplealong the resolution and rewrites the trailing path components past any time value to the runtime names, so the built accessor reads the right attributes.Returns
(remapped, time_resolved).time_resolvedisFalseonly when a time-bearing leaf sits past a sequence that is empty insample— its element cannot be observed, so the runtime names stay a guess and the caller should re-resolve against a later, non-empty message. A resolution with no time component is returned unchanged withTrue. JSON messages (canonical names) are always a no-op.- Parameters:
resolution (Resolution)
sample (Any)
getter (AttrGetter)
- Return type:
tuple[Resolution, bool]
- roboto.formats.mcap.sequence_resolution(pre_path, sub)#
A resolution that crosses the sequence at
pre_path, applyingsubper element.- Parameters:
pre_path (collections.abc.Sequence[str])
sub (Resolution)
- Return type:
Resolution
- roboto.formats.mcap.simple_resolution(path)#
A resolution for a straight attribute chain (no sequence crossing).
- Parameters:
path (collections.abc.Sequence[str])
- Return type:
Resolution