Spaces:
Running
Running
File size: 1,962 Bytes
a8afc36 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | import threading
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable, Hashable
# Separate pools so each workload class runs without head-of-line blocking the others:
# fast = player draft packs the user waits on; slow = the long boss-deck draft;
# art = card illustration, kept off the slow lane so it never queues behind the boss deck.
_executors = {
"fast": ThreadPoolExecutor(max_workers=3),
"slow": ThreadPoolExecutor(max_workers=1),
"art": ThreadPoolExecutor(max_workers=2),
}
_lock = threading.Lock()
_jobs: dict[Hashable, Future] = {}
_missing = object()
# Submit one background job per key on its lane; lanes never starve one another.
def submit(key: Hashable, fn: Callable[[], Any], lane: str = "fast") -> Future:
with _lock:
future = _jobs.get(key)
if future is None or (future.done() and future.exception() is not None):
executor = _executors.get(lane, _executors["fast"])
future = executor.submit(fn)
_jobs[key] = future
return future
# Return a job result, computing inline when it was never prefetched.
def take(key: Hashable, fn: Callable[[], Any]) -> Any:
return submit(key, fn).result()
# Return a completed job result without blocking.
def take_ready(key: Hashable, default: Any = _missing) -> Any:
with _lock:
future = _jobs.get(key)
if future is None or not future.done():
if default is _missing:
return None
return default
try:
return future.result()
except Exception:
if default is _missing:
return None
return default
# Drop all cached jobs so a new run forges fresh cards.
def reset() -> None:
with _lock:
_jobs.clear()
# Block until every submitted job settles; used by tests.
def drain() -> None:
with _lock:
futures = list(_jobs.values())
for future in futures:
future.exception()
|