Spaces:
Sleeping
Sleeping
File size: 3,665 Bytes
c52f5bf 1716018 95a6caf c52f5bf 95a6caf 1716018 95a6caf 1716018 95a6caf 1716018 95a6caf 1716018 c52f5bf 1716018 c52f5bf 1716018 c52f5bf 1716018 c52f5bf 1716018 95a6caf 1716018 c52f5bf 1716018 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# modules/rag_retriever.py
import os
import json
import time
from pathlib import Path
from typing import List, Tuple, Dict, Any
import numpy as np
def _lazy_imports():
from sentence_transformers import SentenceTransformer
return SentenceTransformer
def _now() -> int:
return int(time.time())
try:
from modules.utils import ensure_dirs, data_dir
except Exception:
def ensure_dirs() -> None:
Path("/tmp/agent_studio").mkdir(parents=True, exist_ok=True)
def data_dir() -> Path:
ensure_dirs()
return Path("/tmp/agent_studio")
def _chunks_path() -> Path:
return data_dir() / "chunks.jsonl"
def _load_chunks() -> List[Dict[str, Any]]:
p = _chunks_path()
if not p.exists():
return []
rows: List[Dict[str, Any]] = []
with open(p, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
if isinstance(obj, dict) and obj.get("text"):
rows.append(obj)
except Exception:
continue
return rows
def _emb_model_name() -> str:
return os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
def _emb_cache_dir() -> Path:
return data_dir() / "emb_cache"
def _emb_cache_paths() -> Tuple[Path, Path]:
d = _emb_cache_dir()
return d / "embeddings.npy", d / "meta.json"
def _load_or_build_embeddings(chunks: List[Dict[str, Any]]) -> Tuple[np.ndarray, List[int]]:
ensure_dirs()
_emb_cache_dir().mkdir(parents=True, exist_ok=True)
npy_path, meta_path = _emb_cache_paths()
if npy_path.exists() and meta_path.exists():
try:
with open(meta_path, "r", encoding="utf-8") as f:
meta = json.load(f)
if int(meta.get("n", -1)) == len(chunks) and meta.get("model") == _emb_model_name():
emb = np.load(npy_path)
if emb.shape[0] == len(chunks):
return emb, list(range(len(chunks)))
except Exception:
pass
SentenceTransformer = _lazy_imports()
model = SentenceTransformer(_emb_model_name())
texts = [str(c.get("text", "")) for c in chunks]
if not texts:
return np.zeros((0, 384), dtype="float32"), []
emb = model.encode(texts, normalize_embeddings=True, convert_to_numpy=True)
np.save(npy_path, emb)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump({"n": len(chunks), "model": _emb_model_name(), "ts": _now()}, f)
return emb, list(range(len(chunks)))
def _cosine_topk(matrix: np.ndarray, query_vec: np.ndarray, top_k: int) -> List[int]:
if matrix.size == 0:
return []
sims = matrix @ query_vec
k = min(top_k, matrix.shape[0])
part = np.argpartition(-sims, k - 1)[:k]
part_sorted = part[np.argsort(-sims[part])]
return part_sorted.tolist()
def retrieve_contexts(query: str, top_k: int = 5) -> List[str]:
chunks = _load_chunks()
if not chunks:
return []
SentenceTransformer = _lazy_imports()
model = SentenceTransformer(_emb_model_name())
emb_matrix, idx_map = _load_or_build_embeddings(chunks)
if emb_matrix.size == 0:
return []
q_vec = model.encode([query], normalize_embeddings=True, convert_to_numpy=True)[0]
top_idx = _cosine_topk(emb_matrix, q_vec, top_k)
results: List[str] = []
for i in top_idx:
ch = chunks[idx_map[i]]
txt = str(ch.get("text", "")).strip()
src = ch.get("source")
results.append(f"{txt}\n[source] {src}" if src else txt)
return results
|