File size: 3,032 Bytes
23cdeed 66ad25b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | # -*- coding: utf-8 -*-
"""
pluto/extraction_cache.py β Persistent cache for S1 EXTRACT results.
Stores LLM extraction outputs keyed by chunk content SHA-256 hash.
On cache hit, the expensive LLM call is skipped entirely.
Cache file: <corpus_dir>/.extraction_cache.json
"""
from __future__ import annotations
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
class ExtractionCache:
"""JSON-file-backed cache for chunk extraction results."""
def __init__(self, corpus_dir: str) -> None:
self._path = Path(corpus_dir).resolve() / ".extraction_cache.json"
self._data: dict[str, dict[str, Any]] = {}
self.hits = 0
self.misses = 0
self._load()
# ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get(self, chunk_hash: str) -> dict[str, Any] | None:
"""Return cached extraction dict for this chunk hash, or None."""
entry = self._data.get(chunk_hash)
if entry:
self.hits += 1
return entry
self.misses += 1
return None
def put(self, chunk_hash: str, extract_dict: dict[str, Any]) -> None:
"""Store an extraction result keyed by chunk hash."""
extract_dict["cached_at"] = datetime.now(timezone.utc).isoformat()
self._data[chunk_hash] = extract_dict
def invalidate_doc(self, doc_id: str) -> int:
"""Remove all cached entries for a specific document. Returns count removed."""
to_remove = [
h for h, entry in self._data.items()
if entry.get("doc_id") == doc_id
]
for h in to_remove:
del self._data[h]
return len(to_remove)
def save(self) -> None:
"""Persist cache to disk."""
try:
self._path.write_text(
json.dumps(self._data, indent=2, ensure_ascii=False),
encoding="utf-8",
)
except OSError:
pass # Non-fatal: cache is a performance optimization
def stats(self) -> dict[str, Any]:
"""Return cache statistics."""
return {
"total_entries": len(self._data),
"hits": self.hits,
"misses": self.misses,
"hit_rate": round(self.hits / max(self.hits + self.misses, 1), 2),
"cache_file": str(self._path),
}
# ββ Internal ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _load(self) -> None:
"""Load cache from disk if it exists."""
if self._path.exists():
try:
self._data = json.loads(self._path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
self._data = {}
|