Spaces:
Runtime error
Runtime error
File size: 2,246 Bytes
6f72e2b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | """Utilities"""
from __future__ import annotations
import asyncio
import sys
import typing as t
from collections.abc import Mapping
from contextvars import copy_context
from functools import partial, wraps
if t.TYPE_CHECKING:
from collections.abc import Callable
from contextvars import Context
class LazyDict(Mapping[str, t.Any]):
"""Lazy evaluated read-only dictionary.
Initialised with a dictionary of key-value pairs where the values are either
constants or callables. Callables are evaluated each time the respective item is
read.
"""
def __init__(self, dict):
self._dict = dict
def __getitem__(self, key):
item = self._dict.get(key)
return item() if callable(item) else item
def __len__(self):
return len(self._dict)
def __iter__(self):
return iter(self._dict)
T = t.TypeVar("T")
U = t.TypeVar("U")
V = t.TypeVar("V")
def _async_in_context(
f: Callable[..., t.Coroutine[T, U, V]], context: Context | None = None
) -> Callable[..., t.Coroutine[T, U, V]]:
"""
Wrapper to run a coroutine in a persistent ContextVar Context.
Backports asyncio.create_task(context=...) behavior from Python 3.11
"""
if context is None:
context = copy_context()
if sys.version_info >= (3, 11):
@wraps(f)
async def run_in_context(*args, **kwargs):
coro = f(*args, **kwargs)
return await asyncio.create_task(coro, context=context)
return run_in_context
# don't need this backport when we require 3.11
# context_holder so we have a modifiable container for later calls
context_holder = [context] # type: ignore[unreachable]
async def preserve_context(f, *args, **kwargs):
"""call a coroutine, preserving the context after it is called"""
try:
return await f(*args, **kwargs)
finally:
# persist changes to the context for future calls
context_holder[0] = copy_context()
@wraps(f)
async def run_in_context_pre311(*args, **kwargs):
ctx = context_holder[0]
return await ctx.run(partial(asyncio.create_task, preserve_context(f, *args, **kwargs)))
return run_in_context_pre311
|