| | from __future__ import annotations |
| |
|
| | import atexit |
| | import os |
| | import pickle |
| | import sys |
| | from collections import deque |
| | from collections.abc import Callable |
| | from textwrap import dedent |
| | from typing import Any, Final, TypeVar |
| |
|
| | from . import current_time, to_thread |
| | from ._core._exceptions import BrokenWorkerIntepreter |
| | from ._core._synchronization import CapacityLimiter |
| | from .lowlevel import RunVar |
| |
|
| | if sys.version_info >= (3, 11): |
| | from typing import TypeVarTuple, Unpack |
| | else: |
| | from typing_extensions import TypeVarTuple, Unpack |
| |
|
| | UNBOUND: Final = 2 |
| | FMT_UNPICKLED: Final = 0 |
| | FMT_PICKLED: Final = 1 |
| | DEFAULT_CPU_COUNT: Final = 8 |
| | MAX_WORKER_IDLE_TIME = ( |
| | 30 |
| | ) |
| |
|
| | T_Retval = TypeVar("T_Retval") |
| | PosArgsT = TypeVarTuple("PosArgsT") |
| |
|
| | _idle_workers = RunVar[deque["Worker"]]("_available_workers") |
| | _default_interpreter_limiter = RunVar[CapacityLimiter]("_default_interpreter_limiter") |
| |
|
| |
|
| | class Worker: |
| | _run_func = compile( |
| | dedent(""" |
| | import _interpqueues as queues |
| | import _interpreters as interpreters |
| | from pickle import loads, dumps, HIGHEST_PROTOCOL |
| | |
| | item = queues.get(queue_id)[0] |
| | try: |
| | func, args = loads(item) |
| | retval = func(*args) |
| | except BaseException as exc: |
| | is_exception = True |
| | retval = exc |
| | else: |
| | is_exception = False |
| | |
| | try: |
| | queues.put(queue_id, (retval, is_exception), FMT_UNPICKLED, UNBOUND) |
| | except interpreters.NotShareableError: |
| | retval = dumps(retval, HIGHEST_PROTOCOL) |
| | queues.put(queue_id, (retval, is_exception), FMT_PICKLED, UNBOUND) |
| | """), |
| | "<string>", |
| | "exec", |
| | ) |
| |
|
| | last_used: float = 0 |
| |
|
| | _initialized: bool = False |
| | _interpreter_id: int |
| | _queue_id: int |
| |
|
| | def initialize(self) -> None: |
| | import _interpqueues as queues |
| | import _interpreters as interpreters |
| |
|
| | self._interpreter_id = interpreters.create() |
| | self._queue_id = queues.create(2, FMT_UNPICKLED, UNBOUND) |
| | self._initialized = True |
| | interpreters.set___main___attrs( |
| | self._interpreter_id, |
| | { |
| | "queue_id": self._queue_id, |
| | "FMT_PICKLED": FMT_PICKLED, |
| | "FMT_UNPICKLED": FMT_UNPICKLED, |
| | "UNBOUND": UNBOUND, |
| | }, |
| | ) |
| |
|
| | def destroy(self) -> None: |
| | import _interpqueues as queues |
| | import _interpreters as interpreters |
| |
|
| | if self._initialized: |
| | interpreters.destroy(self._interpreter_id) |
| | queues.destroy(self._queue_id) |
| |
|
| | def _call( |
| | self, |
| | func: Callable[..., T_Retval], |
| | args: tuple[Any], |
| | ) -> tuple[Any, bool]: |
| | import _interpqueues as queues |
| | import _interpreters as interpreters |
| |
|
| | if not self._initialized: |
| | self.initialize() |
| |
|
| | payload = pickle.dumps((func, args), pickle.HIGHEST_PROTOCOL) |
| | queues.put(self._queue_id, payload, FMT_PICKLED, UNBOUND) |
| |
|
| | res: Any |
| | is_exception: bool |
| | if exc_info := interpreters.exec(self._interpreter_id, self._run_func): |
| | raise BrokenWorkerIntepreter(exc_info) |
| |
|
| | (res, is_exception), fmt = queues.get(self._queue_id)[:2] |
| | if fmt == FMT_PICKLED: |
| | res = pickle.loads(res) |
| |
|
| | return res, is_exception |
| |
|
| | async def call( |
| | self, |
| | func: Callable[..., T_Retval], |
| | args: tuple[Any], |
| | limiter: CapacityLimiter, |
| | ) -> T_Retval: |
| | result, is_exception = await to_thread.run_sync( |
| | self._call, |
| | func, |
| | args, |
| | limiter=limiter, |
| | ) |
| | if is_exception: |
| | raise result |
| |
|
| | return result |
| |
|
| |
|
| | def _stop_workers(workers: deque[Worker]) -> None: |
| | for worker in workers: |
| | worker.destroy() |
| |
|
| | workers.clear() |
| |
|
| |
|
| | async def run_sync( |
| | func: Callable[[Unpack[PosArgsT]], T_Retval], |
| | *args: Unpack[PosArgsT], |
| | limiter: CapacityLimiter | None = None, |
| | ) -> T_Retval: |
| | """ |
| | Call the given function with the given arguments in a subinterpreter. |
| | |
| | If the ``cancellable`` option is enabled and the task waiting for its completion is |
| | cancelled, the call will still run its course but its return value (or any raised |
| | exception) will be ignored. |
| | |
| | .. warning:: This feature is **experimental**. The upstream interpreter API has not |
| | yet been finalized or thoroughly tested, so don't rely on this for anything |
| | mission critical. |
| | |
| | :param func: a callable |
| | :param args: positional arguments for the callable |
| | :param limiter: capacity limiter to use to limit the total amount of subinterpreters |
| | running (if omitted, the default limiter is used) |
| | :return: the result of the call |
| | :raises BrokenWorkerIntepreter: if there's an internal error in a subinterpreter |
| | |
| | """ |
| | if sys.version_info <= (3, 13): |
| | raise RuntimeError("subinterpreters require at least Python 3.13") |
| |
|
| | if limiter is None: |
| | limiter = current_default_interpreter_limiter() |
| |
|
| | try: |
| | idle_workers = _idle_workers.get() |
| | except LookupError: |
| | idle_workers = deque() |
| | _idle_workers.set(idle_workers) |
| | atexit.register(_stop_workers, idle_workers) |
| |
|
| | async with limiter: |
| | try: |
| | worker = idle_workers.pop() |
| | except IndexError: |
| | worker = Worker() |
| |
|
| | try: |
| | return await worker.call(func, args, limiter) |
| | finally: |
| | |
| | now = current_time() |
| | while idle_workers: |
| | if now - idle_workers[0].last_used <= MAX_WORKER_IDLE_TIME: |
| | break |
| |
|
| | await to_thread.run_sync(idle_workers.popleft().destroy, limiter=limiter) |
| |
|
| | worker.last_used = current_time() |
| | idle_workers.append(worker) |
| |
|
| |
|
| | def current_default_interpreter_limiter() -> CapacityLimiter: |
| | """ |
| | Return the capacity limiter that is used by default to limit the number of |
| | concurrently running subinterpreters. |
| | |
| | Defaults to the number of CPU cores. |
| | |
| | :return: a capacity limiter object |
| | |
| | """ |
| | try: |
| | return _default_interpreter_limiter.get() |
| | except LookupError: |
| | limiter = CapacityLimiter(os.cpu_count() or DEFAULT_CPU_COUNT) |
| | _default_interpreter_limiter.set(limiter) |
| | return limiter |
| |
|