ayushKishor's picture
Add Pluto memory layer and pipeline fixes
23cdeed
# -*- coding: utf-8 -*-
"""
Document-level summary storage and context prefix helpers.
This module is deliberately lazy: importing it does not require provider keys or
database/network availability. LLM/provider errors are handled inside
generate_doc_summary with a fallback summary.
"""
from __future__ import annotations
from datetime import datetime, timezone
import json
import logging
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from pluto.utils import extract_json_from_response
logger = logging.getLogger("pluto")
SUMMARY_FILENAME = ".doc_summaries.json"
class DocSummary(BaseModel):
doc_id: str
title: str = ""
domain: str = ""
key_claims: list[str] = Field(default_factory=list)
structure: list[str] = Field(default_factory=list)
open_questions: list[str] = Field(default_factory=list)
created_at: str
def generate_doc_summary(doc_id: str, corpus_dir: str | Path) -> DocSummary:
"""Generate and persist a document summary, falling back on failure."""
corpus_path = Path(corpus_dir)
doc_text = _read_document_text(doc_id, corpus_path)
created_at = _utc_now()
try:
raw = _call_summary_llm(doc_id=doc_id, doc_text=doc_text)
summary = _parse_summary(doc_id=doc_id, raw=raw, created_at=created_at)
except Exception as exc:
logger.warning("Failed to generate document summary for %s: %s", doc_id, exc)
summary = _fallback_summary(doc_id=doc_id, created_at=created_at)
summaries = load_doc_summaries(corpus_path)
summaries[doc_id] = summary
save_doc_summaries(corpus_path, summaries)
return summary
def load_doc_summary(doc_id: str, corpus_dir: str | Path) -> DocSummary | None:
"""Load one stored document summary if present."""
return load_doc_summaries(corpus_dir).get(doc_id)
def load_doc_summaries(corpus_dir: str | Path) -> dict[str, DocSummary]:
"""Load all document summaries from disk."""
path = _summary_path(corpus_dir)
if not path.exists():
return {}
try:
raw = path.read_text(encoding="utf-8")
data = json.loads(raw)
return {
str(doc_id): DocSummary(**summary_data)
for doc_id, summary_data in data.items()
if isinstance(summary_data, dict)
}
except Exception as exc:
logger.warning("Failed to load document summaries from %s: %s", path, exc)
return {}
def save_doc_summaries(corpus_dir: str | Path, summaries: dict[str, DocSummary]) -> None:
"""Persist all document summaries as JSON."""
path = _summary_path(corpus_dir)
path.parent.mkdir(parents=True, exist_ok=True)
data = {doc_id: summary.model_dump() for doc_id, summary in summaries.items()}
path.write_text(json.dumps(data, ensure_ascii=False, indent=1), encoding="utf-8")
def apply_doc_summary_context(chunk_text: str, doc_id: str, corpus_dir: str | Path) -> str:
"""Prepend stored document context to a chunk, if available."""
summary = load_doc_summary(doc_id, corpus_dir)
if not summary:
logger.warning("No document summary found for %s", doc_id)
return chunk_text
key_claims = "; ".join(summary.key_claims)
prefix = (
f"[Document context: {summary.title} | Domain: {summary.domain} | "
f"Key claims: {key_claims}]"
)
return f"{prefix}\n\n{chunk_text}"
def _call_summary_llm(doc_id: str, doc_text: str) -> str:
"""Call the configured quick model for summary JSON."""
from pluto.dispatcher import dispatch
from pluto.modes import get_mode
get_mode("MODE_QUICK")
prompt = f"""Summarize this document as JSON only.
Schema:
{{
"title": "short title",
"domain": "subject/domain",
"key_claims": ["claim1", "claim2"],
"structure": ["intro", "methodology", "results", "conclusion"],
"open_questions": ["question1"]
}}
Document id: {doc_id}
Document text:
---
{doc_text[:14000]}
---
"""
return dispatch("MODE_QUICK", prompt)
def _parse_summary(doc_id: str, raw: str, created_at: str) -> DocSummary:
data = json.loads(extract_json_from_response(raw))
return DocSummary(
doc_id=doc_id,
title=str(data.get("title", "")),
domain=str(data.get("domain", "")),
key_claims=_string_list(data.get("key_claims")),
structure=_string_list(data.get("structure")),
open_questions=_string_list(data.get("open_questions")),
created_at=created_at,
)
def _fallback_summary(doc_id: str, created_at: str) -> DocSummary:
return DocSummary(
doc_id=doc_id,
title=doc_id,
domain="",
key_claims=[],
structure=[],
open_questions=[],
created_at=created_at,
)
def _read_document_text(doc_id: str, corpus_dir: Path) -> str:
for ext in (".md", ".txt"):
path = corpus_dir / f"{doc_id}{ext}"
if path.exists():
return path.read_text(encoding="utf-8", errors="replace")
return ""
def _summary_path(corpus_dir: str | Path) -> Path:
return Path(corpus_dir) / SUMMARY_FILENAME
def _string_list(value: Any) -> list[str]:
if not isinstance(value, list):
return []
return [str(item) for item in value if str(item).strip()]
def _utc_now() -> str:
return datetime.now(timezone.utc).isoformat()