Spaces:
Sleeping
Sleeping
| # searcher/sparse_retriever.py | |
| import os | |
| import sqlite3 | |
| import math | |
| import yaml | |
| from collections import defaultdict | |
| class SparseRetriever: | |
| """ | |
| BM25 (Okapi BM25) lexical retrieval over the SQLite chunk store. | |
| Why BM25 alongside semantic search? | |
| - Dense retrieval can miss exact keyword matches (product codes, names, IDs) | |
| - BM25 is great for rare/specific terms that embeddings smooth over | |
| - Hybrid = best of both worlds | |
| BM25 formula: | |
| score(q, d) = Σ IDF(t) × (tf × (k1+1)) / (tf + k1 × (1 - b + b × dl/avgdl)) | |
| """ | |
| def __init__(self, config_path="config.yaml"): | |
| config_path = os.path.abspath(config_path) | |
| with open(config_path) as f: | |
| config = yaml.safe_load(f) | |
| config_dir = os.path.dirname(config_path) | |
| data_dir = config["data_dir"] | |
| self.data_dir = data_dir if os.path.isabs(data_dir) else os.path.normpath(os.path.join(config_dir, data_dir)) | |
| self.db_path = f"{self.data_dir}/metadata.db" | |
| self.k1 = 1.5 # term frequency saturation | |
| self.b = 0.75 # length normalisation | |
| # Build in-memory BM25 index from SQLite on startup | |
| self._corpus = [] # list of (chunk_id, token_list) | |
| self._avgdl = 0.0 | |
| self._N = 0 | |
| self._df = defaultdict(int) # term → doc frequency | |
| self._build_index() | |
| def _build_index(self): | |
| """Load all chunks from SQLite and compute BM25 statistics.""" | |
| os.makedirs(self.data_dir, exist_ok=True) | |
| conn = sqlite3.connect(self.db_path) | |
| try: | |
| rows = conn.execute("SELECT id, chunk_text FROM chunks").fetchall() | |
| except sqlite3.OperationalError: | |
| rows = [] | |
| conn.close() | |
| total_len = 0 | |
| for chunk_id, text in rows: | |
| tokens = text.lower().split() | |
| self._corpus.append((chunk_id, tokens)) | |
| total_len += len(tokens) | |
| for token in set(tokens): | |
| self._df[token] += 1 | |
| self._avgdl = total_len / len(rows) if rows else 1.0 | |
| self._N = len(rows) | |
| def _idf(self, term: str) -> float: | |
| """Inverse document frequency for a term.""" | |
| df = self._df.get(term, 0) | |
| return math.log((self._N - df + 0.5) / (df + 0.5) + 1) | |
| def retrieve(self, query: str, top_k: int = 20) -> list[dict]: | |
| """ | |
| Run BM25 retrieval over the corpus. | |
| Args: | |
| query (str) — raw or rewritten query (NOT expanded — BM25 is lexical) | |
| top_k (int) — number of results to return | |
| Returns: | |
| list[dict] with chunk_id and sparse_score, sorted descending | |
| """ | |
| if not self._corpus: | |
| return [] | |
| query_terms = query.lower().split() | |
| scores = {} | |
| for chunk_id, tokens in self._corpus: | |
| dl = len(tokens) | |
| score = 0.0 | |
| tf_map = defaultdict(int) | |
| for t in tokens: | |
| tf_map[t] += 1 | |
| for term in query_terms: | |
| if term not in tf_map: | |
| continue | |
| tf = tf_map[term] | |
| idf = self._idf(term) | |
| numerator = tf * (self.k1 + 1) | |
| denominator = tf + self.k1 * (1 - self.b + self.b * dl / self._avgdl) | |
| score += idf * numerator / denominator | |
| if score > 0: | |
| scores[chunk_id] = score | |
| sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_k] | |
| # Fetch text for top results | |
| conn = sqlite3.connect(self.db_path) | |
| results = [] | |
| for chunk_id, score in sorted_results: | |
| row = conn.execute( | |
| "SELECT chunk_text, filepath, chunk_index FROM chunks WHERE id = ?", | |
| (chunk_id,) | |
| ).fetchone() | |
| if row: | |
| results.append({ | |
| "chunk_id": chunk_id, | |
| "chunk_text": row[0], | |
| "filepath": row[1], | |
| "chunk_index": row[2], | |
| "sparse_score": score, | |
| }) | |
| conn.close() | |
| return results | |
| if __name__ == "__main__": | |
| sr = SparseRetriever() | |
| results = sr.retrieve("quarterly budget", top_k=5) | |
| for r in results: | |
| print(f"[{r['sparse_score']:.4f}] {r['filepath']} → {r['chunk_text'][:80]}") | |