| from __future__ import annotations |
|
|
| import gzip |
| import io |
| import pathlib |
| import tarfile |
| from typing import ( |
| TYPE_CHECKING, |
| Any, |
| Callable, |
| ) |
| import uuid |
| import zipfile |
|
|
| from pandas.compat import ( |
| get_bz2_file, |
| get_lzma_file, |
| ) |
| from pandas.compat._optional import import_optional_dependency |
|
|
| import pandas as pd |
| from pandas._testing.contexts import ensure_clean |
|
|
| if TYPE_CHECKING: |
| from pandas._typing import ( |
| FilePath, |
| ReadPickleBuffer, |
| ) |
|
|
| from pandas import ( |
| DataFrame, |
| Series, |
| ) |
|
|
| |
| |
|
|
|
|
| def round_trip_pickle( |
| obj: Any, path: FilePath | ReadPickleBuffer | None = None |
| ) -> DataFrame | Series: |
| """ |
| Pickle an object and then read it again. |
| |
| Parameters |
| ---------- |
| obj : any object |
| The object to pickle and then re-read. |
| path : str, path object or file-like object, default None |
| The path where the pickled object is written and then read. |
| |
| Returns |
| ------- |
| pandas object |
| The original object that was pickled and then re-read. |
| """ |
| _path = path |
| if _path is None: |
| _path = f"__{uuid.uuid4()}__.pickle" |
| with ensure_clean(_path) as temp_path: |
| pd.to_pickle(obj, temp_path) |
| return pd.read_pickle(temp_path) |
|
|
|
|
| def round_trip_pathlib(writer, reader, path: str | None = None): |
| """ |
| Write an object to file specified by a pathlib.Path and read it back |
| |
| Parameters |
| ---------- |
| writer : callable bound to pandas object |
| IO writing function (e.g. DataFrame.to_csv ) |
| reader : callable |
| IO reading function (e.g. pd.read_csv ) |
| path : str, default None |
| The path where the object is written and then read. |
| |
| Returns |
| ------- |
| pandas object |
| The original object that was serialized and then re-read. |
| """ |
| Path = pathlib.Path |
| if path is None: |
| path = "___pathlib___" |
| with ensure_clean(path) as path: |
| writer(Path(path)) |
| obj = reader(Path(path)) |
| return obj |
|
|
|
|
| def round_trip_localpath(writer, reader, path: str | None = None): |
| """ |
| Write an object to file specified by a py.path LocalPath and read it back. |
| |
| Parameters |
| ---------- |
| writer : callable bound to pandas object |
| IO writing function (e.g. DataFrame.to_csv ) |
| reader : callable |
| IO reading function (e.g. pd.read_csv ) |
| path : str, default None |
| The path where the object is written and then read. |
| |
| Returns |
| ------- |
| pandas object |
| The original object that was serialized and then re-read. |
| """ |
| import pytest |
|
|
| LocalPath = pytest.importorskip("py.path").local |
| if path is None: |
| path = "___localpath___" |
| with ensure_clean(path) as path: |
| writer(LocalPath(path)) |
| obj = reader(LocalPath(path)) |
| return obj |
|
|
|
|
| def write_to_compressed(compression, path, data, dest: str = "test") -> None: |
| """ |
| Write data to a compressed file. |
| |
| Parameters |
| ---------- |
| compression : {'gzip', 'bz2', 'zip', 'xz', 'zstd'} |
| The compression type to use. |
| path : str |
| The file path to write the data. |
| data : str |
| The data to write. |
| dest : str, default "test" |
| The destination file (for ZIP only) |
| |
| Raises |
| ------ |
| ValueError : An invalid compression value was passed in. |
| """ |
| args: tuple[Any, ...] = (data,) |
| mode = "wb" |
| method = "write" |
| compress_method: Callable |
|
|
| if compression == "zip": |
| compress_method = zipfile.ZipFile |
| mode = "w" |
| args = (dest, data) |
| method = "writestr" |
| elif compression == "tar": |
| compress_method = tarfile.TarFile |
| mode = "w" |
| file = tarfile.TarInfo(name=dest) |
| bytes = io.BytesIO(data) |
| file.size = len(data) |
| args = (file, bytes) |
| method = "addfile" |
| elif compression == "gzip": |
| compress_method = gzip.GzipFile |
| elif compression == "bz2": |
| compress_method = get_bz2_file() |
| elif compression == "zstd": |
| compress_method = import_optional_dependency("zstandard").open |
| elif compression == "xz": |
| compress_method = get_lzma_file() |
| else: |
| raise ValueError(f"Unrecognized compression type: {compression}") |
|
|
| with compress_method(path, mode=mode) as f: |
| getattr(f, method)(*args) |
|
|