roboto.experimental.topics.overlay#

Overlay a partition’s scan-task streams into one nested RecordBatch.

A partition resolves to one or more scan tasks, each a representation layer decoded into the public nested shape: a RecordBatch with a metadata-marked _index timestamp column. A row’s leaf fields can be shredded across these layers (record-shredding style), each layer owning a subtree; this module reassembles each row by gathering every leaf from its owning layer and merging the layers into one batch.

The model is a per-leaf positional join:

  • Streams are aligned by row position, not by index value. Every representation of a partition is persisted in the same row order, so row i is the same logical message in every stream; no sort is needed and none is applied — each decoder emits its native (persisted) order and the merge pairs rows positionally. This is what lets duplicate _index timestamps overlay correctly: equal-timestamp rows keep their shared persisted order instead of being reshuffled by a per-stream sort. Every stream must carry the same rows — identical length and, by position, element-wise identical _index values, which the merge verifies as a corruption guard. A stream that is shorter, longer, or whose _index values diverge by position is a misalignment and raises; there is no fallback.

  • The output leaf set and nesting come from the plan’s projection. The server resolves the projection against the topic schema, so leaf_paths_per_stream already enumerates the fine leaves each stream owns by subtree; the merge never infers a shape out of decoded batches.

  • Each leaf is taken whole from its highest-precedence owning stream. Streams are ordered lowest-precedence first, so a narrower-subtree override wins its leaf while the base keeps the siblings. Struct sub-property override falls out of per-leaf selection; there is no per-struct merge and no cross-stream type unification (a higher-precedence string simply wins over a base int).

  • null is a real value. A higher-precedence stream that carries a leaf as null wins with that null, exactly as it would with any other value.

Module Contents#

roboto.experimental.topics.overlay.overlay_streams(streams, leaf_paths_per_stream)#

Merge position-aligned scan-task streams into one nested RecordBatch, last-writer-wins per leaf.

Parameters:
  • streams (collections.abc.Sequence[collections.abc.Sequence[pyarrow.RecordBatch]]) – One sequence of public-shape RecordBatches per scan task, ordered lowest-precedence first. Each stream must already be in persisted row order (its decoder’s native order); the merge pairs rows positionally and never sorts.

  • leaf_paths_per_stream (collections.abc.Sequence[collections.abc.Sequence[roboto.domain.topics.record.FieldPath]]) – Parallel to streams; each entry is the leaf-most projection that stream owns by subtree. A stream owns leaf L iff L appears in its entry, and the highest-precedence owner wins L.

Returns:

The merged partition as one RecordBatch in the public nested shape with a metadata-marked stored-time index column, or None when every stream is empty (the partition emits no rows).

Raises:

RobotoInternalException – The streams do not share an index — their row counts differ, or their _index values differ element-wise by position.

Return type:

Optional[pyarrow.RecordBatch]