import threading from concurrent.futures import Future, ThreadPoolExecutor from typing import Any, Callable, Hashable # Separate pools so each workload class runs without head-of-line blocking the others: # fast = player draft packs the user waits on; slow = the long boss-deck draft; # art = card illustration, kept off the slow lane so it never queues behind the boss deck. _executors = { "fast": ThreadPoolExecutor(max_workers=3), "slow": ThreadPoolExecutor(max_workers=1), "art": ThreadPoolExecutor(max_workers=2), } _lock = threading.Lock() _jobs: dict[Hashable, Future] = {} _missing = object() # Submit one background job per key on its lane; lanes never starve one another. def submit(key: Hashable, fn: Callable[[], Any], lane: str = "fast") -> Future: with _lock: future = _jobs.get(key) if future is None or (future.done() and future.exception() is not None): executor = _executors.get(lane, _executors["fast"]) future = executor.submit(fn) _jobs[key] = future return future # Return a job result, computing inline when it was never prefetched. def take(key: Hashable, fn: Callable[[], Any]) -> Any: return submit(key, fn).result() # Return a completed job result without blocking. def take_ready(key: Hashable, default: Any = _missing) -> Any: with _lock: future = _jobs.get(key) if future is None or not future.done(): if default is _missing: return None return default try: return future.result() except Exception: if default is _missing: return None return default # Drop all cached jobs so a new run forges fresh cards. def reset() -> None: with _lock: _jobs.clear() # Block until every submitted job settles; used by tests. def drain() -> None: with _lock: futures = list(_jobs.values()) for future in futures: future.exception()