roboto.experimental.topics#
Topics APIs in active refinement; see roboto.experimental for the stability contract.
Submodules#
Package Contents#
- class roboto.experimental.topics.FieldAddress(/, **data)#
Bases:
pydantic.BaseModelAddresses a schema field, and the subtree nested under it, by exactly one of two forms.
A
pathnames the field by itspath_in_schemacomponents directly (no string delimiter, so a component may itself contain a.); afield_idnames it opaquely and resolves server-side to the same path. Either form designates the field and every field nested under it.- Parameters:
data (Any)
- field_id: str | None = None#
The field’s opaque id (
sf_*).
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- path: tuple[str, Ellipsis] | None = None#
The field’s
path_in_schemacomponents;()addresses the schema root.
- roboto.experimental.topics.FieldAddressLike#
A field-subtree address, as a
FieldAddressor explicit path components (("pose", "position")for a nested field,("angular_velocity",)for a top-level one).Each component is one
path_in_schemaelement; there is no string delimiter, so a component may itself contain a.. A bare string is rejected even though it is structurally aSequence[str]— splitting it on.would guess at component boundaries, and iterating it would address one field per character; pass the components explicitly instead.
- roboto.experimental.topics.PLAN_VERSION: int = 1#
Contract version stamped on every plan.
ReadPlanvalidation refuses a plan whose version it does not recognize, so a consumer on an older contract fails at parse time instead of misreading a newer plan.
- class roboto.experimental.topics.ReadPlan(/, **data)#
Bases:
pydantic.BaseModelResolves a read of one topic over a time window into the files to fetch and how to interpret them.
- Parameters:
data (Any)
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- partitions: tuple[ReadPlanPartition, Ellipsis] = ()#
One entry per partition in the window, each its own fetch-and-interpret plan.
Ordered by where each file’s data begins. This orders whole partitions, not rows.
- plan_version: int = 1#
Contract version of this plan. Validation refuses a version this model does not recognize.
- projection: ReadPlanProjection#
The output fields a consumer projects decoded rows to.
- schema_: ReadPlanSchemaRef | None = None#
The resolved schema on a non-empty plan. Serializes as
schema.Noneexactly when the plan is empty: the window contains no partitions, or aschema_id/schema_checksummatches no in-window partition (data may exist in the window under a different schema).
- topic_id: str#
The topic this plan reads.
- window: TimeWindow#
The time window the plan resolves over.
- class roboto.experimental.topics.ReadPlanExtent(/, **data)#
Bases:
pydantic.BaseModelA partition’s time bounds, clipped to the plan window.
- Parameters:
data (Any)
- max: int#
Inclusive upper bound, in absolute Unix-epoch nanoseconds.
- min: int#
Inclusive lower bound, in absolute Unix-epoch nanoseconds.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class roboto.experimental.topics.ReadPlanFieldRef(/, **data)#
Bases:
pydantic.BaseModelA schema field named by both its id and its path components in the schema.
- Parameters:
data (Any)
- field_id: str#
Identifier of the schema field.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- path: roboto.domain.topics.record.FieldPath#
The field’s path components within the schema, from the root to the field.
- class roboto.experimental.topics.ReadPlanObjectRef(/, **data)#
Bases:
pydantic.BaseModelPoints to the file backing a scan task. A consumer fetches the file’s bytes from it.
- Parameters:
data (Any)
- fs_node_id: str#
Identifier of the backing file. This id is stable, so a consumer can cache on it.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- size_bytes: int | None = None#
The source object’s size in bytes.
- class roboto.experimental.topics.ReadPlanPartition(/, **data)#
Bases:
pydantic.BaseModelEverything needed to fetch and interpret one in-window partition’s bytes.
- Parameters:
data (Any)
- extent: ReadPlanExtent#
The partition’s time bounds, clipped to the window.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- scan_tasks: tuple[ReadPlanScanTask, Ellipsis] = ()#
The files to read for this partition; empty when the partition has no readable data.
A partition’s rows may be shredded across several scan tasks (record-shredding style), each owning a subtree of the schema; the read path reassembles each row per leaf, the highest-precedence scan task winning where subtrees overlap. The common case is a single scan task covering the whole schema.
- time_offset_ns: int#
Offset a consumer adds to each decoded row timestamp; the same for every row in the partition.
- timestamp: ReadPlanTimestamp#
Where this partition’s row timestamps come from.
- topic_part_id: str#
Identifier of the partition.
- class roboto.experimental.topics.ReadPlanProjection(/, **data)#
Bases:
pydantic.BaseModelThe output fields the plan resolves rows to.
The projection takes exactly one of two forms: either every field in the schema (
allis true, and the field list is left implicit so the plan need not enumerate a large schema) or an explicitfieldslist.- Parameters:
data (Any)
- all: bool = False#
True when the projection covers every field in the schema.
- classmethod all_fields()#
Return a projection covering every field in the schema.
- Return type:
- fields: tuple[ReadPlanFieldRef, Ellipsis] | None = None#
The resolved field set when the read is narrowed;
Nonewhenallis true.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod narrowed(fields)#
Return a projection narrowed to an explicit field set.
- Parameters:
fields (Iterable[ReadPlanFieldRef])
- Return type:
- class roboto.experimental.topics.ReadPlanRequest(/, **data)#
Bases:
pydantic.BaseModelThe body of a read-plan request: the logical read question to resolve into a physical plan.
- Parameters:
data (Any)
- end_time: int#
Inclusive window upper bound, absolute Unix-epoch nanoseconds.
- fields_exclude: tuple[FieldAddress, Ellipsis] | None = None#
Field subtrees to drop from the projection;
Nonedrops none.
- fields_include: tuple[FieldAddress, Ellipsis] | None = None#
Field subtrees to project;
Noneprojects every field.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- prefer: RepresentationPreference | None = None#
Per-subtree representation preference;
Noneapplies default selection everywhere.
- schema_checksum: str | None = None#
Schema to use, by checksum, or
None.
- schema_id: str | None = None#
Schema to use, by id, or
Noneto default to the sole in-window schema.
- session_id: str | None = None#
Restrict partition enumeration to this session’s contributions;
Nonereads org-wide.
- start_time: int#
Inclusive window lower bound, absolute Unix-epoch nanoseconds.
- timeline_source_id: str | None = None#
Timeline source to resolve partition extents with, by id, or
None.
- timeline_source_name: str | None = None#
Timeline source to resolve partition extents with, by name, or
None.
- class roboto.experimental.topics.ReadPlanScanTask(/, **data)#
Bases:
pydantic.BaseModelOne file to open, with the format and transformations needed to interpret it.
Which of the representations satisfying the governing selector backs a scan task is service policy and may change between releases; only the selector’s hard-filter matching rule is contract.
- Parameters:
data (Any)
- format: roboto.domain.topics.RepresentationStorageFormat#
The format the bytes are stored in; selects the decoder a consumer applies.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- object: ReadPlanObjectRef#
The single file this scan task resolves to.
- precedence: int#
Where two scan tasks’ subtrees overlap, the one with the higher precedence wins.
- subtree: ReadPlanFieldRef | None = None#
The field subtree this scan task covers;
Nonecovers the whole schema.
- transformations: tuple[str, Ellipsis] = ()#
Transformations applied to produce this variant, in order; empty on the original.
- class roboto.experimental.topics.ReadPlanSchemaRef(/, **data)#
Bases:
pydantic.BaseModelIdentifies the single topic schema the plan uses.
- Parameters:
data (Any)
- checksum: str#
Checksum of the schema’s content. A consumer can cache the schema by this value.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- schema_id: str#
Identifier of the resolved topic schema.
- class roboto.experimental.topics.ReadPlanTimestamp(/, **data)#
Bases:
pydantic.BaseModelWhere a partition’s row timestamps come from.
Timestamps are either read out of a schema field (
kindis"schema_field", andfieldnames which one) or taken from the storage envelope (message log or publish time), in which case no schema field is involved andfieldisNone.- Parameters:
data (Any)
- field: ReadPlanFieldRef | None = None#
The schema field timestamps are read from; set exactly when
kindis"schema_field".
- kind: roboto.domain.topics.TimelineSourceKind#
from a schema field, or from the storage envelope.
- Type:
How timestamps are sourced
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- unit: str | None = None#
Time unit of the designated field’s stored values (a
TimeUnitvalue, e.g."ms").Only meaningful for a
"schema_field"source, and only set when the schema declares the field’s unit.Nonewhen the schema does not record one; a consumer then treats non-self-describing values as nanoseconds, matching how the plan’s extents are recorded. Envelope-derived timestamps (message log/publish time) are always assumed nanoseconds.
- class roboto.experimental.topics.RepresentationOverride(/, **data)#
Bases:
pydantic.BaseModelApplies a representation selector to one field subtree, overriding the request default.
- Parameters:
data (Any)
- field: FieldAddress#
The subtree this override covers.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- selector: roboto.experimental.topics.record.RepresentationSelector#
The selector to apply within that subtree.
- class roboto.experimental.topics.RepresentationPreference(/, **data)#
Bases:
pydantic.BaseModelSelects which stored variant of each field to read, per subtree.
A
defaultselector applies to every field unless a more specificoverridecovers it. Where several overrides cover a field, the one whose addressed subtree is the longest prefix of the field’s path wins; this rule isselector_for().The governing selector and its matching rule are contract: a selector never substitutes a non-matching variant, and a read fails when a selector that sets any criterion is satisfied by no stored representation for a requested field — the plan never silently omits a field an explicit requirement covers. Which of the representations that satisfy the selector the service ultimately schedules is service policy and may change between releases.
- Parameters:
data (Any)
- default: roboto.experimental.topics.record.RepresentationSelector#
The selector applied to any field no override covers; matches anything when unset.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- overrides: tuple[RepresentationOverride, Ellipsis] = ()#
Per-subtree selector overrides, resolved longest-matching-prefix wins.
- selector_for(field_path)#
Resolve the selector that governs the field at
field_path, longest-matching-prefix wins.An override applies when its addressed subtree path is a prefix of
field_path; among applicable overrides the deepest subtree wins, and a field no override covers getsdefault.- Parameters:
field_path (tuple[str, Ellipsis]) – The
path_in_schemacomponents of the field whose selector is being resolved.- Returns:
The governing selector.
- Raises:
ValueError – An override addresses its subtree by
field_id. Resolving afield_idto a path takes the schema, which this value object does not hold; resolve every override address to itspathform first.- Return type:
- class roboto.experimental.topics.RepresentationRecord(/, **data)#
Bases:
pydantic.BaseModelOne stored variant of a topic partition’s data, optionally narrowed to a subset of its fields.
A representation pairs a stored file with the data of a single topic partition.
field_idnarrows it to one field and the fields nested under it;Nonecovers every field in the partition.The same partition can have several representations that differ in
storage_format,content_format, andtransformations. A consumer picks the one whose attributes suit it: a viewer of image data, for example, may prefer a JPEG- or PNG-encoded variant over the untransformed original.- Parameters:
data (Any)
- content_format: str | None = None#
The format of the data inside the stored file. For image data, this may be the image encoding (e.g.
"jpeg","png") on a transformed variant.Nonewhen unspecified.
- created: datetime.datetime | None = None#
- created_by: str#
Identity of the user who created this representation.
- field_id: str | None = None#
The field this representation is narrowed to, covering that field and the fields nested under it.
Nonewhen the representation covers every field in the partition.
- fs_node_id: str#
Identifier of the file backing this representation.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- modified: datetime.datetime | None = None#
- modified_by: str#
- org_id: str#
Identifier of the org that owns this representation.
- representation_id: str#
Unique identifier of this representation.
- size_bytes: int | None = None#
Size in bytes of the file backing this representation, when known;
Nonewhen the size is unavailable.Populated on read-plan resolution so the plan can carry the backing file’s size onto its object refs. Write paths that upsert a representation leave it
None.
- storage_format: roboto.domain.topics.RepresentationStorageFormat#
Container the representation data is stored in (e.g. MCAP, Parquet).
- topic_part_id: str#
Identifier of the topic partition this representation belongs to.
- transformations: list[str] = None#
The transformations applied to the source data to produce this variant, in the order applied. Empty on the untransformed original.
Each entry is a
"<kind>:<param>"string whose<kind>is aTransformationKindmember, e.g.["downsample:0.5", "encode:jpeg"].
- class roboto.experimental.topics.RepresentationSelector(/, **data)#
Bases:
pydantic.BaseModelSelects which stored variant of a field to read when several are available.
A selector has three optional criteria —
storage_format,content_format, andtransformations— one for each attribute on which stored variants of the same field can differ (seeRepresentationRecord). A criterion that is set is a requirement a variant must meet to be selected; a criterion leftNoneplaces no requirement, and any value is acceptable.A selector never falls back to a variant other than the one it describes. If any criterion is set and no stored variant of a requested field meets every requirement — whether the variants that exist all fall short, or the field has no stored variant at all — the read fails with an error rather than quietly leave out the field. Only under a selector with no criteria set is a field with no stored variant simply absent from the result; such a selector requires nothing, so nothing requested is missing.
Successor to
roboto.domain.topics.RepresentationSelector, used byget_data().- Parameters:
data (Any)
- content_format: str | None = None#
Required content encoding (e.g.
"jpeg") by scalar equality;Nonedoes not constrain it.There is no legacy carve-out: a representation whose
content_formatisNonedoes not satisfy an explicit request.
- matches(representation)#
Return whether
representationsatisfies every set axis of this selector.- Parameters:
representation (RepresentationRecord)
- Return type:
bool
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod raw()#
Select the untransformed original (a representation with no transformations).
- Return type:
- storage_format: roboto.domain.topics.RepresentationStorageFormat | None = None#
Required container (e.g. MCAP, Parquet) by scalar equality;
Nonedoes not constrain it.
- transformations: tuple[str, Ellipsis] | None = None#
Required transformations;
Nonedoes not constrain,()requires the untransformed original.A non-empty tuple is all-of: every token must be satisfied by some descriptor on the representation, which may carry additional transformations. A token is either a bare kind (e.g.
"downsample"), satisfied by any descriptor whose kind prefix equals it, or a full"<kind>:<param>"descriptor (e.g."encode:jpeg"), satisfied only by an exact match. The grammar is open string matching; the recognized vocabulary (TransformationKind) is enforced by the service at the request boundary, which rejects a token naming an unrecognized kind.
- class roboto.experimental.topics.SessionContext(/, **data)#
Bases:
pydantic.BaseModelThe Session a Topic is scoped to: limits topic operations to the Session’s associated files and supplies the Session’s aggregate time window as the default window for those operations.
- Parameters:
data (Any)
- end_time: int | None = None#
Latest time covered by the Session (Unix-epoch ns); the default end_time for get_data*. None when the Session includes no files.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- session_id: str#
- start_time: int | None = None#
Earliest time covered by the Session (Unix-epoch ns); the default start_time for get_data*. None when the Session includes no files.
- roboto.experimental.topics.TIMESTAMP_FIELD_METADATA_KEY = b'roboto.topic_data.timestamp'#
Arrow field-metadata key marking the per-row timestamp column of a topic-data batch.
- class roboto.experimental.topics.TimeWindow(/, **data)#
Bases:
pydantic.BaseModelA closed time window in nanoseconds since the Unix epoch; both bounds inclusive.
The same shape serves any window the read path carries — the absolute window a plan resolves over, and a partition’s stored-time window once
time_offset_nsis applied. The bounds’ time domain is fixed by the context that holds the window, not by this type.- Parameters:
data (Any)
- end: int#
Inclusive upper bound, in nanoseconds.
- model_config#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- start: int#
Inclusive lower bound, in nanoseconds.
- class roboto.experimental.topics.Topic(record, roboto_client=None, session_context=None)#
A logical stream of robotics data, identified durably across the files that carry it.
Within an organization, topic names are unique; contributions from different files with the same topic name share a single topic identity. By default a
Topicreads org-wide and its data-returning methods require an explicit time window.A
Topiccarrying aSessionContext—e.g., yielded bylist_topics(), orget_topic()— instead scopes topic operations like get_data*` to that Session’s files and defaults the window to the Session’s aggregate bounds.- Parameters:
roboto_client (Optional[roboto.http.RobotoClient])
session_context (Optional[SessionContext])
- property context: SessionContext | None#
- Return type:
Optional[SessionContext]
- classmethod from_id(topic_id, owner_org_id=None, roboto_client=None, session_context=None)#
Load an existing topic by its id.
- Parameters:
topic_id (str) – Identifier of the topic (
ti_*).owner_org_id (Optional[str]) – Organization that owns the topic. If omitted, defaults to the caller’s organization.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses the default if omitted.
session_context (Optional[SessionContext]) – Optional. When provided, scopes topic operations to the session’s files and defaults the read window to the session’s bounds.
Nonereads org-wide.
- Returns:
The loaded topic.
- Raises:
RobotoNotFoundException – No topic with this id exists in the org.
RobotoUnauthorizedException – The caller cannot search topics in the org.
- Return type:
Examples
>>> from roboto.experimental.topics import Topic >>> topic = Topic.from_id("ti_abc123") >>> topic.name '/camera/image_raw'
- classmethod from_record(record, roboto_client=None, session_context=None)#
Wrap an already-loaded topic identity record.
- Parameters:
record (roboto.domain.topics.TopicIdentityRecord) – The topic identity record to wrap.
roboto_client (Optional[roboto.http.RobotoClient]) – Roboto client instance. Uses the default if omitted.
session_context (Optional[SessionContext]) – Optional session scope; see
from_id().
- Returns:
A topic backed by
record, with no further service calls.- Return type:
Examples
>>> from roboto.experimental.topics import Topic >>> topic = Topic.from_record(record) >>> topic.topic_id 'ti_abc123'
- get_data(start_time=None, end_time=None, fields_include=None, fields_exclude=None, prefer=None, schema_id=None, schema_checksum=None, timeline_source_id=None, timeline_source_name=None, cache_policy=CachePolicy.ADAPTIVE, cache_dir=None)#
Yield this topic’s data within a time window, as
(timestamp, record)pairs.Convenience over
get_data_as_record_batches()that unpacks each Arrow RecordBatch into one(timestamp, record)tuple per row.timestampis the row’s absolute Unix-epoch nanosecond timestamp (anint);recordis adictof the projected fields, with struct fields as nested dicts and list fields as lists. A field the data omits for a row is absent from (or null within) that row’s dict.Time windowing, field projection, representation selection, sort order, and error behavior are all as documented on
get_data_as_record_batches().Requires the
roboto[analytics]extra.- Parameters:
start_time (Optional[roboto.time.Time]) – See
get_data_as_record_batches().end_time (Optional[roboto.time.Time]) – See
get_data_as_record_batches().fields_include (Optional[collections.abc.Iterable[FieldAddressLike]]) – See
get_data_as_record_batches().fields_exclude (Optional[collections.abc.Iterable[FieldAddressLike]]) – See
get_data_as_record_batches().prefer (Optional[roboto.experimental.topics.operations.RepresentationPreference]) – See
get_data_as_record_batches().schema_id (Optional[str]) – See
get_data_as_record_batches().schema_checksum (Optional[str]) – See
get_data_as_record_batches().timeline_source_id (Optional[str]) – See
get_data_as_record_batches().timeline_source_name (Optional[str]) – See
get_data_as_record_batches().cache_policy (roboto.storage.CachePolicy) – See
get_data_as_record_batches().cache_dir (Union[str, pathlib.Path, None]) – See
get_data_as_record_batches().
- Yields:
(timestamp, record)tuples for the in-window rows, filtered and projected per the arguments.- Raises:
- Return type:
collections.abc.Generator[tuple[roboto.domain.topics.Timestamp, dict[str, Any]], None, None]
Examples
>>> from roboto.experimental.topics import Topic >>> topic = Topic.from_id("ti_abc123") >>> for timestamp, record in topic.get_data(start_time=t0, end_time=t1): ... print(timestamp, record)
- get_data_as_df(start_time=None, end_time=None, fields_include=None, fields_exclude=None, prefer=None, schema_id=None, schema_checksum=None, timeline_source_id=None, timeline_source_name=None, flatten=False, cache_policy=CachePolicy.ADAPTIVE, cache_dir=None)#
Return this topic’s data within a time window as a pandas DataFrame.
Same pipeline as
get_data_as_record_batches(), with the batches packed into a DataFrame whose index is a timezone-awareDatetimeIndex.Rows return ordered by partition (each file’s data in start order), not interleaved across partitions; within a partition rows keep their stored order. Call
df.sort_index()for a strict row-level time-ordered view.A struct field is returned as a single schema-shaped column of dicts unless
flattenis set, which expands every struct level into dot-delimited leaf columns (e.g.pose.position.x). List-typed fields are unaffected byflatten.Read parameters and error behavior are as documented on
get_data_as_record_batches().Requires the ``roboto[analytics]``extra.
- Parameters:
start_time (Optional[roboto.time.Time]) – See
get_data_as_record_batches().end_time (Optional[roboto.time.Time]) – See
get_data_as_record_batches().fields_include (Optional[collections.abc.Iterable[FieldAddressLike]]) – See
get_data_as_record_batches().fields_exclude (Optional[collections.abc.Iterable[FieldAddressLike]]) – See
get_data_as_record_batches().prefer (Optional[roboto.experimental.topics.operations.RepresentationPreference]) – See
get_data_as_record_batches().schema_id (Optional[str]) – See
get_data_as_record_batches().schema_checksum (Optional[str]) – See
get_data_as_record_batches().timeline_source_id (Optional[str]) – See
get_data_as_record_batches().timeline_source_name (Optional[str]) – See
get_data_as_record_batches().flatten (bool) – Expand struct-typed fields into dot-delimited leaf columns. When
False, each struct-typed field is a single object-dtype column of dicts.cache_policy (roboto.storage.CachePolicy) – See
get_data_as_record_batches().cache_dir (Union[str, pathlib.Path, None]) – See
get_data_as_record_batches().
- Returns:
DataFrame of the in-window rows indexed by a timezone-aware
DatetimeIndex.- Raises:
- Return type:
pandas.DataFrame
Examples
>>> from roboto.experimental.topics import Topic >>> topic = Topic.from_id("ti_abc123") >>> df = topic.get_data_as_df(start_time=t0, end_time=t1)
- get_data_as_record_batches(start_time=None, end_time=None, fields_include=None, fields_exclude=None, prefer=None, schema_id=None, schema_checksum=None, timeline_source_id=None, timeline_source_name=None, cache_policy=CachePolicy.ADAPTIVE, cache_dir=None)#
Yield this topic’s data within a time window, as Arrow RecordBatches.
Each batch carries one column per top-level projected field, with nested struct and list types mirroring the topic’s schema, pruned to the projection, plus a dedicated
int64column of Unix-epoch nanosecond timestamps; locate that column withtimestamp_column_index(). A field the data omits for a row surfaces as null at the deepest level that represents the omission (a whole absent subtree is a single null).Batch sizes and boundaries carry no meaning, and a window matching no rows yields no batches. A topic’s data can span several files (“topic partitions”); rows from different partitions are never mixed within a batch. Partitions arrive ordered by where each file’s data begins. Within a partition, rows keep their stored order, and rows from different partitions are never interleaved. So batches arrive as whole partitions in start order, not as a globally time-sorted row stream. Sort downstream if a strict row-level time order is needed.
Requires the
roboto[analytics]extra.- Parameters:
start_time (Optional[roboto.time.Time]) – Inclusive window lower bound, as nanoseconds since the Unix epoch or anything convertible via
to_epoch_nanoseconds().Nonedefaults to the session’s lower bound when this topic was obtained fromlist_topics()orget_topic(); otherwise required (aValueErroris raised when it cannot be resolved).end_time (Optional[roboto.time.Time]) – Inclusive window upper bound, same forms as
start_time; defaults to the session’s upper bound on the same terms.fields_include (Optional[collections.abc.Iterable[FieldAddressLike]]) – Field subtrees to project.
Noneprojects every field.fields_exclude (Optional[collections.abc.Iterable[FieldAddressLike]]) – Field subtrees to drop from the projection.
Nonedrops none.prefer (Optional[roboto.experimental.topics.operations.RepresentationPreference]) – Preferred representation per field subtree, selecting which stored variant of a field to read.
Noneapplies the default selection everywhere.schema_id (Optional[str]) – Schema to read under, by id. Required only when the window spans data with more than one schema.
schema_checksum (Optional[str]) – Schema to read under, by checksum. Mutually exclusive with
schema_id.timeline_source_id (Optional[str]) – Timeline source to resolve the window with, by id.
Noneuses each schema’s default source.timeline_source_name (Optional[str]) – Timeline source by name. Mutually exclusive with
timeline_source_id.cache_policy (roboto.storage.CachePolicy) – Whether fetched Parquet files are cached to local disk. MCAP data always streams.
cache_dir (Union[str, pathlib.Path, None]) – Directory topic data files are cached under. Defaults to a
topic-datasubdirectory ofROBOTO_CACHE_DIR, or the platform-conventional per-user cache directory when that is unset.
- Yields:
pyarrow.RecordBatchinstances holding the in-window rows, filtered and projected per the arguments.- Raises:
RobotoInvalidRequestException – The window spans multiple schemas and none was chosen with
schema_idorschema_checksum, a named schema or timeline source does not match the window’s data, or no stored representation satisfies a representation preference. The error carries an actionable message.RobotoUnauthorizedException – The caller lacks read access to at least one in-window file backing this topic.
- Return type:
collections.abc.Generator[pyarrow.RecordBatch, None, None]
Examples
Print every record in a window:
>>> from roboto.experimental.topics import Topic >>> topic = Topic.from_id("ti_abc123") >>> for batch in topic.get_data_as_record_batches(start_time=t0, end_time=t1): ... print(batch.num_rows, batch.schema.names)
Project to one field subtree, dropping one of its children:
>>> for batch in topic.get_data_as_record_batches( ... start_time=t0, ... end_time=t1, ... fields_include=[("angular_velocity",)], ... fields_exclude=[("angular_velocity", "y")], ... ): ... print(batch.to_pylist())
- property name: str#
Human-readable topic name (e.g.
"/camera/image_raw"). Unique within an organization.- Return type:
str
- property org_id: str#
Identifier of the organization that owns this topic.
- Return type:
str
- property record: roboto.domain.topics.TopicIdentityRecord#
The underlying topic identity record.
- Return type:
- set_context(session_context)#
- Parameters:
session_context (Optional[SessionContext])
- Return type:
None
- property topic_id: str#
Durable identifier of this topic (
ti_*).- Return type:
str
- roboto.experimental.topics.timestamp_column_index(schema)#
Locate the timestamp column by its metadata marker.
The column is identified by metadata, never by name: a projected root field can legitimately carry any name, including the timestamp column’s conventional one.
- Raises:
RobotoInternalException – The schema does not contain exactly one marked column.
- Parameters:
schema (pyarrow.Schema)
- Return type:
int