| |
| """ |
| Document-level summary storage and context prefix helpers. |
| |
| This module is deliberately lazy: importing it does not require provider keys or |
| database/network availability. LLM/provider errors are handled inside |
| generate_doc_summary with a fallback summary. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from datetime import datetime, timezone |
| import json |
| import logging |
| from pathlib import Path |
| from typing import Any |
|
|
| from pydantic import BaseModel, Field |
|
|
| from pluto.utils import extract_json_from_response |
|
|
|
|
| logger = logging.getLogger("pluto") |
| SUMMARY_FILENAME = ".doc_summaries.json" |
|
|
|
|
| class DocSummary(BaseModel): |
| doc_id: str |
| title: str = "" |
| domain: str = "" |
| key_claims: list[str] = Field(default_factory=list) |
| structure: list[str] = Field(default_factory=list) |
| open_questions: list[str] = Field(default_factory=list) |
| created_at: str |
|
|
|
|
| def generate_doc_summary(doc_id: str, corpus_dir: str | Path) -> DocSummary: |
| """Generate and persist a document summary, falling back on failure.""" |
| corpus_path = Path(corpus_dir) |
| doc_text = _read_document_text(doc_id, corpus_path) |
| created_at = _utc_now() |
|
|
| try: |
| raw = _call_summary_llm(doc_id=doc_id, doc_text=doc_text) |
| summary = _parse_summary(doc_id=doc_id, raw=raw, created_at=created_at) |
| except Exception as exc: |
| logger.warning("Failed to generate document summary for %s: %s", doc_id, exc) |
| summary = _fallback_summary(doc_id=doc_id, created_at=created_at) |
|
|
| summaries = load_doc_summaries(corpus_path) |
| summaries[doc_id] = summary |
| save_doc_summaries(corpus_path, summaries) |
| return summary |
|
|
|
|
| def load_doc_summary(doc_id: str, corpus_dir: str | Path) -> DocSummary | None: |
| """Load one stored document summary if present.""" |
| return load_doc_summaries(corpus_dir).get(doc_id) |
|
|
|
|
| def load_doc_summaries(corpus_dir: str | Path) -> dict[str, DocSummary]: |
| """Load all document summaries from disk.""" |
| path = _summary_path(corpus_dir) |
| if not path.exists(): |
| return {} |
| try: |
| raw = path.read_text(encoding="utf-8") |
| data = json.loads(raw) |
| return { |
| str(doc_id): DocSummary(**summary_data) |
| for doc_id, summary_data in data.items() |
| if isinstance(summary_data, dict) |
| } |
| except Exception as exc: |
| logger.warning("Failed to load document summaries from %s: %s", path, exc) |
| return {} |
|
|
|
|
| def save_doc_summaries(corpus_dir: str | Path, summaries: dict[str, DocSummary]) -> None: |
| """Persist all document summaries as JSON.""" |
| path = _summary_path(corpus_dir) |
| path.parent.mkdir(parents=True, exist_ok=True) |
| data = {doc_id: summary.model_dump() for doc_id, summary in summaries.items()} |
| path.write_text(json.dumps(data, ensure_ascii=False, indent=1), encoding="utf-8") |
|
|
|
|
| def apply_doc_summary_context(chunk_text: str, doc_id: str, corpus_dir: str | Path) -> str: |
| """Prepend stored document context to a chunk, if available.""" |
| summary = load_doc_summary(doc_id, corpus_dir) |
| if not summary: |
| logger.warning("No document summary found for %s", doc_id) |
| return chunk_text |
|
|
| key_claims = "; ".join(summary.key_claims) |
| prefix = ( |
| f"[Document context: {summary.title} | Domain: {summary.domain} | " |
| f"Key claims: {key_claims}]" |
| ) |
| return f"{prefix}\n\n{chunk_text}" |
|
|
|
|
| def _call_summary_llm(doc_id: str, doc_text: str) -> str: |
| """Call the configured quick model for summary JSON.""" |
| from pluto.dispatcher import dispatch |
| from pluto.modes import get_mode |
|
|
| get_mode("MODE_QUICK") |
| prompt = f"""Summarize this document as JSON only. |
| |
| Schema: |
| {{ |
| "title": "short title", |
| "domain": "subject/domain", |
| "key_claims": ["claim1", "claim2"], |
| "structure": ["intro", "methodology", "results", "conclusion"], |
| "open_questions": ["question1"] |
| }} |
| |
| Document id: {doc_id} |
| |
| Document text: |
| --- |
| {doc_text[:14000]} |
| --- |
| """ |
| return dispatch("MODE_QUICK", prompt) |
|
|
|
|
| def _parse_summary(doc_id: str, raw: str, created_at: str) -> DocSummary: |
| data = json.loads(extract_json_from_response(raw)) |
| return DocSummary( |
| doc_id=doc_id, |
| title=str(data.get("title", "")), |
| domain=str(data.get("domain", "")), |
| key_claims=_string_list(data.get("key_claims")), |
| structure=_string_list(data.get("structure")), |
| open_questions=_string_list(data.get("open_questions")), |
| created_at=created_at, |
| ) |
|
|
|
|
| def _fallback_summary(doc_id: str, created_at: str) -> DocSummary: |
| return DocSummary( |
| doc_id=doc_id, |
| title=doc_id, |
| domain="", |
| key_claims=[], |
| structure=[], |
| open_questions=[], |
| created_at=created_at, |
| ) |
|
|
|
|
| def _read_document_text(doc_id: str, corpus_dir: Path) -> str: |
| for ext in (".md", ".txt"): |
| path = corpus_dir / f"{doc_id}{ext}" |
| if path.exists(): |
| return path.read_text(encoding="utf-8", errors="replace") |
| return "" |
|
|
|
|
| def _summary_path(corpus_dir: str | Path) -> Path: |
| return Path(corpus_dir) / SUMMARY_FILENAME |
|
|
|
|
| def _string_list(value: Any) -> list[str]: |
| if not isinstance(value, list): |
| return [] |
| return [str(item) for item in value if str(item).strip()] |
|
|
|
|
| def _utc_now() -> str: |
| return datetime.now(timezone.utc).isoformat() |
|
|