| """ |
| manage legacy pickle tests |
| |
| How to add pickle tests: |
| |
| 1. Install pandas version intended to output the pickle. |
| |
| 2. Execute "generate_legacy_storage_files.py" to create the pickle. |
| $ python generate_legacy_storage_files.py <output_dir> pickle |
| |
| 3. Move the created pickle to "data/legacy_pickle/<version>" directory. |
| """ |
| from __future__ import annotations |
|
|
| from array import array |
| import bz2 |
| import datetime |
| import functools |
| from functools import partial |
| import gzip |
| import io |
| import os |
| from pathlib import Path |
| import pickle |
| import shutil |
| import tarfile |
| from typing import Any |
| import uuid |
| import zipfile |
|
|
| import numpy as np |
| import pytest |
|
|
| from pandas.compat import ( |
| get_lzma_file, |
| is_platform_little_endian, |
| ) |
| from pandas.compat._optional import import_optional_dependency |
| from pandas.compat.compressors import flatten_buffer |
| import pandas.util._test_decorators as td |
|
|
| import pandas as pd |
| from pandas import ( |
| DataFrame, |
| Index, |
| Series, |
| period_range, |
| ) |
| import pandas._testing as tm |
| from pandas.tests.io.generate_legacy_storage_files import create_pickle_data |
|
|
| import pandas.io.common as icom |
| from pandas.tseries.offsets import ( |
| Day, |
| MonthEnd, |
| ) |
|
|
|
|
| |
| |
| |
| def compare_element(result, expected, typ): |
| if isinstance(expected, Index): |
| tm.assert_index_equal(expected, result) |
| return |
|
|
| if typ.startswith("sp_"): |
| tm.assert_equal(result, expected) |
| elif typ == "timestamp": |
| if expected is pd.NaT: |
| assert result is pd.NaT |
| else: |
| assert result == expected |
| else: |
| comparator = getattr(tm, f"assert_{typ}_equal", tm.assert_almost_equal) |
| comparator(result, expected) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @pytest.mark.parametrize( |
| "data", |
| [ |
| b"123", |
| b"123456", |
| bytearray(b"123"), |
| memoryview(b"123"), |
| pickle.PickleBuffer(b"123"), |
| array("I", [1, 2, 3]), |
| memoryview(b"123456").cast("B", (3, 2)), |
| memoryview(b"123456").cast("B", (3, 2))[::2], |
| np.arange(12).reshape((3, 4), order="C"), |
| np.arange(12).reshape((3, 4), order="F"), |
| np.arange(12).reshape((3, 4), order="C")[:, ::2], |
| ], |
| ) |
| def test_flatten_buffer(data): |
| result = flatten_buffer(data) |
| expected = memoryview(data).tobytes("A") |
| assert result == expected |
| if isinstance(data, (bytes, bytearray)): |
| assert result is data |
| elif isinstance(result, memoryview): |
| assert result.ndim == 1 |
| assert result.format == "B" |
| assert result.contiguous |
| assert result.shape == (result.nbytes,) |
|
|
|
|
| def test_pickles(datapath): |
| if not is_platform_little_endian(): |
| pytest.skip("known failure on non-little endian") |
|
|
| |
| for legacy_pickle in Path(__file__).parent.glob("data/legacy_pickle/*/*.p*kl*"): |
| legacy_pickle = datapath(legacy_pickle) |
|
|
| data = pd.read_pickle(legacy_pickle) |
|
|
| for typ, dv in data.items(): |
| for dt, result in dv.items(): |
| expected = data[typ][dt] |
|
|
| if typ == "series" and dt == "ts": |
| |
| tm.assert_series_equal(result, expected) |
| assert result.index.freq == expected.index.freq |
| assert not result.index.freq.normalize |
| tm.assert_series_equal(result > 0, expected > 0) |
|
|
| |
| freq = result.index.freq |
| assert freq + Day(1) == Day(2) |
|
|
| res = freq + pd.Timedelta(hours=1) |
| assert isinstance(res, pd.Timedelta) |
| assert res == pd.Timedelta(days=1, hours=1) |
|
|
| res = freq + pd.Timedelta(nanoseconds=1) |
| assert isinstance(res, pd.Timedelta) |
| assert res == pd.Timedelta(days=1, nanoseconds=1) |
| elif typ == "index" and dt == "period": |
| tm.assert_index_equal(result, expected) |
| assert isinstance(result.freq, MonthEnd) |
| assert result.freq == MonthEnd() |
| assert result.freqstr == "M" |
| tm.assert_index_equal(result.shift(2), expected.shift(2)) |
| elif typ == "series" and dt in ("dt_tz", "cat"): |
| tm.assert_series_equal(result, expected) |
| elif typ == "frame" and dt in ( |
| "dt_mixed_tzs", |
| "cat_onecol", |
| "cat_and_float", |
| ): |
| tm.assert_frame_equal(result, expected) |
| else: |
| compare_element(result, expected, typ) |
|
|
|
|
| def python_pickler(obj, path): |
| with open(path, "wb") as fh: |
| pickle.dump(obj, fh, protocol=-1) |
|
|
|
|
| def python_unpickler(path): |
| with open(path, "rb") as fh: |
| fh.seek(0) |
| return pickle.load(fh) |
|
|
|
|
| def flatten(data: dict) -> list[tuple[str, Any]]: |
| """Flatten create_pickle_data""" |
| return [ |
| (typ, example) |
| for typ, examples in data.items() |
| for example in examples.values() |
| ] |
|
|
|
|
| @pytest.mark.parametrize( |
| "pickle_writer", |
| [ |
| pytest.param(python_pickler, id="python"), |
| pytest.param(pd.to_pickle, id="pandas_proto_default"), |
| pytest.param( |
| functools.partial(pd.to_pickle, protocol=pickle.HIGHEST_PROTOCOL), |
| id="pandas_proto_highest", |
| ), |
| pytest.param(functools.partial(pd.to_pickle, protocol=4), id="pandas_proto_4"), |
| pytest.param( |
| functools.partial(pd.to_pickle, protocol=5), |
| id="pandas_proto_5", |
| ), |
| ], |
| ) |
| @pytest.mark.parametrize("writer", [pd.to_pickle, python_pickler]) |
| @pytest.mark.parametrize("typ, expected", flatten(create_pickle_data())) |
| def test_round_trip_current(typ, expected, pickle_writer, writer): |
| with tm.ensure_clean() as path: |
| |
| pickle_writer(expected, path) |
|
|
| |
| result = pd.read_pickle(path) |
| compare_element(result, expected, typ) |
|
|
| result = python_unpickler(path) |
| compare_element(result, expected, typ) |
|
|
| |
| with open(path, mode="wb") as handle: |
| writer(expected, path) |
| handle.seek(0) |
| with open(path, mode="rb") as handle: |
| result = pd.read_pickle(handle) |
| handle.seek(0) |
| compare_element(result, expected, typ) |
|
|
|
|
| def test_pickle_path_pathlib(): |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
| result = tm.round_trip_pathlib(df.to_pickle, pd.read_pickle) |
| tm.assert_frame_equal(df, result) |
|
|
|
|
| def test_pickle_path_localpath(): |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
| result = tm.round_trip_localpath(df.to_pickle, pd.read_pickle) |
| tm.assert_frame_equal(df, result) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @pytest.fixture |
| def get_random_path(): |
| return f"__{uuid.uuid4()}__.pickle" |
|
|
|
|
| class TestCompression: |
| _extension_to_compression = icom.extension_to_compression |
|
|
| def compress_file(self, src_path, dest_path, compression): |
| if compression is None: |
| shutil.copyfile(src_path, dest_path) |
| return |
|
|
| if compression == "gzip": |
| f = gzip.open(dest_path, "w") |
| elif compression == "bz2": |
| f = bz2.BZ2File(dest_path, "w") |
| elif compression == "zip": |
| with zipfile.ZipFile(dest_path, "w", compression=zipfile.ZIP_DEFLATED) as f: |
| f.write(src_path, os.path.basename(src_path)) |
| elif compression == "tar": |
| with open(src_path, "rb") as fh: |
| with tarfile.open(dest_path, mode="w") as tar: |
| tarinfo = tar.gettarinfo(src_path, os.path.basename(src_path)) |
| tar.addfile(tarinfo, fh) |
| elif compression == "xz": |
| f = get_lzma_file()(dest_path, "w") |
| elif compression == "zstd": |
| f = import_optional_dependency("zstandard").open(dest_path, "wb") |
| else: |
| msg = f"Unrecognized compression type: {compression}" |
| raise ValueError(msg) |
|
|
| if compression not in ["zip", "tar"]: |
| with open(src_path, "rb") as fh: |
| with f: |
| f.write(fh.read()) |
|
|
| def test_write_explicit(self, compression, get_random_path): |
| base = get_random_path |
| path1 = base + ".compressed" |
| path2 = base + ".raw" |
|
|
| with tm.ensure_clean(path1) as p1, tm.ensure_clean(path2) as p2: |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
|
|
| |
| df.to_pickle(p1, compression=compression) |
|
|
| |
| with tm.decompress_file(p1, compression=compression) as f: |
| with open(p2, "wb") as fh: |
| fh.write(f.read()) |
|
|
| |
| df2 = pd.read_pickle(p2, compression=None) |
|
|
| tm.assert_frame_equal(df, df2) |
|
|
| @pytest.mark.parametrize("compression", ["", "None", "bad", "7z"]) |
| def test_write_explicit_bad(self, compression, get_random_path): |
| with pytest.raises(ValueError, match="Unrecognized compression type"): |
| with tm.ensure_clean(get_random_path) as path: |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
| df.to_pickle(path, compression=compression) |
|
|
| def test_write_infer(self, compression_ext, get_random_path): |
| base = get_random_path |
| path1 = base + compression_ext |
| path2 = base + ".raw" |
| compression = self._extension_to_compression.get(compression_ext.lower()) |
|
|
| with tm.ensure_clean(path1) as p1, tm.ensure_clean(path2) as p2: |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
|
|
| |
| df.to_pickle(p1) |
|
|
| |
| with tm.decompress_file(p1, compression=compression) as f: |
| with open(p2, "wb") as fh: |
| fh.write(f.read()) |
|
|
| |
| df2 = pd.read_pickle(p2, compression=None) |
|
|
| tm.assert_frame_equal(df, df2) |
|
|
| def test_read_explicit(self, compression, get_random_path): |
| base = get_random_path |
| path1 = base + ".raw" |
| path2 = base + ".compressed" |
|
|
| with tm.ensure_clean(path1) as p1, tm.ensure_clean(path2) as p2: |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
|
|
| |
| df.to_pickle(p1, compression=None) |
|
|
| |
| self.compress_file(p1, p2, compression=compression) |
|
|
| |
| df2 = pd.read_pickle(p2, compression=compression) |
| tm.assert_frame_equal(df, df2) |
|
|
| def test_read_infer(self, compression_ext, get_random_path): |
| base = get_random_path |
| path1 = base + ".raw" |
| path2 = base + compression_ext |
| compression = self._extension_to_compression.get(compression_ext.lower()) |
|
|
| with tm.ensure_clean(path1) as p1, tm.ensure_clean(path2) as p2: |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
|
|
| |
| df.to_pickle(p1, compression=None) |
|
|
| |
| self.compress_file(p1, p2, compression=compression) |
|
|
| |
| df2 = pd.read_pickle(p2) |
| tm.assert_frame_equal(df, df2) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class TestProtocol: |
| @pytest.mark.parametrize("protocol", [-1, 0, 1, 2]) |
| def test_read(self, protocol, get_random_path): |
| with tm.ensure_clean(get_random_path) as path: |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
| df.to_pickle(path, protocol=protocol) |
| df2 = pd.read_pickle(path) |
| tm.assert_frame_equal(df, df2) |
|
|
|
|
| @pytest.mark.parametrize( |
| ["pickle_file", "excols"], |
| [ |
| ("test_py27.pkl", Index(["a", "b", "c"])), |
| ( |
| "test_mi_py27.pkl", |
| pd.MultiIndex.from_arrays([["a", "b", "c"], ["A", "B", "C"]]), |
| ), |
| ], |
| ) |
| def test_unicode_decode_error(datapath, pickle_file, excols): |
| |
| |
| path = datapath("io", "data", "pickle", pickle_file) |
| df = pd.read_pickle(path) |
|
|
| |
| tm.assert_index_equal(df.columns, excols) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def test_pickle_buffer_roundtrip(): |
| with tm.ensure_clean() as path: |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
| with open(path, "wb") as fh: |
| df.to_pickle(fh) |
| with open(path, "rb") as fh: |
| result = pd.read_pickle(fh) |
| tm.assert_frame_equal(df, result) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @pytest.mark.parametrize( |
| "mockurl", ["http://url.com", "ftp://test.com", "http://gzip.com"] |
| ) |
| def test_pickle_generalurl_read(monkeypatch, mockurl): |
| def python_pickler(obj, path): |
| with open(path, "wb") as fh: |
| pickle.dump(obj, fh, protocol=-1) |
|
|
| class MockReadResponse: |
| def __init__(self, path) -> None: |
| self.file = open(path, "rb") |
| if "gzip" in path: |
| self.headers = {"Content-Encoding": "gzip"} |
| else: |
| self.headers = {"Content-Encoding": ""} |
|
|
| def __enter__(self): |
| return self |
|
|
| def __exit__(self, *args): |
| self.close() |
|
|
| def read(self): |
| return self.file.read() |
|
|
| def close(self): |
| return self.file.close() |
|
|
| with tm.ensure_clean() as path: |
|
|
| def mock_urlopen_read(*args, **kwargs): |
| return MockReadResponse(path) |
|
|
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
| python_pickler(df, path) |
| monkeypatch.setattr("urllib.request.urlopen", mock_urlopen_read) |
| result = pd.read_pickle(mockurl) |
| tm.assert_frame_equal(df, result) |
|
|
|
|
| def test_pickle_fsspec_roundtrip(): |
| pytest.importorskip("fsspec") |
| with tm.ensure_clean(): |
| mockurl = "memory://mockfile" |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
| df.to_pickle(mockurl) |
| result = pd.read_pickle(mockurl) |
| tm.assert_frame_equal(df, result) |
|
|
|
|
| class MyTz(datetime.tzinfo): |
| def __init__(self) -> None: |
| pass |
|
|
|
|
| def test_read_pickle_with_subclass(): |
| |
| expected = Series(dtype=object), MyTz() |
| result = tm.round_trip_pickle(expected) |
|
|
| tm.assert_series_equal(result[0], expected[0]) |
| assert isinstance(result[1], MyTz) |
|
|
|
|
| def test_pickle_binary_object_compression(compression): |
| """ |
| Read/write from binary file-objects w/wo compression. |
| |
| GH 26237, GH 29054, and GH 29570 |
| """ |
| df = DataFrame( |
| 1.1 * np.arange(120).reshape((30, 4)), |
| columns=Index(list("ABCD"), dtype=object), |
| index=Index([f"i-{i}" for i in range(30)], dtype=object), |
| ) |
|
|
| |
| with tm.ensure_clean() as path: |
| df.to_pickle(path, compression=compression) |
| reference = Path(path).read_bytes() |
|
|
| |
| buffer = io.BytesIO() |
| df.to_pickle(buffer, compression=compression) |
| buffer.seek(0) |
|
|
| |
| assert buffer.getvalue() == reference or compression in ("gzip", "zip", "tar") |
|
|
| |
| read_df = pd.read_pickle(buffer, compression=compression) |
| buffer.seek(0) |
| tm.assert_frame_equal(df, read_df) |
|
|
|
|
| def test_pickle_dataframe_with_multilevel_index( |
| multiindex_year_month_day_dataframe_random_data, |
| multiindex_dataframe_random_data, |
| ): |
| ymd = multiindex_year_month_day_dataframe_random_data |
| frame = multiindex_dataframe_random_data |
|
|
| def _test_roundtrip(frame): |
| unpickled = tm.round_trip_pickle(frame) |
| tm.assert_frame_equal(frame, unpickled) |
|
|
| _test_roundtrip(frame) |
| _test_roundtrip(frame.T) |
| _test_roundtrip(ymd) |
| _test_roundtrip(ymd.T) |
|
|
|
|
| def test_pickle_timeseries_periodindex(): |
| |
| prng = period_range("1/1/2011", "1/1/2012", freq="M") |
| ts = Series(np.random.default_rng(2).standard_normal(len(prng)), prng) |
| new_ts = tm.round_trip_pickle(ts) |
| assert new_ts.index.freqstr == "M" |
|
|
|
|
| @pytest.mark.parametrize( |
| "name", [777, 777.0, "name", datetime.datetime(2001, 11, 11), (1, 2)] |
| ) |
| def test_pickle_preserve_name(name): |
| unpickled = tm.round_trip_pickle(Series(np.arange(10, dtype=np.float64), name=name)) |
| assert unpickled.name == name |
|
|
|
|
| def test_pickle_datetimes(datetime_series): |
| unp_ts = tm.round_trip_pickle(datetime_series) |
| tm.assert_series_equal(unp_ts, datetime_series) |
|
|
|
|
| def test_pickle_strings(string_series): |
| unp_series = tm.round_trip_pickle(string_series) |
| tm.assert_series_equal(unp_series, string_series) |
|
|
|
|
| @td.skip_array_manager_invalid_test |
| def test_pickle_preserves_block_ndim(): |
| |
| ser = Series(list("abc")).astype("category").iloc[[0]] |
| res = tm.round_trip_pickle(ser) |
|
|
| assert res._mgr.blocks[0].ndim == 1 |
| assert res._mgr.blocks[0].shape == (1,) |
|
|
| |
| tm.assert_series_equal(res[[True]], ser) |
|
|
|
|
| @pytest.mark.parametrize("protocol", [pickle.DEFAULT_PROTOCOL, pickle.HIGHEST_PROTOCOL]) |
| def test_pickle_big_dataframe_compression(protocol, compression): |
| |
| df = DataFrame(range(100000)) |
| result = tm.round_trip_pathlib( |
| partial(df.to_pickle, protocol=protocol, compression=compression), |
| partial(pd.read_pickle, compression=compression), |
| ) |
| tm.assert_frame_equal(df, result) |
|
|
|
|
| def test_pickle_frame_v124_unpickle_130(datapath): |
| |
| path = datapath( |
| Path(__file__).parent, |
| "data", |
| "legacy_pickle", |
| "1.2.4", |
| "empty_frame_v1_2_4-GH#42345.pkl", |
| ) |
| with open(path, "rb") as fd: |
| df = pickle.load(fd) |
|
|
| expected = DataFrame(index=[], columns=[]) |
| tm.assert_frame_equal(df, expected) |
|
|
|
|
| def test_pickle_pos_args_deprecation(): |
| |
| df = DataFrame({"a": [1, 2, 3]}) |
| msg = ( |
| r"Starting with pandas version 3.0 all arguments of to_pickle except for the " |
| r"argument 'path' will be keyword-only." |
| ) |
| with tm.assert_produces_warning(FutureWarning, match=msg): |
| buffer = io.BytesIO() |
| df.to_pickle(buffer, "infer") |
|
|