roboto.formats.mcap.accessor#

Module Contents#

roboto.formats.mcap.accessor.Accessor#

Reads one path’s value out of a decoded message and writes it into an Accumulator.

The accessor is compiled once per (fields, getter_type) and reused for every subsequent message in the same read pass. Compilation resolves ROS time-field name remapping and sequence boundaries against a sample message, so per-call work is just attribute access plus accumulator writes.

class roboto.formats.mcap.accessor.AccessorCache#

Holds compiled Accessor callables for the lifetime of a single read pass.

The cache lives on the reader rather than at module scope so that two readers consuming topics that happen to share a path_in_schema tuple but resolve through different schemas (e.g., ROS1 vs. ROS2 timestamps) cannot pollute each other.

get_or_compile(fields, sample, getter)#

Return accessors for these fields, compiling against sample on first call.

Compilations that hit an empty sequence in the sample are speculative — the inner shape past the empty point can’t be observed — and are returned without caching so a later message with a non-empty sequence triggers a fresh, complete compile.

Parameters:
Return type:

list[Accessor]

roboto.formats.mcap.accessor.Accumulator#

Output dict the accessors write into. Nested keys materialize as nested dicts.

class roboto.formats.mcap.accessor.AttrGetter#

Bases: abc.ABC

Abstract base class for accessing attributes from decoded messages.

Provides a unified interface for extracting attributes from different types of decoded message data, whether they are dictionaries (JSON) or dynamically created classes (ROS1/ROS2). Used by message decoders to handle various message encoding formats in a consistent way.

static get_attribute(value, attribute)#
Abstractmethod:

Return type:

Any

Get the value of a specific attribute from the given value.

Parameters:
  • value – The decoded message value to access.

  • attribute – Name of the attribute to retrieve.

Returns:

The value of the specified attribute.

Return type:

Any

static get_attribute_names(value)#
Abstractmethod:

Return type:

collections.abc.Sequence[str]

Get the names of all attributes available in the given value.

Parameters:

value – The decoded message value to inspect.

Returns:

Sequence of attribute names available in the value.

Return type:

collections.abc.Sequence[str]

static has_attribute(value, attribute)#
Abstractmethod:

Parameters:

attribute (str)

Return type:

bool

Check if the given value has a specific attribute.

Parameters:
  • value – The decoded message value to inspect.

  • attribute (str) – Name of the attribute to check for.

Returns:

True if the value has the specified attribute, False otherwise.

Return type:

bool

static has_sub_attributes(value)#
Abstractmethod:

Return type:

bool

Check if the given value has nested attributes that can be accessed.

Parameters:

value – The decoded message value to inspect.

Returns:

True if the value has nested attributes, False otherwise.

Return type:

bool

class roboto.formats.mcap.accessor.ClassAttrGetter#

Bases: AttrGetter

Attribute getter for class-based decoded data.

Handles access to attributes from decoded messages that are represented as dynamically created classes with __slots__ at runtime. This includes ROS1, ROS2, and other message formats that use class-based representations.

static get_attribute(value, attribute)#

Get the value of a specific attribute from the given value.

Parameters:
  • value – The decoded message value to access.

  • attribute – Name of the attribute to retrieve.

Returns:

The value of the specified attribute.

static get_attribute_names(value)#

Get the names of all attributes available in the given value.

Parameters:

value – The decoded message value to inspect.

Returns:

Sequence of attribute names available in the value.

static has_attribute(value, attribute)#

Check if the given value has a specific attribute.

Parameters:
  • value – The decoded message value to inspect.

  • attribute (str) – Name of the attribute to check for.

Returns:

True if the value has the specified attribute, False otherwise.

Return type:

bool

static has_sub_attributes(value)#

Check if the given value has nested attributes that can be accessed.

Parameters:

value – The decoded message value to inspect.

Returns:

True if the value has nested attributes, False otherwise.

class roboto.formats.mcap.accessor.DictAttrGetter#

Bases: AttrGetter

Attribute getter for JSON decoded data.

Handles access to attributes from JSON decoded messages, which are represented as standard Python dictionaries.

static get_attribute(value, attribute)#

