Spaces:
Running
Running
File size: 1,502 Bytes
a8afc36 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | import forge
# Verify identical keys reuse one background job.
def test_submit_dedupes_by_key() -> None:
forge.reset()
calls: list[int] = []
future = forge.submit("k", lambda: calls.append(1) or "a")
assert forge.submit("k", lambda: calls.append(1) or "b") is future
forge.drain()
assert calls == [1]
# Verify take computes inline when never prefetched and reuses results after.
def test_take_computes_and_caches() -> None:
forge.reset()
assert forge.take("pack", lambda: "fresh") == "fresh"
assert forge.take("pack", lambda: "other") == "fresh"
# Verify take_ready polls completed jobs without blocking.
def test_take_ready_polls_completed_jobs() -> None:
forge.reset()
assert forge.take_ready("missing", "empty") == "empty"
forge.submit("art", lambda: "ready")
forge.drain()
assert forge.take_ready("art", "empty") == "ready"
# Verify failed jobs are resubmitted on the next request.
def test_failed_jobs_resubmit() -> None:
forge.reset()
def boom() -> str:
raise RuntimeError("nope")
forge.submit("x", boom)
forge.drain()
assert forge.take("x", lambda: "recovered") == "recovered"
# Verify slow-lane jobs share the same key cache as fast-lane jobs.
def test_slow_lane_shares_key_cache() -> None:
forge.reset()
future = forge.submit("slow-key", lambda: "art", lane="slow")
assert forge.submit("slow-key", lambda: "other") is future
assert forge.take("slow-key", lambda: "other") == "art"
|