roboto.experimental.topics.decode#

Submodules#

Package Contents#

roboto.experimental.topics.decode.CACHED_PARQUET_NAME_PATTERN = '{fs_node_id}.parquet'#

Filename template for locally cached Parquet files; keyed on the stable file id.

class roboto.experimental.topics.decode.DecodedScanTask(*, batches_factory)#

One scan task decoded into RecordBatches.

Decoding (including the network fetch) starts when the iterator returned by batches() is first advanced; consume it once.

Parameters:

batches_factory (Callable[[], collections.abc.Iterator[pyarrow.RecordBatch]])

batches()#

Decode into RecordBatches, each prefixed with a metadata-marked stored-time column.

Rows come out in the scan task’s persisted order. For MCAP, that’s the reader’s native chunk order, which is itself the persisted order. For Parquet, that’s the file’s row order. Every representation of a partition must share one persisted row order.

Batch boundaries themselves carry no meaning.

Return type:

collections.abc.Iterator[pyarrow.RecordBatch]

class roboto.experimental.topics.decode.ScanTaskDecodeParams#

Execution inputs a scan-task decode needs beyond the read plan.

The plan says what to read; these supply how to reach and cache it: a resolver that mints download URLs, plus the local-disk cache policy and directory. Caching applies to Parquet scan tasks only — MCAP always streams.

cache_dir: pathlib.Path#

Directory Parquet files are cached under.

cache_policy: roboto.storage.CachePolicy#

Whether fetched Parquet files are cached to local disk.

signed_url_resolver: SignedUrlResolver#

Mints a signed download URL for a scan task’s backing file.

roboto.experimental.topics.decode.ScanTaskDecoder#

Decodes one scan task into a stream of native-order RecordBatches.

Called with the scan task, its partition (for the timestamp designation), the window translated into the partition’s stored-time domain, the projection restricted to the scan task’s subtree. Produced timestamps are in the stored-time domain; the executor applies the partition’s time_offset_ns.

roboto.experimental.topics.decode.leaf_most(paths)#

Drop every path that has a strict descendant in the set.

A projection enumerates registered fields at every level (a struct parent and its children both appear). Decoding the parent would read its whole subtree, defeating an exclusion of one child — so decoders read only the leaf-most projected paths, which together cover exactly the projected set.

Parameters:

paths (collections.abc.Sequence[roboto.domain.topics.record.FieldPath])

Return type:

list[roboto.domain.topics.record.FieldPath]

roboto.experimental.topics.decode.make_scan_task_decoder(params, schema_fields, projection_paths)#

Bind execution inputs into a ScanTaskDecoder.

The returned decoder decodes one scan task (one file, one format), filtered and projected, dispatching on the scan task’s server-declared format.

Timestamps are keyed off the partition’s timestamp designation and produced in the stored-time domain; the caller applies the partition’s time_offset_ns. Rows are filtered to the decoder’s window (inclusive on both ends) and projected to its projection_paths.

The schema tree needed to decode MCAP is derived from schema_fields and projection_paths, but only when the plan actually contains an MCAP scan task: the first such task builds it once, under a lock, and the decode workers then share it read-only. A pure-Parquet plan never builds one (Parquet decodes column-wise from its own file footer).

Parameters:
Returns:

A decoder that, given a scan task, its plan partition, an inclusive time window, and the projected field paths the scan task covers, returns the decoded scan task; consume its batches once.

Raises:
  • The returned decoder raises

  • NotImplementedError – The scan task’s storage format has no decoder, or a Parquet scan task is designated an envelope-derived timestamp.

Return type:

ScanTaskDecoder