roboto.experimental.topics.batch_transforms#

Representation conversion for topic-data RecordBatches.

Topic data moves through the read path as Arrow RecordBatches in its public shape: one column per top-level projected field, with struct/list types mirroring the schema tree, plus one dedicated timestamp column of absolute Unix-epoch nanoseconds (int64) marked by field metadata (TIMESTAMP_FIELD_METADATA_KEY).

This module owns the conversions into and out of that shape:

  • decoded message rows -> a nested RecordBatch (rows_to_batch()).

  • a nested table -> dot-delimited leaf columns (flatten_table()), the DataFrame packing shape behind Topic.get_data_as_df(flatten=True). A null at any struct level propagates to nulls in every leaf column beneath it.

It also exposes the helpers that locate and construct the timestamp column (timestamp_column_index(), timestamp_field()), which the decode path uses to mark and find that column by metadata rather than name.

Module Contents#

roboto.experimental.topics.batch_transforms.TIMESTAMP_FIELD_METADATA_KEY = b'roboto.topic_data.timestamp'#

Arrow field-metadata key marking the per-row timestamp column of a topic-data batch.

roboto.experimental.topics.batch_transforms.TIMESTAMP_FIELD_NAME = '_index'#

Name of the emitted per-row timestamp column.

Source-neutral by design: the column always carries the resolved timeline’s absolute Unix-epoch nanoseconds, whatever that source is (message log time, publish time, or a schema field), so the name asserts no particular origin. It matches the _index index that Topic.get_data_as_df() labels its rows with. The column’s real identity is its metadata marker (TIMESTAMP_FIELD_METADATA_KEY), never this name, which is uniquified by suffixing when a projected field already claims it.

roboto.experimental.topics.batch_transforms.flatten_table(table)#

Expand struct columns into dot-delimited leaf columns, recursively.

A null at any struct level propagates to nulls in every leaf column beneath it. List-typed columns stay whole. This is the DataFrame packing shape: dotted leaf columns over the projected tree.

Raises:

RobotoInvalidRequestException – Two columns resolve to the same dotted name — e.g. a top-level field literally named pose.x alongside a struct pose with child x. A plain dict would silently drop one (last write wins); the ambiguity is rejected instead. Rename the offending field or disable flatten=True to recover the column.

Parameters:

table (pyarrow.Table)

Return type:

pyarrow.Table

roboto.experimental.topics.batch_transforms.rows_to_batch(rows)#

Encode decoded (timestamp, row) pairs as one nested RecordBatch.

Column types are inferred from the rows’ values. A row that lacks a top-level key contributes a null to that column (a whole absent subtree is a single struct-level null); a row that carries an empty dict contributes a valid struct whose children are all null — a different value, and one decoded messages do produce.

Parameters:

rows (collections.abc.Sequence[tuple[int, dict[str, Any]]])

Return type:

pyarrow.RecordBatch

roboto.experimental.topics.batch_transforms.timestamp_column_index(schema)#

Locate the timestamp column by its metadata marker.

The column is identified by metadata, never by name: a projected root field can legitimately carry any name, including the timestamp column’s conventional one.

Raises:

RobotoInternalException – The schema does not contain exactly one marked column.

Parameters:

schema (pyarrow.Schema)

Return type:

int

roboto.experimental.topics.batch_transforms.timestamp_field(name=TIMESTAMP_FIELD_NAME)#

The timestamp column’s Arrow field: int64 epoch nanoseconds, metadata-marked.

Parameters:

name (str)

Return type:

pyarrow.Field