# /memory/rag/indexer.py from __future__ import annotations from dataclasses import dataclass, asdict from typing import Dict, List, Optional, Iterable from pathlib import Path import json import math import re DEFAULT_INDEX_PATH = Path(__file__).with_suffix(".json") _WORD_RE = re.compile(r"[A-Za-z0-9']+") def tokenize(text: str) -> List[str]: return [m.group(0).lower() for m in _WORD_RE.finditer(text or "")] @dataclass(frozen=True) class DocMeta: doc_id: str source: str title: Optional[str] = None tags: Optional[List[str]] = None def to_dict(self) -> Dict: return asdict(self) @staticmethod def from_dict(d: Dict) -> "DocMeta": return DocMeta( doc_id=str(d["doc_id"]), source=str(d.get("source", "")), title=d.get("title"), tags=list(d.get("tags") or []) or None, ) @dataclass(frozen=True) class DocHit: doc_id: str score: float class TfidfIndex: """ Minimal TF-IDF index used by tests: - add_text / add_file - save / load - search(query, k) """ def __init__(self) -> None: self.docs: Dict[str, Dict] = {} # doc_id -> {"text": str, "meta": DocMeta} self.df: Dict[str, int] = {} # term -> document frequency self.n_docs: int = 0 # ---- building ---- def add_text(self, doc_id: str, text: str, meta: DocMeta) -> None: text = text or "" self.docs[doc_id] = {"text": text, "meta": meta} self.n_docs = len(self.docs) seen = set() for t in set(tokenize(text)): if t not in seen: self.df[t] = self.df.get(t, 0) + 1 seen.add(t) def add_file(self, path: str | Path) -> None: p = Path(path) text = p.read_text(encoding="utf-8", errors="ignore") did = str(p.resolve()) meta = DocMeta(doc_id=did, source=did, title=p.name, tags=None) self.add_text(did, text, meta) # ---- persistence ---- def save(self, path: str | Path) -> None: p = Path(path) payload = { "n_docs": self.n_docs, "docs": { did: {"text": d["text"], "meta": d["meta"].to_dict()} for did, d in self.docs.items() } } p.parent.mkdir(parents=True, exist_ok=True) p.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") @classmethod def load(cls, path: str | Path) -> "TfidfIndex": p = Path(path) idx = cls() if not p.exists(): return idx raw = json.loads(p.read_text(encoding="utf-8")) docs = raw.get("docs", {}) for did, d in docs.items(): meta = DocMeta.from_dict(d["meta"]) idx.add_text(did, d.get("text", ""), meta) return idx # ---- search ---- def _idf(self, term: str) -> float: df = self.df.get(term, 0) # smooth to avoid div-by-zero; +1 in both numerator/denominator return math.log((self.n_docs + 1) / (df + 1)) + 1.0 def search(self, query: str, k: int = 5) -> List[DocHit]: q_terms = tokenize(query) if not q_terms or self.n_docs == 0: return [] # doc scores via simple tf-idf (sum over terms) scores: Dict[str, float] = {} for did, d in self.docs.items(): text_terms = tokenize(d["text"]) if not text_terms: continue tf: Dict[str, int] = {} for t in text_terms: tf[t] = tf.get(t, 0) + 1 s = 0.0 for qt in set(q_terms): s += (tf.get(qt, 0) * self._idf(qt)) if s > 0.0: scores[did] = s hits = [DocHit(doc_id=did, score=sc) for did, sc in scores.items()] hits.sort(key=lambda h: h.score, reverse=True) return hits[:k] # -------- convenience used by retriever/tests -------- def load_index(path: str | Path = DEFAULT_INDEX_PATH) -> TfidfIndex: return TfidfIndex.load(path) def search(query: str, k: int = 5, path: str | Path = DEFAULT_INDEX_PATH) -> List[DocHit]: idx = load_index(path) return idx.search(query, k=k)