File size: 4,250 Bytes
8fa7af1
43f41de
 
 
 
 
8fa7af1
43f41de
 
 
 
 
8fa7af1
 
 
43f41de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8fa7af1
 
 
 
 
 
 
 
 
 
 
 
43f41de
 
 
8fa7af1
 
 
43f41de
8fa7af1
 
 
 
 
43f41de
8fa7af1
43f41de
8fa7af1
 
43f41de
 
8fa7af1
43f41de
 
8fa7af1
 
 
 
 
43f41de
8fa7af1
 
 
 
43f41de
 
8fa7af1
 
 
 
 
 
43f41de
 
8fa7af1
 
 
 
 
 
 
 
43f41de
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""Small retrieval helpers: tokenization, chunking, and embedding ranking."""

from __future__ import annotations

import math
import re
from pathlib import Path

from .types import ResearchChunk

SECTION_MAX_CHARS = 900
MAX_RETURNED_CHUNKS = 5
EMBEDDING_MODEL_NAME = "BAAI/bge-small-en-v1.5"
EMBEDDING_CACHE_DIR = Path(__file__).resolve().parents[1] / ".cache" / "fastembed"
_EMBEDDING_MODEL = None

_STOP_WORDS = frozenset({
    "the",
    "a",
    "an",
    "is",
    "are",
    "was",
    "were",
    "be",
    "been",
    "being",
    "have",
    "has",
    "had",
    "do",
    "does",
    "did",
    "will",
    "would",
    "could",
    "should",
    "to",
    "of",
    "in",
    "for",
    "on",
    "with",
    "at",
    "by",
    "from",
    "as",
    "and",
    "but",
    "or",
    "this",
    "that",
    "these",
    "those",
    "it",
    "its",
})


def tokenize(text: str) -> list[str]:
    """Lowercase alphanumeric tokenization, stop words removed."""
    return [
        word
        for word in re.findall(r"\w+", text.lower())
        if word not in _STOP_WORDS and len(word) > 1
    ]


def trim_text(text: str, max_chars: int = SECTION_MAX_CHARS) -> str:
    text = re.sub(r"\s+", " ", text).strip()
    return text[:max_chars].strip()


def chunk_markdown(text: str, fallback_title: str) -> list[tuple[str, str]]:
    """Split markdown-ish text into titled chunks."""
    chunks: list[tuple[str, str]] = []
    heading = fallback_title
    lines: list[str] = []

    for line in text.splitlines():
        if line.startswith("#"):
            body = "\n".join(lines).strip()
            if body:
                chunks.append((heading, body))
            heading = line.lstrip("#").strip() or fallback_title
            lines = []
        else:
            lines.append(line)

    body = "\n".join(lines).strip()
    if body:
        chunks.append((heading, body))
    return chunks


def rank_chunks_for_query(
    query: str,
    intent: str,
    chunks: list[ResearchChunk],
    top_k: int = MAX_RETURNED_CHUNKS,
    embedding_model=None,
) -> list[ResearchChunk]:
    """Return the final top chunks for query+intent.

    The pipeline is: source results -> text chunks -> embedding similarity
    against query+intent -> final top-k chunks.
    """
    if not chunks:
        return []

    query_text = f"{query} {intent}".strip()
    if not query_text:
        return _assign_ranks(chunks[:top_k])

    model = embedding_model or _get_embedding_model()
    texts = [query_text] + [_chunk_embedding_text(chunk) for chunk in chunks]
    vectors = list(model.embed(texts))
    if len(vectors) != len(texts):
        raise RuntimeError("Embedding model returned an unexpected number of vectors")

    query_vec = vectors[0]
    scored: list[ResearchChunk] = []
    for chunk, vec in zip(chunks, vectors[1:]):
        chunk.score = _cosine(query_vec, vec)
        scored.append(chunk)
    scored.sort(key=lambda chunk: chunk.score, reverse=True)
    return _assign_ranks(scored[:top_k])


def preload_embedding_model() -> None:
    """Download/cache and initialize the embedding model before serving traffic."""
    model = _get_embedding_model()
    # Force model files and runtime session to be ready, not just configured.
    list(model.embed(["startup warmup"]))


def _get_embedding_model():
    global _EMBEDDING_MODEL
    if _EMBEDDING_MODEL is None:
        from fastembed import TextEmbedding

        EMBEDDING_CACHE_DIR.mkdir(parents=True, exist_ok=True)
        _EMBEDDING_MODEL = TextEmbedding(
            model_name=EMBEDDING_MODEL_NAME,
            cache_dir=str(EMBEDDING_CACHE_DIR),
        )
    return _EMBEDDING_MODEL


def _chunk_embedding_text(chunk: ResearchChunk) -> str:
    return f"{chunk.title}\n{chunk.text}".strip()


def _assign_ranks(chunks: list[ResearchChunk]) -> list[ResearchChunk]:
    for idx, chunk in enumerate(chunks, start=1):
        chunk.rank = idx
    return chunks


def _cosine(a, b) -> float:
    numerator = sum(float(x) * float(y) for x, y in zip(a, b))
    a_norm = math.sqrt(sum(float(x) * float(x) for x in a))
    b_norm = math.sqrt(sum(float(y) * float(y) for y in b))
    if a_norm == 0 or b_norm == 0:
        return 0.0
    return numerator / (a_norm * b_norm)