roboto.experimental.topics.decode.transpose#

Reshape decoded message rows into typed Arrow columns.

One ColumnTransposer per output column buffers that column’s value from each decoded message and, at batch boundaries, builds an Arrow array whose type comes from the topic’s schema inferred during ingestion.

Module Contents#

exception roboto.experimental.topics.decode.transpose.ColumnBuildError#

Bases: Exception

A topic field’s data could not be read against the type recorded for it at ingestion.

Customer-facing: the message names the specific field, describes the data’s shape in plain language, and reports the field’s recorded type – enough for a user to know which field is affected and for Roboto to diagnose the schema/data mismatch (e.g. a multi-dimensional array flattened to a single dimension in the stored type). The underlying pyarrow error is kept as the exception’s __cause__ for internal logs, not surfaced in the message.

class roboto.experimental.topics.decode.transpose.ColumnTransposer(node, dialect=McapDialect.OTHER)#

Buffers one output column’s per-message values and builds its typed Arrow array.

The column’s Arrow type and name are fixed from the schema node and the partition’s dialect at construction; append_row() takes the column’s extracted value (or None when the field is absent for that row) and column() materializes the buffered rows against the fixed type.

Parameters:
append_row(value)#

Append this column’s value for one row; None records a typed null.

Parameters:

value (Any)

Return type:

None

column()#

The column’s (field, array), built against the schema-declared type.

Return type:

tuple[pyarrow.Field, pyarrow.Array]

reset()#

Drop the buffered rows so the transposer can build the next batch.

Return type:

None

roboto.experimental.topics.decode.transpose.record_batch_from_columns(timestamp_name, timestamps, fields, arrays)#

Frame the timestamp column and the transposed columns into one RecordBatch.

Parameters:
  • timestamp_name (str)

  • timestamps (list[int])

  • fields (list[pyarrow.Field])

  • arrays (list[pyarrow.Array])

Return type:

pyarrow.RecordBatch