File size: 1,581 Bytes
3193174
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
from collections.abc import Awaitable, Coroutine
from typing import Any

__all__ = ["gather_with_concurrency", "run_sync"]


def run_sync[T](coro: Awaitable[T] | Coroutine[Any, Any, T], *, context: str = "run_sync") -> T:
    """
    Run a coroutine synchronously when there is no active event loop.

    Raises RuntimeError if called from within an already-running loop.
    """
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        if not asyncio.iscoroutine(coro):

            async def _wrap() -> T:
                return await coro

            return asyncio.run(_wrap())
        return asyncio.run(coro)  # type: ignore[arg-type]

    msg = f"{context} cannot be called while an event loop is running. Use the asynchronous variant instead."
    raise RuntimeError(msg)


async def gather_with_concurrency[T](
    n: int,
    *coros: Awaitable[T],
) -> list[T]:
    """Run coroutines with a concurrency limit of n simultaneous tasks."""
    semaphore = asyncio.Semaphore(n)

    async def bounded(coro: Awaitable[T]) -> T:
        async with semaphore:
            return await coro

    return await asyncio.gather(*[bounded(c) for c in coros])


async def timeout_wrapper[T](
    coro: Awaitable[T],
    timeout_seconds: float,
    error_message: str = "Operation timed out",
) -> T:
    """Wrap a coroutine with a timeout, raising TimeoutError with the given message."""
    try:
        return await asyncio.wait_for(coro, timeout=timeout_seconds)
    except TimeoutError as exc:
        raise TimeoutError(error_message) from exc