File size: 3,665 Bytes
c52f5bf
1716018
 
 
95a6caf
c52f5bf
95a6caf
1716018
95a6caf
1716018
 
 
95a6caf
1716018
 
95a6caf
1716018
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c52f5bf
1716018
 
c52f5bf
1716018
c52f5bf
1716018
 
 
 
 
 
c52f5bf
1716018
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
95a6caf
1716018
 
 
 
 
 
 
 
 
 
 
 
c52f5bf
 
 
 
1716018
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# modules/rag_retriever.py
import os
import json
import time
from pathlib import Path
from typing import List, Tuple, Dict, Any

import numpy as np

def _lazy_imports():
    from sentence_transformers import SentenceTransformer
    return SentenceTransformer

def _now() -> int:
    return int(time.time())

try:
    from modules.utils import ensure_dirs, data_dir
except Exception:
    def ensure_dirs() -> None:
        Path("/tmp/agent_studio").mkdir(parents=True, exist_ok=True)
    def data_dir() -> Path:
        ensure_dirs()
        return Path("/tmp/agent_studio")

def _chunks_path() -> Path:
    return data_dir() / "chunks.jsonl"

def _load_chunks() -> List[Dict[str, Any]]:
    p = _chunks_path()
    if not p.exists():
        return []
    rows: List[Dict[str, Any]] = []
    with open(p, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
                if isinstance(obj, dict) and obj.get("text"):
                    rows.append(obj)
            except Exception:
                continue
    return rows

def _emb_model_name() -> str:
    return os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2")

def _emb_cache_dir() -> Path:
    return data_dir() / "emb_cache"

def _emb_cache_paths() -> Tuple[Path, Path]:
    d = _emb_cache_dir()
    return d / "embeddings.npy", d / "meta.json"

def _load_or_build_embeddings(chunks: List[Dict[str, Any]]) -> Tuple[np.ndarray, List[int]]:
    ensure_dirs()
    _emb_cache_dir().mkdir(parents=True, exist_ok=True)
    npy_path, meta_path = _emb_cache_paths()

    if npy_path.exists() and meta_path.exists():
        try:
            with open(meta_path, "r", encoding="utf-8") as f:
                meta = json.load(f)
            if int(meta.get("n", -1)) == len(chunks) and meta.get("model") == _emb_model_name():
                emb = np.load(npy_path)
                if emb.shape[0] == len(chunks):
                    return emb, list(range(len(chunks)))
        except Exception:
            pass

    SentenceTransformer = _lazy_imports()
    model = SentenceTransformer(_emb_model_name())
    texts = [str(c.get("text", "")) for c in chunks]
    if not texts:
        return np.zeros((0, 384), dtype="float32"), []
    emb = model.encode(texts, normalize_embeddings=True, convert_to_numpy=True)
    np.save(npy_path, emb)
    with open(meta_path, "w", encoding="utf-8") as f:
        json.dump({"n": len(chunks), "model": _emb_model_name(), "ts": _now()}, f)
    return emb, list(range(len(chunks)))

def _cosine_topk(matrix: np.ndarray, query_vec: np.ndarray, top_k: int) -> List[int]:
    if matrix.size == 0:
        return []
    sims = matrix @ query_vec
    k = min(top_k, matrix.shape[0])
    part = np.argpartition(-sims, k - 1)[:k]
    part_sorted = part[np.argsort(-sims[part])]
    return part_sorted.tolist()

def retrieve_contexts(query: str, top_k: int = 5) -> List[str]:
    chunks = _load_chunks()
    if not chunks:
        return []

    SentenceTransformer = _lazy_imports()
    model = SentenceTransformer(_emb_model_name())

    emb_matrix, idx_map = _load_or_build_embeddings(chunks)
    if emb_matrix.size == 0:
        return []

    q_vec = model.encode([query], normalize_embeddings=True, convert_to_numpy=True)[0]
    top_idx = _cosine_topk(emb_matrix, q_vec, top_k)
    results: List[str] = []
    for i in top_idx:
        ch = chunks[idx_map[i]]
        txt = str(ch.get("text", "")).strip()
        src = ch.get("source")
        results.append(f"{txt}\n[source] {src}" if src else txt)
    return results