roboto.formats.parquet#
Fetching and decoding topic data stored in Parquet files.
Covers cache-policy-driven opening of remote Parquet files (local cache reuse, atomic download, or HTTP streaming), row-group time filtering, column projection from schema field paths, and timestamp extraction.
Submodules#
Package Contents#
- class roboto.formats.parquet.ParquetParser(source, min_required_row_group_size=100000, small_row_group_count_threshold=32)#
- Parameters:
source (pathlib.Path)
min_required_row_group_size (int)
small_row_group_count_threshold (int)
- property column_count: int#
- Return type:
int
- extract_timestamp_info(timestamp_column_name=None, timestamp_unit=None)#
- Parameters:
timestamp_column_name (Optional[str])
timestamp_unit (Optional[Union[str, roboto.time.TimeUnit]])
- Return type:
- property fields: Generator[pyarrow.Field, None, None]#
- Return type:
Generator[pyarrow.Field, None, None]
- find_timestamp_field_by_type()#
- Return type:
pyarrow.Field
- get_data_for_column(column_name)#
- Parameters:
column_name (str)
- Return type:
pyarrow.Table
- get_timestamp_field_by_name(column_name)#
- Parameters:
column_name (str)
- Return type:
pyarrow.Field
- static is_parquet_file(path)#
- Parameters:
path (pathlib.Path)
- Return type:
bool
- requires_rewrite(timestamp)#
- Parameters:
timestamp (roboto.formats.parquet.timestamp.TimestampInfo)
- Return type:
bool
- rewrite(outfile, timestamp, target_row_group_size_bytes=100 * 1000 * 1000)#
- Parameters:
outfile (pathlib.Path)
timestamp (roboto.formats.parquet.timestamp.TimestampInfo)
target_row_group_size_bytes (int)
- Return type:
None
- property row_count: int#
- Return type:
int
- property row_group_count: int#
- Return type:
int
- property row_group_size: int#
- Return type:
int
- class roboto.formats.parquet.Timestamp#
Timestamp signal in a Parquet field. Serves as both a descriptor of that signal and as a utility for projecting it to other time units.
Note
This is not intended as a public API.
- field: pyarrow.Field#
- to_epoch_nanoseconds(timestamp)#
- Parameters:
timestamp (roboto.time.Time)
- Return type:
int
- unit()#
- Return type:
- unit_hint: str | None#
Unit the stored values are recorded in, used when the Arrow type does not carry one.
Sourced from the field’s metadata (old model) or its first-class unit (new model) at the bounded-context boundary.
Nonewhen the unit is unknown.
- roboto.formats.parquet.compute_time_filter_mask(timestamps, start_time=None, end_time=None)#
Compute a boolean mask indicating which rows fall within the specified time range. Returns None if no time filtering is needed (both start_time and end_time are None).
- Parameters:
timestamps (pyarrow.Array)
start_time (Optional[int])
end_time (Optional[int])
- Return type:
Optional[pyarrow.BooleanArray]
- roboto.formats.parquet.extract_timestamp_field(schema, timestamp_field, unit_hint)#
Aggregate timestamp info into a helper utility for handling time-based data operations.
unit_hintcarries the recorded unit of the stored values for the case where the Arrow column type does not encode one; callers resolve it from their own field record at the bounded-context boundary.- Parameters:
schema (pyarrow.Schema)
timestamp_field (roboto.formats.fields.FieldSelection)
unit_hint (Optional[str])
- Return type:
- roboto.formats.parquet.extract_timestamps(table, timestamp)#
Extract timestamps in nanoseconds since Unix epoch from the table’s timestamp column.
- Parameters:
table (pyarrow.Table)
timestamp (roboto.formats.parquet.timestamp.Timestamp)
- Return type:
pyarrow.Int64Array
- roboto.formats.parquet.generate_message_path_requests(parser, timestamp, max_depth=10)#
Generate AddMessagePathRequest objects for all fields in a Parquet schema.
Traverses the schema recursively to generate message paths for nested types (structs, lists) in addition to top-level fields.
- Parameters:
parser (roboto.formats.parquet.parquet_parser.ParquetParser) – ParquetParser instance containing the schema and data.
timestamp (roboto.formats.parquet.timestamp.TimestampInfo) – Timestamp information for the topic.
max_depth (int) – Maximum recursion depth for nested types (default: 10).
- Yields:
AddMessagePathRequest objects for each field and nested field in the schema.
- Return type:
Generator[roboto.domain.topics.operations.AddMessagePathRequest, None, None]
Examples
For a schema with a struct column position: struct<x: float, y: float>: - Yields position (Object) - Yields position.x (Number) - Yields position.y (Number)
For a schema with values: list<float64>: - Yields values (NumberArray)
For a schema with points: list<struct<x: float, y: float>>: - Yields points (Array) - Yields points.x (Number) - Yields points.y (Number)
- roboto.formats.parquet.narrow_list_nested_fields(table, schema, fields)#
Prune list-of-struct columns to the projected leaves inside each element.
PyArrow’s prefix-based nested column selection cannot reach through list wrapper nodes, so
resolve_columns()reads a list-nested leaf’s whole list ancestor column — every element keeps all of its struct fields. This Arrow-native post-read pass narrows each such element down to the requested leaves, leaving every other read path byte-identical.A top-level root is narrowed iff at least one of its projected paths has a list ancestor; otherwise the table is returned unchanged (pure struct, scalar, and scalar-list reads never enter the rebuild). Per root, a trie is built from its paths with the root component stripped so non-list-nested siblings the projection also keeps are preserved.
- Parameters:
table (pyarrow.Table)
schema (pyarrow.Schema)
fields (collections.abc.Iterable[roboto.formats.fields.FieldSelection])
- Return type:
pyarrow.Table
- roboto.formats.parquet.open_parquet_file(url_provider, cache_outfile, policy, estimated_column_count, size_bytes=None)#
Open a remote Parquet file under a cache policy, from the cheapest available source.
Dispatches on
choose_fetch_mode(): an already-cached copy is reused, a download is performed (concurrency-safe, atomic) when the policy calls for one, and otherwise the file is streamed over HTTP range requests without touching disk.- Parameters:
url_provider (Callable[[], str]) – Resolves the file’s signed download URL. Called at most once, and only when the chosen mode actually needs the URL.
cache_outfile (Optional[pathlib.Path]) – The file’s stable local cache path, or
Nonewhen no cache location is configured (forces streaming).policy (roboto.storage.cache.CachePolicy) – The caller’s cache policy.
estimated_column_count (int) – How many columns the read is expected to project; informs the
ADAPTIVEdownload-vs-stream choice.size_bytes (Optional[int]) – The backing object’s size in bytes when the server reports it;
Nonewhen unknown. It gates cache reuse: a present cached file whose size does not match is treated as stale and re-fetched. On the STREAM path it lets a known-large file skip the whole-file head probe; on the DOWNLOAD path it verifies the downloaded file is complete before it is promoted to the cache.
- Returns:
An open
pyarrow.parquet.ParquetFile.- Return type:
pyarrow.parquet.ParquetFile
- roboto.formats.parquet.parquet_file_from_url(signed_url, size_bytes=None)#
Open a Parquet file over HTTP via a signed URL (no local download).
A single ranged GET over fsspec’s shared HTTP session probes the first
_STREAM_WHOLE_FILE_PROBE_BYTESof the file. A file smaller than the probe arrives whole in that one request and is read from an in-memory buffer; a larger file (or a failed probe) falls back to HTTP range-request streaming through pyarrow’s filesystem layer.When
size_bytesis known and at least_STREAM_WHOLE_FILE_PROBE_BYTES, the file is known-large up front: the whole-file probe could never win (a BufferReader read is taken only for sub-threshold files), so it is skipped and the file is range-streamed directly, avoiding a wasted 16 MiB GET.size_bytesofNone(an older server omits the size) preserves the probe-then-decide behavior.- Parameters:
signed_url (str) – The file’s signed download URL.
size_bytes (Optional[int]) – The backing object’s size in bytes when the server reports it;
Nonewhen unknown.
- Raises:
ValueError – The probe succeeds but the object is empty (0 bytes), which is not a readable Parquet file.
- Return type:
pyarrow.parquet.ParquetFile
- roboto.formats.parquet.resolve_columns(schema, fields)#
Build a deduplicated list of column names safe for
read_row_group(columns=...).Children of list-type columns are replaced by their list ancestor’s column name because PyArrow’s prefix-based nested column selection does not work through list wrapper nodes in the physical Parquet schema. Selecting the parent list column already returns its full nested structure.
This is important because the projected fields contain only leaf paths. For a column like
points: list<struct<x, y>>, onlypoints.xandpoints.yare selected — the parentpointsfield is absent. This function derives the correct parent column name from the child’spath_in_schema.Children of struct-type columns are preserved because PyArrow can resolve them via dot-separated prefix matching (e.g.
"position.x"selects thexchild of thepositionstruct).- Parameters:
schema (pyarrow.Schema)
fields (collections.abc.Iterable[roboto.formats.fields.FieldSelection])
- Return type:
list[str]
- roboto.formats.parquet.should_narrow_list_nested_fields(schema, fields)#
Return whether
narrow_list_nested_fields()would change the table.True iff at least one projected field addresses a leaf inside a list (its path has a list ancestor). When False, every projected field resolves through structs and scalars alone, so PyArrow’s column selection already returns the narrowed shape and the post-read prune is a no-op — callers can skip it.
Cheap enough to evaluate once per file and hoist the per-row-group narrowing decision out of the decode loop.
- Parameters:
schema (pyarrow.Schema)
fields (collections.abc.Iterable[roboto.formats.fields.FieldSelection])
- Return type:
bool
- roboto.formats.parquet.should_read_row_group(row_group_metadata, timestamp, start_time=None, end_time=None)#
Determine whether a Parquet row group contains data within the requested time range. Used to short-circuit requesting column chunks from the given row group if not relevant.
- Parameters:
row_group_metadata (pyarrow.parquet.RowGroupMetaData)
timestamp (roboto.formats.parquet.timestamp.Timestamp)
start_time (Optional[int])
end_time (Optional[int])
- Return type:
bool