roboto.formats.parquet#

Fetching and decoding topic data stored in Parquet files.

Covers cache-policy-driven opening of remote Parquet files (local cache reuse, atomic download, or HTTP streaming), row-group time filtering, column projection from schema field paths, and timestamp extraction.

Submodules#

Package Contents#

class roboto.formats.parquet.ParquetParser(source, min_required_row_group_size=100000, small_row_group_count_threshold=32)#
Parameters:
  • source (pathlib.Path)

  • min_required_row_group_size (int)

  • small_row_group_count_threshold (int)

property column_count: int#
Return type:

int

extract_timestamp_info(timestamp_column_name=None, timestamp_unit=None)#
Parameters:
Return type:

roboto.formats.parquet.timestamp.TimestampInfo

property fields: Generator[pyarrow.Field, None, None]#
Return type:

Generator[pyarrow.Field, None, None]

find_timestamp_field_by_type()#
Return type:

pyarrow.Field

get_data_for_column(column_name)#
Parameters:

column_name (str)

Return type:

pyarrow.Table

get_timestamp_field_by_name(column_name)#
Parameters:

column_name (str)

Return type:

pyarrow.Field

static is_parquet_file(path)#
Parameters:

path (pathlib.Path)

Return type:

bool

requires_rewrite(timestamp)#
Parameters:

timestamp (roboto.formats.parquet.timestamp.TimestampInfo)

Return type:

bool

rewrite(outfile, timestamp, target_row_group_size_bytes=100 * 1000 * 1000)#
Parameters:
Return type:

None

property row_count: int#
Return type:

int

property row_group_count: int#
Return type:

int

property row_group_size: int#
Return type:

int

class roboto.formats.parquet.Timestamp#

Timestamp signal in a Parquet field. Serves as both a descriptor of that signal and as a utility for projecting it to other time units.

Note

This is not intended as a public API.

field: pyarrow.Field#
to_epoch_nanoseconds(timestamp)#
Parameters:

timestamp (roboto.time.Time)

Return type:

int

unit()#
Return type:

roboto.time.TimeUnit

unit_hint: str | None#

Unit the stored values are recorded in, used when the Arrow type does not carry one.

Sourced from the field’s metadata (old model) or its first-class unit (new model) at the bounded-context boundary. None when the unit is unknown.

roboto.formats.parquet.compute_time_filter_mask(timestamps, start_time=None, end_time=None)#

Compute a boolean mask indicating which rows fall within the specified time range. Returns None if no time filtering is needed (both start_time and end_time are None).

Parameters:
  • timestamps (pyarrow.Array)

  • start_time (Optional[int])

  • end_time (Optional[int])

Return type:

Optional[pyarrow.BooleanArray]

roboto.formats.parquet.extract_timestamp_field(schema, timestamp_field, unit_hint)#

Aggregate timestamp info into a helper utility for handling time-based data operations.

unit_hint carries the recorded unit of the stored values for the case where the Arrow column type does not encode one; callers resolve it from their own field record at the bounded-context boundary.

Parameters:
Return type:

roboto.formats.parquet.timestamp.Timestamp

roboto.formats.parquet.extract_timestamps(table, timestamp)#

Extract timestamps in nanoseconds since Unix epoch from the table’s timestamp column.

Parameters:
Return type:

pyarrow.Int64Array

roboto.formats.parquet.generate_message_path_requests(parser, timestamp, max_depth=10)#

Generate AddMessagePathRequest objects for all fields in a Parquet schema.

Traverses the schema recursively to generate message paths for nested types (structs, lists) in addition to top-level fields.

Parameters:
Yields:

AddMessagePathRequest objects for each field and nested field in the schema.

Return type:

Generator[roboto.domain.topics.operations.AddMessagePathRequest, None, None]

Examples

For a schema with a struct column position: struct<x: float, y: float>: - Yields position (Object) - Yields position.x (Number) - Yields position.y (Number)

For a schema with values: list<float64>: - Yields values (NumberArray)

For a schema with points: list<struct<x: float, y: float>>: - Yields points (Array) - Yields points.x (Number) - Yields points.y (Number)

roboto.formats.parquet.narrow_list_nested_fields(table, schema, fields)#

Prune list-of-struct columns to the projected leaves inside each element.

PyArrow’s prefix-based nested column selection cannot reach through list wrapper nodes, so resolve_columns() reads a list-nested leaf’s whole list ancestor column — every element keeps all of its struct fields. This Arrow-native post-read pass narrows each such element down to the requested leaves, leaving every other read path byte-identical.

