tabras / tests /test_forge.py
vvennelakanti's picture
finished base
a8afc36
Raw
History Blame Contribute Delete
1.5 kB
import forge
# Verify identical keys reuse one background job.
def test_submit_dedupes_by_key() -> None:
forge.reset()
calls: list[int] = []
future = forge.submit("k", lambda: calls.append(1) or "a")
assert forge.submit("k", lambda: calls.append(1) or "b") is future
forge.drain()
assert calls == [1]
# Verify take computes inline when never prefetched and reuses results after.
def test_take_computes_and_caches() -> None:
forge.reset()
assert forge.take("pack", lambda: "fresh") == "fresh"
assert forge.take("pack", lambda: "other") == "fresh"
# Verify take_ready polls completed jobs without blocking.
def test_take_ready_polls_completed_jobs() -> None:
forge.reset()
assert forge.take_ready("missing", "empty") == "empty"
forge.submit("art", lambda: "ready")
forge.drain()
assert forge.take_ready("art", "empty") == "ready"
# Verify failed jobs are resubmitted on the next request.
def test_failed_jobs_resubmit() -> None:
forge.reset()
def boom() -> str:
raise RuntimeError("nope")
forge.submit("x", boom)
forge.drain()
assert forge.take("x", lambda: "recovered") == "recovered"
# Verify slow-lane jobs share the same key cache as fast-lane jobs.
def test_slow_lane_shares_key_cache() -> None:
forge.reset()
future = forge.submit("slow-key", lambda: "art", lane="slow")
assert forge.submit("slow-key", lambda: "other") is future
assert forge.take("slow-key", lambda: "other") == "art"