Spaces:
Running
Running
File size: 8,943 Bytes
0b89610 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | """
Deterministic hybrid retrieval utilities for rag-context-optimizer.
The retriever combines a corpus-aware BM25 score with a keyword-overlap score, using
only Python standard library components so runs are reproducible across environments.
Doctest examples:
>>> from rag_optimizer_env.corpus import Chunk
>>> corpus = [
... Chunk(
... chunk_id="c1",
... domain="Climate Policy",
... text="Carbon tax rebates helped households accept climate policy.",
... tokens=40,
... keywords=["carbon tax", "rebates", "households", "policy", "climate"],
... relevance_tags=["carbon_pricing"],
... ),
... Chunk(
... chunk_id="c2",
... domain="Software Engineering Best Practices",
... text="Code review and rollback notes improve deployment safety.",
... tokens=35,
... keywords=["code review", "rollback", "deployment", "safety", "review"],
... relevance_tags=["code_review"],
... ),
... ]
>>> retriever = HybridRetriever(corpus)
>>> round(retriever.keyword_overlap_score("carbon tax rebates", corpus[0]), 3) > 0
True
>>> ranked = retriever.rank_chunks("carbon tax rebates", top_k=1)
>>> ranked[0][0].chunk_id
'c1'
>>> "c1" in retriever.get_ground_truth_relevant("carbon tax rebates", threshold=0.1)
True
"""
from __future__ import annotations
import math
import re
from collections import Counter, defaultdict
from typing import Iterable
from rag_optimizer_env.corpus import Chunk
class HybridRetriever:
"""Hybrid lexical retriever with deterministic BM25 and keyword overlap scoring."""
_STOPWORDS = {
"a", "an", "and", "are", "as", "at", "be", "but", "by", "do", "for", "from",
"how", "if", "in", "into", "is", "it", "its", "of", "on", "or", "should",
"that", "the", "their", "them", "there", "these", "this", "to", "using",
"what", "when", "where", "which", "while", "with", "without", "your",
}
def __init__(self, corpus: list[Chunk]):
self.corpus = list(corpus)
self._k1 = 1.5
self._b = 0.75
self._doc_tokens: dict[str, list[str]] = {}
self._doc_term_freqs: dict[str, Counter[str]] = {}
self._doc_lengths: dict[str, int] = {}
self._doc_freqs: dict[str, int] = defaultdict(int)
total_length = 0
for chunk in self.corpus:
tokens = self._tokenize_for_bm25(chunk.text)
self._doc_tokens[chunk.chunk_id] = tokens
term_freqs = Counter(tokens)
self._doc_term_freqs[chunk.chunk_id] = term_freqs
doc_len = len(tokens)
self._doc_lengths[chunk.chunk_id] = doc_len
total_length += doc_len
for term in term_freqs:
self._doc_freqs[term] += 1
self._avg_doc_length = total_length / len(self.corpus) if self.corpus else 0.0
@staticmethod
def _tokenize_for_bm25(text: str) -> list[str]:
return [
token
for token in re.findall(r"[a-z0-9]+", text.lower())
if token not in HybridRetriever._STOPWORDS and len(token) > 1
]
@staticmethod
def _tokenize_query_terms(text: str) -> set[str]:
return {
token
for token in re.findall(r"[a-z0-9]+", text.lower())
if token not in HybridRetriever._STOPWORDS and len(token) > 1
}
@staticmethod
def _normalize_score(value: float, maximum: float) -> float:
if maximum <= 0.0:
return 0.0
return max(0.0, min(1.0, value / maximum))
def _idf(self, term: str) -> float:
"""Compute BM25 IDF using corpus statistics."""
doc_count = len(self.corpus)
if doc_count == 0:
return 0.0
frequency = self._doc_freqs.get(term, 0)
return math.log(1.0 + ((doc_count - frequency + 0.5) / (frequency + 0.5)))
def _raw_bm25(self, query_terms: Iterable[str], chunk: Chunk) -> float:
term_freqs = self._doc_term_freqs.get(chunk.chunk_id, Counter())
doc_len = self._doc_lengths.get(chunk.chunk_id, 0)
avg_doc_len = self._avg_doc_length or 1.0
score = 0.0
for term in Counter(query_terms):
freq = term_freqs.get(term, 0)
if freq == 0:
continue
idf = self._idf(term)
numerator = freq * (self._k1 + 1.0)
denominator = freq + self._k1 * (1.0 - self._b + self._b * (doc_len / avg_doc_len))
score += idf * (numerator / denominator)
return score
def _max_raw_bm25_for_query(self, query_terms: list[str]) -> float:
if not self.corpus or not query_terms:
return 0.0
return max(self._raw_bm25(query_terms, chunk) for chunk in self.corpus)
def bm25_score(self, query: str, chunk: Chunk) -> float:
"""
Implement BM25 scoring using only stdlib and return a normalized score in [0.0, 1.0].
>>> from rag_optimizer_env.corpus import Chunk
>>> corpus = [Chunk(chunk_id="a", domain="Climate Policy", text="carbon tax rebates", tokens=10, keywords=["carbon"], relevance_tags=["x"])]
>>> retriever = HybridRetriever(corpus)
>>> 0.0 <= retriever.bm25_score("carbon tax", corpus[0]) <= 1.0
True
"""
query_terms = self._tokenize_for_bm25(query)
raw_score = self._raw_bm25(query_terms, chunk)
max_score = self._max_raw_bm25_for_query(query_terms)
return self._normalize_score(raw_score, max_score)
def keyword_overlap_score(self, query: str, chunk: Chunk) -> float:
"""
Compute Jaccard similarity between query tokens and chunk keyword tokens.
>>> from rag_optimizer_env.corpus import Chunk
>>> corpus = [Chunk(chunk_id="a", domain="Climate Policy", text="x", tokens=10, keywords=["carbon tax", "rebates"], relevance_tags=["x"])]
>>> retriever = HybridRetriever(corpus)
>>> round(retriever.keyword_overlap_score("carbon rebates", corpus[0]), 3)
0.667
"""
query_terms = self._tokenize_query_terms(query)
keyword_terms = self._tokenize_query_terms(" ".join(chunk.keywords))
if not query_terms or not keyword_terms:
return 0.0
union = query_terms | keyword_terms
if not union:
return 0.0
return len(query_terms & keyword_terms) / len(union)
def hybrid_score(
self,
query: str,
chunk: Chunk,
bm25_weight: float = 0.6,
keyword_weight: float = 0.4,
) -> float:
"""Return a weighted combination of BM25 and keyword overlap scores."""
if bm25_weight < 0 or keyword_weight < 0:
raise ValueError("Weights must be non-negative.")
weight_sum = bm25_weight + keyword_weight
if weight_sum == 0:
raise ValueError("At least one weight must be positive.")
bm25_component = self.bm25_score(query, chunk)
keyword_component = self.keyword_overlap_score(query, chunk)
score = ((bm25_component * bm25_weight) + (keyword_component * keyword_weight)) / weight_sum
return max(0.0, min(1.0, score))
def rank_chunks(self, query: str, top_k: int = 20) -> list[tuple[Chunk, float]]:
"""
Return chunks sorted by hybrid_score descending.
>>> from rag_optimizer_env.corpus import Chunk
>>> corpus = [
... Chunk(chunk_id="a", domain="Climate Policy", text="carbon tax rebates for households", tokens=10, keywords=["carbon tax"], relevance_tags=["x"]),
... Chunk(chunk_id="b", domain="Software Engineering Best Practices", text="code review safety", tokens=10, keywords=["code review"], relevance_tags=["y"]),
... ]
>>> retriever = HybridRetriever(corpus)
>>> [chunk.chunk_id for chunk, _ in retriever.rank_chunks("carbon tax", top_k=2)]
['a', 'b']
"""
scored = [(chunk, self.hybrid_score(query, chunk)) for chunk in self.corpus]
scored.sort(key=lambda item: (-item[1], item[0].chunk_id))
return scored[: max(0, top_k)]
def get_ground_truth_relevant(self, query: str, threshold: float = 0.3) -> set[str]:
"""
Return chunk_ids with hybrid score above or equal to the threshold.
>>> from rag_optimizer_env.corpus import Chunk
>>> corpus = [Chunk(chunk_id="a", domain="Climate Policy", text="carbon tax rebates", tokens=10, keywords=["carbon tax", "rebates"], relevance_tags=["x"])]
>>> retriever = HybridRetriever(corpus)
>>> retriever.get_ground_truth_relevant("carbon tax", threshold=0.1) == {'a'}
True
"""
if not 0.0 <= threshold <= 1.0:
raise ValueError("threshold must be between 0.0 and 1.0.")
return {
chunk.chunk_id
for chunk, score in self.rank_chunks(query, top_k=len(self.corpus))
if score >= threshold
}
|