File size: 1,502 Bytes
a8afc36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import forge


# Verify identical keys reuse one background job.
def test_submit_dedupes_by_key() -> None:
    forge.reset()
    calls: list[int] = []
    future = forge.submit("k", lambda: calls.append(1) or "a")
    assert forge.submit("k", lambda: calls.append(1) or "b") is future
    forge.drain()
    assert calls == [1]


# Verify take computes inline when never prefetched and reuses results after.
def test_take_computes_and_caches() -> None:
    forge.reset()
    assert forge.take("pack", lambda: "fresh") == "fresh"
    assert forge.take("pack", lambda: "other") == "fresh"


# Verify take_ready polls completed jobs without blocking.
def test_take_ready_polls_completed_jobs() -> None:
    forge.reset()
    assert forge.take_ready("missing", "empty") == "empty"
    forge.submit("art", lambda: "ready")
    forge.drain()
    assert forge.take_ready("art", "empty") == "ready"


# Verify failed jobs are resubmitted on the next request.
def test_failed_jobs_resubmit() -> None:
    forge.reset()

    def boom() -> str:
        raise RuntimeError("nope")

    forge.submit("x", boom)
    forge.drain()
    assert forge.take("x", lambda: "recovered") == "recovered"


# Verify slow-lane jobs share the same key cache as fast-lane jobs.
def test_slow_lane_shares_key_cache() -> None:
    forge.reset()
    future = forge.submit("slow-key", lambda: "art", lane="slow")
    assert forge.submit("slow-key", lambda: "other") is future
    assert forge.take("slow-key", lambda: "other") == "art"