# -*- coding: utf-8 -*- """ pluto/doc_index.py — In-memory document index with disk persistence. Stores pre-processed document data so chunks are split once, classified once, and the LLM overview is computed once per document. All subsequent queries reuse this cached state. Persists to a JSON file so data survives server restarts. """ from __future__ import annotations import json from dataclasses import dataclass, field from pathlib import Path from typing import Any @dataclass class ChunkMeta: """Metadata for a single chunk.""" chunk_id: str # "C0", "C1", ... chunk_type: str # "text", "math", "table", "figure", "code", "references", "noise" mode: str # "MODE_QUICK", "MODE_REASONING", "MODE_VISION" header: str = "" # nearest heading / section title relevance: float = 0.0 # query-time relevance score (updated per query) @dataclass class DocEntry: """All pre-processed data for a single document.""" doc_id: str filename: str = "" chunks: list[str] = field(default_factory=list) chunk_meta: list[ChunkMeta] = field(default_factory=list) overview: str = "" # LLM-generated understanding chunk_topics: dict[str, list[str]] = field(default_factory=dict) # {chunk_id: [topics]} is_processed: bool = False # True after Phase A completes processing_status: str = "pending" # pending | understanding | ready | failed last_error: str = "" class DocIndex: """ In-memory index of pre-processed documents with disk persistence. Populated during upload (Phase A). Queried during pipeline run (Phase B). Persists to a JSON file so data survives server restarts. """ def __init__(self, persist_path: str | Path | None = None) -> None: self._docs: dict[str, DocEntry] = {} self._persist_path = Path(persist_path) if persist_path else None # Load from disk if available if self._persist_path: self._load_from_disk() # ── Persistence ────────────────────────────────────────────────── def _save_to_disk(self) -> None: """Persist the index to disk as JSON.""" if not self._persist_path: return try: self._persist_path.parent.mkdir(parents=True, exist_ok=True) data = {} for doc_id, entry in self._docs.items(): data[doc_id] = { "doc_id": entry.doc_id, "filename": entry.filename, "chunks": entry.chunks, "chunk_meta": [ {"chunk_id": m.chunk_id, "chunk_type": m.chunk_type, "mode": m.mode, "header": m.header} for m in entry.chunk_meta ], "overview": entry.overview, "chunk_topics": entry.chunk_topics, "is_processed": entry.is_processed, "processing_status": entry.processing_status, "last_error": entry.last_error, } self._persist_path.write_text( json.dumps(data, ensure_ascii=False, indent=1), encoding="utf-8", ) except Exception: pass # Don't crash the pipeline for a cache write failure def _load_from_disk(self) -> None: """Load the index from disk JSON.""" if not self._persist_path or not self._persist_path.exists(): return try: raw = self._persist_path.read_text(encoding="utf-8") data = json.loads(raw) for doc_id, entry_data in data.items(): meta_list = [ ChunkMeta( chunk_id=m["chunk_id"], chunk_type=m["chunk_type"], mode=m["mode"], header=m.get("header", ""), ) for m in entry_data.get("chunk_meta", []) ] self._docs[doc_id] = DocEntry( doc_id=entry_data["doc_id"], filename=entry_data.get("filename", ""), chunks=entry_data.get("chunks", []), chunk_meta=meta_list, overview=entry_data.get("overview", ""), chunk_topics=entry_data.get("chunk_topics", {}), is_processed=entry_data.get("is_processed", False), processing_status=entry_data.get("processing_status", "pending"), last_error=entry_data.get("last_error", ""), ) except Exception: pass # Corrupted file — start fresh # ── Write API (Phase A) ────────────────────────────────────────── def register_doc( self, doc_id: str, filename: str, chunks: list[str], chunk_meta: list[ChunkMeta], ) -> None: """Register a document with its pre-split chunks and metadata.""" self._docs[doc_id] = DocEntry( doc_id=doc_id, filename=filename, chunks=chunks, chunk_meta=chunk_meta, overview="", is_processed=False, processing_status="pending", last_error="", ) self._save_to_disk() def mark_processing(self, doc_id: str) -> None: """Mark a document as currently running Phase A.""" if doc_id in self._docs: self._docs[doc_id].processing_status = "understanding" self._docs[doc_id].last_error = "" self._save_to_disk() def set_overview(self, doc_id: str, overview: str) -> None: """Store the LLM-generated understanding for a document.""" if doc_id in self._docs: self._docs[doc_id].overview = overview self._docs[doc_id].is_processed = True self._docs[doc_id].processing_status = "ready" self._docs[doc_id].last_error = "" self._save_to_disk() def set_chunk_topics(self, doc_id: str, chunk_topics: dict[str, list[str]]) -> None: """Store per-chunk topic tags for intelligent routing.""" if doc_id in self._docs: self._docs[doc_id].chunk_topics = chunk_topics self._save_to_disk() def mark_failed(self, doc_id: str, error: str) -> None: """Persist a Phase A failure so the UI can surface it.""" if doc_id in self._docs: self._docs[doc_id].processing_status = "failed" self._docs[doc_id].last_error = error self._docs[doc_id].is_processed = False self._save_to_disk() def remove_doc(self, doc_id: str) -> None: """Remove a document from the index.""" self._docs.pop(doc_id, None) self._save_to_disk() # ── Read API (Phase B) ─────────────────────────────────────────── def is_processed(self, doc_id: str) -> bool: """Check if a document has been fully processed (Phase A complete).""" entry = self._docs.get(doc_id) return entry.is_processed if entry else False def get_effective_status(self, doc_id: str) -> str: """ Return the user-facing status for a document. This normalizes stale or partially-migrated state so processed documents are always treated as ready, even if an older status string lingers. """ entry = self._docs.get(doc_id) if not entry: return "not_found" if entry.is_processed or entry.overview: return "ready" if entry.processing_status == "failed": return "failed" if entry.processing_status in {"understanding", "pending", ""}: return "understanding" return entry.processing_status def get_chunks(self, doc_id: str) -> list[str]: """Return all chunk texts for a document.""" entry = self._docs.get(doc_id) return entry.chunks if entry else [] def get_chunk(self, doc_id: str, chunk_index: int) -> str: """Return a specific chunk by index.""" chunks = self.get_chunks(doc_id) if 0 <= chunk_index < len(chunks): return chunks[chunk_index] return "" def get_chunk_meta(self, doc_id: str) -> list[ChunkMeta]: """Return chunk metadata list for a document.""" entry = self._docs.get(doc_id) return entry.chunk_meta if entry else [] def get_overview(self, doc_id: str) -> str: """Return the LLM-generated understanding for a document.""" entry = self._docs.get(doc_id) return entry.overview if entry else "" def get_chunk_topics(self, doc_id: str) -> dict[str, list[str]]: """Return chunk topic map: {chunk_id: [topic1, topic2, role]}.""" entry = self._docs.get(doc_id) return entry.chunk_topics if entry else {} def get_chunk_count(self, doc_id: str) -> int: """Return number of chunks in a document.""" return len(self.get_chunks(doc_id)) def get_status(self, doc_id: str) -> str: """Return current Phase A state for a document.""" return self.get_effective_status(doc_id) def get_last_error(self, doc_id: str) -> str: """Return the last recorded processing error for a document.""" entry = self._docs.get(doc_id) return entry.last_error if entry else "" def get_filename(self, doc_id: str) -> str: """Return the original uploaded filename when known.""" entry = self._docs.get(doc_id) return entry.filename if entry else "" def list_docs(self) -> list[dict[str, Any]]: """Return summary info for all indexed documents.""" return [ { "doc_id": e.doc_id, "filename": e.filename, "chunk_count": len(e.chunks), "is_processed": e.is_processed, "has_overview": bool(e.overview), "processing_status": self.get_effective_status(e.doc_id), "last_error": e.last_error, } for e in self._docs.values() ] def has_doc(self, doc_id: str) -> bool: """Check if a document is in the index.""" return doc_id in self._docs