File size: 10,659 Bytes
23cdeed
66ad25b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# -*- coding: utf-8 -*-
"""
pluto/doc_index.py β€” In-memory document index with disk persistence.

Stores pre-processed document data so chunks are split once,
classified once, and the LLM overview is computed once per document.
All subsequent queries reuse this cached state.

Persists to a JSON file so data survives server restarts.
"""

from __future__ import annotations

import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any


@dataclass
class ChunkMeta:
    """Metadata for a single chunk."""
    chunk_id: str          # "C0", "C1", ...
    chunk_type: str        # "text", "math", "table", "figure", "code", "references", "noise"
    mode: str              # "MODE_QUICK", "MODE_REASONING", "MODE_VISION"
    header: str = ""       # nearest heading / section title
    relevance: float = 0.0 # query-time relevance score (updated per query)


@dataclass
class DocEntry:
    """All pre-processed data for a single document."""
    doc_id: str
    filename: str = ""
    chunks: list[str] = field(default_factory=list)
    chunk_meta: list[ChunkMeta] = field(default_factory=list)
    overview: str = ""                            # LLM-generated understanding
    chunk_topics: dict[str, list[str]] = field(default_factory=dict)  # {chunk_id: [topics]}
    is_processed: bool = False                    # True after Phase A completes
    processing_status: str = "pending"           # pending | understanding | ready | failed
    last_error: str = ""


