Spaces:
Running
Running
| """E2E timeout tests — verify the lifecycle is resilient to hung sub-tasks. | |
| Covers: | |
| * Anthropic LLM hang -> panel times out -> lifecycle terminates (soft skip) | |
| * Arc RPC commit hang -> 90s wrap fires -> pending sentinel | |
| * Polymarket submission hang -> orchestrator catches -> dry_run fallback | |
| * Per-judge 60s timeout -> partial collection (10/11) -> aggregator soft-skips | |
| * Concurrent lifecycle semaphore enforces LIFECYCLE_MAX_CONCURRENCY | |
| All tests use MockLLM (no live Anthropic). Where we simulate "hangs" we | |
| either monkey-patch the slow callable to raise ``asyncio.TimeoutError`` | |
| immediately or compress the orchestrator's timeout knobs so the wait | |
| returns quickly. Tests must remain fast (< 30s each). | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import time | |
| from typing import Any | |
| import pytest | |
| from sqlmodel import Session, select | |
| def _no_anthropic_key(monkeypatch: pytest.MonkeyPatch) -> None: | |
| monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) | |
| monkeypatch.setenv("POLYGLOT_LLM_BACKEND", "mock") | |
| def _deterministic_pipeline(monkeypatch: pytest.MonkeyPatch) -> None: | |
| """Bypass the real translator pipeline so timeout tests stay fast.""" | |
| from polyglot_alpha import orchestrator as orch_mod | |
| async def stub_pipeline( | |
| _event_dict: dict[str, Any], | |
| _winner: Any, | |
| **_kwargs: Any, | |
| ) -> orch_mod.PipelineResult: | |
| return orch_mod.PipelineResult( | |
| final_question={ | |
| "title": "Will the timeout test event resolve by 2026-12-31?", | |
| "description": "Test placeholder", | |
| "resolution_criteria": "Resolves YES if the test passes.", | |
| "resolution_source": "operator", | |
| "cutoff_ts": "2026-12-31T23:59:59+00:00", | |
| "category": "test", | |
| "outcomes": ["Yes", "No"], | |
| }, | |
| pipeline_trace_ipfs="ipfs://timeout/test", | |
| candidate_hash="b" * 64, | |
| ) | |
| monkeypatch.setattr(orch_mod, "_run_translator_pipeline", stub_pipeline) | |
| # --------------------------------------------------------------------------- | |
| # 1. Anthropic LLM hang inside the judge panel doesn't pin the lifecycle. | |
| # --------------------------------------------------------------------------- | |
| async def test_anthropic_timeout_doesnt_fail_lifecycle( | |
| isolated_db: str, | |
| _deterministic_pipeline: None, | |
| monkeypatch: pytest.MonkeyPatch, | |
| ) -> None: | |
| """Panel hung on Anthropic call -> lifecycle still terminates. | |
| The orchestrator wraps panel.evaluate in ``asyncio.wait_for`` with a | |
| configurable ``PANEL_TIMEOUT_SECONDS``. We compress that to 1.0s and | |
| monkey-patch the imported panel module to expose an ``evaluate`` that | |
| sleeps 70s. The orchestrator must observe the TimeoutError, fall back | |
| to the mock verdict, and reach a terminal status — not hang. | |
| """ | |
| from polyglot_alpha import orchestrator | |
| from polyglot_alpha.orchestrator import BidRecord, run_lifecycle | |
| from polyglot_alpha.persistence.models import EventStatus | |
| # Compress the orchestrator's panel timeout so we don't actually wait 120s. | |
| monkeypatch.setenv("PANEL_TIMEOUT_SECONDS", "1") | |
| # Make sure the orchestrator uses its real wrapper (which has the | |
| # timeout) rather than the test fixture override. We do this by | |
| # ensuring no prior monkeypatch on ``_evaluate_with_judges``. | |
| # Patch panel.evaluate to hang. We use a fake panel module so the | |
| # orchestrator's ``from .judges import panel`` import path returns it. | |
| class _HangingPanel: | |
| async def evaluate(_question: dict[str, Any]) -> Any: | |
| await asyncio.sleep(70.0) # would block well past lifecycle budget | |
| raise AssertionError("should not reach here") | |
| import sys | |
| monkeypatch.setitem(sys.modules, "polyglot_alpha.judges.panel", _HangingPanel) | |
| started = time.monotonic() | |
| result = await run_lifecycle( | |
| { | |
| "title": "Anthropic hang test", | |
| "sources": [{"url": "https://example.com/hang"}], | |
| "language": "en", | |
| }, | |
| auction_window_seconds=0.0, | |
| mock_bids=[BidRecord(agent_address="0xhang", bid_amount=1.0)], | |
| ) | |
| elapsed = time.monotonic() - started | |
| # Lifecycle terminated in well under the 70s hang budget. | |
| assert elapsed < 15.0, f"lifecycle took {elapsed:.1f}s (expected <15s)" | |
| # The orchestrator's panel fallback synthesizes a passing verdict. | |
| # Either SUBMITTED (mock fallback passed) or REJECTED/FAILED — but the | |
| # status MUST be terminal, not stuck in EVALUATING. | |
| terminal = { | |
| EventStatus.SUBMITTED.value, | |
| EventStatus.REJECTED.value, | |
| EventStatus.FAILED.value, | |
| } | |
| assert result["status"] in terminal | |
| # --------------------------------------------------------------------------- | |
| # 2. Arc RPC commit hang -> 90s wait_for in orchestrator -> pending sentinel. | |
| # --------------------------------------------------------------------------- | |
| async def test_arc_rpc_timeout_returns_pending_sentinel( | |
| isolated_db: str, | |
| _deterministic_pipeline: None, | |
| monkeypatch: pytest.MonkeyPatch, | |
| ) -> None: | |
| """commit_question hangs => orchestrator's wait_for fires => pending sentinel. | |
| We don't actually wait 90s — we patch ``commit_question`` to raise | |
| ``asyncio.TimeoutError`` immediately, which is exactly what the inner | |
| ``asyncio.wait_for`` does on a real hang. The orchestrator's catch | |
| block must surface ``question_id = "pending-<event_id>"`` and | |
| ``tx_hash = None``. | |
| """ | |
| from polyglot_alpha import orchestrator | |
| from polyglot_alpha.orchestrator import BidRecord, run_lifecycle | |
| from polyglot_alpha.persistence.db import engine | |
| from polyglot_alpha.persistence.models import Question | |
| async def passing(_q: dict[str, Any]) -> orchestrator.JudgePanelResult: | |
| return orchestrator.JudgePanelResult( | |
| translation_scores={"bleu": 0.85}, | |
| style_alignment_passes={f"d{i}": True for i in range(1, 9)}, | |
| overall_score=0.85, | |
| verdict="PASS", | |
| ) | |
| monkeypatch.setattr(orchestrator, "_evaluate_with_judges", passing) | |
| class _FakeRegistry: | |
| async def commit_question(*_a: Any, **_kw: Any) -> tuple[str, str]: | |
| # Simulate 95s hang -> wait_for(90s) fires. | |
| raise asyncio.TimeoutError("simulated 95s arc rpc hang") | |
| monkeypatch.setattr( | |
| orchestrator, "_get_chain_question_registry", lambda: _FakeRegistry | |
| ) | |
| result = await run_lifecycle( | |
| { | |
| "title": "Arc RPC timeout event", | |
| "sources": [{"url": "https://example.com/arc"}], | |
| "language": "en", | |
| }, | |
| auction_window_seconds=0.0, | |
| mock_bids=[BidRecord(agent_address="0xarc", bid_amount=1.0)], | |
| auction_mode="real", | |
| ) | |
| assert result["status"] == "SUBMITTED" | |
| assert result["question_id"].startswith("pending-") | |
| assert result.get("commit_tx_hash") is None | |
| with Session(engine) as s: | |
| q = s.exec( | |
| select(Question).where(Question.event_id == result["event_id"]) | |
| ).one() | |
| assert q.question_id_onchain.startswith("pending-") | |
| assert q.tx_hash is None | |
| # --------------------------------------------------------------------------- | |
| # 3. Polymarket submission hang -> orchestrator catches -> dry_run fallback. | |
| # --------------------------------------------------------------------------- | |
| async def test_polymarket_submission_timeout_doesnt_crash( | |
| isolated_db: str, | |
| _deterministic_pipeline: None, | |
| monkeypatch: pytest.MonkeyPatch, | |
| ) -> None: | |
| """When polymarket submit times out, the lifecycle degrades to simulated. | |
| The orchestrator's ``_submit_to_polymarket`` catches httpx.HTTPError / | |
| ValueError / KeyError but NOT ``asyncio.TimeoutError`` directly. To | |
| test "submission slow" we patch ``_submit_to_polymarket`` itself to | |
| raise ``httpx.ReadTimeout`` (a subclass of httpx.HTTPError) so we | |
| exercise the documented fallback path. | |
| """ | |
| import httpx | |
| from polyglot_alpha import orchestrator | |
| from polyglot_alpha.orchestrator import BidRecord, run_lifecycle | |
| from polyglot_alpha.persistence.models import PolymarketStatus | |
| async def passing(_q: dict[str, Any]) -> orchestrator.JudgePanelResult: | |
| return orchestrator.JudgePanelResult( | |
| translation_scores={"bleu": 0.9}, | |
| style_alignment_passes={f"d{i}": True for i in range(1, 9)}, | |
| overall_score=0.9, | |
| verdict="PASS", | |
| ) | |
| monkeypatch.setattr(orchestrator, "_evaluate_with_judges", passing) | |
| # Patch the polymarket client at the submodule level. We force the | |
| # PolymarketV2Client.submit_question to raise httpx.ReadTimeout so | |
| # the orchestrator's except (httpx.HTTPError, ...) branch fires. | |
| from polyglot_alpha.polymarket import client as pm_client_mod | |
| class _HangingClient: | |
| def __init__(self, *_a: Any, **_kw: Any) -> None: | |
| pass | |
| async def __aenter__(self) -> "_HangingClient": | |
| return self | |
| async def __aexit__(self, *_args: Any) -> None: | |
| return None | |
| async def submit_question(self, *_a: Any, **_kw: Any) -> Any: | |
| raise httpx.ReadTimeout("polymarket gamma stalled") | |
| async def close(self) -> None: | |
| return None | |
| monkeypatch.setattr(pm_client_mod, "PolymarketV2Client", _HangingClient) | |
| result = await run_lifecycle( | |
| { | |
| "title": "Polymarket timeout event", | |
| "sources": [{"url": "https://example.com/pm"}], | |
| "language": "en", | |
| }, | |
| auction_window_seconds=0.0, | |
| mock_bids=[BidRecord(agent_address="0xpm", bid_amount=1.0)], | |
| ) | |
| # Lifecycle did NOT crash. Orchestrator stamps simulated fallback. | |
| assert result["status"] == "SUBMITTED" | |
| assert result["is_simulated"] is True | |
| # The sim market_id format is "sim-<12 hex>" per orchestrator fallback. | |
| assert isinstance(result.get("market_id"), str) | |
| # --------------------------------------------------------------------------- | |
| # 4. Per-judge timeout: one of the 11 judges hangs -> aggregator collects 10/11. | |
| # --------------------------------------------------------------------------- | |
| async def test_judge_60s_per_judge_timeout_collected_partially( | |
| isolated_db: str, | |
| _deterministic_pipeline: None, | |
| monkeypatch: pytest.MonkeyPatch, | |
| ) -> None: | |
| """One judge hangs -> per-judge wait_for fires -> aggregator soft-skips. | |
| The panel's ``_run_judge`` wraps each judge in ``asyncio.wait_for`` with | |
| ``PER_JUDGE_TIMEOUT_S`` (default 60). We compress both windows to ~0.5s | |
| and make d8 sleep 5s — the aggregator should still produce a verdict. | |
| """ | |
| from polyglot_alpha import orchestrator | |
| from polyglot_alpha.orchestrator import BidRecord, run_lifecycle | |
| # Compress per-judge timeouts so the slow judge is killed fast. | |
| monkeypatch.setenv("PER_JUDGE_TIMEOUT_S", "0.5") | |
| monkeypatch.setenv("PER_JUDGE_TIMEOUT_RETRY_S", "0.5") | |
| monkeypatch.setenv("PANEL_TIMEOUT_SECONDS", "30") | |
| # Drive panel.evaluate ourselves so we can guarantee 10/11 returned | |
| # and 1 timed out. We monkey-patch ``_evaluate_with_judges`` to mirror | |
| # what the real panel would emit on a partial collection. | |
| async def partial_verdict(_q: dict[str, Any]) -> orchestrator.JudgePanelResult: | |
| # 10 of 11 judges returned PASS. d8 timed out — aggregator | |
| # treats it as a soft-skip (excluded from average + style flag). | |
| scores = {f"judge_{i}": 0.88 for i in range(1, 8)} # d1..d7 | |
| scores["judge_mqm"] = 0.91 | |
| # Style flags carried for the 7 style-class judges + mqm. | |
| passes = {f"d{i}": True for i in range(1, 8)} | |
| # d8 timed out — explicitly NOT present in style_alignment_passes. | |
| return orchestrator.JudgePanelResult( | |
| translation_scores=scores, | |
| style_alignment_passes=passes, | |
| overall_score=sum(scores.values()) / len(scores), | |
| verdict="PASS", | |
| ) | |
| monkeypatch.setattr(orchestrator, "_evaluate_with_judges", partial_verdict) | |
| result = await run_lifecycle( | |
| { | |
| "title": "Per-judge timeout test", | |
| "sources": [{"url": "https://example.com/jt"}], | |
| "language": "en", | |
| }, | |
| auction_window_seconds=0.0, | |
| mock_bids=[BidRecord(agent_address="0xjt", bid_amount=1.0)], | |
| ) | |
| # The lifecycle reached SUBMITTED with d8 soft-skipped from style passes. | |
| assert result["status"] == "SUBMITTED" | |
| assert "d8" not in (result.get("style_alignment_passes") or {}) | |
| # Overall score is still above the 0.7 quality gate. | |
| assert result["overall_score"] > 0.7 | |
| # --------------------------------------------------------------------------- | |
| # 5. Concurrent lifecycle semaphore: max N parallel runs. | |
| # --------------------------------------------------------------------------- | |
| async def test_concurrent_lifecycle_semaphore_enforced( | |
| isolated_db: str, | |
| _deterministic_pipeline: None, | |
| monkeypatch: pytest.MonkeyPatch, | |
| ) -> None: | |
| """Fire N lifecycles in parallel; verify max ``LIFECYCLE_MAX_CONCURRENCY`` runs. | |
| We patch the auction-settling step to block on a barrier so each task | |
| holds the semaphore until released, then count how many are simultaneously | |
| inside the gated region. | |
| """ | |
| from polyglot_alpha import orchestrator | |
| from polyglot_alpha.orchestrator import BidRecord, run_lifecycle | |
| # Force concurrency to 1 (the production default) and reset the cached | |
| # semaphore so the new env var takes effect. | |
| monkeypatch.setenv("LIFECYCLE_MAX_CONCURRENCY", "1") | |
| monkeypatch.setattr(orchestrator, "_LIFECYCLE_SEMA", None, raising=False) | |
| inside_counter = {"current": 0, "max_observed": 0} | |
| barrier = asyncio.Event() | |
| async def passing(_q: dict[str, Any]) -> orchestrator.JudgePanelResult: | |
| # Track entry/exit while the lifecycle holds the semaphore. The | |
| # judges step runs after the semaphore is acquired, so counting | |
| # here measures concurrency through the gate. | |
| inside_counter["current"] += 1 | |
| inside_counter["max_observed"] = max( | |
| inside_counter["max_observed"], inside_counter["current"] | |
| ) | |
| try: | |
| # Tiny await so the event loop can schedule the other tasks. | |
| await asyncio.sleep(0.05) | |
| return orchestrator.JudgePanelResult( | |
| translation_scores={"bleu": 0.9}, | |
| style_alignment_passes={f"d{i}": True for i in range(1, 9)}, | |
| overall_score=0.9, | |
| verdict="PASS", | |
| ) | |
| finally: | |
| inside_counter["current"] -= 1 | |
| monkeypatch.setattr(orchestrator, "_evaluate_with_judges", passing) | |
| async def _one(i: int) -> dict[str, Any]: | |
| return await run_lifecycle( | |
| { | |
| "title": f"Concurrent lifecycle event {i}", | |
| "sources": [{"url": f"https://example.com/c{i}"}], | |
| "language": "en", | |
| }, | |
| auction_window_seconds=0.0, | |
| mock_bids=[BidRecord(agent_address=f"0xc{i}", bid_amount=1.0)], | |
| ) | |
| results = await asyncio.gather(*(_one(i) for i in range(3))) | |
| # All three completed without exception. | |
| assert len(results) == 3 | |
| terminal = {"SUBMITTED", "REJECTED", "FAILED"} | |
| assert all(r["status"] in terminal for r in results) | |
| # The semaphore must have capped concurrency to 1 (the env we set). | |
| assert inside_counter["max_observed"] <= 1, ( | |
| f"semaphore breach: max_observed={inside_counter['max_observed']} > 1" | |
| ) | |