File size: 5,326 Bytes
23cdeed | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | # -*- coding: utf-8 -*-
"""
Document-level summary storage and context prefix helpers.
This module is deliberately lazy: importing it does not require provider keys or
database/network availability. LLM/provider errors are handled inside
generate_doc_summary with a fallback summary.
"""
from __future__ import annotations
from datetime import datetime, timezone
import json
import logging
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from pluto.utils import extract_json_from_response
logger = logging.getLogger("pluto")
SUMMARY_FILENAME = ".doc_summaries.json"
class DocSummary(BaseModel):
doc_id: str
title: str = ""
domain: str = ""
key_claims: list[str] = Field(default_factory=list)
structure: list[str] = Field(default_factory=list)
open_questions: list[str] = Field(default_factory=list)
created_at: str
def generate_doc_summary(doc_id: str, corpus_dir: str | Path) -> DocSummary:
"""Generate and persist a document summary, falling back on failure."""
corpus_path = Path(corpus_dir)
doc_text = _read_document_text(doc_id, corpus_path)
created_at = _utc_now()
try:
raw = _call_summary_llm(doc_id=doc_id, doc_text=doc_text)
summary = _parse_summary(doc_id=doc_id, raw=raw, created_at=created_at)
except Exception as exc:
logger.warning("Failed to generate document summary for %s: %s", doc_id, exc)
summary = _fallback_summary(doc_id=doc_id, created_at=created_at)
summaries = load_doc_summaries(corpus_path)
summaries[doc_id] = summary
save_doc_summaries(corpus_path, summaries)
return summary
def load_doc_summary(doc_id: str, corpus_dir: str | Path) -> DocSummary | None:
"""Load one stored document summary if present."""
return load_doc_summaries(corpus_dir).get(doc_id)
def load_doc_summaries(corpus_dir: str | Path) -> dict[str, DocSummary]:
"""Load all document summaries from disk."""
path = _summary_path(corpus_dir)
if not path.exists():
return {}
try:
raw = path.read_text(encoding="utf-8")
data = json.loads(raw)
return {
str(doc_id): DocSummary(**summary_data)
for doc_id, summary_data in data.items()
if isinstance(summary_data, dict)
}
except Exception as exc:
logger.warning("Failed to load document summaries from %s: %s", path, exc)
return {}
def save_doc_summaries(corpus_dir: str | Path, summaries: dict[str, DocSummary]) -> None:
"""Persist all document summaries as JSON."""
path = _summary_path(corpus_dir)
path.parent.mkdir(parents=True, exist_ok=True)
data = {doc_id: summary.model_dump() for doc_id, summary in summaries.items()}
path.write_text(json.dumps(data, ensure_ascii=False, indent=1), encoding="utf-8")
def apply_doc_summary_context(chunk_text: str, doc_id: str, corpus_dir: str | Path) -> str:
"""Prepend stored document context to a chunk, if available."""
summary = load_doc_summary(doc_id, corpus_dir)
if not summary:
logger.warning("No document summary found for %s", doc_id)
return chunk_text
key_claims = "; ".join(summary.key_claims)
prefix = (
f"[Document context: {summary.title} | Domain: {summary.domain} | "
f"Key claims: {key_claims}]"
)
return f"{prefix}\n\n{chunk_text}"
def _call_summary_llm(doc_id: str, doc_text: str) -> str:
"""Call the configured quick model for summary JSON."""
from pluto.dispatcher import dispatch
from pluto.modes import get_mode
get_mode("MODE_QUICK")
prompt = f"""Summarize this document as JSON only.
Schema:
{{
"title": "short title",
"domain": "subject/domain",
"key_claims": ["claim1", "claim2"],
"structure": ["intro", "methodology", "results", "conclusion"],
"open_questions": ["question1"]
}}
Document id: {doc_id}
Document text:
---
{doc_text[:14000]}
---
"""
return dispatch("MODE_QUICK", prompt)
def _parse_summary(doc_id: str, raw: str, created_at: str) -> DocSummary:
data = json.loads(extract_json_from_response(raw))
return DocSummary(
doc_id=doc_id,
title=str(data.get("title", "")),
domain=str(data.get("domain", "")),
key_claims=_string_list(data.get("key_claims")),
structure=_string_list(data.get("structure")),
open_questions=_string_list(data.get("open_questions")),
created_at=created_at,
)
def _fallback_summary(doc_id: str, created_at: str) -> DocSummary:
return DocSummary(
doc_id=doc_id,
title=doc_id,
domain="",
key_claims=[],
structure=[],
open_questions=[],
created_at=created_at,
)
def _read_document_text(doc_id: str, corpus_dir: Path) -> str:
for ext in (".md", ".txt"):
path = corpus_dir / f"{doc_id}{ext}"
if path.exists():
return path.read_text(encoding="utf-8", errors="replace")
return ""
def _summary_path(corpus_dir: str | Path) -> Path:
return Path(corpus_dir) / SUMMARY_FILENAME
def _string_list(value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [str(item) for item in value if str(item).strip()]
def _utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
|