| import asyncio |
| import json |
|
|
| from fastapi.testclient import TestClient |
|
|
| from pluto.doc_index import ChunkMeta, DocIndex |
| from pluto.models import ChunkPlan, ChunkType, FinalAnswer, FinalOutput, Priority, TraceSummary |
| import pluto.server as server |
|
|
|
|
| def test_server_blocks_run_while_background_understanding(monkeypatch): |
| temp_index = DocIndex() |
| temp_index.register_doc( |
| doc_id="paper", |
| filename="paper.md", |
| chunks=["chunk text"], |
| chunk_meta=[ChunkMeta(chunk_id="C0", chunk_type="text", mode="MODE_REASONING")], |
| ) |
| temp_index.mark_processing("paper") |
|
|
| monkeypatch.setattr(server, "_doc_index", temp_index) |
|
|
| client = TestClient(server.app) |
| response = client.post("/api/run", json={"query": "what is this paper about"}) |
|
|
| assert response.status_code == 409 |
| payload = response.json() |
| assert "Please wait" in payload["error"] |
| assert payload["processing_docs"] == ["paper"] |
|
|
|
|
| def test_server_compare_returns_json_error_payload(monkeypatch): |
| import benchmark.compare as compare_module |
|
|
| class BrokenRunner: |
| def __init__(self, *args, **kwargs): |
| pass |
|
|
| def compare(self, query: str, selected_doc_ids=None, detail_level="standard"): |
| raise RuntimeError("benchmark exploded") |
|
|
| monkeypatch.setattr(compare_module, "ComparisonRunner", BrokenRunner) |
|
|
| client = TestClient(server.app) |
| response = client.post("/api/compare", json={"query": "what is this paper about"}) |
|
|
| assert response.status_code == 200 |
| assert response.json()["error"] == "Benchmark error: benchmark exploded" |
|
|
|
|
| def test_server_run_forwards_selected_docs_and_detail_level(monkeypatch): |
| recorded = {} |
|
|
| class FakeCache: |
| def stats(self): |
| return {"hits": 0, "misses": 0} |
|
|
| class FakeRunner: |
| def __init__(self, *args, **kwargs): |
| self.cache = FakeCache() |
|
|
| def on_progress(self, callback): |
| recorded["progress_callback_registered"] = callable(callback) |
|
|
| def run(self, query: str, selected_doc_ids=None, detail_level="standard"): |
| recorded["query"] = query |
| recorded["selected_doc_ids"] = selected_doc_ids |
| recorded["detail_level"] = detail_level |
| return FinalOutput( |
| final_answer=FinalAnswer(response="ok", sections=[]), |
| evidence=[], |
| trace_summary=TraceSummary(), |
| confidence=0.9, |
| ) |
|
|
| monkeypatch.setattr(server, "PipelineRunner", FakeRunner) |
| monkeypatch.setattr(server, "_doc_index", DocIndex()) |
|
|
| client = TestClient(server.app) |
| response = client.post( |
| "/api/run", |
| json={ |
| "query": "summarize this", |
| "selected_doc_ids": ["paper_a"], |
| "detail_level": "detailed", |
| }, |
| ) |
|
|
| assert response.status_code == 200 |
| assert response.json()["session_id"] |
| assert recorded["progress_callback_registered"] is True |
| assert recorded["query"] == "summarize this" |
| assert recorded["selected_doc_ids"] == ["paper_a"] |
| assert recorded["detail_level"] == "detailed" |
|
|
|
|
| def test_server_compare_forwards_selected_docs_and_detail_level(monkeypatch): |
| import benchmark.compare as compare_module |
|
|
| recorded = {} |
|
|
| class FakeRunner: |
| def __init__(self, *args, **kwargs): |
| pass |
|
|
| def compare(self, query: str, selected_doc_ids=None, detail_level="standard"): |
| recorded["query"] = query |
| recorded["selected_doc_ids"] = selected_doc_ids |
| recorded["detail_level"] = detail_level |
| return { |
| "query": query, |
| "pluto": {"confidence": 1.0}, |
| "baseline": {"confidence": 0.5}, |
| "winner": "Pluto", |
| } |
|
|
| monkeypatch.setattr(compare_module, "ComparisonRunner", FakeRunner) |
| monkeypatch.setattr(server, "_doc_index", DocIndex()) |
|
|
| client = TestClient(server.app) |
| response = client.post( |
| "/api/compare", |
| json={ |
| "query": "summarize this", |
| "selected_doc_ids": ["paper_a", "paper_b"], |
| "detail_level": "detailed", |
| }, |
| ) |
|
|
| assert response.status_code == 200 |
| assert response.json()["winner"] == "Pluto" |
| assert recorded["query"] == "summarize this" |
| assert recorded["selected_doc_ids"] == ["paper_a", "paper_b"] |
| assert recorded["detail_level"] == "detailed" |
|
|
|
|
| def test_server_exposes_processed_docs_as_ready_even_if_status_is_stale(monkeypatch): |
| temp_index = DocIndex() |
| temp_index.register_doc( |
| doc_id="agentic_ai", |
| filename="agentic ai.pdf", |
| chunks=["chunk text"], |
| chunk_meta=[ChunkMeta(chunk_id="C0", chunk_type="text", mode="MODE_REASONING")], |
| ) |
| temp_index.set_overview("agentic_ai", "overview text") |
| temp_index._docs["agentic_ai"].processing_status = "understanding" |
|
|
| monkeypatch.setattr(server, "_doc_index", temp_index) |
|
|
| client = TestClient(server.app) |
|
|
| status_response = client.get("/api/doc-status/agentic_ai") |
| corpus_response = client.get("/api/corpus") |
|
|
| assert status_response.status_code == 200 |
| assert status_response.json()["status"] == "ready" |
|
|
| documents = corpus_response.json()["documents"] |
| agentic_ai = next(document for document in documents if document["doc_id"] == "agentic_ai") |
| assert agentic_ai["processing_status"] == "ready" |
| assert agentic_ai["is_processed"] is True |
|
|
|
|
| def test_stream_progress_serializes_pydantic_payloads(monkeypatch): |
| session_id = "test-session" |
| queue = asyncio.Queue() |
| monkeypatch.setattr(server, "session_queues", {session_id: queue}) |
| monkeypatch.setattr(server, "session_results", {session_id: {"ok": True}}) |
| monkeypatch.setattr(server, "session_cleanup_tasks", {}) |
| queue.put_nowait({ |
| "stage": "done", |
| "status": "complete", |
| "payload": { |
| "plan": [ |
| ChunkPlan( |
| doc_id="paper", |
| chunk_id="C0", |
| where="chunk 0", |
| chunk_type=ChunkType.TEXT, |
| mode="MODE_REASONING", |
| priority=Priority.HIGH, |
| task="Extract facts", |
| ) |
| ] |
| }, |
| }) |
|
|
| client = TestClient(server.app) |
| with client.stream("GET", f"/api/stream?session_id={session_id}") as response: |
| body = b"".join(response.iter_raw()).decode("utf-8") |
|
|
| assert response.status_code == 200 |
| assert "ChunkPlan" not in body |
| payload = json.loads(body.removeprefix("data: ").strip()) |
| assert payload["payload"]["plan"][0]["doc_id"] == "paper" |
| assert payload["payload"]["plan"][0]["chunk_type"] == "text" |
| assert session_id in server.session_queues |
| assert session_id in server.session_results |
|
|
|
|
| def test_stream_progress_is_session_scoped(monkeypatch): |
| first = asyncio.Queue() |
| second = asyncio.Queue() |
| first.put_nowait({"stage": "done", "status": "complete", "session_id": "first"}) |
| second.put_nowait({"stage": "done", "status": "complete", "session_id": "second"}) |
| monkeypatch.setattr(server, "session_queues", {"first": first, "second": second}) |
| monkeypatch.setattr(server, "session_results", {"first": {}, "second": {}}) |
| monkeypatch.setattr(server, "session_cleanup_tasks", {}) |
|
|
| client = TestClient(server.app) |
| with client.stream("GET", "/api/stream?session_id=second") as response: |
| body = b"".join(response.iter_raw()).decode("utf-8") |
|
|
| payload = json.loads(body.removeprefix("data: ").strip()) |
| assert payload["session_id"] == "second" |
| assert "first" in server.session_queues |
| assert "second" in server.session_queues |
|
|
|
|
| def test_session_cleanup_is_delayed(monkeypatch): |
| async def run_check(): |
| session_id = "cleanup-session" |
| queue = asyncio.Queue() |
| monkeypatch.setattr(server, "SESSION_CLEANUP_DELAY_SECONDS", 0.01) |
| monkeypatch.setattr(server, "session_queues", {session_id: queue}) |
| monkeypatch.setattr(server, "session_results", {session_id: {"ok": True}}) |
| monkeypatch.setattr(server, "session_cleanup_tasks", {}) |
|
|
| server._schedule_session_cleanup(session_id, queue) |
|
|
| assert session_id in server.session_queues |
| assert session_id in server.session_results |
|
|
| await asyncio.sleep(0.05) |
|
|
| assert session_id not in server.session_queues |
| assert session_id not in server.session_results |
|
|
| asyncio.run(run_check()) |
|
|