Spaces:
Sleeping
Sleeping
| # modules/rag_retriever.py | |
| import os | |
| import json | |
| import time | |
| from pathlib import Path | |
| from typing import List, Tuple, Dict, Any | |
| import numpy as np | |
| def _lazy_imports(): | |
| from sentence_transformers import SentenceTransformer | |
| return SentenceTransformer | |
| def _now() -> int: | |
| return int(time.time()) | |
| try: | |
| from modules.utils import ensure_dirs, data_dir | |
| except Exception: | |
| def ensure_dirs() -> None: | |
| Path("/tmp/agent_studio").mkdir(parents=True, exist_ok=True) | |
| def data_dir() -> Path: | |
| ensure_dirs() | |
| return Path("/tmp/agent_studio") | |
| def _chunks_path() -> Path: | |
| return data_dir() / "chunks.jsonl" | |
| def _load_chunks() -> List[Dict[str, Any]]: | |
| p = _chunks_path() | |
| if not p.exists(): | |
| return [] | |
| rows: List[Dict[str, Any]] = [] | |
| with open(p, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| obj = json.loads(line) | |
| if isinstance(obj, dict) and obj.get("text"): | |
| rows.append(obj) | |
| except Exception: | |
| continue | |
| return rows | |
| def _emb_model_name() -> str: | |
| return os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2") | |
| def _emb_cache_dir() -> Path: | |
| return data_dir() / "emb_cache" | |
| def _emb_cache_paths() -> Tuple[Path, Path]: | |
| d = _emb_cache_dir() | |
| return d / "embeddings.npy", d / "meta.json" | |
| def _load_or_build_embeddings(chunks: List[Dict[str, Any]]) -> Tuple[np.ndarray, List[int]]: | |
| ensure_dirs() | |
| _emb_cache_dir().mkdir(parents=True, exist_ok=True) | |
| npy_path, meta_path = _emb_cache_paths() | |
| if npy_path.exists() and meta_path.exists(): | |
| try: | |
| with open(meta_path, "r", encoding="utf-8") as f: | |
| meta = json.load(f) | |
| if int(meta.get("n", -1)) == len(chunks) and meta.get("model") == _emb_model_name(): | |
| emb = np.load(npy_path) | |
| if emb.shape[0] == len(chunks): | |
| return emb, list(range(len(chunks))) | |
| except Exception: | |
| pass | |
| SentenceTransformer = _lazy_imports() | |
| model = SentenceTransformer(_emb_model_name()) | |
| texts = [str(c.get("text", "")) for c in chunks] | |
| if not texts: | |
| return np.zeros((0, 384), dtype="float32"), [] | |
| emb = model.encode(texts, normalize_embeddings=True, convert_to_numpy=True) | |
| np.save(npy_path, emb) | |
| with open(meta_path, "w", encoding="utf-8") as f: | |
| json.dump({"n": len(chunks), "model": _emb_model_name(), "ts": _now()}, f) | |
| return emb, list(range(len(chunks))) | |
| def _cosine_topk(matrix: np.ndarray, query_vec: np.ndarray, top_k: int) -> List[int]: | |
| if matrix.size == 0: | |
| return [] | |
| sims = matrix @ query_vec | |
| k = min(top_k, matrix.shape[0]) | |
| part = np.argpartition(-sims, k - 1)[:k] | |
| part_sorted = part[np.argsort(-sims[part])] | |
| return part_sorted.tolist() | |
| def retrieve_contexts(query: str, top_k: int = 5) -> List[str]: | |
| chunks = _load_chunks() | |
| if not chunks: | |
| return [] | |
| SentenceTransformer = _lazy_imports() | |
| model = SentenceTransformer(_emb_model_name()) | |
| emb_matrix, idx_map = _load_or_build_embeddings(chunks) | |
| if emb_matrix.size == 0: | |
| return [] | |
| q_vec = model.encode([query], normalize_embeddings=True, convert_to_numpy=True)[0] | |
| top_idx = _cosine_topk(emb_matrix, q_vec, top_k) | |
| results: List[str] = [] | |
| for i in top_idx: | |
| ch = chunks[idx_map[i]] | |
| txt = str(ch.get("text", "")).strip() | |
| src = ch.get("source") | |
| results.append(f"{txt}\n[source] {src}" if src else txt) | |
| return results | |