roboto.experimental.topics.topic#

Module Contents#

roboto.experimental.topics.topic.FieldAddressLike#

A field-subtree address, as a FieldAddress or explicit path components (("pose", "position") for a nested field, ("angular_velocity",) for a top-level one).

Each component is one path_in_schema element; there is no string delimiter, so a component may itself contain a .. A bare string is rejected even though it is structurally a Sequence[str] — splitting it on . would guess at component boundaries, and iterating it would address one field per character; pass the components explicitly instead.

class roboto.experimental.topics.topic.SessionContext(/, **data)#

Bases: pydantic.BaseModel

The Session a Topic is scoped to: limits topic operations to the Session’s associated files and supplies the Session’s aggregate time window as the default window for those operations.

Parameters:

data (Any)

end_time: int | None = None#

Latest time covered by the Session (Unix-epoch ns); the default end_time for get_data*. None when the Session includes no files.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

session_id: str#
start_time: int | None = None#

Earliest time covered by the Session (Unix-epoch ns); the default start_time for get_data*. None when the Session includes no files.

roboto.experimental.topics.topic.TOPIC_DATA_CACHE_SUBDIR = 'topic-data'#

Subdirectory of the client’s cache directory where fetched topic data files are cached.

class roboto.experimental.topics.topic.Topic(record, roboto_client=None, session_context=None)#

A logical stream of robotics data, identified durably across the files that carry it.

Within an organization, topic names are unique; contributions from different files with the same topic name share a single topic identity. By default a Topic reads org-wide and its data-returning methods require an explicit time window.