class DocIndex:
    """
    In-memory index of pre-processed documents with disk persistence.

    Populated during upload (Phase A). Queried during pipeline run (Phase B).
    Persists to a JSON file so data survives server restarts.
    """

    def __init__(self, persist_path: str | Path | None = None) -> None:
        self._docs: dict[str, DocEntry] = {}
        self._persist_path = Path(persist_path) if persist_path else None
        # Load from disk if available
        if self._persist_path:
            self._load_from_disk()

    # ── Persistence ──────────────────────────────────────────────────

    def _save_to_disk(self) -> None:
        """Persist the index to disk as JSON."""
        if not self._persist_path:
            return
        try:
            self._persist_path.parent.mkdir(parents=True, exist_ok=True)
            data = {}
            for doc_id, entry in self._docs.items():
                data[doc_id] = {
                    "doc_id": entry.doc_id,
                    "filename": entry.filename,
                    "chunks": entry.chunks,
                    "chunk_meta": [
                        {"chunk_id": m.chunk_id, "chunk_type": m.chunk_type,
                         "mode": m.mode, "header": m.header}
                        for m in entry.chunk_meta
                    ],
                    "overview": entry.overview,
                    "chunk_topics": entry.chunk_topics,
                    "is_processed": entry.is_processed,
                    "processing_status": entry.processing_status,
                    "last_error": entry.last_error,
                }
            self._persist_path.write_text(
                json.dumps(data, ensure_ascii=False, indent=1),
                encoding="utf-8",
            )
        except Exception:
            pass  # Don't crash the pipeline for a cache write failure

    def _load_from_disk(self) -> None:
        """Load the index from disk JSON."""
        if not self._persist_path or not self._persist_path.exists():
            return
        try:
            raw = self._persist_path.read_text(encoding="utf-8")
            data = json.loads(raw)
            for doc_id, entry_data in data.items():
                meta_list = [
                    ChunkMeta(
                        chunk_id=m["chunk_id"],
                        chunk_type=m["chunk_type"],
                        mode=m["mode"],
                        header=m.get("header", ""),
                    )
                    for m in entry_data.get("chunk_meta", [])
                ]
                self._docs[doc_id] = DocEntry(
                    doc_id=entry_data["doc_id"],
                    filename=entry_data.get("filename", ""),
                    chunks=entry_data.get("chunks", []),
                    chunk_meta=meta_list,
                    overview=entry_data.get("overview", ""),
                    chunk_topics=entry_data.get("chunk_topics", {}),
                    is_processed=entry_data.get("is_processed", False),
                    processing_status=entry_data.get("processing_status", "pending"),
                    last_error=entry_data.get("last_error", ""),
                )
        except Exception:
            pass  # Corrupted file β€” start fresh

    # ── Write API (Phase A) ──────────────────────────────────────────

    def register_doc(
        self,
        doc_id: str,
        filename: str,
        chunks: list[str],
        chunk_meta: list[ChunkMeta],
    ) -> None:
        """Register a document with its pre-split chunks and metadata."""
        self._docs[doc_id] = DocEntry(
            doc_id=doc_id,
            filename=filename,
            chunks=chunks,
            chunk_meta=chunk_meta,
            overview="",
            is_processed=False,
            processing_status="pending",
            last_error="",
        )
        self._save_to_disk()

    def mark_processing(self, doc_id: str) -> None:
        """Mark a document as currently running Phase A."""
        if doc_id in self._docs:
            self._docs[doc_id].processing_status = "understanding"
            self._docs[doc_id].last_error = ""
            self._save_to_disk()

    def set_overview(self, doc_id: str, overview: str) -> None:
        """Store the LLM-generated understanding for a document."""
        if doc_id in self._docs:
            self._docs[doc_id].overview = overview
            self._docs[doc_id].is_processed = True
            self._docs[doc_id].processing_status = "ready"
            self._docs[doc_id].last_error = ""
            self._save_to_disk()

    def set_chunk_topics(self, doc_id: str, chunk_topics: dict[str, list[str]]) -> None:
        """Store per-chunk topic tags for intelligent routing."""
        if doc_id in self._docs:
            self._docs[doc_id].chunk_topics = chunk_topics
            self._save_to_disk()

    def mark_failed(self, doc_id: str, error: str) -> None:
        """Persist a Phase A failure so the UI can surface it."""
        if doc_id in self._docs:
            self._docs[doc_id].processing_status = "failed"
            self._docs[doc_id].last_error = error
            self._docs[doc_id].is_processed = False
            self._save_to_disk()

    def remove_doc(self, doc_id: str) -> None:
        """Remove a document from the index."""
        self._docs.pop(doc_id, None)
        self._save_to_disk()

    # ── Read API (Phase B) ───────────────────────────────────────────

    def is_processed(self, doc_id: str) -> bool:
        """Check if a document has been fully processed (Phase A complete)."""
        entry = self._docs.get(doc_id)
        return entry.is_processed if entry else False

    def get_effective_status(self, doc_id: str) -> str:
        """
        Return the user-facing status for a document.

        This normalizes stale or partially-migrated state so processed documents
        are always treated as ready, even if an older status string lingers.
        """
        entry = self._docs.get(doc_id)
        if not entry:
            return "not_found"

        if entry.is_processed or entry.overview:
            return "ready"

        if entry.processing_status == "failed":
            return "failed"

        if entry.processing_status in {"understanding", "pending", ""}:
            return "understanding"

        return entry.processing_status

    def get_chunks(self, doc_id: str) -> list[str]:
        """Return all chunk texts for a document."""
        entry = self._docs.get(doc_id)
        return entry.chunks if entry else []

    def get_chunk(self, doc_id: str, chunk_index: int) -> str:
        """Return a specific chunk by index."""
        chunks = self.get_chunks(doc_id)
        if 0 <= chunk_index < len(chunks):
            return chunks[chunk_index]
        return ""

    def get_chunk_meta(self, doc_id: str) -> list[ChunkMeta]:
        """Return chunk metadata list for a document."""
        entry = self._docs.get(doc_id)
        return entry.chunk_meta if entry else []

    def get_overview(self, doc_id: str) -> str:
        """Return the LLM-generated understanding for a document."""
        entry = self._docs.get(doc_id)
        return entry.overview if entry else ""

    def get_chunk_topics(self, doc_id: str) -> dict[str, list[str]]:
        """Return chunk topic map: {chunk_id: [topic1, topic2, role]}."""
        entry = self._docs.get(doc_id)
        return entry.chunk_topics if entry else {}

    def get_chunk_count(self, doc_id: str) -> int:
        """Return number of chunks in a document."""
        return len(self.get_chunks(doc_id))

    def get_status(self, doc_id: str) -> str:
        """Return current Phase A state for a document."""
        return self.get_effective_status(doc_id)

    def get_last_error(self, doc_id: str) -> str:
        """Return the last recorded processing error for a document."""
        entry = self._docs.get(doc_id)
        return entry.last_error if entry else ""

    def get_filename(self, doc_id: str) -> str:
        """Return the original uploaded filename when known."""
        entry = self._docs.get(doc_id)
        return entry.filename if entry else ""

    def list_docs(self) -> list[dict[str, Any]]:
        """Return summary info for all indexed documents."""
        return [
            {
                "doc_id": e.doc_id,
                "filename": e.filename,
                "chunk_count": len(e.chunks),
                "is_processed": e.is_processed,
                "has_overview": bool(e.overview),
                "processing_status": self.get_effective_status(e.doc_id),
                "last_error": e.last_error,
            }
            for e in self._docs.values()
        ]

    def has_doc(self, doc_id: str) -> bool:
        """Check if a document is in the index."""
        return doc_id in self._docs