File size: 4,230 Bytes
732e77c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# /memory/rag/indexer.py
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Iterable
from pathlib import Path
import json
import math
import re

DEFAULT_INDEX_PATH = Path(__file__).with_suffix(".json")

_WORD_RE = re.compile(r"[A-Za-z0-9']+")

def tokenize(text: str) -> List[str]:
    return [m.group(0).lower() for m in _WORD_RE.finditer(text or "")]

@dataclass(frozen=True)
class DocMeta:
    doc_id: str
    source: str
    title: Optional[str] = None
    tags: Optional[List[str]] = None

    def to_dict(self) -> Dict:
        return asdict(self)

    @staticmethod
    def from_dict(d: Dict) -> "DocMeta":
        return DocMeta(
            doc_id=str(d["doc_id"]),
            source=str(d.get("source", "")),
            title=d.get("title"),
            tags=list(d.get("tags") or []) or None,
        )

@dataclass(frozen=True)
class DocHit:
    doc_id: str
    score: float

class TfidfIndex:
    """
    Minimal TF-IDF index used by tests:
      - add_text / add_file
      - save / load
      - search(query, k)
    """
    def __init__(self) -> None:
        self.docs: Dict[str, Dict] = {}          # doc_id -> {"text": str, "meta": DocMeta}
        self.df: Dict[str, int] = {}             # term -> document frequency
        self.n_docs: int = 0

    # ---- building ----
    def add_text(self, doc_id: str, text: str, meta: DocMeta) -> None:
        text = text or ""
        self.docs[doc_id] = {"text": text, "meta": meta}
        self.n_docs = len(self.docs)
        seen = set()
        for t in set(tokenize(text)):
            if t not in seen:
                self.df[t] = self.df.get(t, 0) + 1
                seen.add(t)

    def add_file(self, path: str | Path) -> None:
        p = Path(path)
        text = p.read_text(encoding="utf-8", errors="ignore")
        did = str(p.resolve())
        meta = DocMeta(doc_id=did, source=did, title=p.name, tags=None)
        self.add_text(did, text, meta)

    # ---- persistence ----
    def save(self, path: str | Path) -> None:
        p = Path(path)
        payload = {
            "n_docs": self.n_docs,
            "docs": {
                did: {"text": d["text"], "meta": d["meta"].to_dict()}
                for did, d in self.docs.items()
            }
        }
        p.parent.mkdir(parents=True, exist_ok=True)
        p.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")

    @classmethod
    def load(cls, path: str | Path) -> "TfidfIndex":
        p = Path(path)
        idx = cls()
        if not p.exists():
            return idx
        raw = json.loads(p.read_text(encoding="utf-8"))
        docs = raw.get("docs", {})
        for did, d in docs.items():
            meta = DocMeta.from_dict(d["meta"])
            idx.add_text(did, d.get("text", ""), meta)
        return idx

    # ---- search ----
    def _idf(self, term: str) -> float:
        df = self.df.get(term, 0)
        # smooth to avoid div-by-zero; +1 in both numerator/denominator
        return math.log((self.n_docs + 1) / (df + 1)) + 1.0

    def search(self, query: str, k: int = 5) -> List[DocHit]:
        q_terms = tokenize(query)
        if not q_terms or self.n_docs == 0:
            return []
        # doc scores via simple tf-idf (sum over terms)
        scores: Dict[str, float] = {}
        for did, d in self.docs.items():
            text_terms = tokenize(d["text"])
            if not text_terms:
                continue
            tf: Dict[str, int] = {}
            for t in text_terms:
                tf[t] = tf.get(t, 0) + 1
            s = 0.0
            for qt in set(q_terms):
                s += (tf.get(qt, 0) * self._idf(qt))
            if s > 0.0:
                scores[did] = s
        hits = [DocHit(doc_id=did, score=sc) for did, sc in scores.items()]
        hits.sort(key=lambda h: h.score, reverse=True)
        return hits[:k]

# -------- convenience used by retriever/tests --------
def load_index(path: str | Path = DEFAULT_INDEX_PATH) -> TfidfIndex:
    return TfidfIndex.load(path)

def search(query: str, k: int = 5, path: str | Path = DEFAULT_INDEX_PATH) -> List[DocHit]:
    idx = load_index(path)
    return idx.search(query, k=k)