| """Common IO api utilities""" |
| from __future__ import annotations |
|
|
| from abc import ( |
| ABC, |
| abstractmethod, |
| ) |
| import codecs |
| from collections import defaultdict |
| from collections.abc import ( |
| Hashable, |
| Mapping, |
| Sequence, |
| ) |
| import dataclasses |
| import functools |
| import gzip |
| from io import ( |
| BufferedIOBase, |
| BytesIO, |
| RawIOBase, |
| StringIO, |
| TextIOBase, |
| TextIOWrapper, |
| ) |
| import mmap |
| import os |
| from pathlib import Path |
| import re |
| import tarfile |
| from typing import ( |
| IO, |
| TYPE_CHECKING, |
| Any, |
| AnyStr, |
| DefaultDict, |
| Generic, |
| Literal, |
| TypeVar, |
| cast, |
| overload, |
| ) |
| from urllib.parse import ( |
| urljoin, |
| urlparse as parse_url, |
| uses_netloc, |
| uses_params, |
| uses_relative, |
| ) |
| import warnings |
| import zipfile |
|
|
| from pandas._typing import ( |
| BaseBuffer, |
| ReadCsvBuffer, |
| ) |
| from pandas.compat import ( |
| get_bz2_file, |
| get_lzma_file, |
| ) |
| from pandas.compat._optional import import_optional_dependency |
| from pandas.util._decorators import doc |
| from pandas.util._exceptions import find_stack_level |
|
|
| from pandas.core.dtypes.common import ( |
| is_bool, |
| is_file_like, |
| is_integer, |
| is_list_like, |
| ) |
| from pandas.core.dtypes.generic import ABCMultiIndex |
|
|
| from pandas.core.shared_docs import _shared_docs |
|
|
| _VALID_URLS = set(uses_relative + uses_netloc + uses_params) |
| _VALID_URLS.discard("") |
| _RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://") |
|
|
| BaseBufferT = TypeVar("BaseBufferT", bound=BaseBuffer) |
|
|
|
|
| if TYPE_CHECKING: |
| from types import TracebackType |
|
|
| from pandas._typing import ( |
| CompressionDict, |
| CompressionOptions, |
| FilePath, |
| ReadBuffer, |
| StorageOptions, |
| WriteBuffer, |
| ) |
|
|
| from pandas import MultiIndex |
|
|
|
|
| @dataclasses.dataclass |
| class IOArgs: |
| """ |
| Return value of io/common.py:_get_filepath_or_buffer. |
| """ |
|
|
| filepath_or_buffer: str | BaseBuffer |
| encoding: str |
| mode: str |
| compression: CompressionDict |
| should_close: bool = False |
|
|
|
|
| @dataclasses.dataclass |
| class IOHandles(Generic[AnyStr]): |
| """ |
| Return value of io/common.py:get_handle |
| |
| Can be used as a context manager. |
| |
| This is used to easily close created buffers and to handle corner cases when |
| TextIOWrapper is inserted. |
| |
| handle: The file handle to be used. |
| created_handles: All file handles that are created by get_handle |
| is_wrapped: Whether a TextIOWrapper needs to be detached. |
| """ |
|
|
| |
| handle: IO[AnyStr] |
| compression: CompressionDict |
| created_handles: list[IO[bytes] | IO[str]] = dataclasses.field(default_factory=list) |
| is_wrapped: bool = False |
|
|
| def close(self) -> None: |
| """ |
| Close all created buffers. |
| |
| Note: If a TextIOWrapper was inserted, it is flushed and detached to |
| avoid closing the potentially user-created buffer. |
| """ |
| if self.is_wrapped: |
| assert isinstance(self.handle, TextIOWrapper) |
| self.handle.flush() |
| self.handle.detach() |
| self.created_handles.remove(self.handle) |
| for handle in self.created_handles: |
| handle.close() |
| self.created_handles = [] |
| self.is_wrapped = False |
|
|
| def __enter__(self) -> IOHandles[AnyStr]: |
| return self |
|
|
| def __exit__( |
| self, |
| exc_type: type[BaseException] | None, |
| exc_value: BaseException | None, |
| traceback: TracebackType | None, |
| ) -> None: |
| self.close() |
|
|
|
|
| def is_url(url: object) -> bool: |
| """ |
| Check to see if a URL has a valid protocol. |
| |
| Parameters |
| ---------- |
| url : str or unicode |
| |
| Returns |
| ------- |
| isurl : bool |
| If `url` has a valid protocol return True otherwise False. |
| """ |
| if not isinstance(url, str): |
| return False |
| return parse_url(url).scheme in _VALID_URLS |
|
|
|
|
| @overload |
| def _expand_user(filepath_or_buffer: str) -> str: |
| ... |
|
|
|
|
| @overload |
| def _expand_user(filepath_or_buffer: BaseBufferT) -> BaseBufferT: |
| ... |
|
|
|
|
| def _expand_user(filepath_or_buffer: str | BaseBufferT) -> str | BaseBufferT: |
| """ |
| Return the argument with an initial component of ~ or ~user |
| replaced by that user's home directory. |
| |
| Parameters |
| ---------- |
| filepath_or_buffer : object to be converted if possible |
| |
| Returns |
| ------- |
| expanded_filepath_or_buffer : an expanded filepath or the |
| input if not expandable |
| """ |
| if isinstance(filepath_or_buffer, str): |
| return os.path.expanduser(filepath_or_buffer) |
| return filepath_or_buffer |
|
|
|
|
| def validate_header_arg(header: object) -> None: |
| if header is None: |
| return |
| if is_integer(header): |
| header = cast(int, header) |
| if header < 0: |
| |
| raise ValueError( |
| "Passing negative integer to header is invalid. " |
| "For no header, use header=None instead" |
| ) |
| return |
| if is_list_like(header, allow_sets=False): |
| header = cast(Sequence, header) |
| if not all(map(is_integer, header)): |
| raise ValueError("header must be integer or list of integers") |
| if any(i < 0 for i in header): |
| raise ValueError("cannot specify multi-index header with negative integers") |
| return |
| if is_bool(header): |
| raise TypeError( |
| "Passing a bool to header is invalid. Use header=None for no header or " |
| "header=int or list-like of ints to specify " |
| "the row(s) making up the column names" |
| ) |
| |
| raise ValueError("header must be integer or list of integers") |
|
|
|
|
| @overload |
| def stringify_path(filepath_or_buffer: FilePath, convert_file_like: bool = ...) -> str: |
| ... |
|
|
|
|
| @overload |
| def stringify_path( |
| filepath_or_buffer: BaseBufferT, convert_file_like: bool = ... |
| ) -> BaseBufferT: |
| ... |
|
|
|
|
| def stringify_path( |
| filepath_or_buffer: FilePath | BaseBufferT, |
| convert_file_like: bool = False, |
| ) -> str | BaseBufferT: |
| """ |
| Attempt to convert a path-like object to a string. |
| |
| Parameters |
| ---------- |
| filepath_or_buffer : object to be converted |
| |
| Returns |
| ------- |
| str_filepath_or_buffer : maybe a string version of the object |
| |
| Notes |
| ----- |
| Objects supporting the fspath protocol are coerced |
| according to its __fspath__ method. |
| |
| Any other object is passed through unchanged, which includes bytes, |
| strings, buffers, or anything else that's not even path-like. |
| """ |
| if not convert_file_like and is_file_like(filepath_or_buffer): |
| |
| |
| |
| return cast(BaseBufferT, filepath_or_buffer) |
|
|
| if isinstance(filepath_or_buffer, os.PathLike): |
| filepath_or_buffer = filepath_or_buffer.__fspath__() |
| return _expand_user(filepath_or_buffer) |
|
|
|
|
| def urlopen(*args, **kwargs): |
| """ |
| Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of |
| the stdlib. |
| """ |
| import urllib.request |
|
|
| return urllib.request.urlopen(*args, **kwargs) |
|
|
|
|
| def is_fsspec_url(url: FilePath | BaseBuffer) -> bool: |
| """ |
| Returns true if the given URL looks like |
| something fsspec can handle |
| """ |
| return ( |
| isinstance(url, str) |
| and bool(_RFC_3986_PATTERN.match(url)) |
| and not url.startswith(("http://", "https://")) |
| ) |
|
|
|
|
| @doc( |
| storage_options=_shared_docs["storage_options"], |
| compression_options=_shared_docs["compression_options"] % "filepath_or_buffer", |
| ) |
| def _get_filepath_or_buffer( |
| filepath_or_buffer: FilePath | BaseBuffer, |
| encoding: str = "utf-8", |
| compression: CompressionOptions | None = None, |
| mode: str = "r", |
| storage_options: StorageOptions | None = None, |
| ) -> IOArgs: |
| """ |
| If the filepath_or_buffer is a url, translate and return the buffer. |
| Otherwise passthrough. |
| |
| Parameters |
| ---------- |
| filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), |
| or buffer |
| {compression_options} |
| |
| .. versionchanged:: 1.4.0 Zstandard support. |
| |
| encoding : the encoding to use to decode bytes, default is 'utf-8' |
| mode : str, optional |
| |
| {storage_options} |
| |
| |
| Returns the dataclass IOArgs. |
| """ |
| filepath_or_buffer = stringify_path(filepath_or_buffer) |
|
|
| |
| compression_method, compression = get_compression_method(compression) |
| compression_method = infer_compression(filepath_or_buffer, compression_method) |
|
|
| |
| if compression_method and hasattr(filepath_or_buffer, "write") and "b" not in mode: |
| warnings.warn( |
| "compression has no effect when passing a non-binary object as input.", |
| RuntimeWarning, |
| stacklevel=find_stack_level(), |
| ) |
| compression_method = None |
|
|
| compression = dict(compression, method=compression_method) |
|
|
| |
| |
| if ( |
| "w" in mode |
| and compression_method in ["bz2", "xz"] |
| and encoding in ["utf-16", "utf-32"] |
| ): |
| warnings.warn( |
| f"{compression} will not write the byte order mark for {encoding}", |
| UnicodeWarning, |
| stacklevel=find_stack_level(), |
| ) |
|
|
| |
| |
| |
| fsspec_mode = mode |
| if "t" not in fsspec_mode and "b" not in fsspec_mode: |
| fsspec_mode += "b" |
|
|
| if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): |
| |
| |
| |
| storage_options = storage_options or {} |
|
|
| |
| |
| import urllib.request |
|
|
| |
| req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options) |
| with urlopen(req_info) as req: |
| content_encoding = req.headers.get("Content-Encoding", None) |
| if content_encoding == "gzip": |
| |
| compression = {"method": "gzip"} |
| reader = BytesIO(req.read()) |
| return IOArgs( |
| filepath_or_buffer=reader, |
| encoding=encoding, |
| compression=compression, |
| should_close=True, |
| mode=fsspec_mode, |
| ) |
|
|
| if is_fsspec_url(filepath_or_buffer): |
| assert isinstance( |
| filepath_or_buffer, str |
| ) |
| |
| |
| |
| if filepath_or_buffer.startswith("s3a://"): |
| filepath_or_buffer = filepath_or_buffer.replace("s3a://", "s3://") |
| if filepath_or_buffer.startswith("s3n://"): |
| filepath_or_buffer = filepath_or_buffer.replace("s3n://", "s3://") |
| fsspec = import_optional_dependency("fsspec") |
|
|
| |
| |
| err_types_to_retry_with_anon: list[Any] = [] |
| try: |
| import_optional_dependency("botocore") |
| from botocore.exceptions import ( |
| ClientError, |
| NoCredentialsError, |
| ) |
|
|
| err_types_to_retry_with_anon = [ |
| ClientError, |
| NoCredentialsError, |
| PermissionError, |
| ] |
| except ImportError: |
| pass |
|
|
| try: |
| file_obj = fsspec.open( |
| filepath_or_buffer, mode=fsspec_mode, **(storage_options or {}) |
| ).open() |
| |
| except tuple(err_types_to_retry_with_anon): |
| if storage_options is None: |
| storage_options = {"anon": True} |
| else: |
| |
| storage_options = dict(storage_options) |
| storage_options["anon"] = True |
| file_obj = fsspec.open( |
| filepath_or_buffer, mode=fsspec_mode, **(storage_options or {}) |
| ).open() |
|
|
| return IOArgs( |
| filepath_or_buffer=file_obj, |
| encoding=encoding, |
| compression=compression, |
| should_close=True, |
| mode=fsspec_mode, |
| ) |
| elif storage_options: |
| raise ValueError( |
| "storage_options passed with file object or non-fsspec file path" |
| ) |
|
|
| if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): |
| return IOArgs( |
| filepath_or_buffer=_expand_user(filepath_or_buffer), |
| encoding=encoding, |
| compression=compression, |
| should_close=False, |
| mode=mode, |
| ) |
|
|
| |
| |
| if not ( |
| hasattr(filepath_or_buffer, "read") or hasattr(filepath_or_buffer, "write") |
| ): |
| msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}" |
| raise ValueError(msg) |
|
|
| return IOArgs( |
| filepath_or_buffer=filepath_or_buffer, |
| encoding=encoding, |
| compression=compression, |
| should_close=False, |
| mode=mode, |
| ) |
|
|
|
|
| def file_path_to_url(path: str) -> str: |
| """ |
| converts an absolute native path to a FILE URL. |
| |
| Parameters |
| ---------- |
| path : a path in native format |
| |
| Returns |
| ------- |
| a valid FILE URL |
| """ |
| |
| from urllib.request import pathname2url |
|
|
| return urljoin("file:", pathname2url(path)) |
|
|
|
|
| extension_to_compression = { |
| ".tar": "tar", |
| ".tar.gz": "tar", |
| ".tar.bz2": "tar", |
| ".tar.xz": "tar", |
| ".gz": "gzip", |
| ".bz2": "bz2", |
| ".zip": "zip", |
| ".xz": "xz", |
| ".zst": "zstd", |
| } |
| _supported_compressions = set(extension_to_compression.values()) |
|
|
|
|
| def get_compression_method( |
| compression: CompressionOptions, |
| ) -> tuple[str | None, CompressionDict]: |
| """ |
| Simplifies a compression argument to a compression method string and |
| a mapping containing additional arguments. |
| |
| Parameters |
| ---------- |
| compression : str or mapping |
| If string, specifies the compression method. If mapping, value at key |
| 'method' specifies compression method. |
| |
| Returns |
| ------- |
| tuple of ({compression method}, Optional[str] |
| {compression arguments}, Dict[str, Any]) |
| |
| Raises |
| ------ |
| ValueError on mapping missing 'method' key |
| """ |
| compression_method: str | None |
| if isinstance(compression, Mapping): |
| compression_args = dict(compression) |
| try: |
| compression_method = compression_args.pop("method") |
| except KeyError as err: |
| raise ValueError("If mapping, compression must have key 'method'") from err |
| else: |
| compression_args = {} |
| compression_method = compression |
| return compression_method, compression_args |
|
|
|
|
| @doc(compression_options=_shared_docs["compression_options"] % "filepath_or_buffer") |
| def infer_compression( |
| filepath_or_buffer: FilePath | BaseBuffer, compression: str | None |
| ) -> str | None: |
| """ |
| Get the compression method for filepath_or_buffer. If compression='infer', |
| the inferred compression method is returned. Otherwise, the input |
| compression method is returned unchanged, unless it's invalid, in which |
| case an error is raised. |
| |
| Parameters |
| ---------- |
| filepath_or_buffer : str or file handle |
| File path or object. |
| {compression_options} |
| |
| .. versionchanged:: 1.4.0 Zstandard support. |
| |
| Returns |
| ------- |
| string or None |
| |
| Raises |
| ------ |
| ValueError on invalid compression specified. |
| """ |
| if compression is None: |
| return None |
|
|
| |
| if compression == "infer": |
| |
| filepath_or_buffer = stringify_path(filepath_or_buffer, convert_file_like=True) |
| if not isinstance(filepath_or_buffer, str): |
| |
| return None |
|
|
| |
| for extension, compression in extension_to_compression.items(): |
| if filepath_or_buffer.lower().endswith(extension): |
| return compression |
| return None |
|
|
| |
| if compression in _supported_compressions: |
| return compression |
|
|
| valid = ["infer", None] + sorted(_supported_compressions) |
| msg = ( |
| f"Unrecognized compression type: {compression}\n" |
| f"Valid compression types are {valid}" |
| ) |
| raise ValueError(msg) |
|
|
|
|
| def check_parent_directory(path: Path | str) -> None: |
| """ |
| Check if parent directory of a file exists, raise OSError if it does not |
| |
| Parameters |
| ---------- |
| path: Path or str |
| Path to check parent directory of |
| """ |
| parent = Path(path).parent |
| if not parent.is_dir(): |
| raise OSError(rf"Cannot save file into a non-existent directory: '{parent}'") |
|
|
|
|
| @overload |
| def get_handle( |
| path_or_buf: FilePath | BaseBuffer, |
| mode: str, |
| *, |
| encoding: str | None = ..., |
| compression: CompressionOptions = ..., |
| memory_map: bool = ..., |
| is_text: Literal[False], |
| errors: str | None = ..., |
| storage_options: StorageOptions = ..., |
| ) -> IOHandles[bytes]: |
| ... |
|
|
|
|
| @overload |
| def get_handle( |
| path_or_buf: FilePath | BaseBuffer, |
| mode: str, |
| *, |
| encoding: str | None = ..., |
| compression: CompressionOptions = ..., |
| memory_map: bool = ..., |
| is_text: Literal[True] = ..., |
| errors: str | None = ..., |
| storage_options: StorageOptions = ..., |
| ) -> IOHandles[str]: |
| ... |
|
|
|
|
| @overload |
| def get_handle( |
| path_or_buf: FilePath | BaseBuffer, |
| mode: str, |
| *, |
| encoding: str | None = ..., |
| compression: CompressionOptions = ..., |
| memory_map: bool = ..., |
| is_text: bool = ..., |
| errors: str | None = ..., |
| storage_options: StorageOptions = ..., |
| ) -> IOHandles[str] | IOHandles[bytes]: |
| ... |
|
|
|
|
| @doc(compression_options=_shared_docs["compression_options"] % "path_or_buf") |
| def get_handle( |
| path_or_buf: FilePath | BaseBuffer, |
| mode: str, |
| *, |
| encoding: str | None = None, |
| compression: CompressionOptions | None = None, |
| memory_map: bool = False, |
| is_text: bool = True, |
| errors: str | None = None, |
| storage_options: StorageOptions | None = None, |
| ) -> IOHandles[str] | IOHandles[bytes]: |
| """ |
| Get file handle for given path/buffer and mode. |
| |
| Parameters |
| ---------- |
| path_or_buf : str or file handle |
| File path or object. |
| mode : str |
| Mode to open path_or_buf with. |
| encoding : str or None |
| Encoding to use. |
| {compression_options} |
| |
| May be a dict with key 'method' as compression mode |
| and other keys as compression options if compression |
| mode is 'zip'. |
| |
| Passing compression options as keys in dict is |
| supported for compression modes 'gzip', 'bz2', 'zstd' and 'zip'. |
| |
| .. versionchanged:: 1.4.0 Zstandard support. |
| |
| memory_map : bool, default False |
| See parsers._parser_params for more information. Only used by read_csv. |
| is_text : bool, default True |
| Whether the type of the content passed to the file/buffer is string or |
| bytes. This is not the same as `"b" not in mode`. If a string content is |
| passed to a binary file/buffer, a wrapper is inserted. |
| errors : str, default 'strict' |
| Specifies how encoding and decoding errors are to be handled. |
| See the errors argument for :func:`open` for a full list |
| of options. |
| storage_options: StorageOptions = None |
| Passed to _get_filepath_or_buffer |
| |
| Returns the dataclass IOHandles |
| """ |
| |
| encoding = encoding or "utf-8" |
|
|
| errors = errors or "strict" |
|
|
| |
| if _is_binary_mode(path_or_buf, mode) and "b" not in mode: |
| mode += "b" |
|
|
| |
| codecs.lookup(encoding) |
| if isinstance(errors, str): |
| codecs.lookup_error(errors) |
|
|
| |
| ioargs = _get_filepath_or_buffer( |
| path_or_buf, |
| encoding=encoding, |
| compression=compression, |
| mode=mode, |
| storage_options=storage_options, |
| ) |
|
|
| handle = ioargs.filepath_or_buffer |
| handles: list[BaseBuffer] |
|
|
| |
| |
| handle, memory_map, handles = _maybe_memory_map(handle, memory_map) |
|
|
| is_path = isinstance(handle, str) |
| compression_args = dict(ioargs.compression) |
| compression = compression_args.pop("method") |
|
|
| |
| if "r" not in mode and is_path: |
| check_parent_directory(str(handle)) |
|
|
| if compression: |
| if compression != "zstd": |
| |
| ioargs.mode = ioargs.mode.replace("t", "") |
| elif compression == "zstd" and "b" not in ioargs.mode: |
| |
| |
| ioargs.mode += "b" |
|
|
| |
| if compression == "gzip": |
| if isinstance(handle, str): |
| |
| |
| handle = gzip.GzipFile( |
| filename=handle, |
| mode=ioargs.mode, |
| **compression_args, |
| ) |
| else: |
| handle = gzip.GzipFile( |
| |
| |
| fileobj=handle, |
| mode=ioargs.mode, |
| **compression_args, |
| ) |
|
|
| |
| elif compression == "bz2": |
| |
| |
| handle = get_bz2_file()( |
| handle, |
| mode=ioargs.mode, |
| **compression_args, |
| ) |
|
|
| |
| elif compression == "zip": |
| |
| |
| |
| handle = _BytesZipFile( |
| handle, ioargs.mode, **compression_args |
| ) |
| if handle.buffer.mode == "r": |
| handles.append(handle) |
| zip_names = handle.buffer.namelist() |
| if len(zip_names) == 1: |
| handle = handle.buffer.open(zip_names.pop()) |
| elif not zip_names: |
| raise ValueError(f"Zero files found in ZIP file {path_or_buf}") |
| else: |
| raise ValueError( |
| "Multiple files found in ZIP file. " |
| f"Only one file per ZIP: {zip_names}" |
| ) |
|
|
| |
| elif compression == "tar": |
| compression_args.setdefault("mode", ioargs.mode) |
| if isinstance(handle, str): |
| handle = _BytesTarFile(name=handle, **compression_args) |
| else: |
| |
| |
| |
| handle = _BytesTarFile( |
| fileobj=handle, **compression_args |
| ) |
| assert isinstance(handle, _BytesTarFile) |
| if "r" in handle.buffer.mode: |
| handles.append(handle) |
| files = handle.buffer.getnames() |
| if len(files) == 1: |
| file = handle.buffer.extractfile(files[0]) |
| assert file is not None |
| handle = file |
| elif not files: |
| raise ValueError(f"Zero files found in TAR archive {path_or_buf}") |
| else: |
| raise ValueError( |
| "Multiple files found in TAR archive. " |
| f"Only one file per TAR archive: {files}" |
| ) |
|
|
| |
| elif compression == "xz": |
| |
| |
| |
| handle = get_lzma_file()( |
| handle, ioargs.mode, **compression_args |
| ) |
|
|
| |
| elif compression == "zstd": |
| zstd = import_optional_dependency("zstandard") |
| if "r" in ioargs.mode: |
| open_args = {"dctx": zstd.ZstdDecompressor(**compression_args)} |
| else: |
| open_args = {"cctx": zstd.ZstdCompressor(**compression_args)} |
| handle = zstd.open( |
| handle, |
| mode=ioargs.mode, |
| **open_args, |
| ) |
|
|
| |
| else: |
| msg = f"Unrecognized compression type: {compression}" |
| raise ValueError(msg) |
|
|
| assert not isinstance(handle, str) |
| handles.append(handle) |
|
|
| elif isinstance(handle, str): |
| |
| |
| if ioargs.encoding and "b" not in ioargs.mode: |
| |
| handle = open( |
| handle, |
| ioargs.mode, |
| encoding=ioargs.encoding, |
| errors=errors, |
| newline="", |
| ) |
| else: |
| |
| handle = open(handle, ioargs.mode) |
| handles.append(handle) |
|
|
| |
| is_wrapped = False |
| if not is_text and ioargs.mode == "rb" and isinstance(handle, TextIOBase): |
| |
| handle = _BytesIOWrapper( |
| handle, |
| encoding=ioargs.encoding, |
| ) |
| elif is_text and ( |
| compression or memory_map or _is_binary_mode(handle, ioargs.mode) |
| ): |
| if ( |
| not hasattr(handle, "readable") |
| or not hasattr(handle, "writable") |
| or not hasattr(handle, "seekable") |
| ): |
| handle = _IOWrapper(handle) |
| |
| |
| handle = TextIOWrapper( |
| handle, |
| encoding=ioargs.encoding, |
| errors=errors, |
| newline="", |
| ) |
| handles.append(handle) |
| |
| is_wrapped = not ( |
| isinstance(ioargs.filepath_or_buffer, str) or ioargs.should_close |
| ) |
|
|
| if "r" in ioargs.mode and not hasattr(handle, "read"): |
| raise TypeError( |
| "Expected file path name or file-like object, " |
| f"got {type(ioargs.filepath_or_buffer)} type" |
| ) |
|
|
| handles.reverse() |
| if ioargs.should_close: |
| assert not isinstance(ioargs.filepath_or_buffer, str) |
| handles.append(ioargs.filepath_or_buffer) |
|
|
| return IOHandles( |
| |
| |
| |
| handle=handle, |
| |
| |
| created_handles=handles, |
| is_wrapped=is_wrapped, |
| compression=ioargs.compression, |
| ) |
|
|
|
|
| |
| |
| class _BufferedWriter(BytesIO, ABC): |
| """ |
| Some objects do not support multiple .write() calls (TarFile and ZipFile). |
| This wrapper writes to the underlying buffer on close. |
| """ |
|
|
| buffer = BytesIO() |
|
|
| @abstractmethod |
| def write_to_buffer(self) -> None: |
| ... |
|
|
| def close(self) -> None: |
| if self.closed: |
| |
| return |
| if self.getbuffer().nbytes: |
| |
| self.seek(0) |
| with self.buffer: |
| self.write_to_buffer() |
| else: |
| self.buffer.close() |
| super().close() |
|
|
|
|
| class _BytesTarFile(_BufferedWriter): |
| def __init__( |
| self, |
| name: str | None = None, |
| mode: Literal["r", "a", "w", "x"] = "r", |
| fileobj: ReadBuffer[bytes] | WriteBuffer[bytes] | None = None, |
| archive_name: str | None = None, |
| **kwargs, |
| ) -> None: |
| super().__init__() |
| self.archive_name = archive_name |
| self.name = name |
| |
| |
| self.buffer: tarfile.TarFile = tarfile.TarFile.open( |
| name=name, |
| mode=self.extend_mode(mode), |
| fileobj=fileobj, |
| **kwargs, |
| ) |
|
|
| def extend_mode(self, mode: str) -> str: |
| mode = mode.replace("b", "") |
| if mode != "w": |
| return mode |
| if self.name is not None: |
| suffix = Path(self.name).suffix |
| if suffix in (".gz", ".xz", ".bz2"): |
| mode = f"{mode}:{suffix[1:]}" |
| return mode |
|
|
| def infer_filename(self) -> str | None: |
| """ |
| If an explicit archive_name is not given, we still want the file inside the zip |
| file not to be named something.tar, because that causes confusion (GH39465). |
| """ |
| if self.name is None: |
| return None |
|
|
| filename = Path(self.name) |
| if filename.suffix == ".tar": |
| return filename.with_suffix("").name |
| elif filename.suffix in (".tar.gz", ".tar.bz2", ".tar.xz"): |
| return filename.with_suffix("").with_suffix("").name |
| return filename.name |
|
|
| def write_to_buffer(self) -> None: |
| |
| archive_name = self.archive_name or self.infer_filename() or "tar" |
| tarinfo = tarfile.TarInfo(name=archive_name) |
| tarinfo.size = len(self.getvalue()) |
| self.buffer.addfile(tarinfo, self) |
|
|
|
|
| class _BytesZipFile(_BufferedWriter): |
| def __init__( |
| self, |
| file: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], |
| mode: str, |
| archive_name: str | None = None, |
| **kwargs, |
| ) -> None: |
| super().__init__() |
| mode = mode.replace("b", "") |
| self.archive_name = archive_name |
|
|
| kwargs.setdefault("compression", zipfile.ZIP_DEFLATED) |
| |
| |
| self.buffer: zipfile.ZipFile = zipfile.ZipFile( |
| file, mode, **kwargs |
| ) |
|
|
| def infer_filename(self) -> str | None: |
| """ |
| If an explicit archive_name is not given, we still want the file inside the zip |
| file not to be named something.zip, because that causes confusion (GH39465). |
| """ |
| if isinstance(self.buffer.filename, (os.PathLike, str)): |
| filename = Path(self.buffer.filename) |
| if filename.suffix == ".zip": |
| return filename.with_suffix("").name |
| return filename.name |
| return None |
|
|
| def write_to_buffer(self) -> None: |
| |
| archive_name = self.archive_name or self.infer_filename() or "zip" |
| self.buffer.writestr(archive_name, self.getvalue()) |
|
|
|
|
| class _IOWrapper: |
| |
| |
| |
| |
| |
| |
| def __init__(self, buffer: BaseBuffer) -> None: |
| self.buffer = buffer |
|
|
| def __getattr__(self, name: str): |
| return getattr(self.buffer, name) |
|
|
| def readable(self) -> bool: |
| if hasattr(self.buffer, "readable"): |
| return self.buffer.readable() |
| return True |
|
|
| def seekable(self) -> bool: |
| if hasattr(self.buffer, "seekable"): |
| return self.buffer.seekable() |
| return True |
|
|
| def writable(self) -> bool: |
| if hasattr(self.buffer, "writable"): |
| return self.buffer.writable() |
| return True |
|
|
|
|
| class _BytesIOWrapper: |
| |
| |
| def __init__(self, buffer: StringIO | TextIOBase, encoding: str = "utf-8") -> None: |
| self.buffer = buffer |
| self.encoding = encoding |
| |
| |
| |
| |
| self.overflow = b"" |
|
|
| def __getattr__(self, attr: str): |
| return getattr(self.buffer, attr) |
|
|
| def read(self, n: int | None = -1) -> bytes: |
| assert self.buffer is not None |
| bytestring = self.buffer.read(n).encode(self.encoding) |
| |
| combined_bytestring = self.overflow + bytestring |
| if n is None or n < 0 or n >= len(combined_bytestring): |
| self.overflow = b"" |
| return combined_bytestring |
| else: |
| to_return = combined_bytestring[:n] |
| self.overflow = combined_bytestring[n:] |
| return to_return |
|
|
|
|
| def _maybe_memory_map( |
| handle: str | BaseBuffer, memory_map: bool |
| ) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]: |
| """Try to memory map file/buffer.""" |
| handles: list[BaseBuffer] = [] |
| memory_map &= hasattr(handle, "fileno") or isinstance(handle, str) |
| if not memory_map: |
| return handle, memory_map, handles |
|
|
| |
| handle = cast(ReadCsvBuffer, handle) |
|
|
| |
| if isinstance(handle, str): |
| handle = open(handle, "rb") |
| handles.append(handle) |
|
|
| try: |
| |
| |
| |
| wrapped = _IOWrapper( |
| mmap.mmap( |
| handle.fileno(), 0, access=mmap.ACCESS_READ |
| ) |
| ) |
| finally: |
| for handle in reversed(handles): |
| |
| handle.close() |
|
|
| return wrapped, memory_map, [wrapped] |
|
|
|
|
| def file_exists(filepath_or_buffer: FilePath | BaseBuffer) -> bool: |
| """Test whether file exists.""" |
| exists = False |
| filepath_or_buffer = stringify_path(filepath_or_buffer) |
| if not isinstance(filepath_or_buffer, str): |
| return exists |
| try: |
| exists = os.path.exists(filepath_or_buffer) |
| |
| except (TypeError, ValueError): |
| pass |
| return exists |
|
|
|
|
| def _is_binary_mode(handle: FilePath | BaseBuffer, mode: str) -> bool: |
| """Whether the handle is opened in binary mode""" |
| |
| if "t" in mode or "b" in mode: |
| return "b" in mode |
|
|
| |
| text_classes = ( |
| |
| codecs.StreamWriter, |
| codecs.StreamReader, |
| codecs.StreamReaderWriter, |
| ) |
| if issubclass(type(handle), text_classes): |
| return False |
|
|
| return isinstance(handle, _get_binary_io_classes()) or "b" in getattr( |
| handle, "mode", mode |
| ) |
|
|
|
|
| @functools.lru_cache |
| def _get_binary_io_classes() -> tuple[type, ...]: |
| """IO classes that that expect bytes""" |
| binary_classes: tuple[type, ...] = (BufferedIOBase, RawIOBase) |
|
|
| |
| |
| |
| |
| |
| zstd = import_optional_dependency("zstandard", errors="ignore") |
| if zstd is not None: |
| with zstd.ZstdDecompressor().stream_reader(b"") as reader: |
| binary_classes += (type(reader),) |
|
|
| return binary_classes |
|
|
|
|
| def is_potential_multi_index( |
| columns: Sequence[Hashable] | MultiIndex, |
| index_col: bool | Sequence[int] | None = None, |
| ) -> bool: |
| """ |
| Check whether or not the `columns` parameter |
| could be converted into a MultiIndex. |
| |
| Parameters |
| ---------- |
| columns : array-like |
| Object which may or may not be convertible into a MultiIndex |
| index_col : None, bool or list, optional |
| Column or columns to use as the (possibly hierarchical) index |
| |
| Returns |
| ------- |
| bool : Whether or not columns could become a MultiIndex |
| """ |
| if index_col is None or isinstance(index_col, bool): |
| index_col = [] |
|
|
| return bool( |
| len(columns) |
| and not isinstance(columns, ABCMultiIndex) |
| and all(isinstance(c, tuple) for c in columns if c not in list(index_col)) |
| ) |
|
|
|
|
| def dedup_names( |
| names: Sequence[Hashable], is_potential_multiindex: bool |
| ) -> Sequence[Hashable]: |
| """ |
| Rename column names if duplicates exist. |
| |
| Currently the renaming is done by appending a period and an autonumeric, |
| but a custom pattern may be supported in the future. |
| |
| Examples |
| -------- |
| >>> dedup_names(["x", "y", "x", "x"], is_potential_multiindex=False) |
| ['x', 'y', 'x.1', 'x.2'] |
| """ |
| names = list(names) |
| counts: DefaultDict[Hashable, int] = defaultdict(int) |
|
|
| for i, col in enumerate(names): |
| cur_count = counts[col] |
|
|
| while cur_count > 0: |
| counts[col] = cur_count + 1 |
|
|
| if is_potential_multiindex: |
| |
| assert isinstance(col, tuple) |
| col = col[:-1] + (f"{col[-1]}.{cur_count}",) |
| else: |
| col = f"{col}.{cur_count}" |
| cur_count = counts[col] |
|
|
| names[i] = col |
| counts[col] = cur_count + 1 |
|
|
| return names |
|
|