Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import faiss | |
| import numpy as np | |
| from shared_utilities import chunk_text, validate_chunk_sizes, generate_embeddings_batch | |
| import asyncio | |
| import logging | |
| logging.basicConfig(level=logging.WARNING) | |
| logger = logging.getLogger(__name__) | |
| # ---------- Config ---------- | |
| TOP_K = 5 | |
| PERSIST_DIR = "/persistent/faiss_store" | |
| if not os.path.exists("/persistent"): # fallback if running locally | |
| PERSIST_DIR = "./faiss_store" | |
| os.makedirs(PERSIST_DIR, exist_ok=True) | |
| INDEX_PATH = os.path.join(PERSIST_DIR, "store.index") | |
| META_PATH = os.path.join(PERSIST_DIR, "store.json") | |
| # OpenAI text-embedding-3-small dimension (used for FAISS) | |
| OPENAI_EMBEDDING_DIM = 1536 | |
| def _normalize_embeddings(embeddings: np.ndarray) -> np.ndarray: | |
| """Normalize embeddings for cosine similarity (inner product in FAISS).""" | |
| norms = np.linalg.norm(embeddings, axis=1, keepdims=True) | |
| return (embeddings / np.maximum(norms, 1e-9)).astype("float32") | |
| # ---------- FAISS Vector Store ---------- | |
| class FaissStore: | |
| def __init__(self, dim): | |
| self.index = faiss.IndexFlatIP(dim) # Inner product for cosine similarity (normalized vectors) | |
| self.metadatas = [] | |
| def add(self, vectors: np.ndarray, metadatas): | |
| self.index.add(vectors) | |
| self.metadatas.extend(metadatas) | |
| def search(self, q_vec: np.ndarray, k=TOP_K): | |
| if self.index.ntotal == 0: | |
| return [] | |
| D, I = self.index.search(q_vec, k) | |
| results = [] | |
| for score, idx in zip(D[0], I[0]): | |
| if idx < 0: | |
| continue | |
| results.append((float(score), self.metadatas[idx])) | |
| return results | |
| def save(self, index_path, meta_path): | |
| faiss.write_index(self.index, index_path) | |
| with open(meta_path, "w", encoding="utf-8") as f: | |
| json.dump(self.metadatas, f, ensure_ascii=False, indent=2) | |
| def load(cls, index_path, meta_path): | |
| index = faiss.read_index(index_path) | |
| with open(meta_path, "r", encoding="utf-8") as f: | |
| metadatas = json.load(f) | |
| store = cls(index.d) | |
| store.index = index | |
| store.metadatas = metadatas | |
| return store | |
| # ---------- Global store ---------- | |
| faiss_stores = {} # Dictionary to store multiple FAISS indices by method | |
| VECTOR_DIM = None | |
| def get_index_paths(method_suffix=""): | |
| """Get index paths based on method suffix""" | |
| if method_suffix: | |
| index_path = os.path.join(PERSIST_DIR, f"store{method_suffix}.index") | |
| meta_path = os.path.join(PERSIST_DIR, f"store{method_suffix}.json") | |
| else: | |
| index_path = INDEX_PATH | |
| meta_path = META_PATH | |
| return index_path, meta_path | |
| # ---------- RAG Builder ---------- | |
| async def create_rag_from_text_selfhosted(extracted_text: str, source_info: str, progress_callback=None, method_suffix=""): | |
| global faiss_stores, VECTOR_DIM | |
| # Get index paths based on method suffix | |
| index_path, meta_path = get_index_paths(method_suffix) | |
| if progress_callback: | |
| await progress_callback("π Chunking text into segments...") | |
| chunks = chunk_text(extracted_text) | |
| chunks = validate_chunk_sizes(chunks, max_tokens=8000) | |
| if progress_callback: | |
| await progress_callback(f"π’ Creating {len(chunks)} embeddings (OpenAI)...") | |
| # Create embeddings using OpenAI text-embedding-3-small | |
| raw_embeddings = await generate_embeddings_batch(chunks, progress_callback) | |
| embeddings = np.array(raw_embeddings, dtype=np.float32) | |
| embeddings = _normalize_embeddings(embeddings) | |
| VECTOR_DIM = embeddings.shape[1] | |
| # Create or get store for this method | |
| if method_suffix not in faiss_stores: | |
| faiss_stores[method_suffix] = FaissStore(VECTOR_DIM) | |
| faiss_store = faiss_stores[method_suffix] | |
| metas = [{"id": i, "text": c, "source": source_info} for i, c in enumerate(chunks)] | |
| faiss_store.add(embeddings, metas) | |
| # Save to persistent storage | |
| faiss_store.save(index_path, meta_path) | |
| if progress_callback: | |
| await progress_callback(f"β Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}") | |
| logger.info(f"Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}") | |
| return { | |
| "status": "success", | |
| "message": f"β Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}. Ready for queries.", | |
| "vector_index": f"faiss{method_suffix}" | |
| } | |
| # ---------- RAG Search (Self-hosted) ---------- | |
| async def search_rag_documents_selfhosted(query: str, top_k: int = 5, method_suffix: str = "") -> list: | |
| """ | |
| Search for relevant document chunks using the locally stored FAISS index. | |
| Returns a list of dicts with content, score, and metadata. | |
| Args: | |
| query (str): Search query | |
| top_k (int): Number of results to return | |
| method_suffix (str): Optional suffix to append to index name (e.g., "_hybrid", "_regular") | |
| """ | |
| try: | |
| global faiss_stores, VECTOR_DIM | |
| # Get index paths based on method suffix | |
| index_path, meta_path = get_index_paths(method_suffix) | |
| # Load FAISS index + metadata if not already loaded | |
| if method_suffix not in faiss_stores or faiss_stores[method_suffix].index.ntotal == 0: | |
| if os.path.exists(index_path) and os.path.exists(meta_path): | |
| faiss_stores[method_suffix] = FaissStore.load(index_path, meta_path) | |
| VECTOR_DIM = faiss_stores[method_suffix].index.d | |
| logger.info(f"Loaded FAISS store with {faiss_stores[method_suffix].index.ntotal} vectors.") | |
| else: | |
| logger.warning(f"No FAISS store found for method '{method_suffix}'. Please create RAG index first.") | |
| return [] | |
| faiss_store = faiss_stores[method_suffix] | |
| # Generate query embedding using OpenAI | |
| raw_query = await generate_embeddings_batch([query]) | |
| q_vec = np.array([raw_query[0]], dtype=np.float32) | |
| q_vec = _normalize_embeddings(q_vec) # shape: (1, dim) | |
| # Search for top_k similar chunks | |
| results = faiss_store.search(q_vec, k=top_k) | |
| formatted_results = [] | |
| for score, meta in results: | |
| formatted_results.append({ | |
| "score": float(score), | |
| "content": meta.get("text", ""), | |
| "source": meta.get("source", "selfhosted"), | |
| "chunk_index": meta.get("id", None), | |
| "title": meta.get("title", None) | |
| }) | |
| return formatted_results | |
| except Exception as e: | |
| logger.error(f"Error searching self-hosted RAG documents: {e}") | |
| return [] |