| |
|
|
| import asyncio |
| import json |
|
|
| from fastapi.testclient import TestClient |
|
|
| from pluto.session_memory import ( |
| CompressedSession, |
| compress_session, |
| list_session_context, |
| ) |
| import pluto.server as server |
|
|
|
|
| def test_compression_produces_valid_compressed_session(monkeypatch, tmp_path): |
| corpus = tmp_path / "corpus" |
| corpus.mkdir() |
| monkeypatch.setattr( |
| "pluto.session_memory._call_compression_llm", |
| lambda **kwargs: """ |
| { |
| "queries_resolved": [{"query": "q", "answer_summary": "a", "chunks_used": 2, "confidence": 0.8}], |
| "key_findings": ["Finding A"], |
| "open_questions": ["Question A"], |
| "links_to_prior_sessions": [] |
| } |
| """, |
| ) |
| monkeypatch.setattr("pluto.session_memory._store_postgres", lambda compressed, raw_path: None) |
|
|
| compressed = compress_session("s1", "doc_a", {"query": "q", "confidence": 0.8}, corpus) |
|
|
| assert isinstance(compressed, CompressedSession) |
| assert compressed.session_id == "s1" |
| assert compressed.doc_id == "doc_a" |
| assert compressed.key_findings == ["Finding A"] |
| assert (corpus / ".session_archive" / "s1.json").exists() |
|
|
|
|
| def test_postgres_unavailable_falls_back_to_local_file(monkeypatch, tmp_path): |
| corpus = tmp_path / "corpus" |
| corpus.mkdir() |
| monkeypatch.setattr("pluto.session_memory._call_compression_llm", lambda **kwargs: "{}") |
|
|
| def fail_store(compressed, raw_path): |
| raise EnvironmentError("no database") |
|
|
| monkeypatch.setattr("pluto.session_memory._store_postgres", fail_store) |
|
|
| compressed = compress_session("s2", "doc_b", {"query": "q"}, corpus) |
|
|
| path = corpus / ".session_memory" / "s2.json" |
| assert path.exists() |
| assert json.loads(path.read_text(encoding="utf-8"))["session_id"] == compressed.session_id |
|
|
|
|
| def test_warm_start_endpoint_returns_sessions_in_order(monkeypatch): |
| sessions = [ |
| {"session_id": "new", "doc_id": "paper", "timestamp": "2026-01-02T00:00:00+00:00"}, |
| {"session_id": "old", "doc_id": "paper", "timestamp": "2026-01-01T00:00:00+00:00"}, |
| ] |
| monkeypatch.setattr("pluto.session_memory.list_session_context", lambda doc_id, corpus_dir, limit=10: sessions) |
|
|
| client = TestClient(server.app) |
| response = client.get("/api/session-context/paper") |
|
|
| assert response.status_code == 200 |
| payload = response.json() |
| assert [item["session_id"] for item in payload["sessions"]] == ["new", "old"] |
|
|
|
|
| def test_list_session_context_local_fallback_orders_by_timestamp(monkeypatch, tmp_path): |
| corpus = tmp_path / "corpus" |
| memory = corpus / ".session_memory" |
| memory.mkdir(parents=True) |
| (memory / "old.json").write_text( |
| json.dumps({"session_id": "old", "doc_id": "paper", "timestamp": "2026-01-01T00:00:00+00:00"}), |
| encoding="utf-8", |
| ) |
| (memory / "new.json").write_text( |
| json.dumps({"session_id": "new", "doc_id": "paper", "timestamp": "2026-01-02T00:00:00+00:00"}), |
| encoding="utf-8", |
| ) |
| monkeypatch.setattr("pluto.session_memory._list_postgres", lambda doc_id, limit: (_ for _ in ()).throw(EnvironmentError("no db"))) |
|
|
| sessions = list_session_context("paper", corpus) |
|
|
| assert [item["session_id"] for item in sessions] == ["new", "old"] |
|
|
|
|
| def test_compression_is_scheduled_async_without_blocking_sse(monkeypatch): |
| calls = [] |
|
|
| async def run_check(): |
| session_id = "sse-session" |
| queue = asyncio.Queue() |
| await queue.put({"stage": "done", "status": "complete", "session_id": session_id}) |
| monkeypatch.setattr(server, "session_queues", {session_id: queue}) |
| monkeypatch.setattr(server, "session_results", {session_id: {"doc_id": "paper"}}) |
| monkeypatch.setattr(server, "session_cleanup_tasks", {}) |
| monkeypatch.setattr(server, "_schedule_session_compression", lambda sid: calls.append(sid)) |
|
|
| client = TestClient(server.app) |
| with client.stream("GET", f"/api/stream?session_id={session_id}") as response: |
| body = b"".join(response.iter_raw()).decode("utf-8") |
|
|
| assert response.status_code == 200 |
| assert '"stage": "done"' in body |
| assert calls == [session_id] |
|
|
| asyncio.run(run_check()) |
|
|