roboto.experimental.topics.topic#
Module Contents#
- roboto.experimental.topics.topic.FieldAddressLike#
A field-subtree address, as a
FieldAddressor explicit path components (("pose", "position")for a nested field,("angular_velocity",)for a top-level one).Each component is one
path_in_schemaelement; there is no string delimiter, so a component may itself contain a.. A bare string is rejected even though it is structurally aSequence[str]— splitting it on.would guess at component boundaries, and iterating it would address one field per character; pass the components explicitly instead.
- class roboto.experimental.topics.topic.SessionContext(/, **data)#
Bases:
pydantic.BaseModelThe Session a Topic is scoped to: limits topic operations to the Session’s associated files and supplies the Session’s aggregate time window as the default window for those operations.
- Parameters:
data (Any)
- end_time: int | None = None#
Latest time covered by the Session (Unix-epoch ns); the default end_time for get_data*. None when the Session includes no files.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- session_id: str#
- start_time: int | None = None#
Earliest time covered by the Session (Unix-epoch ns); the default start_time for get_data*. None when the Session includes no files.
- roboto.experimental.topics.topic.TOPIC_DATA_CACHE_SUBDIR = 'topic-data'#
Subdirectory of the client’s cache directory where fetched topic data files are cached.
- class roboto.experimental.topics.topic.Topic(record, roboto_client=None, session_context=None)#
A logical stream of robotics data, identified durably across the files that carry it.
Within an organization, topic names are unique; contributions from different files with the same topic name share a single topic identity. By default a
Topicreads org-wide and its data-returning methods require an explicit time window.A
Topiccarrying aSessionContext—e.g., yielded bylist_topics(), orget_topic()— instead scopes topic operations like get_data*` to that Session’s files and defaults the window to the Session’s aggregate bounds.- Parameters:
roboto_client (Optional[roboto.http.RobotoClient])
session_context (Optional[SessionContext])
- property context: SessionContext | None#
- Return type:
Optional[SessionContext]
- classmethod from_id(topic_id, owner_org_id=None, roboto_client=None, session_context=None)#
Load an existing topic by its id.
- Parameters:
topic_id (str) – Identifier of the topic (
ti_*).owner_org_id (Optional[str]) – Organization that owns the topic. If omitted, defaults to the caller’s organization.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses the default if omitted.
session_context (Optional[SessionContext]) – Optional. When provided, scopes topic operations to the session’s files and defaults the read window to the session’s bounds.
Nonereads org-wide.
- Returns:
The loaded topic.
- Raises:
RobotoNotFoundException – No topic with this id exists in the org.
RobotoUnauthorizedException – The caller cannot search topics in the org.
- Return type:
Examples
>>> from roboto.experimental.topics import Topic >>> topic = Topic.from_id("ti_abc123") >>> topic.name '/camera/image_raw'
- classmethod from_record(record, roboto_client=None, session_context=None)#
Wrap an already-loaded topic identity record.
- Parameters:
record (roboto.domain.topics.TopicIdentityRecord) – The topic identity record to wrap.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses the default if omitted.
session_context (Optional[SessionContext]) – Optional session scope; see
from_id().
- Returns:
A topic backed by
record, with no further service calls.- Return type:
Examples
>>> from roboto.experimental.topics import Topic >>> topic = Topic.from_record(record) >>> topic.topic_id 'ti_abc123'
- get_data(start_time=None, end_time=None, fields_include=None, fields_exclude=None, prefer=None, schema_id=None, schema_checksum=None, timeline_source_id=None, timeline_source_name=None, cache_policy=CachePolicy.ADAPTIVE, cache_dir=None)#
Yield this topic’s data within a time window, as
(timestamp, record)pairs.Convenience over
get_data_as_record_batches()that unpacks each Arrow RecordBatch into one(timestamp, record)tuple per row.timestampis the row’s absolute Unix-epoch nanosecond timestamp (anint);recordis adictof the projected fields, with struct fields as nested dicts and list fields as lists. A field the data omits for a row is absent from (or null within) that row’s dict.Time windowing, field projection, representation selection, sort order, and error behavior are all as documented on
get_data_as_record_batches().Requires the
roboto[analytics]extra.- Parameters:
start_time (Optional[roboto.time.Time]) – See
get_data_as_record_batches().end_time (Optional[roboto.time.Time]) – See
get_data_as_record_batches().fields_include (Optional[collections.abc.Iterable[FieldAddressLike]]) – See
get_data_as_record_batches().fields_exclude (Optional[collections.abc.Iterable[FieldAddressLike]]) – See
get_data_as_record_batches().prefer (Optional[roboto.experimental.topics.operations.RepresentationPreference]) – See
get_data_as_record_batches().schema_id (Optional[str]) – See
get_data_as_record_batches().schema_checksum (Optional[str]) – See
get_data_as_record_batches().timeline_source_id (Optional[str]) – See
get_data_as_record_batches().timeline_source_name (Optional[str]) – See
get_data_as_record_batches().cache_policy (roboto.storage.CachePolicy) – See
get_data_as_record_batches().cache_dir (Union[str, pathlib.Path, None]) – See
get_data_as_record_batches().
- Yields:
(timestamp, record)tuples for the in-window rows, filtered and projected per the arguments.- Raises:
- Return type:
collections.abc.Generator[tuple[roboto.domain.topics.Timestamp, dict[str, Any]], None, None]
Examples
>>> from roboto.experimental.topics import Topic >>> topic = Topic.from_id("ti_abc123") >>> for timestamp, record in topic.get_data(start_time=t0, end_time=t1): ... print(timestamp, record)
- get_data_as_df(start_time=None, end_time=None, fields_include=None, fields_exclude=None, prefer=None, schema_id=None, schema_checksum=None, timeline_source_id=None, timeline_source_name=None, flatten=False, cache_policy=CachePolicy.ADAPTIVE, cache_dir=None)#
Return this topic’s data within a time window as a pandas DataFrame.
Same pipeline as
get_data_as_record_batches(), with the batches packed into a DataFrame whose index is a timezone-awareDatetimeIndex.Rows return ordered by partition (each file’s data in start order), not interleaved across partitions; within a partition rows keep their stored order. Call
df.sort_index()for a strict row-level time-ordered view.A struct field is returned as a single schema-shaped column of dicts unless
flattenis set, which expands every struct level into dot-delimited leaf columns (e.g.pose.position.x). List-typed fields are unaffected byflatten.Read parameters and error behavior are as documented on
get_data_as_record_batches().Requires the ``roboto[analytics]``extra.
- Parameters:
start_time (Optional[roboto.time.Time]) – See
get_data_as_record_batches().end_time (Optional[roboto.time.Time]) – See
get_data_as_record_batches().fields_include (Optional[collections.abc.Iterable[FieldAddressLike]]) – See
get_data_as_record_batches().fields_exclude (Optional[collections.abc.Iterable[FieldAddressLike]]) – See
get_data_as_record_batches().prefer (Optional[roboto.experimental.topics.operations.RepresentationPreference]) – See
get_data_as_record_batches().schema_id (Optional[str]) – See
get_data_as_record_batches().schema_checksum (Optional[str]) – See
get_data_as_record_batches().timeline_source_id (Optional[str]) – See
get_data_as_record_batches().timeline_source_name (Optional[str]) – See
get_data_as_record_batches().flatten (bool) – Expand struct-typed fields into dot-delimited leaf columns. When
False, each struct-typed field is a single object-dtype column of dicts.cache_policy (roboto.storage.CachePolicy) – See
get_data_as_record_batches().cache_dir (Union[str, pathlib.Path, None]) – See
get_data_as_record_batches().
- Returns:
DataFrame of the in-window rows indexed by a timezone-aware
DatetimeIndex.- Raises:
- Return type:
pandas.DataFrame
Examples
>>> from roboto.experimental.topics import Topic >>> topic = Topic.from_id("ti_abc123") >>> df = topic.get_data_as_df(start_time=t0, end_time=t1)
- get_data_as_record_batches(start_time=None, end_time=None, fields_include=None, fields_exclude=None, prefer=None, schema_id=None, schema_checksum=None, timeline_source_id=None, timeline_source_name=None, cache_policy=CachePolicy.ADAPTIVE, cache_dir=None)#
Yield this topic’s data within a time window, as Arrow RecordBatches.
Each batch carries one column per top-level projected field, with nested struct and list types mirroring the topic’s schema, pruned to the projection, plus a dedicated
int64column of Unix-epoch nanosecond timestamps; locate that column withtimestamp_column_index(). A field the data omits for a row surfaces as null at the deepest level that represents the omission (a whole absent subtree is a single null).Batch sizes and boundaries carry no meaning, and a window matching no rows yields no batches. A topic’s data can span several files (“topic partitions”); rows from different partitions are never mixed within a batch. Partitions arrive ordered by where each file’s data begins. Within a partition, rows keep their stored order, and rows from different partitions are never interleaved. So batches arrive as whole partitions in start order, not as a globally time-sorted row stream. Sort downstream if a strict row-level time order is needed.
Requires the
roboto[analytics]extra.- Parameters:
start_time (Optional[roboto.time.Time]) – Inclusive window lower bound, as nanoseconds since the Unix epoch or anything convertible via
to_epoch_nanoseconds().Nonedefaults to the session’s lower bound when this topic was obtained fromlist_topics()orget_topic(); otherwise required (aValueErroris raised when it cannot be resolved).end_time (Optional[roboto.time.Time]) – Inclusive window upper bound, same forms as
start_time; defaults to the session’s upper bound on the same terms.fields_include (Optional[collections.abc.Iterable[FieldAddressLike]]) – Field subtrees to project.
Noneprojects every field.fields_exclude (Optional[collections.abc.Iterable[FieldAddressLike]]) – Field subtrees to drop from the projection.
Nonedrops none.prefer (Optional[roboto.experimental.topics.operations.RepresentationPreference]) – Preferred representation per field subtree, selecting which stored variant of a field to read.
Noneapplies the default selection everywhere.schema_id (Optional[str]) – Schema to read under, by id. Required only when the window spans data with more than one schema.
schema_checksum (Optional[str]) – Schema to read under, by checksum. Mutually exclusive with
schema_id.timeline_source_id (Optional[str]) – Timeline source to resolve the window with, by id.
Noneuses each schema’s default source.timeline_source_name (Optional[str]) – Timeline source by name. Mutually exclusive with
timeline_source_id.cache_policy (roboto.storage.CachePolicy) – Whether fetched Parquet files are cached to local disk. MCAP data always streams.
cache_dir (Union[str, pathlib.Path, None]) – Directory topic data files are cached under. Defaults to a
topic-datasubdirectory ofROBOTO_CACHE_DIR, or the platform-conventional per-user cache directory when that is unset.
- Yields:
pyarrow.RecordBatchinstances holding the in-window rows, filtered and projected per the arguments.- Raises:
RobotoInvalidRequestException – The window spans multiple schemas and none was chosen with
schema_idorschema_checksum, a named schema or timeline source does not match the window’s data, or no stored representation satisfies a representation preference. The error carries an actionable message.RobotoUnauthorizedException – The caller lacks read access to at least one in-window file backing this topic.
- Return type:
collections.abc.Generator[pyarrow.RecordBatch, None, None]
Examples
Print every record in a window:
>>> from roboto.experimental.topics import Topic >>> topic = Topic.from_id("ti_abc123") >>> for batch in topic.get_data_as_record_batches(start_time=t0, end_time=t1): ... print(batch.num_rows, batch.schema.names)
Project to one field subtree, dropping one of its children:
>>> for batch in topic.get_data_as_record_batches( ... start_time=t0, ... end_time=t1, ... fields_include=[("angular_velocity",)], ... fields_exclude=[("angular_velocity", "y")], ... ): ... print(batch.to_pylist())
- property name: str#
Human-readable topic name (e.g.
"/camera/image_raw"). Unique within an organization.- Return type:
str
- property org_id: str#
Identifier of the organization that owns this topic.
- Return type:
str
- property record: roboto.domain.topics.TopicIdentityRecord#
The underlying topic identity record.
- Return type:
- set_context(session_context)#
- Parameters:
session_context (Optional[SessionContext])
- Return type:
None
- property topic_id: str#
Durable identifier of this topic (
ti_*).- Return type:
str