import forge # Verify identical keys reuse one background job. def test_submit_dedupes_by_key() -> None: forge.reset() calls: list[int] = [] future = forge.submit("k", lambda: calls.append(1) or "a") assert forge.submit("k", lambda: calls.append(1) or "b") is future forge.drain() assert calls == [1] # Verify take computes inline when never prefetched and reuses results after. def test_take_computes_and_caches() -> None: forge.reset() assert forge.take("pack", lambda: "fresh") == "fresh" assert forge.take("pack", lambda: "other") == "fresh" # Verify take_ready polls completed jobs without blocking. def test_take_ready_polls_completed_jobs() -> None: forge.reset() assert forge.take_ready("missing", "empty") == "empty" forge.submit("art", lambda: "ready") forge.drain() assert forge.take_ready("art", "empty") == "ready" # Verify failed jobs are resubmitted on the next request. def test_failed_jobs_resubmit() -> None: forge.reset() def boom() -> str: raise RuntimeError("nope") forge.submit("x", boom) forge.drain() assert forge.take("x", lambda: "recovered") == "recovered" # Verify slow-lane jobs share the same key cache as fast-lane jobs. def test_slow_lane_shares_key_cache() -> None: forge.reset() future = forge.submit("slow-key", lambda: "art", lane="slow") assert forge.submit("slow-key", lambda: "other") is future assert forge.take("slow-key", lambda: "other") == "art"