| from __future__ import annotations | |
| import sys | |
| from collections.abc import AsyncIterable, Iterable, Mapping, Sequence | |
| from io import BytesIO | |
| from os import PathLike | |
| from subprocess import PIPE, CalledProcessError, CompletedProcess | |
| from typing import IO, Any, Union, cast | |
| from ..abc import Process | |
| from ._eventloop import get_async_backend | |
| from ._tasks import create_task_group | |
| if sys.version_info >= (3, 10): | |
| from typing import TypeAlias | |
| else: | |
| from typing_extensions import TypeAlias | |
| StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"] | |
| async def run_process( | |
| command: StrOrBytesPath | Sequence[StrOrBytesPath], | |
| *, | |
| input: bytes | None = None, | |
| stdin: int | IO[Any] | None = None, | |
| stdout: int | IO[Any] | None = PIPE, | |
| stderr: int | IO[Any] | None = PIPE, | |
| check: bool = True, | |
| cwd: StrOrBytesPath | None = None, | |
| env: Mapping[str, str] | None = None, | |
| startupinfo: Any = None, | |
| creationflags: int = 0, | |
| start_new_session: bool = False, | |
| pass_fds: Sequence[int] = (), | |
| user: str | int | None = None, | |
| group: str | int | None = None, | |
| extra_groups: Iterable[str | int] | None = None, | |
| umask: int = -1, | |
| ) -> CompletedProcess[bytes]: | |
| """ | |
| Run an external command in a subprocess and wait until it completes. | |
| .. seealso:: :func:`subprocess.run` | |
| :param command: either a string to pass to the shell, or an iterable of strings | |
| containing the executable name or path and its arguments | |
| :param input: bytes passed to the standard input of the subprocess | |
| :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, | |
| a file-like object, or `None`; ``input`` overrides this | |
| :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, | |
| a file-like object, or `None` | |
| :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, | |
| :data:`subprocess.STDOUT`, a file-like object, or `None` | |
| :param check: if ``True``, raise :exc:`~subprocess.CalledProcessError` if the | |
| process terminates with a return code other than 0 | |
| :param cwd: If not ``None``, change the working directory to this before running the | |
| command | |
| :param env: if not ``None``, this mapping replaces the inherited environment | |
| variables from the parent process | |
| :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used | |
| to specify process startup parameters (Windows only) | |
| :param creationflags: flags that can be used to control the creation of the | |
| subprocess (see :class:`subprocess.Popen` for the specifics) | |
| :param start_new_session: if ``true`` the setsid() system call will be made in the | |
| child process prior to the execution of the subprocess. (POSIX only) | |
| :param pass_fds: sequence of file descriptors to keep open between the parent and | |
| child processes. (POSIX only) | |
| :param user: effective user to run the process as (Python >= 3.9, POSIX only) | |
| :param group: effective group to run the process as (Python >= 3.9, POSIX only) | |
| :param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9, | |
| POSIX only) | |
| :param umask: if not negative, this umask is applied in the child process before | |
| running the given command (Python >= 3.9, POSIX only) | |
| :return: an object representing the completed process | |
| :raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process | |
| exits with a nonzero return code | |
| """ | |
| async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None: | |
| buffer = BytesIO() | |
| async for chunk in stream: | |
| buffer.write(chunk) | |
| stream_contents[index] = buffer.getvalue() | |
| if stdin is not None and input is not None: | |
| raise ValueError("only one of stdin and input is allowed") | |
| async with await open_process( | |
| command, | |
| stdin=PIPE if input else stdin, | |
| stdout=stdout, | |
| stderr=stderr, | |
| cwd=cwd, | |
| env=env, | |
| startupinfo=startupinfo, | |
| creationflags=creationflags, | |
| start_new_session=start_new_session, | |
| pass_fds=pass_fds, | |
| user=user, | |
| group=group, | |
| extra_groups=extra_groups, | |
| umask=umask, | |
| ) as process: | |
| stream_contents: list[bytes | None] = [None, None] | |
| async with create_task_group() as tg: | |
| if process.stdout: | |
| tg.start_soon(drain_stream, process.stdout, 0) | |
| if process.stderr: | |
| tg.start_soon(drain_stream, process.stderr, 1) | |
| if process.stdin and input: | |
| await process.stdin.send(input) | |
| await process.stdin.aclose() | |
| await process.wait() | |
| output, errors = stream_contents | |
| if check and process.returncode != 0: | |
| raise CalledProcessError(cast(int, process.returncode), command, output, errors) | |
| return CompletedProcess(command, cast(int, process.returncode), output, errors) | |
| async def open_process( | |
| command: StrOrBytesPath | Sequence[StrOrBytesPath], | |
| *, | |
| stdin: int | IO[Any] | None = PIPE, | |
| stdout: int | IO[Any] | None = PIPE, | |
| stderr: int | IO[Any] | None = PIPE, | |
| cwd: StrOrBytesPath | None = None, | |
| env: Mapping[str, str] | None = None, | |
| startupinfo: Any = None, | |
| creationflags: int = 0, | |
| start_new_session: bool = False, | |
| pass_fds: Sequence[int] = (), | |
| user: str | int | None = None, | |
| group: str | int | None = None, | |
| extra_groups: Iterable[str | int] | None = None, | |
| umask: int = -1, | |
| ) -> Process: | |
| """ | |
| Start an external command in a subprocess. | |
| .. seealso:: :class:`subprocess.Popen` | |
| :param command: either a string to pass to the shell, or an iterable of strings | |
| containing the executable name or path and its arguments | |
| :param stdin: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, a | |
| file-like object, or ``None`` | |
| :param stdout: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, | |
| a file-like object, or ``None`` | |
| :param stderr: one of :data:`subprocess.PIPE`, :data:`subprocess.DEVNULL`, | |
| :data:`subprocess.STDOUT`, a file-like object, or ``None`` | |
| :param cwd: If not ``None``, the working directory is changed before executing | |
| :param env: If env is not ``None``, it must be a mapping that defines the | |
| environment variables for the new process | |
| :param creationflags: flags that can be used to control the creation of the | |
| subprocess (see :class:`subprocess.Popen` for the specifics) | |
| :param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used | |
| to specify process startup parameters (Windows only) | |
| :param start_new_session: if ``true`` the setsid() system call will be made in the | |
| child process prior to the execution of the subprocess. (POSIX only) | |
| :param pass_fds: sequence of file descriptors to keep open between the parent and | |
| child processes. (POSIX only) | |
| :param user: effective user to run the process as (POSIX only) | |
| :param group: effective group to run the process as (POSIX only) | |
| :param extra_groups: supplementary groups to set in the subprocess (POSIX only) | |
| :param umask: if not negative, this umask is applied in the child process before | |
| running the given command (POSIX only) | |
| :return: an asynchronous process object | |
| """ | |
| kwargs: dict[str, Any] = {} | |
| if user is not None: | |
| kwargs["user"] = user | |
| if group is not None: | |
| kwargs["group"] = group | |
| if extra_groups is not None: | |
| kwargs["extra_groups"] = group | |
| if umask >= 0: | |
| kwargs["umask"] = umask | |
| return await get_async_backend().open_process( | |
| command, | |
| stdin=stdin, | |
| stdout=stdout, | |
| stderr=stderr, | |
| cwd=cwd, | |
| env=env, | |
| startupinfo=startupinfo, | |
| creationflags=creationflags, | |
| start_new_session=start_new_session, | |
| pass_fds=pass_fds, | |
| **kwargs, | |
| ) | |