roboto.experimental.topics.plan_execution#
Module Contents#
- roboto.experimental.topics.plan_execution.execute_plan(plan, projection_paths, decoder)#
Decode the files named by a read plan and yield the topic’s rows as RecordBatches.
A plan splits the data into partitions, and within a partition the same rows may be stored across several files (one file may hold some columns, another file other columns of the same rows). Per partition, the decoded files are merged back into whole rows, the plan’s declared precedence deciding which file wins a column when two carry it, and the partition’s time offset is added to make timestamps absolute. Partitions are yielded in plan order, which the plan defines by where each file’s data begins (a file’s segments stay contiguous and in segment order); they are decoded concurrently but emitted in that order. Across partitions the rows are simply concatenated end to end: no deduplication, and rows from different partitions are never interleaved (rows within a partition keep their stored order). So the output orders whole partitions, not rows — a consumer needing a strict row-level time order (overlapping partitions, or rows not stored in time order) must sort.
- Parameters:
plan (roboto.experimental.topics.read_plan.ReadPlan) – The read plan resolved by the server.
projection_paths (collections.abc.Sequence[roboto.domain.topics.record.FieldPath]) – The columns to read, as explicit field paths. To read every column, the caller expands the request against the plan’s schema first.
decoder (roboto.experimental.topics.decode.ScanTaskDecoder) – Decodes one scan task, choosing the reader by file format.
- Yields:
RecordBatches. The timestamp column (marked in the schema metadata) holds absolute Unix-epoch nanoseconds. Batch sizes and boundaries are arbitrary.
- Return type:
collections.abc.Generator[pyarrow.RecordBatch, None, None]
- roboto.experimental.topics.plan_execution.projection_for_subtree(projection_paths, subtree)#
Restrict the plan’s projected paths to what one scan task, covering
subtree, can produce.The projection is requested against the whole schema, but a scan task holds only one branch of it. A field path is a tuple naming a location in the nested schema, e.g.
("pose", "position", "x"), so each projected path falls into one of three cases by how it relates to the subtree root:Inside the subtree (
subtreeis a prefix of it): kept as-is.An ancestor of the subtree (it is a prefix of
subtree): it asks for more than this scan task holds, so it clamps to the subtree root; this task contributes only its own branch.In a different branch (neither is a prefix of the other): dropped, since another scan task produces it.
For
subtree = ("pose", "position")and projected paths("pose", "position", "x"),("pose",), and("twist", "linear"), the result is[("pose", "position", "x"), ("pose", "position")]: kept, clamped, and dropped respectively.- Parameters:
projection_paths (collections.abc.Sequence[roboto.domain.topics.record.FieldPath]) – The plan’s projected field paths, against the whole schema.
subtree (Optional[roboto.domain.topics.record.FieldPath]) – The root of the scan task’s branch, or
Nonefor a scan task that covers the whole schema (no restriction).
- Returns:
The deduplicated paths this scan task is responsible for producing.
- Return type:
list[roboto.domain.topics.record.FieldPath]