| | from __future__ import annotations |
| |
|
| | import os |
| | import sys |
| | import tempfile |
| | from collections.abc import Iterable |
| | from io import BytesIO, TextIOWrapper |
| | from types import TracebackType |
| | from typing import ( |
| | TYPE_CHECKING, |
| | Any, |
| | AnyStr, |
| | Generic, |
| | overload, |
| | ) |
| |
|
| | from .. import to_thread |
| | from .._core._fileio import AsyncFile |
| | from ..lowlevel import checkpoint_if_cancelled |
| |
|
| | if TYPE_CHECKING: |
| | from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer |
| |
|
| |
|
| | class TemporaryFile(Generic[AnyStr]): |
| | """ |
| | An asynchronous temporary file that is automatically created and cleaned up. |
| | |
| | This class provides an asynchronous context manager interface to a temporary file. |
| | The file is created using Python's standard `tempfile.TemporaryFile` function in a |
| | background thread, and is wrapped as an asynchronous file using `AsyncFile`. |
| | |
| | :param mode: The mode in which the file is opened. Defaults to "w+b". |
| | :param buffering: The buffering policy (-1 means the default buffering). |
| | :param encoding: The encoding used to decode or encode the file. Only applicable in |
| | text mode. |
| | :param newline: Controls how universal newlines mode works (only applicable in text |
| | mode). |
| | :param suffix: The suffix for the temporary file name. |
| | :param prefix: The prefix for the temporary file name. |
| | :param dir: The directory in which the temporary file is created. |
| | :param errors: The error handling scheme used for encoding/decoding errors. |
| | """ |
| |
|
| | _async_file: AsyncFile[AnyStr] |
| |
|
| | @overload |
| | def __init__( |
| | self: TemporaryFile[bytes], |
| | mode: OpenBinaryMode = ..., |
| | buffering: int = ..., |
| | encoding: str | None = ..., |
| | newline: str | None = ..., |
| | suffix: str | None = ..., |
| | prefix: str | None = ..., |
| | dir: str | None = ..., |
| | *, |
| | errors: str | None = ..., |
| | ): ... |
| | @overload |
| | def __init__( |
| | self: TemporaryFile[str], |
| | mode: OpenTextMode, |
| | buffering: int = ..., |
| | encoding: str | None = ..., |
| | newline: str | None = ..., |
| | suffix: str | None = ..., |
| | prefix: str | None = ..., |
| | dir: str | None = ..., |
| | *, |
| | errors: str | None = ..., |
| | ): ... |
| |
|
| | def __init__( |
| | self, |
| | mode: OpenTextMode | OpenBinaryMode = "w+b", |
| | buffering: int = -1, |
| | encoding: str | None = None, |
| | newline: str | None = None, |
| | suffix: str | None = None, |
| | prefix: str | None = None, |
| | dir: str | None = None, |
| | *, |
| | errors: str | None = None, |
| | ) -> None: |
| | self.mode = mode |
| | self.buffering = buffering |
| | self.encoding = encoding |
| | self.newline = newline |
| | self.suffix: str | None = suffix |
| | self.prefix: str | None = prefix |
| | self.dir: str | None = dir |
| | self.errors = errors |
| |
|
| | async def __aenter__(self) -> AsyncFile[AnyStr]: |
| | fp = await to_thread.run_sync( |
| | lambda: tempfile.TemporaryFile( |
| | self.mode, |
| | self.buffering, |
| | self.encoding, |
| | self.newline, |
| | self.suffix, |
| | self.prefix, |
| | self.dir, |
| | errors=self.errors, |
| | ) |
| | ) |
| | self._async_file = AsyncFile(fp) |
| | return self._async_file |
| |
|
| | async def __aexit__( |
| | self, |
| | exc_type: type[BaseException] | None, |
| | exc_value: BaseException | None, |
| | traceback: TracebackType | None, |
| | ) -> None: |
| | await self._async_file.aclose() |
| |
|
| |
|
| | class NamedTemporaryFile(Generic[AnyStr]): |
| | """ |
| | An asynchronous named temporary file that is automatically created and cleaned up. |
| | |
| | This class provides an asynchronous context manager for a temporary file with a |
| | visible name in the file system. It uses Python's standard |
| | :func:`~tempfile.NamedTemporaryFile` function and wraps the file object with |
| | :class:`AsyncFile` for asynchronous operations. |
| | |
| | :param mode: The mode in which the file is opened. Defaults to "w+b". |
| | :param buffering: The buffering policy (-1 means the default buffering). |
| | :param encoding: The encoding used to decode or encode the file. Only applicable in |
| | text mode. |
| | :param newline: Controls how universal newlines mode works (only applicable in text |
| | mode). |
| | :param suffix: The suffix for the temporary file name. |
| | :param prefix: The prefix for the temporary file name. |
| | :param dir: The directory in which the temporary file is created. |
| | :param delete: Whether to delete the file when it is closed. |
| | :param errors: The error handling scheme used for encoding/decoding errors. |
| | :param delete_on_close: (Python 3.12+) Whether to delete the file on close. |
| | """ |
| |
|
| | _async_file: AsyncFile[AnyStr] |
| |
|
| | @overload |
| | def __init__( |
| | self: NamedTemporaryFile[bytes], |
| | mode: OpenBinaryMode = ..., |
| | buffering: int = ..., |
| | encoding: str | None = ..., |
| | newline: str | None = ..., |
| | suffix: str | None = ..., |
| | prefix: str | None = ..., |
| | dir: str | None = ..., |
| | delete: bool = ..., |
| | *, |
| | errors: str | None = ..., |
| | delete_on_close: bool = ..., |
| | ): ... |
| | @overload |
| | def __init__( |
| | self: NamedTemporaryFile[str], |
| | mode: OpenTextMode, |
| | buffering: int = ..., |
| | encoding: str | None = ..., |
| | newline: str | None = ..., |
| | suffix: str | None = ..., |
| | prefix: str | None = ..., |
| | dir: str | None = ..., |
| | delete: bool = ..., |
| | *, |
| | errors: str | None = ..., |
| | delete_on_close: bool = ..., |
| | ): ... |
| |
|
| | def __init__( |
| | self, |
| | mode: OpenBinaryMode | OpenTextMode = "w+b", |
| | buffering: int = -1, |
| | encoding: str | None = None, |
| | newline: str | None = None, |
| | suffix: str | None = None, |
| | prefix: str | None = None, |
| | dir: str | None = None, |
| | delete: bool = True, |
| | *, |
| | errors: str | None = None, |
| | delete_on_close: bool = True, |
| | ) -> None: |
| | self._params: dict[str, Any] = { |
| | "mode": mode, |
| | "buffering": buffering, |
| | "encoding": encoding, |
| | "newline": newline, |
| | "suffix": suffix, |
| | "prefix": prefix, |
| | "dir": dir, |
| | "delete": delete, |
| | "errors": errors, |
| | } |
| | if sys.version_info >= (3, 12): |
| | self._params["delete_on_close"] = delete_on_close |
| |
|
| | async def __aenter__(self) -> AsyncFile[AnyStr]: |
| | fp = await to_thread.run_sync( |
| | lambda: tempfile.NamedTemporaryFile(**self._params) |
| | ) |
| | self._async_file = AsyncFile(fp) |
| | return self._async_file |
| |
|
| | async def __aexit__( |
| | self, |
| | exc_type: type[BaseException] | None, |
| | exc_value: BaseException | None, |
| | traceback: TracebackType | None, |
| | ) -> None: |
| | await self._async_file.aclose() |
| |
|
| |
|
| | class SpooledTemporaryFile(AsyncFile[AnyStr]): |
| | """ |
| | An asynchronous spooled temporary file that starts in memory and is spooled to disk. |
| | |
| | This class provides an asynchronous interface to a spooled temporary file, much like |
| | Python's standard :class:`~tempfile.SpooledTemporaryFile`. It supports asynchronous |
| | write operations and provides a method to force a rollover to disk. |
| | |
| | :param max_size: Maximum size in bytes before the file is rolled over to disk. |
| | :param mode: The mode in which the file is opened. Defaults to "w+b". |
| | :param buffering: The buffering policy (-1 means the default buffering). |
| | :param encoding: The encoding used to decode or encode the file (text mode only). |
| | :param newline: Controls how universal newlines mode works (text mode only). |
| | :param suffix: The suffix for the temporary file name. |
| | :param prefix: The prefix for the temporary file name. |
| | :param dir: The directory in which the temporary file is created. |
| | :param errors: The error handling scheme used for encoding/decoding errors. |
| | """ |
| |
|
| | _rolled: bool = False |
| |
|
| | @overload |
| | def __init__( |
| | self: SpooledTemporaryFile[bytes], |
| | max_size: int = ..., |
| | mode: OpenBinaryMode = ..., |
| | buffering: int = ..., |
| | encoding: str | None = ..., |
| | newline: str | None = ..., |
| | suffix: str | None = ..., |
| | prefix: str | None = ..., |
| | dir: str | None = ..., |
| | *, |
| | errors: str | None = ..., |
| | ): ... |
| | @overload |
| | def __init__( |
| | self: SpooledTemporaryFile[str], |
| | max_size: int = ..., |
| | mode: OpenTextMode = ..., |
| | buffering: int = ..., |
| | encoding: str | None = ..., |
| | newline: str | None = ..., |
| | suffix: str | None = ..., |
| | prefix: str | None = ..., |
| | dir: str | None = ..., |
| | *, |
| | errors: str | None = ..., |
| | ): ... |
| |
|
| | def __init__( |
| | self, |
| | max_size: int = 0, |
| | mode: OpenBinaryMode | OpenTextMode = "w+b", |
| | buffering: int = -1, |
| | encoding: str | None = None, |
| | newline: str | None = None, |
| | suffix: str | None = None, |
| | prefix: str | None = None, |
| | dir: str | None = None, |
| | *, |
| | errors: str | None = None, |
| | ) -> None: |
| | self._tempfile_params: dict[str, Any] = { |
| | "mode": mode, |
| | "buffering": buffering, |
| | "encoding": encoding, |
| | "newline": newline, |
| | "suffix": suffix, |
| | "prefix": prefix, |
| | "dir": dir, |
| | "errors": errors, |
| | } |
| | self._max_size = max_size |
| | if "b" in mode: |
| | super().__init__(BytesIO()) |
| | else: |
| | super().__init__( |
| | TextIOWrapper( |
| | BytesIO(), |
| | encoding=encoding, |
| | errors=errors, |
| | newline=newline, |
| | write_through=True, |
| | ) |
| | ) |
| |
|
| | async def aclose(self) -> None: |
| | if not self._rolled: |
| | self._fp.close() |
| | return |
| |
|
| | await super().aclose() |
| |
|
| | async def _check(self) -> None: |
| | if self._rolled or self._fp.tell() <= self._max_size: |
| | return |
| |
|
| | await self.rollover() |
| |
|
| | async def rollover(self) -> None: |
| | if self._rolled: |
| | return |
| |
|
| | self._rolled = True |
| | buffer = self._fp |
| | buffer.seek(0) |
| | self._fp = await to_thread.run_sync( |
| | lambda: tempfile.TemporaryFile(**self._tempfile_params) |
| | ) |
| | await self.write(buffer.read()) |
| | buffer.close() |
| |
|
| | @property |
| | def closed(self) -> bool: |
| | return self._fp.closed |
| |
|
| | async def read(self, size: int = -1) -> AnyStr: |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | return self._fp.read(size) |
| |
|
| | return await super().read(size) |
| |
|
| | async def read1(self: SpooledTemporaryFile[bytes], size: int = -1) -> bytes: |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | return self._fp.read1(size) |
| |
|
| | return await super().read1(size) |
| |
|
| | async def readline(self) -> AnyStr: |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | return self._fp.readline() |
| |
|
| | return await super().readline() |
| |
|
| | async def readlines(self) -> list[AnyStr]: |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | return self._fp.readlines() |
| |
|
| | return await super().readlines() |
| |
|
| | async def readinto(self: SpooledTemporaryFile[bytes], b: WriteableBuffer) -> int: |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | self._fp.readinto(b) |
| |
|
| | return await super().readinto(b) |
| |
|
| | async def readinto1(self: SpooledTemporaryFile[bytes], b: WriteableBuffer) -> int: |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | self._fp.readinto(b) |
| |
|
| | return await super().readinto1(b) |
| |
|
| | async def seek(self, offset: int, whence: int | None = os.SEEK_SET) -> int: |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | return self._fp.seek(offset, whence) |
| |
|
| | return await super().seek(offset, whence) |
| |
|
| | async def tell(self) -> int: |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | return self._fp.tell() |
| |
|
| | return await super().tell() |
| |
|
| | async def truncate(self, size: int | None = None) -> int: |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | return self._fp.truncate(size) |
| |
|
| | return await super().truncate(size) |
| |
|
| | @overload |
| | async def write(self: SpooledTemporaryFile[bytes], b: ReadableBuffer) -> int: ... |
| | @overload |
| | async def write(self: SpooledTemporaryFile[str], b: str) -> int: ... |
| |
|
| | async def write(self, b: ReadableBuffer | str) -> int: |
| | """ |
| | Asynchronously write data to the spooled temporary file. |
| | |
| | If the file has not yet been rolled over, the data is written synchronously, |
| | and a rollover is triggered if the size exceeds the maximum size. |
| | |
| | :param s: The data to write. |
| | :return: The number of bytes written. |
| | :raises RuntimeError: If the underlying file is not initialized. |
| | |
| | """ |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | result = self._fp.write(b) |
| | await self._check() |
| | return result |
| |
|
| | return await super().write(b) |
| |
|
| | @overload |
| | async def writelines( |
| | self: SpooledTemporaryFile[bytes], lines: Iterable[ReadableBuffer] |
| | ) -> None: ... |
| | @overload |
| | async def writelines( |
| | self: SpooledTemporaryFile[str], lines: Iterable[str] |
| | ) -> None: ... |
| |
|
| | async def writelines(self, lines: Iterable[str] | Iterable[ReadableBuffer]) -> None: |
| | """ |
| | Asynchronously write a list of lines to the spooled temporary file. |
| | |
| | If the file has not yet been rolled over, the lines are written synchronously, |
| | and a rollover is triggered if the size exceeds the maximum size. |
| | |
| | :param lines: An iterable of lines to write. |
| | :raises RuntimeError: If the underlying file is not initialized. |
| | |
| | """ |
| | if not self._rolled: |
| | await checkpoint_if_cancelled() |
| | result = self._fp.writelines(lines) |
| | await self._check() |
| | return result |
| |
|
| | return await super().writelines(lines) |
| |
|
| |
|
| | class TemporaryDirectory(Generic[AnyStr]): |
| | """ |
| | An asynchronous temporary directory that is created and cleaned up automatically. |
| | |
| | This class provides an asynchronous context manager for creating a temporary |
| | directory. It wraps Python's standard :class:`~tempfile.TemporaryDirectory` to |
| | perform directory creation and cleanup operations in a background thread. |
| | |
| | :param suffix: Suffix to be added to the temporary directory name. |
| | :param prefix: Prefix to be added to the temporary directory name. |
| | :param dir: The parent directory where the temporary directory is created. |
| | :param ignore_cleanup_errors: Whether to ignore errors during cleanup |
| | (Python 3.10+). |
| | :param delete: Whether to delete the directory upon closing (Python 3.12+). |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | suffix: AnyStr | None = None, |
| | prefix: AnyStr | None = None, |
| | dir: AnyStr | None = None, |
| | *, |
| | ignore_cleanup_errors: bool = False, |
| | delete: bool = True, |
| | ) -> None: |
| | self.suffix: AnyStr | None = suffix |
| | self.prefix: AnyStr | None = prefix |
| | self.dir: AnyStr | None = dir |
| | self.ignore_cleanup_errors = ignore_cleanup_errors |
| | self.delete = delete |
| |
|
| | self._tempdir: tempfile.TemporaryDirectory | None = None |
| |
|
| | async def __aenter__(self) -> str: |
| | params: dict[str, Any] = { |
| | "suffix": self.suffix, |
| | "prefix": self.prefix, |
| | "dir": self.dir, |
| | } |
| | if sys.version_info >= (3, 10): |
| | params["ignore_cleanup_errors"] = self.ignore_cleanup_errors |
| |
|
| | if sys.version_info >= (3, 12): |
| | params["delete"] = self.delete |
| |
|
| | self._tempdir = await to_thread.run_sync( |
| | lambda: tempfile.TemporaryDirectory(**params) |
| | ) |
| | return await to_thread.run_sync(self._tempdir.__enter__) |
| |
|
| | async def __aexit__( |
| | self, |
| | exc_type: type[BaseException] | None, |
| | exc_value: BaseException | None, |
| | traceback: TracebackType | None, |
| | ) -> None: |
| | if self._tempdir is not None: |
| | await to_thread.run_sync( |
| | self._tempdir.__exit__, exc_type, exc_value, traceback |
| | ) |
| |
|
| | async def cleanup(self) -> None: |
| | if self._tempdir is not None: |
| | await to_thread.run_sync(self._tempdir.cleanup) |
| |
|
| |
|
| | @overload |
| | async def mkstemp( |
| | suffix: str | None = None, |
| | prefix: str | None = None, |
| | dir: str | None = None, |
| | text: bool = False, |
| | ) -> tuple[int, str]: ... |
| |
|
| |
|
| | @overload |
| | async def mkstemp( |
| | suffix: bytes | None = None, |
| | prefix: bytes | None = None, |
| | dir: bytes | None = None, |
| | text: bool = False, |
| | ) -> tuple[int, bytes]: ... |
| |
|
| |
|
| | async def mkstemp( |
| | suffix: AnyStr | None = None, |
| | prefix: AnyStr | None = None, |
| | dir: AnyStr | None = None, |
| | text: bool = False, |
| | ) -> tuple[int, str | bytes]: |
| | """ |
| | Asynchronously create a temporary file and return an OS-level handle and the file |
| | name. |
| | |
| | This function wraps `tempfile.mkstemp` and executes it in a background thread. |
| | |
| | :param suffix: Suffix to be added to the file name. |
| | :param prefix: Prefix to be added to the file name. |
| | :param dir: Directory in which the temporary file is created. |
| | :param text: Whether the file is opened in text mode. |
| | :return: A tuple containing the file descriptor and the file name. |
| | |
| | """ |
| | return await to_thread.run_sync(tempfile.mkstemp, suffix, prefix, dir, text) |
| |
|
| |
|
| | @overload |
| | async def mkdtemp( |
| | suffix: str | None = None, |
| | prefix: str | None = None, |
| | dir: str | None = None, |
| | ) -> str: ... |
| |
|
| |
|
| | @overload |
| | async def mkdtemp( |
| | suffix: bytes | None = None, |
| | prefix: bytes | None = None, |
| | dir: bytes | None = None, |
| | ) -> bytes: ... |
| |
|
| |
|
| | async def mkdtemp( |
| | suffix: AnyStr | None = None, |
| | prefix: AnyStr | None = None, |
| | dir: AnyStr | None = None, |
| | ) -> str | bytes: |
| | """ |
| | Asynchronously create a temporary directory and return its path. |
| | |
| | This function wraps `tempfile.mkdtemp` and executes it in a background thread. |
| | |
| | :param suffix: Suffix to be added to the directory name. |
| | :param prefix: Prefix to be added to the directory name. |
| | :param dir: Parent directory where the temporary directory is created. |
| | :return: The path of the created temporary directory. |
| | |
| | """ |
| | return await to_thread.run_sync(tempfile.mkdtemp, suffix, prefix, dir) |
| |
|
| |
|
| | async def gettempdir() -> str: |
| | """ |
| | Asynchronously return the name of the directory used for temporary files. |
| | |
| | This function wraps `tempfile.gettempdir` and executes it in a background thread. |
| | |
| | :return: The path of the temporary directory as a string. |
| | |
| | """ |
| | return await to_thread.run_sync(tempfile.gettempdir) |
| |
|
| |
|
| | async def gettempdirb() -> bytes: |
| | """ |
| | Asynchronously return the name of the directory used for temporary files in bytes. |
| | |
| | This function wraps `tempfile.gettempdirb` and executes it in a background thread. |
| | |
| | :return: The path of the temporary directory as bytes. |
| | |
| | """ |
| | return await to_thread.run_sync(tempfile.gettempdirb) |
| |
|