Spaces:
Sleeping
Sleeping
| """Small retrieval helpers: tokenization, chunking, and embedding ranking.""" | |
| from __future__ import annotations | |
| import math | |
| import re | |
| from pathlib import Path | |
| from .types import ResearchChunk | |
| SECTION_MAX_CHARS = 900 | |
| MAX_RETURNED_CHUNKS = 5 | |
| EMBEDDING_MODEL_NAME = "BAAI/bge-small-en-v1.5" | |
| EMBEDDING_CACHE_DIR = Path(__file__).resolve().parents[1] / ".cache" / "fastembed" | |
| _EMBEDDING_MODEL = None | |
| _STOP_WORDS = frozenset({ | |
| "the", | |
| "a", | |
| "an", | |
| "is", | |
| "are", | |
| "was", | |
| "were", | |
| "be", | |
| "been", | |
| "being", | |
| "have", | |
| "has", | |
| "had", | |
| "do", | |
| "does", | |
| "did", | |
| "will", | |
| "would", | |
| "could", | |
| "should", | |
| "to", | |
| "of", | |
| "in", | |
| "for", | |
| "on", | |
| "with", | |
| "at", | |
| "by", | |
| "from", | |
| "as", | |
| "and", | |
| "but", | |
| "or", | |
| "this", | |
| "that", | |
| "these", | |
| "those", | |
| "it", | |
| "its", | |
| }) | |
| def tokenize(text: str) -> list[str]: | |
| """Lowercase alphanumeric tokenization, stop words removed.""" | |
| return [ | |
| word | |
| for word in re.findall(r"\w+", text.lower()) | |
| if word not in _STOP_WORDS and len(word) > 1 | |
| ] | |
| def trim_text(text: str, max_chars: int = SECTION_MAX_CHARS) -> str: | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text[:max_chars].strip() | |
| def chunk_markdown(text: str, fallback_title: str) -> list[tuple[str, str]]: | |
| """Split markdown-ish text into titled chunks.""" | |
| chunks: list[tuple[str, str]] = [] | |
| heading = fallback_title | |
| lines: list[str] = [] | |
| for line in text.splitlines(): | |
| if line.startswith("#"): | |
| body = "\n".join(lines).strip() | |
| if body: | |
| chunks.append((heading, body)) | |
| heading = line.lstrip("#").strip() or fallback_title | |
| lines = [] | |
| else: | |
| lines.append(line) | |
| body = "\n".join(lines).strip() | |
| if body: | |
| chunks.append((heading, body)) | |
| return chunks | |
| def rank_chunks_for_query( | |
| query: str, | |
| intent: str, | |
| chunks: list[ResearchChunk], | |
| top_k: int = MAX_RETURNED_CHUNKS, | |
| embedding_model=None, | |
| ) -> list[ResearchChunk]: | |
| """Return the final top chunks for query+intent. | |
| The pipeline is: source results -> text chunks -> embedding similarity | |
| against query+intent -> final top-k chunks. | |
| """ | |
| if not chunks: | |
| return [] | |
| query_text = f"{query} {intent}".strip() | |
| if not query_text: | |
| return _assign_ranks(chunks[:top_k]) | |
| model = embedding_model or _get_embedding_model() | |
| texts = [query_text] + [_chunk_embedding_text(chunk) for chunk in chunks] | |
| vectors = list(model.embed(texts)) | |
| if len(vectors) != len(texts): | |
| raise RuntimeError("Embedding model returned an unexpected number of vectors") | |
| query_vec = vectors[0] | |
| scored: list[ResearchChunk] = [] | |
| for chunk, vec in zip(chunks, vectors[1:]): | |
| chunk.score = _cosine(query_vec, vec) | |
| scored.append(chunk) | |
| scored.sort(key=lambda chunk: chunk.score, reverse=True) | |
| return _assign_ranks(scored[:top_k]) | |
| def preload_embedding_model() -> None: | |
| """Download/cache and initialize the embedding model before serving traffic.""" | |
| model = _get_embedding_model() | |
| # Force model files and runtime session to be ready, not just configured. | |
| list(model.embed(["startup warmup"])) | |
| def _get_embedding_model(): | |
| global _EMBEDDING_MODEL | |
| if _EMBEDDING_MODEL is None: | |
| from fastembed import TextEmbedding | |
| EMBEDDING_CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| _EMBEDDING_MODEL = TextEmbedding( | |
| model_name=EMBEDDING_MODEL_NAME, | |
| cache_dir=str(EMBEDDING_CACHE_DIR), | |
| ) | |
| return _EMBEDDING_MODEL | |
| def _chunk_embedding_text(chunk: ResearchChunk) -> str: | |
| return f"{chunk.title}\n{chunk.text}".strip() | |
| def _assign_ranks(chunks: list[ResearchChunk]) -> list[ResearchChunk]: | |
| for idx, chunk in enumerate(chunks, start=1): | |
| chunk.rank = idx | |
| return chunks | |
| def _cosine(a, b) -> float: | |
| numerator = sum(float(x) * float(y) for x, y in zip(a, b)) | |
| a_norm = math.sqrt(sum(float(x) * float(x) for x in a)) | |
| b_norm = math.sqrt(sum(float(y) * float(y) for y in b)) | |
| if a_norm == 0 or b_norm == 0: | |
| return 0.0 | |
| return numerator / (a_norm * b_norm) | |