tabras / forge.py
vvennelakanti's picture
finished base
a8afc36
Raw
History Blame Contribute Delete
1.96 kB
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable, Hashable
# Separate pools so each workload class runs without head-of-line blocking the others:
# fast = player draft packs the user waits on; slow = the long boss-deck draft;
# art = card illustration, kept off the slow lane so it never queues behind the boss deck.
_executors = {
"fast": ThreadPoolExecutor(max_workers=3),
"slow": ThreadPoolExecutor(max_workers=1),
"art": ThreadPoolExecutor(max_workers=2),
}
_lock = threading.Lock()
_jobs: dict[Hashable, Future] = {}
_missing = object()
# Submit one background job per key on its lane; lanes never starve one another.
def submit(key: Hashable, fn: Callable[[], Any], lane: str = "fast") -> Future:
with _lock:
future = _jobs.get(key)
if future is None or (future.done() and future.exception() is not None):
executor = _executors.get(lane, _executors["fast"])
future = executor.submit(fn)
_jobs[key] = future
return future
# Return a job result, computing inline when it was never prefetched.
def take(key: Hashable, fn: Callable[[], Any]) -> Any:
return submit(key, fn).result()
# Return a completed job result without blocking.
def take_ready(key: Hashable, default: Any = _missing) -> Any:
with _lock:
future = _jobs.get(key)
if future is None or not future.done():
if default is _missing:
return None
return default
try:
return future.result()
except Exception:
if default is _missing:
return None
return default
# Drop all cached jobs so a new run forges fresh cards.
def reset() -> None:
with _lock:
_jobs.clear()
# Block until every submitted job settles; used by tests.
def drain() -> None:
with _lock:
futures = list(_jobs.values())
for future in futures:
future.exception()