File size: 4,221 Bytes
23cdeed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -*- coding: utf-8 -*-

import asyncio
import json

from fastapi.testclient import TestClient

from pluto.session_memory import (
    CompressedSession,
    compress_session,
    list_session_context,
)
import pluto.server as server


def test_compression_produces_valid_compressed_session(monkeypatch, tmp_path):
    corpus = tmp_path / "corpus"
    corpus.mkdir()
    monkeypatch.setattr(
        "pluto.session_memory._call_compression_llm",
        lambda **kwargs: """
        {
          "queries_resolved": [{"query": "q", "answer_summary": "a", "chunks_used": 2, "confidence": 0.8}],
          "key_findings": ["Finding A"],
          "open_questions": ["Question A"],
          "links_to_prior_sessions": []
        }
        """,
    )
    monkeypatch.setattr("pluto.session_memory._store_postgres", lambda compressed, raw_path: None)

    compressed = compress_session("s1", "doc_a", {"query": "q", "confidence": 0.8}, corpus)

    assert isinstance(compressed, CompressedSession)
    assert compressed.session_id == "s1"
    assert compressed.doc_id == "doc_a"
    assert compressed.key_findings == ["Finding A"]
    assert (corpus / ".session_archive" / "s1.json").exists()


def test_postgres_unavailable_falls_back_to_local_file(monkeypatch, tmp_path):
    corpus = tmp_path / "corpus"
    corpus.mkdir()
    monkeypatch.setattr("pluto.session_memory._call_compression_llm", lambda **kwargs: "{}")

    def fail_store(compressed, raw_path):
        raise EnvironmentError("no database")

    monkeypatch.setattr("pluto.session_memory._store_postgres", fail_store)

    compressed = compress_session("s2", "doc_b", {"query": "q"}, corpus)

    path = corpus / ".session_memory" / "s2.json"
    assert path.exists()
    assert json.loads(path.read_text(encoding="utf-8"))["session_id"] == compressed.session_id


def test_warm_start_endpoint_returns_sessions_in_order(monkeypatch):
    sessions = [
        {"session_id": "new", "doc_id": "paper", "timestamp": "2026-01-02T00:00:00+00:00"},
        {"session_id": "old", "doc_id": "paper", "timestamp": "2026-01-01T00:00:00+00:00"},
    ]
    monkeypatch.setattr("pluto.session_memory.list_session_context", lambda doc_id, corpus_dir, limit=10: sessions)

    client = TestClient(server.app)
    response = client.get("/api/session-context/paper")

    assert response.status_code == 200
    payload = response.json()
    assert [item["session_id"] for item in payload["sessions"]] == ["new", "old"]


def test_list_session_context_local_fallback_orders_by_timestamp(monkeypatch, tmp_path):
    corpus = tmp_path / "corpus"
    memory = corpus / ".session_memory"
    memory.mkdir(parents=True)
    (memory / "old.json").write_text(
        json.dumps({"session_id": "old", "doc_id": "paper", "timestamp": "2026-01-01T00:00:00+00:00"}),
        encoding="utf-8",
    )
    (memory / "new.json").write_text(
        json.dumps({"session_id": "new", "doc_id": "paper", "timestamp": "2026-01-02T00:00:00+00:00"}),
        encoding="utf-8",
    )
    monkeypatch.setattr("pluto.session_memory._list_postgres", lambda doc_id, limit: (_ for _ in ()).throw(EnvironmentError("no db")))

    sessions = list_session_context("paper", corpus)

    assert [item["session_id"] for item in sessions] == ["new", "old"]


def test_compression_is_scheduled_async_without_blocking_sse(monkeypatch):
    calls = []

    async def run_check():
        session_id = "sse-session"
        queue = asyncio.Queue()
        await queue.put({"stage": "done", "status": "complete", "session_id": session_id})
        monkeypatch.setattr(server, "session_queues", {session_id: queue})
        monkeypatch.setattr(server, "session_results", {session_id: {"doc_id": "paper"}})
        monkeypatch.setattr(server, "session_cleanup_tasks", {})
        monkeypatch.setattr(server, "_schedule_session_compression", lambda sid: calls.append(sid))

        client = TestClient(server.app)
        with client.stream("GET", f"/api/stream?session_id={session_id}") as response:
            body = b"".join(response.iter_raw()).decode("utf-8")

        assert response.status_code == 200
        assert '"stage": "done"' in body
        assert calls == [session_id]

    asyncio.run(run_check())