roboto.experimental.topics.overlay#
Overlay a partition’s scan-task streams into one nested RecordBatch.
A partition resolves to one or more scan tasks, each a representation layer
decoded into the public nested shape: a RecordBatch with a metadata-marked
_index timestamp column. A row’s leaf fields can be shredded across these
layers (record-shredding style), each layer owning a subtree; this module
reassembles each row by gathering every leaf from its owning layer and merging
the layers into one batch.
The model is a per-leaf positional join:
Streams are aligned by row position, not by index value. Every representation of a partition is persisted in the same row order, so row
iis the same logical message in every stream; no sort is needed and none is applied — each decoder emits its native (persisted) order and the merge pairs rows positionally. This is what lets duplicate_indextimestamps overlay correctly: equal-timestamp rows keep their shared persisted order instead of being reshuffled by a per-stream sort. Every stream must carry the same rows — identical length and, by position, element-wise identical_indexvalues, which the merge verifies as a corruption guard. A stream that is shorter, longer, or whose_indexvalues diverge by position is a misalignment and raises; there is no fallback.The output leaf set and nesting come from the plan’s projection. The server resolves the projection against the topic schema, so
leaf_paths_per_streamalready enumerates the fine leaves each stream owns by subtree; the merge never infers a shape out of decoded batches.Each leaf is taken whole from its highest-precedence owning stream. Streams are ordered lowest-precedence first, so a narrower-subtree override wins its leaf while the base keeps the siblings. Struct sub-property override falls out of per-leaf selection; there is no per-struct merge and no cross-stream type unification (a higher-precedence string simply wins over a base int).
nullis a real value. A higher-precedence stream that carries a leaf as null wins with that null, exactly as it would with any other value.
Module Contents#
- roboto.experimental.topics.overlay.overlay_streams(streams, leaf_paths_per_stream)#
Merge position-aligned scan-task streams into one nested RecordBatch, last-writer-wins per leaf.
- Parameters:
streams (collections.abc.Sequence[collections.abc.Sequence[pyarrow.RecordBatch]]) – One sequence of public-shape RecordBatches per scan task, ordered lowest-precedence first. Each stream must already be in persisted row order (its decoder’s native order); the merge pairs rows positionally and never sorts.
leaf_paths_per_stream (collections.abc.Sequence[collections.abc.Sequence[roboto.domain.topics.record.FieldPath]]) – Parallel to
streams; each entry is the leaf-most projection that stream owns by subtree. A stream owns leafLiffLappears in its entry, and the highest-precedence owner winsL.
- Returns:
The merged partition as one RecordBatch in the public nested shape with a metadata-marked stored-time index column, or
Nonewhen every stream is empty (the partition emits no rows).- Raises:
RobotoInternalException – The streams do not share an index — their row counts differ, or their
_indexvalues differ element-wise by position.- Return type:
Optional[pyarrow.RecordBatch]