Get the value of a specific attribute from the given value.

Parameters:
  • value – The decoded message value to access.

  • attribute – Name of the attribute to retrieve.

Returns:

The value of the specified attribute.

static get_attribute_names(value)#

Get the names of all attributes available in the given value.

Parameters:

value – The decoded message value to inspect.

Returns:

Sequence of attribute names available in the value.

static has_attribute(value, attribute)#

Check if the given value has a specific attribute.

Parameters:
  • value – The decoded message value to inspect.

  • attribute (str) – Name of the attribute to check for.

Returns:

True if the value has the specified attribute, False otherwise.

Return type:

bool

static has_sub_attributes(value)#

Check if the given value has nested attributes that can be accessed.

Parameters:

value – The decoded message value to inspect.

Returns:

True if the value has nested attributes, False otherwise.

roboto.formats.mcap.accessor.PathInSchema#

One field’s path through a message schema, e.g. ("header", "stamp", "sec").

roboto.formats.mcap.accessor.Resolution#

a no-op, a simple attribute chain, or a per-element sequence crossing.

Build one with none_resolution(), simple_resolution(), or sequence_resolution(), then compile it with build_accessor().

Type:

A resolved accessor path

roboto.formats.mcap.accessor.build_accessor(resolution, getter, is_class_getter)#

Compile a resolution into an Accessor that reads its path into an accumulator.

The resolution carries the (possibly time-remapped) structure; this only selects the matching runtime walk. It does not sample, so a caller that built the resolution from a schema can compile without a message in hand.

Parameters:
  • resolution (Resolution)

  • getter (AttrGetter)

  • is_class_getter (bool)

Return type:

Accessor

roboto.formats.mcap.accessor.compile_accessors(fields, sample, getter)#

Compile one accessor per field. Does not cache; callers manage caching.

Returns a tuple of (accessors, fully_resolved). fully_resolved is False if any path traversed an empty sequence in sample and the inner shape past it had to be guessed. Callers maintaining a cross-message cache should not cache speculative compilations, since the next message may need a different shape.

Parameters:
Return type:

tuple[list[Accessor], bool]

roboto.formats.mcap.accessor.getter_for(message)#

The shared attribute getter matching a decoded message’s shape.

Parameters:

message (Any)

Return type:

AttrGetter

roboto.formats.mcap.accessor.is_ros1_time_value(val)#
Parameters:

val (Any)

Return type:

bool

roboto.formats.mcap.accessor.is_ros2_time_value(val)#

Structural type checking because Time and Duration classes are dynamically generated

Parameters:

val (Any)

Return type:

bool

roboto.formats.mcap.accessor.none_resolution()#

A resolution whose accessor is a no-op (the path is absent on a message).

Return type:

Resolution

roboto.formats.mcap.accessor.remap_time_fields(resolution, sample, getter)#

Substitute ROS runtime time-field names into resolution, observed against sample.

A schema declares a ROS time struct’s leaves by their canonical names (sec, nsec), but a decoded ROS1 message exposes secs/nsecs and a ROS2 message sec/nanosec. This walks sample along the resolution and rewrites the trailing path components past any time value to the runtime names, so the built accessor reads the right attributes.

Returns (remapped, time_resolved). time_resolved is False only when a time-bearing leaf sits past a sequence that is empty in sample — its element cannot be observed, so the runtime names stay a guess and the caller should re-resolve against a later, non-empty message. A resolution with no time component is returned unchanged with True. JSON messages (canonical names) are always a no-op.

Parameters:
  • resolution (Resolution)

  • sample (Any)

  • getter (AttrGetter)

Return type:

tuple[Resolution, bool]

roboto.formats.mcap.accessor.sequence_resolution(pre_path, sub)#

A resolution that crosses the sequence at pre_path, applying sub per element.

Parameters:
  • pre_path (collections.abc.Sequence[str])

  • sub (Resolution)

Return type:

Resolution

roboto.formats.mcap.accessor.simple_resolution(path)#

A resolution for a straight attribute chain (no sequence crossing).

Parameters:

path (collections.abc.Sequence[str])

Return type:

Resolution