# -*- coding: utf-8 -*- """ Document-level summary storage and context prefix helpers. This module is deliberately lazy: importing it does not require provider keys or database/network availability. LLM/provider errors are handled inside generate_doc_summary with a fallback summary. """ from __future__ import annotations from datetime import datetime, timezone import json import logging from pathlib import Path from typing import Any from pydantic import BaseModel, Field from pluto.utils import extract_json_from_response logger = logging.getLogger("pluto") SUMMARY_FILENAME = ".doc_summaries.json" class DocSummary(BaseModel): doc_id: str title: str = "" domain: str = "" key_claims: list[str] = Field(default_factory=list) structure: list[str] = Field(default_factory=list) open_questions: list[str] = Field(default_factory=list) created_at: str def generate_doc_summary(doc_id: str, corpus_dir: str | Path) -> DocSummary: """Generate and persist a document summary, falling back on failure.""" corpus_path = Path(corpus_dir) doc_text = _read_document_text(doc_id, corpus_path) created_at = _utc_now() try: raw = _call_summary_llm(doc_id=doc_id, doc_text=doc_text) summary = _parse_summary(doc_id=doc_id, raw=raw, created_at=created_at) except Exception as exc: logger.warning("Failed to generate document summary for %s: %s", doc_id, exc) summary = _fallback_summary(doc_id=doc_id, created_at=created_at) summaries = load_doc_summaries(corpus_path) summaries[doc_id] = summary save_doc_summaries(corpus_path, summaries) return summary def load_doc_summary(doc_id: str, corpus_dir: str | Path) -> DocSummary | None: """Load one stored document summary if present.""" return load_doc_summaries(corpus_dir).get(doc_id) def load_doc_summaries(corpus_dir: str | Path) -> dict[str, DocSummary]: """Load all document summaries from disk.""" path = _summary_path(corpus_dir) if not path.exists(): return {} try: raw = path.read_text(encoding="utf-8") data = json.loads(raw) return { str(doc_id): DocSummary(**summary_data) for doc_id, summary_data in data.items() if isinstance(summary_data, dict) } except Exception as exc: logger.warning("Failed to load document summaries from %s: %s", path, exc) return {} def save_doc_summaries(corpus_dir: str | Path, summaries: dict[str, DocSummary]) -> None: """Persist all document summaries as JSON.""" path = _summary_path(corpus_dir) path.parent.mkdir(parents=True, exist_ok=True) data = {doc_id: summary.model_dump() for doc_id, summary in summaries.items()} path.write_text(json.dumps(data, ensure_ascii=False, indent=1), encoding="utf-8") def apply_doc_summary_context(chunk_text: str, doc_id: str, corpus_dir: str | Path) -> str: """Prepend stored document context to a chunk, if available.""" summary = load_doc_summary(doc_id, corpus_dir) if not summary: logger.warning("No document summary found for %s", doc_id) return chunk_text key_claims = "; ".join(summary.key_claims) prefix = ( f"[Document context: {summary.title} | Domain: {summary.domain} | " f"Key claims: {key_claims}]" ) return f"{prefix}\n\n{chunk_text}" def _call_summary_llm(doc_id: str, doc_text: str) -> str: """Call the configured quick model for summary JSON.""" from pluto.dispatcher import dispatch from pluto.modes import get_mode get_mode("MODE_QUICK") prompt = f"""Summarize this document as JSON only. Schema: {{ "title": "short title", "domain": "subject/domain", "key_claims": ["claim1", "claim2"], "structure": ["intro", "methodology", "results", "conclusion"], "open_questions": ["question1"] }} Document id: {doc_id} Document text: --- {doc_text[:14000]} --- """ return dispatch("MODE_QUICK", prompt) def _parse_summary(doc_id: str, raw: str, created_at: str) -> DocSummary: data = json.loads(extract_json_from_response(raw)) return DocSummary( doc_id=doc_id, title=str(data.get("title", "")), domain=str(data.get("domain", "")), key_claims=_string_list(data.get("key_claims")), structure=_string_list(data.get("structure")), open_questions=_string_list(data.get("open_questions")), created_at=created_at, ) def _fallback_summary(doc_id: str, created_at: str) -> DocSummary: return DocSummary( doc_id=doc_id, title=doc_id, domain="", key_claims=[], structure=[], open_questions=[], created_at=created_at, ) def _read_document_text(doc_id: str, corpus_dir: Path) -> str: for ext in (".md", ".txt"): path = corpus_dir / f"{doc_id}{ext}" if path.exists(): return path.read_text(encoding="utf-8", errors="replace") return "" def _summary_path(corpus_dir: str | Path) -> Path: return Path(corpus_dir) / SUMMARY_FILENAME def _string_list(value: Any) -> list[str]: if not isinstance(value, list): return [] return [str(item) for item in value if str(item).strip()] def _utc_now() -> str: return datetime.now(timezone.utc).isoformat()