Spaces:
Sleeping
Sleeping
| """E2E tests for the 5-minute sliding-window dedup and the RSS uuid salt. | |
| Exercises the trigger route's dedup behaviour via httpx.AsyncClient + | |
| ASGITransport (in-process FastAPI, no live server). MockLLM is forced | |
| on by clearing ANTHROPIC_API_KEY. | |
| """ | |
| from __future__ import annotations | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Any | |
| import httpx | |
| import pytest | |
| from sqlmodel import Session, select | |
| def _no_anthropic_key(monkeypatch: pytest.MonkeyPatch) -> None: | |
| monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False) | |
| monkeypatch.setenv("POLYGLOT_LLM_BACKEND", "mock") | |
| def _force_judges_pass(monkeypatch: pytest.MonkeyPatch) -> None: | |
| """Force lifecycle to reach SUBMITTED quickly.""" | |
| from polyglot_alpha import orchestrator | |
| async def passing(_q: dict[str, Any]) -> orchestrator.JudgePanelResult: | |
| return orchestrator.JudgePanelResult( | |
| translation_scores={"bleu": 0.9}, | |
| style_alignment_passes={f"d{i}": True for i in range(1, 9)}, | |
| overall_score=0.92, | |
| verdict="PASS", | |
| ) | |
| monkeypatch.setattr(orchestrator, "_evaluate_with_judges", passing) | |
| def _build_app() -> Any: | |
| from polyglot_alpha.api.main import create_app | |
| return create_app() | |
| async def _post_trigger(client: httpx.AsyncClient, title: str) -> dict[str, Any]: | |
| r = await client.post( | |
| "/trigger/event", | |
| json={ | |
| "title": title, | |
| "sources": [{"name": "test", "url": "https://example.com/x"}], | |
| "auction_window_seconds": 0.0, | |
| "mock_bids": [ | |
| {"agent_address": "0xdedup", "bid_amount": 1.0}, | |
| {"agent_address": "0xdedup_b", "bid_amount": 3.0}, | |
| ], | |
| }, | |
| ) | |
| assert r.status_code == 200, r.text | |
| return r.json() | |
| async def test_5_min_sliding_dedup_returns_same_event_id( | |
| isolated_db: str, | |
| ) -> None: | |
| """Two posts with identical title within 5 min => same event_id, deduped=True.""" | |
| app = _build_app() | |
| transport = httpx.ASGITransport(app=app) | |
| async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: | |
| first = await _post_trigger(client, "Sliding dedup test event") | |
| first_id = first["event_id"] | |
| assert first.get("deduped") is not True | |
| second = await _post_trigger(client, "Sliding dedup test event") | |
| assert second["event_id"] == first_id | |
| assert second["deduped"] is True | |
| async def test_5_min_sliding_dedup_expires_after_window( | |
| isolated_db: str, | |
| ) -> None: | |
| """Once an event ages past 5 min the next identical-title click is fresh. | |
| We don't actually wait 5 minutes — we backdate the persisted ``triggered_at`` | |
| of the first event by 6 minutes so the sliding window query stops | |
| matching it. The orchestrator's permanent ``content_hash`` dedup is | |
| sidestepped by giving each event a different ``sources`` payload, | |
| matching how the trigger route degrades. | |
| """ | |
| from polyglot_alpha.persistence.db import engine | |
| from polyglot_alpha.persistence.models import Event | |
| app = _build_app() | |
| transport = httpx.ASGITransport(app=app) | |
| async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: | |
| title = "Expiring dedup test event" | |
| # First click — vanilla. | |
| first = await _post_trigger(client, title) | |
| first_id = first["event_id"] | |
| # Backdate the first event 6 minutes ago so the 5-min slider misses it. | |
| with Session(engine) as s: | |
| row = s.get(Event, first_id) | |
| assert row is not None | |
| row.triggered_at = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(minutes=6) | |
| # Also tweak the content_hash so the orchestrator's permanent | |
| # content-hash dedup doesn't short-circuit the second post — | |
| # this matches what a fresh RSS click with different sources | |
| # would produce in production. | |
| row.content_hash = row.content_hash + "_aged" | |
| s.add(row) | |
| s.commit() | |
| # Second click — sliding dedup miss => new event row. | |
| r = await client.post( | |
| "/trigger/event", | |
| json={ | |
| "title": title, | |
| # Different sources => different content_hash; sidesteps the | |
| # permanent dedup that runs after the slider. | |
| "sources": [{"name": "test2", "url": "https://example.com/y"}], | |
| "auction_window_seconds": 0.0, | |
| "mock_bids": [ | |
| {"agent_address": "0xexpired", "bid_amount": 1.0}, | |
| {"agent_address": "0xexpired_b", "bid_amount": 3.0}, | |
| ], | |
| }, | |
| ) | |
| assert r.status_code == 200, r.text | |
| second = r.json() | |
| assert second["event_id"] != first_id, "expected fresh event_id after window" | |
| assert second.get("deduped") is not True | |
| async def test_uuid_salt_makes_each_rss_click_unique( | |
| isolated_db: str, | |
| monkeypatch: pytest.MonkeyPatch, | |
| ) -> None: | |
| """Three RSS placeholder clicks produce three distinct event_ids. | |
| The trigger route salts the RSS placeholder title with | |
| ``uuid.uuid4().hex`` so dedup never collapses repeat clicks. We | |
| monkey-patch the RSS-fetch helper so the BackgroundTask never runs | |
| (we don't need it for the dedup check) and just verify three POSTs | |
| produce three event_ids. | |
| """ | |
| from polyglot_alpha.api.routes import trigger as trigger_mod | |
| async def _no_op_fetch(*_args: Any, **_kwargs: Any) -> None: | |
| return None | |
| monkeypatch.setattr(trigger_mod, "_fetch_rss_demo_event", _no_op_fetch) | |
| app = _build_app() | |
| transport = httpx.ASGITransport(app=app) | |
| seen_ids: set[int] = set() | |
| async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: | |
| for _ in range(3): | |
| r = await client.post( | |
| "/trigger/event", | |
| json={ | |
| "event_source": "rss", | |
| "auction_window_seconds": 0.0, | |
| "mock_bids": [ | |
| {"agent_address": "0xrss_a", "bid_amount": 1.0}, | |
| {"agent_address": "0xrss_b", "bid_amount": 3.0}, | |
| ], | |
| }, | |
| ) | |
| assert r.status_code == 200, r.text | |
| body = r.json() | |
| assert body.get("deduped") is not True, "RSS placeholder must never dedup" | |
| event_id = body.get("event_id") | |
| assert isinstance(event_id, int) | |
| seen_ids.add(event_id) | |
| assert len(seen_ids) == 3, f"expected 3 distinct RSS event_ids, got {seen_ids}" | |