File size: 1,962 Bytes
a8afc36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable, Hashable

# Separate pools so each workload class runs without head-of-line blocking the others:
# fast = player draft packs the user waits on; slow = the long boss-deck draft;
# art = card illustration, kept off the slow lane so it never queues behind the boss deck.
_executors = {
    "fast": ThreadPoolExecutor(max_workers=3),
    "slow": ThreadPoolExecutor(max_workers=1),
    "art": ThreadPoolExecutor(max_workers=2),
}
_lock = threading.Lock()
_jobs: dict[Hashable, Future] = {}
_missing = object()


# Submit one background job per key on its lane; lanes never starve one another.
def submit(key: Hashable, fn: Callable[[], Any], lane: str = "fast") -> Future:
    with _lock:
        future = _jobs.get(key)
        if future is None or (future.done() and future.exception() is not None):
            executor = _executors.get(lane, _executors["fast"])
            future = executor.submit(fn)
            _jobs[key] = future
        return future


# Return a job result, computing inline when it was never prefetched.
def take(key: Hashable, fn: Callable[[], Any]) -> Any:
    return submit(key, fn).result()


# Return a completed job result without blocking.
def take_ready(key: Hashable, default: Any = _missing) -> Any:
    with _lock:
        future = _jobs.get(key)
    if future is None or not future.done():
        if default is _missing:
            return None
        return default
    try:
        return future.result()
    except Exception:
        if default is _missing:
            return None
        return default


# Drop all cached jobs so a new run forges fresh cards.
def reset() -> None:
    with _lock:
        _jobs.clear()


# Block until every submitted job settles; used by tests.
def drain() -> None:
    with _lock:
        futures = list(_jobs.values())
    for future in futures:
        future.exception()