A Topic carrying a SessionContext —e.g., yielded by list_topics(), or get_topic()— instead scopes topic operations like get_data*` to that Session’s files and defaults the window to the Session’s aggregate bounds.

Parameters:
property context: SessionContext | None#
Return type:

Optional[SessionContext]

classmethod from_id(topic_id, owner_org_id=None, roboto_client=None, session_context=None)#

Load an existing topic by its id.

Parameters:
  • topic_id (str) – Identifier of the topic (ti_*).

  • owner_org_id (Optional[str]) – Organization that owns the topic. If omitted, defaults to the caller’s organization.

  • roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses the default if omitted.

  • session_context (Optional[SessionContext]) – Optional. When provided, scopes topic operations to the session’s files and defaults the read window to the session’s bounds. None reads org-wide.

Returns:

The loaded topic.

Raises:
Return type:

Topic

Examples

>>> from roboto.experimental.topics import Topic
>>> topic = Topic.from_id("ti_abc123")
>>> topic.name
'/camera/image_raw'
classmethod from_record(record, roboto_client=None, session_context=None)#

Wrap an already-loaded topic identity record.

Parameters:
Returns:

A topic backed by record, with no further service calls.

Return type:

Topic

Examples

>>> from roboto.experimental.topics import Topic
>>> topic = Topic.from_record(record)
>>> topic.topic_id  
'ti_abc123'
get_data(start_time=None, end_time=None, fields_include=None, fields_exclude=None, prefer=None, schema_id=None, schema_checksum=None, timeline_source_id=None, timeline_source_name=None, cache_policy=CachePolicy.ADAPTIVE, cache_dir=None)#

Yield this topic’s data within a time window, as (timestamp, record) pairs.

Convenience over get_data_as_record_batches() that unpacks each Arrow RecordBatch into one (timestamp, record) tuple per row. timestamp is the row’s absolute Unix-epoch nanosecond timestamp (an int); record is a dict of the projected fields, with struct fields as nested dicts and list fields as lists. A field the data omits for a row is absent from (or null within) that row’s dict.

Time windowing, field projection, representation selection, sort order, and error behavior are all as documented on get_data_as_record_batches().

Requires the roboto[analytics] extra.

Parameters:
Yields:

(timestamp, record) tuples for the in-window rows, filtered and projected per the arguments.

Raises:
Return type:

collections.abc.Generator[tuple[roboto.domain.topics.Timestamp, dict[str, Any]], None, None]

Examples

>>> from roboto.experimental.topics import Topic
>>> topic = Topic.from_id("ti_abc123")
>>> for timestamp, record in topic.get_data(start_time=t0, end_time=t1):
...     print(timestamp, record)
get_data_as_df(start_time=None, end_time=None, fields_include=None, fields_exclude=None, prefer=None, schema_id=None, schema_checksum=None, timeline_source_id=None, timeline_source_name=None, flatten=False, cache_policy=CachePolicy.ADAPTIVE, cache_dir=None)#

Return this topic’s data within a time window as a pandas DataFrame.

Same pipeline as get_data_as_record_batches(), with the batches packed into a DataFrame whose index is a timezone-aware DatetimeIndex.

Rows return ordered by partition (each file’s data in start order), not interleaved across partitions; within a partition rows keep their stored order. Call df.sort_index() for a strict row-level time-ordered view.

A struct field is returned as a single schema-shaped column of dicts unless flatten is set, which expands every struct level into dot-delimited leaf columns (e.g. pose.position.x). List-typed fields are unaffected by flatten.

Read parameters and error behavior are as documented on get_data_as_record_batches().

Requires the ``roboto[analytics]``extra.

Parameters:
Returns:

DataFrame of the in-window rows indexed by a timezone-aware DatetimeIndex.

Raises:
Return type:

pandas.DataFrame

Examples

>>> from roboto.experimental.topics import Topic
>>> topic = Topic.from_id("ti_abc123")
>>> df = topic.get_data_as_df(start_time=t0, end_time=t1)
get_data_as_record_batches(start_time=None, end_time=None, fields_include=None, fields_exclude=None, prefer=None, schema_id=None, schema_checksum=None, timeline_source_id=None, timeline_source_name=None, cache_policy=CachePolicy.ADAPTIVE, cache_dir=None)#

Yield this topic’s data within a time window, as Arrow RecordBatches.

Each batch carries one column per top-level projected field, with nested struct and list types mirroring the topic’s schema, pruned to the projection, plus a dedicated int64 column of Unix-epoch nanosecond timestamps; locate that column with timestamp_column_index(). A field the data omits for a row surfaces as null at the deepest level that represents the omission (a whole absent subtree is a single null).

Batch sizes and boundaries carry no meaning, and a window matching no rows yields no batches. A topic’s data can span several files (“topic partitions”); rows from different partitions are never mixed within a batch. Partitions arrive ordered by where each file’s data begins. Within a partition, rows keep their stored order, and rows from different partitions are never interleaved. So batches arrive as whole partitions in start order, not as a globally time-sorted row stream. Sort downstream if a strict row-level time order is needed.

Requires the roboto[analytics] extra.

Parameters:
  • start_time (Optional[roboto.time.Time]) – Inclusive window lower bound, as nanoseconds since the Unix epoch or anything convertible via to_epoch_nanoseconds(). None defaults to the session’s lower bound when this topic was obtained from list_topics() or get_topic(); otherwise required (a ValueError is raised when it cannot be resolved).

  • end_time (Optional[roboto.time.Time]) – Inclusive window upper bound, same forms as start_time; defaults to the session’s upper bound on the same terms.

  • fields_include (Optional[collections.abc.Iterable[FieldAddressLike]]) – Field subtrees to project. None projects every field.

  • fields_exclude (Optional[collections.abc.Iterable[FieldAddressLike]]) – Field subtrees to drop from the projection. None drops none.

  • prefer (Optional[roboto.experimental.topics.operations.RepresentationPreference]) – Preferred representation per field subtree, selecting which stored variant of a field to read. None applies the default selection everywhere.

  • schema_id (Optional[str]) – Schema to read under, by id. Required only when the window spans data with more than one schema.

  • schema_checksum (Optional[str]) – Schema to read under, by checksum. Mutually exclusive with schema_id.

  • timeline_source_id (Optional[str]) – Timeline source to resolve the window with, by id. None uses each schema’s default source.

  • timeline_source_name (Optional[str]) – Timeline source by name. Mutually exclusive with timeline_source_id.

  • cache_policy (roboto.storage.CachePolicy) – Whether fetched Parquet files are cached to local disk. MCAP data always streams.

  • cache_dir (Union[str, pathlib.Path, None]) – Directory topic data files are cached under. Defaults to a topic-data subdirectory of ROBOTO_CACHE_DIR, or the platform-conventional per-user cache directory when that is unset.

Yields:

pyarrow.RecordBatch instances holding the in-window rows, filtered and projected per the arguments.

Raises:
  • RobotoInvalidRequestException – The window spans multiple schemas and none was chosen with schema_id or schema_checksum, a named schema or timeline source does not match the window’s data, or no stored representation satisfies a representation preference. The error carries an actionable message.

  • RobotoUnauthorizedException – The caller lacks read access to at least one in-window file backing this topic.

Return type:

collections.abc.Generator[pyarrow.RecordBatch, None, None]

Examples

Print every record in a window:

>>> from roboto.experimental.topics import Topic
>>> topic = Topic.from_id("ti_abc123")
>>> for batch in topic.get_data_as_record_batches(start_time=t0, end_time=t1):
...     print(batch.num_rows, batch.schema.names)

Project to one field subtree, dropping one of its children:

>>> for batch in topic.get_data_as_record_batches(
...     start_time=t0,
...     end_time=t1,
...     fields_include=[("angular_velocity",)],
...     fields_exclude=[("angular_velocity", "y")],
... ):
...     print(batch.to_pylist())
property name: str#

Human-readable topic name (e.g. "/camera/image_raw"). Unique within an organization.

Return type:

str

property org_id: str#

Identifier of the organization that owns this topic.

Return type:

str

property record: roboto.domain.topics.TopicIdentityRecord#

The underlying topic identity record.

Return type:

roboto.domain.topics.TopicIdentityRecord

set_context(session_context)#
Parameters:

session_context (Optional[SessionContext])

Return type:

None

property topic_id: str#

Durable identifier of this topic (ti_*).

Return type:

str