# modules/rag_retriever.py import os import json import time from pathlib import Path from typing import List, Tuple, Dict, Any import numpy as np def _lazy_imports(): from sentence_transformers import SentenceTransformer return SentenceTransformer def _now() -> int: return int(time.time()) try: from modules.utils import ensure_dirs, data_dir except Exception: def ensure_dirs() -> None: Path("/tmp/agent_studio").mkdir(parents=True, exist_ok=True) def data_dir() -> Path: ensure_dirs() return Path("/tmp/agent_studio") def _chunks_path() -> Path: return data_dir() / "chunks.jsonl" def _load_chunks() -> List[Dict[str, Any]]: p = _chunks_path() if not p.exists(): return [] rows: List[Dict[str, Any]] = [] with open(p, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) if isinstance(obj, dict) and obj.get("text"): rows.append(obj) except Exception: continue return rows def _emb_model_name() -> str: return os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2") def _emb_cache_dir() -> Path: return data_dir() / "emb_cache" def _emb_cache_paths() -> Tuple[Path, Path]: d = _emb_cache_dir() return d / "embeddings.npy", d / "meta.json" def _load_or_build_embeddings(chunks: List[Dict[str, Any]]) -> Tuple[np.ndarray, List[int]]: ensure_dirs() _emb_cache_dir().mkdir(parents=True, exist_ok=True) npy_path, meta_path = _emb_cache_paths() if npy_path.exists() and meta_path.exists(): try: with open(meta_path, "r", encoding="utf-8") as f: meta = json.load(f) if int(meta.get("n", -1)) == len(chunks) and meta.get("model") == _emb_model_name(): emb = np.load(npy_path) if emb.shape[0] == len(chunks): return emb, list(range(len(chunks))) except Exception: pass SentenceTransformer = _lazy_imports() model = SentenceTransformer(_emb_model_name()) texts = [str(c.get("text", "")) for c in chunks] if not texts: return np.zeros((0, 384), dtype="float32"), [] emb = model.encode(texts, normalize_embeddings=True, convert_to_numpy=True) np.save(npy_path, emb) with open(meta_path, "w", encoding="utf-8") as f: json.dump({"n": len(chunks), "model": _emb_model_name(), "ts": _now()}, f) return emb, list(range(len(chunks))) def _cosine_topk(matrix: np.ndarray, query_vec: np.ndarray, top_k: int) -> List[int]: if matrix.size == 0: return [] sims = matrix @ query_vec k = min(top_k, matrix.shape[0]) part = np.argpartition(-sims, k - 1)[:k] part_sorted = part[np.argsort(-sims[part])] return part_sorted.tolist() def retrieve_contexts(query: str, top_k: int = 5) -> List[str]: chunks = _load_chunks() if not chunks: return [] SentenceTransformer = _lazy_imports() model = SentenceTransformer(_emb_model_name()) emb_matrix, idx_map = _load_or_build_embeddings(chunks) if emb_matrix.size == 0: return [] q_vec = model.encode([query], normalize_embeddings=True, convert_to_numpy=True)[0] top_idx = _cosine_topk(emb_matrix, q_vec, top_k) results: List[str] = [] for i in top_idx: ch = chunks[idx_map[i]] txt = str(ch.get("text", "")).strip() src = ch.get("source") results.append(f"{txt}\n[source] {src}" if src else txt) return results