File size: 4,375 Bytes
1bbc850
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64ac601
1bbc850
 
 
64ac601
 
 
1bbc850
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# searcher/sparse_retriever.py

import os
import sqlite3
import math
import yaml
from collections import defaultdict


class SparseRetriever:
    """
    BM25 (Okapi BM25) lexical retrieval over the SQLite chunk store.

    Why BM25 alongside semantic search?
    - Dense retrieval can miss exact keyword matches (product codes, names, IDs)
    - BM25 is great for rare/specific terms that embeddings smooth over
    - Hybrid = best of both worlds

    BM25 formula:
        score(q, d) = Σ IDF(t) × (tf × (k1+1)) / (tf + k1 × (1 - b + b × dl/avgdl))
    """

    def __init__(self, config_path="config.yaml"):
        config_path = os.path.abspath(config_path)
        with open(config_path) as f:
            config = yaml.safe_load(f)

        config_dir = os.path.dirname(config_path)
        data_dir = config["data_dir"]
        self.data_dir = data_dir if os.path.isabs(data_dir) else os.path.normpath(os.path.join(config_dir, data_dir))
        self.db_path = f"{self.data_dir}/metadata.db"
        self.k1 = 1.5   # term frequency saturation
        self.b = 0.75   # length normalisation

        # Build in-memory BM25 index from SQLite on startup
        self._corpus = []       # list of (chunk_id, token_list)
        self._avgdl = 0.0
        self._N = 0
        self._df = defaultdict(int)   # term → doc frequency
        self._build_index()

    def _build_index(self):
        """Load all chunks from SQLite and compute BM25 statistics."""
        os.makedirs(self.data_dir, exist_ok=True)
        conn = sqlite3.connect(self.db_path)
        try:
            rows = conn.execute("SELECT id, chunk_text FROM chunks").fetchall()
        except sqlite3.OperationalError:
            rows = []
        conn.close()

        total_len = 0
        for chunk_id, text in rows:
            tokens = text.lower().split()
            self._corpus.append((chunk_id, tokens))
            total_len += len(tokens)
            for token in set(tokens):
                self._df[token] += 1

        self._avgdl = total_len / len(rows) if rows else 1.0
        self._N = len(rows)

    def _idf(self, term: str) -> float:
        """Inverse document frequency for a term."""
        df = self._df.get(term, 0)
        return math.log((self._N - df + 0.5) / (df + 0.5) + 1)

    def retrieve(self, query: str, top_k: int = 20) -> list[dict]:
        """
        Run BM25 retrieval over the corpus.

        Args:
            query (str) — raw or rewritten query (NOT expanded — BM25 is lexical)
            top_k (int) — number of results to return

        Returns:
            list[dict] with chunk_id and sparse_score, sorted descending
        """
        if not self._corpus:
            return []

        query_terms = query.lower().split()
        scores = {}

        for chunk_id, tokens in self._corpus:
            dl = len(tokens)
            score = 0.0
            tf_map = defaultdict(int)
            for t in tokens:
                tf_map[t] += 1

            for term in query_terms:
                if term not in tf_map:
                    continue
                tf = tf_map[term]
                idf = self._idf(term)
                numerator = tf * (self.k1 + 1)
                denominator = tf + self.k1 * (1 - self.b + self.b * dl / self._avgdl)
                score += idf * numerator / denominator

            if score > 0:
                scores[chunk_id] = score

        sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_k]

        # Fetch text for top results
        conn = sqlite3.connect(self.db_path)
        results = []
        for chunk_id, score in sorted_results:
            row = conn.execute(
                "SELECT chunk_text, filepath, chunk_index FROM chunks WHERE id = ?",
                (chunk_id,)
            ).fetchone()
            if row:
                results.append({
                    "chunk_id": chunk_id,
                    "chunk_text": row[0],
                    "filepath": row[1],
                    "chunk_index": row[2],
                    "sparse_score": score,
                })
        conn.close()
        return results


if __name__ == "__main__":
    sr = SparseRetriever()
    results = sr.retrieve("quarterly budget", top_k=5)
    for r in results:
        print(f"[{r['sparse_score']:.4f}] {r['filepath']}{r['chunk_text'][:80]}")