| | import io |
| | import json |
| | import warnings |
| | from typing import Literal |
| |
|
| | import fsspec |
| |
|
| | from .core import url_to_fs |
| | from .spec import AbstractBufferedFile |
| | from .utils import merge_offset_ranges |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | class AlreadyBufferedFile(AbstractBufferedFile): |
| | def _fetch_range(self, start, end): |
| | raise NotImplementedError |
| |
|
| |
|
| | def open_parquet_files( |
| | path: list[str], |
| | mode: Literal["rb"] = "rb", |
| | fs: None | fsspec.AbstractFileSystem = None, |
| | metadata=None, |
| | columns: None | list[str] = None, |
| | row_groups: None | list[int] = None, |
| | storage_options: None | dict = None, |
| | engine: str = "auto", |
| | max_gap: int = 64_000, |
| | max_block: int = 256_000_000, |
| | footer_sample_size: int = 1_000_000, |
| | filters: None | list[list[list[str]]] = None, |
| | **kwargs, |
| | ): |
| | """ |
| | Return a file-like object for a single Parquet file. |
| | |
| | The specified parquet `engine` will be used to parse the |
| | footer metadata, and determine the required byte ranges |
| | from the file. The target path will then be opened with |
| | the "parts" (`KnownPartsOfAFile`) caching strategy. |
| | |
| | Note that this method is intended for usage with remote |
| | file systems, and is unlikely to improve parquet-read |
| | performance on local file systems. |
| | |
| | Parameters |
| | ---------- |
| | path: str |
| | Target file path. |
| | mode: str, optional |
| | Mode option to be passed through to `fs.open`. Default is "rb". |
| | metadata: Any, optional |
| | Parquet metadata object. Object type must be supported |
| | by the backend parquet engine. For now, only the "fastparquet" |
| | engine supports an explicit `ParquetFile` metadata object. |
| | If a metadata object is supplied, the remote footer metadata |
| | will not need to be transferred into local memory. |
| | fs: AbstractFileSystem, optional |
| | Filesystem object to use for opening the file. If nothing is |
| | specified, an `AbstractFileSystem` object will be inferred. |
| | engine : str, default "auto" |
| | Parquet engine to use for metadata parsing. Allowed options |
| | include "fastparquet", "pyarrow", and "auto". The specified |
| | engine must be installed in the current environment. If |
| | "auto" is specified, and both engines are installed, |
| | "fastparquet" will take precedence over "pyarrow". |
| | columns: list, optional |
| | List of all column names that may be read from the file. |
| | row_groups : list, optional |
| | List of all row-groups that may be read from the file. This |
| | may be a list of row-group indices (integers), or it may be |
| | a list of `RowGroup` metadata objects (if the "fastparquet" |
| | engine is used). |
| | storage_options : dict, optional |
| | Used to generate an `AbstractFileSystem` object if `fs` was |
| | not specified. |
| | max_gap : int, optional |
| | Neighboring byte ranges will only be merged when their |
| | inter-range gap is <= `max_gap`. Default is 64KB. |
| | max_block : int, optional |
| | Neighboring byte ranges will only be merged when the size of |
| | the aggregated range is <= `max_block`. Default is 256MB. |
| | footer_sample_size : int, optional |
| | Number of bytes to read from the end of the path to look |
| | for the footer metadata. If the sampled bytes do not contain |
| | the footer, a second read request will be required, and |
| | performance will suffer. Default is 1MB. |
| | filters : list[list], optional |
| | List of filters to apply to prevent reading row groups, of the |
| | same format as accepted by the loading engines. Ignored if |
| | ``row_groups`` is specified. |
| | **kwargs : |
| | Optional key-word arguments to pass to `fs.open` |
| | """ |
| |
|
| | |
| | |
| | if fs is None: |
| | path0 = path |
| | if isinstance(path, (list, tuple)): |
| | path = path[0] |
| | fs, path = url_to_fs(path, **(storage_options or {})) |
| | else: |
| | path0 = path |
| |
|
| | |
| | |
| | if columns is not None and len(columns) == 0: |
| | columns = None |
| |
|
| | |
| | engine = _set_engine(engine) |
| |
|
| | if isinstance(path0, (list, tuple)): |
| | paths = path0 |
| | elif "*" in path: |
| | paths = fs.glob(path) |
| | elif path0.endswith("/"): |
| | paths = [ |
| | _ |
| | for _ in fs.find(path, withdirs=False, detail=False) |
| | if _.endswith((".parquet", ".parq")) |
| | ] |
| | else: |
| | paths = [path] |
| |
|
| | data = _get_parquet_byte_ranges( |
| | paths, |
| | fs, |
| | metadata=metadata, |
| | columns=columns, |
| | row_groups=row_groups, |
| | engine=engine, |
| | max_gap=max_gap, |
| | max_block=max_block, |
| | footer_sample_size=footer_sample_size, |
| | filters=filters, |
| | ) |
| |
|
| | |
| | options = kwargs.pop("cache_options", {}).copy() |
| | return [ |
| | AlreadyBufferedFile( |
| | fs=None, |
| | path=fn, |
| | mode=mode, |
| | cache_type="parts", |
| | cache_options={ |
| | **options, |
| | "data": data.get(fn, {}), |
| | }, |
| | size=max(_[1] for _ in data.get(fn, {})), |
| | **kwargs, |
| | ) |
| | for fn in data |
| | ] |
| |
|
| |
|
| | def open_parquet_file(*args, **kwargs): |
| | """Create files tailed to reading specific parts of parquet files |
| | |
| | Please see ``open_parquet_files`` for details of the arguments. The |
| | difference is, this function always returns a single ``AleadyBufferedFile``, |
| | whereas `open_parquet_files`` always returns a list of files, even if |
| | there are one or zero matching parquet files. |
| | """ |
| | return open_parquet_files(*args, **kwargs)[0] |
| |
|
| |
|
| | def _get_parquet_byte_ranges( |
| | paths, |
| | fs, |
| | metadata=None, |
| | columns=None, |
| | row_groups=None, |
| | max_gap=64_000, |
| | max_block=256_000_000, |
| | footer_sample_size=1_000_000, |
| | engine="auto", |
| | filters=None, |
| | ): |
| | """Get a dictionary of the known byte ranges needed |
| | to read a specific column/row-group selection from a |
| | Parquet dataset. Each value in the output dictionary |
| | is intended for use as the `data` argument for the |
| | `KnownPartsOfAFile` caching strategy of a single path. |
| | """ |
| |
|
| | |
| | if isinstance(engine, str): |
| | engine = _set_engine(engine) |
| |
|
| | |
| | if metadata is not None: |
| | |
| | |
| | return _get_parquet_byte_ranges_from_metadata( |
| | metadata, |
| | fs, |
| | engine, |
| | columns=columns, |
| | row_groups=row_groups, |
| | max_gap=max_gap, |
| | max_block=max_block, |
| | filters=filters, |
| | ) |
| |
|
| | |
| | file_sizes = fs.sizes(paths) |
| |
|
| | |
| | result = {} |
| | data_paths = [] |
| | data_starts = [] |
| | data_ends = [] |
| | add_header_magic = True |
| | if columns is None and row_groups is None and filters is None: |
| | |
| | |
| | |
| | |
| | for i, path in enumerate(paths): |
| | result[path] = {} |
| | data_paths.append(path) |
| | data_starts.append(0) |
| | data_ends.append(file_sizes[i]) |
| | add_header_magic = False |
| | else: |
| | |
| | |
| | |
| | |
| | |
| | footer_starts = [] |
| | footer_ends = [] |
| | for i, path in enumerate(paths): |
| | footer_ends.append(file_sizes[i]) |
| | sample_size = max(0, file_sizes[i] - footer_sample_size) |
| | footer_starts.append(sample_size) |
| | footer_samples = fs.cat_ranges(paths, footer_starts, footer_ends) |
| |
|
| | |
| | missing_footer_starts = footer_starts.copy() |
| | large_footer = 0 |
| | for i, path in enumerate(paths): |
| | footer_size = int.from_bytes(footer_samples[i][-8:-4], "little") |
| | real_footer_start = file_sizes[i] - (footer_size + 8) |
| | if real_footer_start < footer_starts[i]: |
| | missing_footer_starts[i] = real_footer_start |
| | large_footer = max(large_footer, (footer_size + 8)) |
| | if large_footer: |
| | warnings.warn( |
| | f"Not enough data was used to sample the parquet footer. " |
| | f"Try setting footer_sample_size >= {large_footer}." |
| | ) |
| | for i, block in enumerate( |
| | fs.cat_ranges( |
| | paths, |
| | missing_footer_starts, |
| | footer_starts, |
| | ) |
| | ): |
| | footer_samples[i] = block + footer_samples[i] |
| | footer_starts[i] = missing_footer_starts[i] |
| |
|
| | |
| | for i, path in enumerate(paths): |
| | |
| | path_data_starts, path_data_ends = engine._parquet_byte_ranges( |
| | columns, |
| | row_groups=row_groups, |
| | footer=footer_samples[i], |
| | footer_start=footer_starts[i], |
| | filters=filters, |
| | ) |
| |
|
| | data_paths += [path] * len(path_data_starts) |
| | data_starts += path_data_starts |
| | data_ends += path_data_ends |
| | result.setdefault(path, {})[(footer_starts[i], file_sizes[i])] = ( |
| | footer_samples[i] |
| | ) |
| |
|
| | |
| | data_paths, data_starts, data_ends = merge_offset_ranges( |
| | data_paths, |
| | data_starts, |
| | data_ends, |
| | max_gap=max_gap, |
| | max_block=max_block, |
| | sort=False, |
| | ) |
| |
|
| | |
| | for i, path in enumerate(paths): |
| | result[path] = {(footer_starts[i], footer_ends[i]): footer_samples[i]} |
| |
|
| | |
| | _transfer_ranges(fs, result, data_paths, data_starts, data_ends) |
| |
|
| | |
| | if add_header_magic: |
| | _add_header_magic(result) |
| |
|
| | return result |
| |
|
| |
|
| | def _get_parquet_byte_ranges_from_metadata( |
| | metadata, |
| | fs, |
| | engine, |
| | columns=None, |
| | row_groups=None, |
| | max_gap=64_000, |
| | max_block=256_000_000, |
| | filters=None, |
| | ): |
| | """Simplified version of `_get_parquet_byte_ranges` for |
| | the case that an engine-specific `metadata` object is |
| | provided, and the remote footer metadata does not need to |
| | be transferred before calculating the required byte ranges. |
| | """ |
| |
|
| | |
| | data_paths, data_starts, data_ends = engine._parquet_byte_ranges( |
| | columns, row_groups=row_groups, metadata=metadata, filters=filters |
| | ) |
| |
|
| | |
| | data_paths, data_starts, data_ends = merge_offset_ranges( |
| | data_paths, |
| | data_starts, |
| | data_ends, |
| | max_gap=max_gap, |
| | max_block=max_block, |
| | sort=False, |
| | ) |
| |
|
| | |
| | result = {fn: {} for fn in list(set(data_paths))} |
| | _transfer_ranges(fs, result, data_paths, data_starts, data_ends) |
| |
|
| | |
| | _add_header_magic(result) |
| |
|
| | return result |
| |
|
| |
|
| | def _transfer_ranges(fs, blocks, paths, starts, ends): |
| | |
| | ranges = (paths, starts, ends) |
| | for path, start, stop, data in zip(*ranges, fs.cat_ranges(*ranges)): |
| | blocks[path][(start, stop)] = data |
| |
|
| |
|
| | def _add_header_magic(data): |
| | |
| | for path in list(data.keys()): |
| | add_magic = True |
| | for k in data[path]: |
| | if k[0] == 0 and k[1] >= 4: |
| | add_magic = False |
| | break |
| | if add_magic: |
| | data[path][(0, 4)] = b"PAR1" |
| |
|
| |
|
| | def _set_engine(engine_str): |
| | |
| | if engine_str == "auto": |
| | try_engines = ("fastparquet", "pyarrow") |
| | elif not isinstance(engine_str, str): |
| | raise ValueError( |
| | "Failed to set parquet engine! " |
| | "Please pass 'fastparquet', 'pyarrow', or 'auto'" |
| | ) |
| | elif engine_str not in ("fastparquet", "pyarrow"): |
| | raise ValueError(f"{engine_str} engine not supported by `fsspec.parquet`") |
| | else: |
| | try_engines = [engine_str] |
| |
|
| | |
| | |
| | for engine in try_engines: |
| | try: |
| | if engine == "fastparquet": |
| | return FastparquetEngine() |
| | elif engine == "pyarrow": |
| | return PyarrowEngine() |
| | except ImportError: |
| | pass |
| |
|
| | |
| | |
| | raise ImportError( |
| | f"The following parquet engines are not installed " |
| | f"in your python environment: {try_engines}." |
| | f"Please install 'fastparquert' or 'pyarrow' to " |
| | f"utilize the `fsspec.parquet` module." |
| | ) |
| |
|
| |
|
| | class FastparquetEngine: |
| | |
| | |
| | |
| | |
| | |
| |
|
| | def __init__(self): |
| | import fastparquet as fp |
| |
|
| | self.fp = fp |
| |
|
| | def _row_group_filename(self, row_group, pf): |
| | return pf.row_group_filename(row_group) |
| |
|
| | def _parquet_byte_ranges( |
| | self, |
| | columns, |
| | row_groups=None, |
| | metadata=None, |
| | footer=None, |
| | footer_start=None, |
| | filters=None, |
| | ): |
| | |
| | pf = metadata |
| | data_paths, data_starts, data_ends = [], [], [] |
| | if filters and row_groups: |
| | raise ValueError("filters and row_groups cannot be used together") |
| | if pf is None: |
| | pf = self.fp.ParquetFile(io.BytesIO(footer)) |
| |
|
| | |
| | |
| | column_set = None if columns is None else {c.split(".", 1)[0] for c in columns} |
| | if column_set is not None and hasattr(pf, "pandas_metadata"): |
| | md_index = [ |
| | ind |
| | for ind in pf.pandas_metadata.get("index_columns", []) |
| | |
| | if not isinstance(ind, dict) |
| | ] |
| | column_set |= set(md_index) |
| |
|
| | |
| | |
| | if filters: |
| | from fastparquet.api import filter_row_groups |
| |
|
| | row_group_indices = None |
| | row_groups = filter_row_groups(pf, filters) |
| | elif row_groups and not isinstance(row_groups[0], int): |
| | |
| | row_group_indices = None |
| | else: |
| | |
| | row_group_indices = row_groups |
| | row_groups = pf.row_groups |
| |
|
| | |
| | for r, row_group in enumerate(row_groups): |
| | |
| | |
| | if row_group_indices is None or r in row_group_indices: |
| | |
| | fn = self._row_group_filename(row_group, pf) |
| |
|
| | for column in row_group.columns: |
| | name = column.meta_data.path_in_schema[0] |
| | |
| | |
| | if column_set is None or name in column_set: |
| | file_offset0 = column.meta_data.dictionary_page_offset |
| | if file_offset0 is None: |
| | file_offset0 = column.meta_data.data_page_offset |
| | num_bytes = column.meta_data.total_compressed_size |
| | if footer_start is None or file_offset0 < footer_start: |
| | data_paths.append(fn) |
| | data_starts.append(file_offset0) |
| | data_ends.append( |
| | min( |
| | file_offset0 + num_bytes, |
| | footer_start or (file_offset0 + num_bytes), |
| | ) |
| | ) |
| |
|
| | if metadata: |
| | |
| | |
| | return data_paths, data_starts, data_ends |
| | return data_starts, data_ends |
| |
|
| |
|
| | class PyarrowEngine: |
| | |
| | |
| | |
| | |
| | |
| |
|
| | def __init__(self): |
| | import pyarrow.parquet as pq |
| |
|
| | self.pq = pq |
| |
|
| | def _row_group_filename(self, row_group, metadata): |
| | raise NotImplementedError |
| |
|
| | def _parquet_byte_ranges( |
| | self, |
| | columns, |
| | row_groups=None, |
| | metadata=None, |
| | footer=None, |
| | footer_start=None, |
| | filters=None, |
| | ): |
| | if metadata is not None: |
| | raise ValueError("metadata input not supported for PyarrowEngine") |
| | if filters: |
| | raise NotImplementedError |
| |
|
| | data_starts, data_ends = [], [] |
| | md = self.pq.ParquetFile(io.BytesIO(footer)).metadata |
| |
|
| | |
| | |
| | column_set = None if columns is None else set(columns) |
| | if column_set is not None: |
| | schema = md.schema.to_arrow_schema() |
| | has_pandas_metadata = ( |
| | schema.metadata is not None and b"pandas" in schema.metadata |
| | ) |
| | if has_pandas_metadata: |
| | md_index = [ |
| | ind |
| | for ind in json.loads( |
| | schema.metadata[b"pandas"].decode("utf8") |
| | ).get("index_columns", []) |
| | |
| | if not isinstance(ind, dict) |
| | ] |
| | column_set |= set(md_index) |
| |
|
| | |
| | for r in range(md.num_row_groups): |
| | |
| | |
| | if row_groups is None or r in row_groups: |
| | row_group = md.row_group(r) |
| | for c in range(row_group.num_columns): |
| | column = row_group.column(c) |
| | name = column.path_in_schema |
| | |
| | |
| | split_name = name.split(".")[0] |
| | if ( |
| | column_set is None |
| | or name in column_set |
| | or split_name in column_set |
| | ): |
| | file_offset0 = column.dictionary_page_offset |
| | if file_offset0 is None: |
| | file_offset0 = column.data_page_offset |
| | num_bytes = column.total_compressed_size |
| | if file_offset0 < footer_start: |
| | data_starts.append(file_offset0) |
| | data_ends.append( |
| | min(file_offset0 + num_bytes, footer_start) |
| | ) |
| | return data_starts, data_ends |
| |
|