Spaces:
Sleeping
Sleeping
JerameeUC
12 Commit PyTest Working But Failing for some. The individual sections need to be completed to fix.
0c4f0e3 | # /memory/rag/indexer.py | |
| from __future__ import annotations | |
| from dataclasses import dataclass, asdict | |
| from typing import Dict, List, Optional, Iterable | |
| from pathlib import Path | |
| import json | |
| import math | |
| import re | |
| DEFAULT_INDEX_PATH = Path(__file__).with_suffix(".json") | |
| _WORD_RE = re.compile(r"[A-Za-z0-9']+") | |
| def tokenize(text: str) -> List[str]: | |
| return [m.group(0).lower() for m in _WORD_RE.finditer(text or "")] | |
| class DocMeta: | |
| doc_id: str | |
| source: str | |
| title: Optional[str] = None | |
| tags: Optional[List[str]] = None | |
| def to_dict(self) -> Dict: | |
| return asdict(self) | |
| def from_dict(d: Dict) -> "DocMeta": | |
| return DocMeta( | |
| doc_id=str(d["doc_id"]), | |
| source=str(d.get("source", "")), | |
| title=d.get("title"), | |
| tags=list(d.get("tags") or []) or None, | |
| ) | |
| class DocHit: | |
| doc_id: str | |
| score: float | |
| class TfidfIndex: | |
| """ | |
| Minimal TF-IDF index used by tests: | |
| - add_text / add_file | |
| - save / load | |
| - search(query, k) | |
| """ | |
| def __init__(self) -> None: | |
| self.docs: Dict[str, Dict] = {} # doc_id -> {"text": str, "meta": DocMeta} | |
| self.df: Dict[str, int] = {} # term -> document frequency | |
| self.n_docs: int = 0 | |
| # ---- building ---- | |
| def add_text(self, doc_id: str, text: str, meta: DocMeta) -> None: | |
| text = text or "" | |
| self.docs[doc_id] = {"text": text, "meta": meta} | |
| self.n_docs = len(self.docs) | |
| seen = set() | |
| for t in set(tokenize(text)): | |
| if t not in seen: | |
| self.df[t] = self.df.get(t, 0) + 1 | |
| seen.add(t) | |
| def add_file(self, path: str | Path) -> None: | |
| p = Path(path) | |
| text = p.read_text(encoding="utf-8", errors="ignore") | |
| did = str(p.resolve()) | |
| meta = DocMeta(doc_id=did, source=did, title=p.name, tags=None) | |
| self.add_text(did, text, meta) | |
| # ---- persistence ---- | |
| def save(self, path: str | Path) -> None: | |
| p = Path(path) | |
| payload = { | |
| "n_docs": self.n_docs, | |
| "docs": { | |
| did: {"text": d["text"], "meta": d["meta"].to_dict()} | |
| for did, d in self.docs.items() | |
| } | |
| } | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| p.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") | |
| def load(cls, path: str | Path) -> "TfidfIndex": | |
| p = Path(path) | |
| idx = cls() | |
| if not p.exists(): | |
| return idx | |
| raw = json.loads(p.read_text(encoding="utf-8")) | |
| docs = raw.get("docs", {}) | |
| for did, d in docs.items(): | |
| meta = DocMeta.from_dict(d["meta"]) | |
| idx.add_text(did, d.get("text", ""), meta) | |
| return idx | |
| # ---- search ---- | |
| def _idf(self, term: str) -> float: | |
| df = self.df.get(term, 0) | |
| # smooth to avoid div-by-zero; +1 in both numerator/denominator | |
| return math.log((self.n_docs + 1) / (df + 1)) + 1.0 | |
| def search(self, query: str, k: int = 5) -> List[DocHit]: | |
| q_terms = tokenize(query) | |
| if not q_terms or self.n_docs == 0: | |
| return [] | |
| # doc scores via simple tf-idf (sum over terms) | |
| scores: Dict[str, float] = {} | |
| for did, d in self.docs.items(): | |
| text_terms = tokenize(d["text"]) | |
| if not text_terms: | |
| continue | |
| tf: Dict[str, int] = {} | |
| for t in text_terms: | |
| tf[t] = tf.get(t, 0) + 1 | |
| s = 0.0 | |
| for qt in set(q_terms): | |
| s += (tf.get(qt, 0) * self._idf(qt)) | |
| if s > 0.0: | |
| scores[did] = s | |
| hits = [DocHit(doc_id=did, score=sc) for did, sc in scores.items()] | |
| hits.sort(key=lambda h: h.score, reverse=True) | |
| return hits[:k] | |
| # -------- convenience used by retriever/tests -------- | |
| def load_index(path: str | Path = DEFAULT_INDEX_PATH) -> TfidfIndex: | |
| return TfidfIndex.load(path) | |
| def search(query: str, k: int = 5, path: str | Path = DEFAULT_INDEX_PATH) -> List[DocHit]: | |
| idx = load_index(path) | |
| return idx.search(query, k=k) | |