plutoV2_miniProject_3rd-yr / mp1 /test_session_memory.py
ayushKishor's picture
Add Pluto memory layer and pipeline fixes
23cdeed
# -*- coding: utf-8 -*-
import asyncio
import json
from fastapi.testclient import TestClient
from pluto.session_memory import (
CompressedSession,
compress_session,
list_session_context,
)
import pluto.server as server
def test_compression_produces_valid_compressed_session(monkeypatch, tmp_path):
corpus = tmp_path / "corpus"
corpus.mkdir()
monkeypatch.setattr(
"pluto.session_memory._call_compression_llm",
lambda **kwargs: """
{
"queries_resolved": [{"query": "q", "answer_summary": "a", "chunks_used": 2, "confidence": 0.8}],
"key_findings": ["Finding A"],
"open_questions": ["Question A"],
"links_to_prior_sessions": []
}
""",
)
monkeypatch.setattr("pluto.session_memory._store_postgres", lambda compressed, raw_path: None)
compressed = compress_session("s1", "doc_a", {"query": "q", "confidence": 0.8}, corpus)
assert isinstance(compressed, CompressedSession)
assert compressed.session_id == "s1"
assert compressed.doc_id == "doc_a"
assert compressed.key_findings == ["Finding A"]
assert (corpus / ".session_archive" / "s1.json").exists()
def test_postgres_unavailable_falls_back_to_local_file(monkeypatch, tmp_path):
corpus = tmp_path / "corpus"
corpus.mkdir()
monkeypatch.setattr("pluto.session_memory._call_compression_llm", lambda **kwargs: "{}")
def fail_store(compressed, raw_path):
raise EnvironmentError("no database")
monkeypatch.setattr("pluto.session_memory._store_postgres", fail_store)
compressed = compress_session("s2", "doc_b", {"query": "q"}, corpus)
path = corpus / ".session_memory" / "s2.json"
assert path.exists()
assert json.loads(path.read_text(encoding="utf-8"))["session_id"] == compressed.session_id
def test_warm_start_endpoint_returns_sessions_in_order(monkeypatch):
sessions = [
{"session_id": "new", "doc_id": "paper", "timestamp": "2026-01-02T00:00:00+00:00"},
{"session_id": "old", "doc_id": "paper", "timestamp": "2026-01-01T00:00:00+00:00"},
]
monkeypatch.setattr("pluto.session_memory.list_session_context", lambda doc_id, corpus_dir, limit=10: sessions)
client = TestClient(server.app)
response = client.get("/api/session-context/paper")
assert response.status_code == 200
payload = response.json()
assert [item["session_id"] for item in payload["sessions"]] == ["new", "old"]
def test_list_session_context_local_fallback_orders_by_timestamp(monkeypatch, tmp_path):
corpus = tmp_path / "corpus"
memory = corpus / ".session_memory"
memory.mkdir(parents=True)
(memory / "old.json").write_text(
json.dumps({"session_id": "old", "doc_id": "paper", "timestamp": "2026-01-01T00:00:00+00:00"}),
encoding="utf-8",
)
(memory / "new.json").write_text(
json.dumps({"session_id": "new", "doc_id": "paper", "timestamp": "2026-01-02T00:00:00+00:00"}),
encoding="utf-8",
)
monkeypatch.setattr("pluto.session_memory._list_postgres", lambda doc_id, limit: (_ for _ in ()).throw(EnvironmentError("no db")))
sessions = list_session_context("paper", corpus)
assert [item["session_id"] for item in sessions] == ["new", "old"]
def test_compression_is_scheduled_async_without_blocking_sse(monkeypatch):
calls = []
async def run_check():
session_id = "sse-session"
queue = asyncio.Queue()
await queue.put({"stage": "done", "status": "complete", "session_id": session_id})
monkeypatch.setattr(server, "session_queues", {session_id: queue})
monkeypatch.setattr(server, "session_results", {session_id: {"doc_id": "paper"}})
monkeypatch.setattr(server, "session_cleanup_tasks", {})
monkeypatch.setattr(server, "_schedule_session_compression", lambda sid: calls.append(sid))
client = TestClient(server.app)
with client.stream("GET", f"/api/stream?session_id={session_id}") as response:
body = b"".join(response.iter_raw()).decode("utf-8")
assert response.status_code == 200
assert '"stage": "done"' in body
assert calls == [session_id]
asyncio.run(run_check())