| |
| from __future__ import annotations |
| from dataclasses import dataclass, asdict |
| from typing import Dict, List, Optional, Iterable |
| from pathlib import Path |
| import json |
| import math |
| import re |
|
|
| DEFAULT_INDEX_PATH = Path(__file__).with_suffix(".json") |
|
|
| _WORD_RE = re.compile(r"[A-Za-z0-9']+") |
|
|
| def tokenize(text: str) -> List[str]: |
| return [m.group(0).lower() for m in _WORD_RE.finditer(text or "")] |
|
|
| @dataclass(frozen=True) |
| class DocMeta: |
| doc_id: str |
| source: str |
| title: Optional[str] = None |
| tags: Optional[List[str]] = None |
|
|
| def to_dict(self) -> Dict: |
| return asdict(self) |
|
|
| @staticmethod |
| def from_dict(d: Dict) -> "DocMeta": |
| return DocMeta( |
| doc_id=str(d["doc_id"]), |
| source=str(d.get("source", "")), |
| title=d.get("title"), |
| tags=list(d.get("tags") or []) or None, |
| ) |
|
|
| @dataclass(frozen=True) |
| class DocHit: |
| doc_id: str |
| score: float |
|
|
| class TfidfIndex: |
| """ |
| Minimal TF-IDF index used by tests: |
| - add_text / add_file |
| - save / load |
| - search(query, k) |
| """ |
| def __init__(self) -> None: |
| self.docs: Dict[str, Dict] = {} |
| self.df: Dict[str, int] = {} |
| self.n_docs: int = 0 |
|
|
| |
| def add_text(self, doc_id: str, text: str, meta: DocMeta) -> None: |
| text = text or "" |
| self.docs[doc_id] = {"text": text, "meta": meta} |
| self.n_docs = len(self.docs) |
| seen = set() |
| for t in set(tokenize(text)): |
| if t not in seen: |
| self.df[t] = self.df.get(t, 0) + 1 |
| seen.add(t) |
|
|
| def add_file(self, path: str | Path) -> None: |
| p = Path(path) |
| text = p.read_text(encoding="utf-8", errors="ignore") |
| did = str(p.resolve()) |
| meta = DocMeta(doc_id=did, source=did, title=p.name, tags=None) |
| self.add_text(did, text, meta) |
|
|
| |
| def save(self, path: str | Path) -> None: |
| p = Path(path) |
| payload = { |
| "n_docs": self.n_docs, |
| "docs": { |
| did: {"text": d["text"], "meta": d["meta"].to_dict()} |
| for did, d in self.docs.items() |
| } |
| } |
| p.parent.mkdir(parents=True, exist_ok=True) |
| p.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8") |
|
|
| @classmethod |
| def load(cls, path: str | Path) -> "TfidfIndex": |
| p = Path(path) |
| idx = cls() |
| if not p.exists(): |
| return idx |
| raw = json.loads(p.read_text(encoding="utf-8")) |
| docs = raw.get("docs", {}) |
| for did, d in docs.items(): |
| meta = DocMeta.from_dict(d["meta"]) |
| idx.add_text(did, d.get("text", ""), meta) |
| return idx |
|
|
| |
| def _idf(self, term: str) -> float: |
| df = self.df.get(term, 0) |
| |
| return math.log((self.n_docs + 1) / (df + 1)) + 1.0 |
|
|
| def search(self, query: str, k: int = 5) -> List[DocHit]: |
| q_terms = tokenize(query) |
| if not q_terms or self.n_docs == 0: |
| return [] |
| |
| scores: Dict[str, float] = {} |
| for did, d in self.docs.items(): |
| text_terms = tokenize(d["text"]) |
| if not text_terms: |
| continue |
| tf: Dict[str, int] = {} |
| for t in text_terms: |
| tf[t] = tf.get(t, 0) + 1 |
| s = 0.0 |
| for qt in set(q_terms): |
| s += (tf.get(qt, 0) * self._idf(qt)) |
| if s > 0.0: |
| scores[did] = s |
| hits = [DocHit(doc_id=did, score=sc) for did, sc in scores.items()] |
| hits.sort(key=lambda h: h.score, reverse=True) |
| return hits[:k] |
|
|
| |
| def load_index(path: str | Path = DEFAULT_INDEX_PATH) -> TfidfIndex: |
| return TfidfIndex.load(path) |
|
|
| def search(query: str, k: int = 5, path: str | Path = DEFAULT_INDEX_PATH) -> List[DocHit]: |
| idx = load_index(path) |
| return idx.search(query, k=k) |
|
|