# -*- coding: utf-8 -*- import asyncio import json from fastapi.testclient import TestClient from pluto.session_memory import ( CompressedSession, compress_session, list_session_context, ) import pluto.server as server def test_compression_produces_valid_compressed_session(monkeypatch, tmp_path): corpus = tmp_path / "corpus" corpus.mkdir() monkeypatch.setattr( "pluto.session_memory._call_compression_llm", lambda **kwargs: """ { "queries_resolved": [{"query": "q", "answer_summary": "a", "chunks_used": 2, "confidence": 0.8}], "key_findings": ["Finding A"], "open_questions": ["Question A"], "links_to_prior_sessions": [] } """, ) monkeypatch.setattr("pluto.session_memory._store_postgres", lambda compressed, raw_path: None) compressed = compress_session("s1", "doc_a", {"query": "q", "confidence": 0.8}, corpus) assert isinstance(compressed, CompressedSession) assert compressed.session_id == "s1" assert compressed.doc_id == "doc_a" assert compressed.key_findings == ["Finding A"] assert (corpus / ".session_archive" / "s1.json").exists() def test_postgres_unavailable_falls_back_to_local_file(monkeypatch, tmp_path): corpus = tmp_path / "corpus" corpus.mkdir() monkeypatch.setattr("pluto.session_memory._call_compression_llm", lambda **kwargs: "{}") def fail_store(compressed, raw_path): raise EnvironmentError("no database") monkeypatch.setattr("pluto.session_memory._store_postgres", fail_store) compressed = compress_session("s2", "doc_b", {"query": "q"}, corpus) path = corpus / ".session_memory" / "s2.json" assert path.exists() assert json.loads(path.read_text(encoding="utf-8"))["session_id"] == compressed.session_id def test_warm_start_endpoint_returns_sessions_in_order(monkeypatch): sessions = [ {"session_id": "new", "doc_id": "paper", "timestamp": "2026-01-02T00:00:00+00:00"}, {"session_id": "old", "doc_id": "paper", "timestamp": "2026-01-01T00:00:00+00:00"}, ] monkeypatch.setattr("pluto.session_memory.list_session_context", lambda doc_id, corpus_dir, limit=10: sessions) client = TestClient(server.app) response = client.get("/api/session-context/paper") assert response.status_code == 200 payload = response.json() assert [item["session_id"] for item in payload["sessions"]] == ["new", "old"] def test_list_session_context_local_fallback_orders_by_timestamp(monkeypatch, tmp_path): corpus = tmp_path / "corpus" memory = corpus / ".session_memory" memory.mkdir(parents=True) (memory / "old.json").write_text( json.dumps({"session_id": "old", "doc_id": "paper", "timestamp": "2026-01-01T00:00:00+00:00"}), encoding="utf-8", ) (memory / "new.json").write_text( json.dumps({"session_id": "new", "doc_id": "paper", "timestamp": "2026-01-02T00:00:00+00:00"}), encoding="utf-8", ) monkeypatch.setattr("pluto.session_memory._list_postgres", lambda doc_id, limit: (_ for _ in ()).throw(EnvironmentError("no db"))) sessions = list_session_context("paper", corpus) assert [item["session_id"] for item in sessions] == ["new", "old"] def test_compression_is_scheduled_async_without_blocking_sse(monkeypatch): calls = [] async def run_check(): session_id = "sse-session" queue = asyncio.Queue() await queue.put({"stage": "done", "status": "complete", "session_id": session_id}) monkeypatch.setattr(server, "session_queues", {session_id: queue}) monkeypatch.setattr(server, "session_results", {session_id: {"doc_id": "paper"}}) monkeypatch.setattr(server, "session_cleanup_tasks", {}) monkeypatch.setattr(server, "_schedule_session_compression", lambda sid: calls.append(sid)) client = TestClient(server.app) with client.stream("GET", f"/api/stream?session_id={session_id}") as response: body = b"".join(response.iter_raw()).decode("utf-8") assert response.status_code == 200 assert '"stage": "done"' in body assert calls == [session_id] asyncio.run(run_check())