roboto.experimental.topics.batch_transforms#
Representation conversion for topic-data RecordBatches.
Topic data moves through the read path as Arrow RecordBatches in its public shape:
one column per top-level projected field, with struct/list types mirroring the schema tree,
plus one dedicated timestamp column of absolute Unix-epoch nanoseconds (int64) marked by field metadata
(TIMESTAMP_FIELD_METADATA_KEY).
This module owns the conversions into and out of that shape:
decoded message rows -> a nested RecordBatch (
rows_to_batch()).a nested table -> dot-delimited leaf columns (
flatten_table()), the DataFrame packing shape behindTopic.get_data_as_df(flatten=True). A null at any struct level propagates to nulls in every leaf column beneath it.
It also exposes the helpers that locate and construct the timestamp column
(timestamp_column_index(), timestamp_field()), which the
decode path uses to mark and find that column by metadata rather than name.
Module Contents#
- roboto.experimental.topics.batch_transforms.TIMESTAMP_FIELD_METADATA_KEY = b'roboto.topic_data.timestamp'#
Arrow field-metadata key marking the per-row timestamp column of a topic-data batch.
- roboto.experimental.topics.batch_transforms.TIMESTAMP_FIELD_NAME = '_index'#
Name of the emitted per-row timestamp column.
Source-neutral by design: the column always carries the resolved timeline’s absolute Unix-epoch nanoseconds, whatever that source is (message log time, publish time, or a schema field), so the name asserts no particular origin. It matches the
_indexindex thatTopic.get_data_as_df()labels its rows with. The column’s real identity is its metadata marker (TIMESTAMP_FIELD_METADATA_KEY), never this name, which is uniquified by suffixing when a projected field already claims it.
- roboto.experimental.topics.batch_transforms.flatten_table(table)#
Expand struct columns into dot-delimited leaf columns, recursively.
A null at any struct level propagates to nulls in every leaf column beneath it. List-typed columns stay whole. This is the DataFrame packing shape: dotted leaf columns over the projected tree.
- Raises:
RobotoInvalidRequestException – Two columns resolve to the same dotted name — e.g. a top-level field literally named
pose.xalongside a structposewith childx. A plain dict would silently drop one (last write wins); the ambiguity is rejected instead. Rename the offending field or disableflatten=Trueto recover the column.- Parameters:
table (pyarrow.Table)
- Return type:
pyarrow.Table
- roboto.experimental.topics.batch_transforms.rows_to_batch(rows)#
Encode decoded
(timestamp, row)pairs as one nested RecordBatch.Column types are inferred from the rows’ values. A row that lacks a top-level key contributes a null to that column (a whole absent subtree is a single struct-level null); a row that carries an empty dict contributes a valid struct whose children are all null — a different value, and one decoded messages do produce.
- Parameters:
rows (collections.abc.Sequence[tuple[int, dict[str, Any]]])
- Return type:
pyarrow.RecordBatch
- roboto.experimental.topics.batch_transforms.timestamp_column_index(schema)#
Locate the timestamp column by its metadata marker.
The column is identified by metadata, never by name: a projected root field can legitimately carry any name, including the timestamp column’s conventional one.
- Raises:
RobotoInternalException – The schema does not contain exactly one marked column.
- Parameters:
schema (pyarrow.Schema)
- Return type:
int
- roboto.experimental.topics.batch_transforms.timestamp_field(name=TIMESTAMP_FIELD_NAME)#
The timestamp column’s Arrow field: int64 epoch nanoseconds, metadata-marked.
- Parameters:
name (str)
- Return type:
pyarrow.Field