A top-level root is narrowed iff at least one of its projected paths has a list ancestor; otherwise the table is returned unchanged (pure struct, scalar, and scalar-list reads never enter the rebuild). Per root, a trie is built from its paths with the root component stripped so non-list-nested siblings the projection also keeps are preserved.

Parameters:
Return type:

pyarrow.Table

roboto.formats.parquet.open_parquet_file(url_provider, cache_outfile, policy, estimated_column_count, size_bytes=None)#

Open a remote Parquet file under a cache policy, from the cheapest available source.

Dispatches on choose_fetch_mode(): an already-cached copy is reused, a download is performed (concurrency-safe, atomic) when the policy calls for one, and otherwise the file is streamed over HTTP range requests without touching disk.

Parameters:
  • url_provider (Callable[[], str]) – Resolves the file’s signed download URL. Called at most once, and only when the chosen mode actually needs the URL.

  • cache_outfile (Optional[pathlib.Path]) – The file’s stable local cache path, or None when no cache location is configured (forces streaming).

  • policy (roboto.storage.cache.CachePolicy) – The caller’s cache policy.

  • estimated_column_count (int) – How many columns the read is expected to project; informs the ADAPTIVE download-vs-stream choice.

  • size_bytes (Optional[int]) – The backing object’s size in bytes when the server reports it; None when unknown. It gates cache reuse: a present cached file whose size does not match is treated as stale and re-fetched. On the STREAM path it lets a known-large file skip the whole-file head probe; on the DOWNLOAD path it verifies the downloaded file is complete before it is promoted to the cache.

Returns:

An open pyarrow.parquet.ParquetFile.

Return type:

pyarrow.parquet.ParquetFile

roboto.formats.parquet.parquet_file_from_url(signed_url, size_bytes=None)#

Open a Parquet file over HTTP via a signed URL (no local download).

A single ranged GET over fsspec’s shared HTTP session probes the first _STREAM_WHOLE_FILE_PROBE_BYTES of the file. A file smaller than the probe arrives whole in that one request and is read from an in-memory buffer; a larger file (or a failed probe) falls back to HTTP range-request streaming through pyarrow’s filesystem layer.

When size_bytes is known and at least _STREAM_WHOLE_FILE_PROBE_BYTES, the file is known-large up front: the whole-file probe could never win (a BufferReader read is taken only for sub-threshold files), so it is skipped and the file is range-streamed directly, avoiding a wasted 16 MiB GET. size_bytes of None (an older server omits the size) preserves the probe-then-decide behavior.

Parameters:
  • signed_url (str) – The file’s signed download URL.

  • size_bytes (Optional[int]) – The backing object’s size in bytes when the server reports it; None when unknown.

Raises:

ValueError – The probe succeeds but the object is empty (0 bytes), which is not a readable Parquet file.

Return type:

pyarrow.parquet.ParquetFile

roboto.formats.parquet.resolve_columns(schema, fields)#

Build a deduplicated list of column names safe for read_row_group(columns=...).

Children of list-type columns are replaced by their list ancestor’s column name because PyArrow’s prefix-based nested column selection does not work through list wrapper nodes in the physical Parquet schema. Selecting the parent list column already returns its full nested structure.

This is important because the projected fields contain only leaf paths. For a column like points: list<struct<x, y>>, only points.x and points.y are selected — the parent points field is absent. This function derives the correct parent column name from the child’s path_in_schema.

Children of struct-type columns are preserved because PyArrow can resolve them via dot-separated prefix matching (e.g. "position.x" selects the x child of the position struct).

Parameters:
Return type:

list[str]

roboto.formats.parquet.should_narrow_list_nested_fields(schema, fields)#

Return whether narrow_list_nested_fields() would change the table.

True iff at least one projected field addresses a leaf inside a list (its path has a list ancestor). When False, every projected field resolves through structs and scalars alone, so PyArrow’s column selection already returns the narrowed shape and the post-read prune is a no-op — callers can skip it.

Cheap enough to evaluate once per file and hoist the per-row-group narrowing decision out of the decode loop.

Parameters:
Return type:

bool

roboto.formats.parquet.should_read_row_group(row_group_metadata, timestamp, start_time=None, end_time=None)#

Determine whether a Parquet row group contains data within the requested time range. Used to short-circuit requesting column chunks from the given row group if not relevant.

Parameters:
Return type:

bool