File size: 3,032 Bytes
23cdeed
66ad25b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# -*- coding: utf-8 -*-
"""
pluto/extraction_cache.py β€” Persistent cache for S1 EXTRACT results.

Stores LLM extraction outputs keyed by chunk content SHA-256 hash.
On cache hit, the expensive LLM call is skipped entirely.

Cache file: <corpus_dir>/.extraction_cache.json
"""

from __future__ import annotations

import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any


class ExtractionCache:
    """JSON-file-backed cache for chunk extraction results."""

    def __init__(self, corpus_dir: str) -> None:
        self._path = Path(corpus_dir).resolve() / ".extraction_cache.json"
        self._data: dict[str, dict[str, Any]] = {}
        self.hits = 0
        self.misses = 0
        self._load()

    # ── Public API ────────────────────────────────────────────────────────────

    def get(self, chunk_hash: str) -> dict[str, Any] | None:
        """Return cached extraction dict for this chunk hash, or None."""
        entry = self._data.get(chunk_hash)
        if entry:
            self.hits += 1
            return entry
        self.misses += 1
        return None

    def put(self, chunk_hash: str, extract_dict: dict[str, Any]) -> None:
        """Store an extraction result keyed by chunk hash."""
        extract_dict["cached_at"] = datetime.now(timezone.utc).isoformat()
        self._data[chunk_hash] = extract_dict

    def invalidate_doc(self, doc_id: str) -> int:
        """Remove all cached entries for a specific document. Returns count removed."""
        to_remove = [
            h for h, entry in self._data.items()
            if entry.get("doc_id") == doc_id
        ]
        for h in to_remove:
            del self._data[h]
        return len(to_remove)

    def save(self) -> None:
        """Persist cache to disk."""
        try:
            self._path.write_text(
                json.dumps(self._data, indent=2, ensure_ascii=False),
                encoding="utf-8",
            )
        except OSError:
            pass  # Non-fatal: cache is a performance optimization

    def stats(self) -> dict[str, Any]:
        """Return cache statistics."""
        return {
            "total_entries": len(self._data),
            "hits": self.hits,
            "misses": self.misses,
            "hit_rate": round(self.hits / max(self.hits + self.misses, 1), 2),
            "cache_file": str(self._path),
        }

    # ── Internal ──────────────────────────────────────────────────────────────

    def _load(self) -> None:
        """Load cache from disk if it exists."""
        if self._path.exists():
            try:
                self._data = json.loads(self._path.read_text(encoding="utf-8"))
            except (json.JSONDecodeError, OSError):
                self._data = {}