import os import json import faiss import numpy as np from shared_utilities import chunk_text, validate_chunk_sizes, generate_embeddings_batch import asyncio import logging logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(__name__) # ---------- Config ---------- TOP_K = 5 PERSIST_DIR = "/persistent/faiss_store" if not os.path.exists("/persistent"): # fallback if running locally PERSIST_DIR = "./faiss_store" os.makedirs(PERSIST_DIR, exist_ok=True) INDEX_PATH = os.path.join(PERSIST_DIR, "store.index") META_PATH = os.path.join(PERSIST_DIR, "store.json") # OpenAI text-embedding-3-small dimension (used for FAISS) OPENAI_EMBEDDING_DIM = 1536 def _normalize_embeddings(embeddings: np.ndarray) -> np.ndarray: """Normalize embeddings for cosine similarity (inner product in FAISS).""" norms = np.linalg.norm(embeddings, axis=1, keepdims=True) return (embeddings / np.maximum(norms, 1e-9)).astype("float32") # ---------- FAISS Vector Store ---------- class FaissStore: def __init__(self, dim): self.index = faiss.IndexFlatIP(dim) # Inner product for cosine similarity (normalized vectors) self.metadatas = [] def add(self, vectors: np.ndarray, metadatas): self.index.add(vectors) self.metadatas.extend(metadatas) def search(self, q_vec: np.ndarray, k=TOP_K): if self.index.ntotal == 0: return [] D, I = self.index.search(q_vec, k) results = [] for score, idx in zip(D[0], I[0]): if idx < 0: continue results.append((float(score), self.metadatas[idx])) return results def save(self, index_path, meta_path): faiss.write_index(self.index, index_path) with open(meta_path, "w", encoding="utf-8") as f: json.dump(self.metadatas, f, ensure_ascii=False, indent=2) @classmethod def load(cls, index_path, meta_path): index = faiss.read_index(index_path) with open(meta_path, "r", encoding="utf-8") as f: metadatas = json.load(f) store = cls(index.d) store.index = index store.metadatas = metadatas return store # ---------- Global store ---------- faiss_stores = {} # Dictionary to store multiple FAISS indices by method VECTOR_DIM = None def get_index_paths(method_suffix=""): """Get index paths based on method suffix""" if method_suffix: index_path = os.path.join(PERSIST_DIR, f"store{method_suffix}.index") meta_path = os.path.join(PERSIST_DIR, f"store{method_suffix}.json") else: index_path = INDEX_PATH meta_path = META_PATH return index_path, meta_path # ---------- RAG Builder ---------- async def create_rag_from_text_selfhosted(extracted_text: str, source_info: str, progress_callback=None, method_suffix=""): global faiss_stores, VECTOR_DIM # Get index paths based on method suffix index_path, meta_path = get_index_paths(method_suffix) if progress_callback: await progress_callback("📄 Chunking text into segments...") chunks = chunk_text(extracted_text) chunks = validate_chunk_sizes(chunks, max_tokens=8000) if progress_callback: await progress_callback(f"🔢 Creating {len(chunks)} embeddings (OpenAI)...") # Create embeddings using OpenAI text-embedding-3-small raw_embeddings = await generate_embeddings_batch(chunks, progress_callback) embeddings = np.array(raw_embeddings, dtype=np.float32) embeddings = _normalize_embeddings(embeddings) VECTOR_DIM = embeddings.shape[1] # Create or get store for this method if method_suffix not in faiss_stores: faiss_stores[method_suffix] = FaissStore(VECTOR_DIM) faiss_store = faiss_stores[method_suffix] metas = [{"id": i, "text": c, "source": source_info} for i, c in enumerate(chunks)] faiss_store.add(embeddings, metas) # Save to persistent storage faiss_store.save(index_path, meta_path) if progress_callback: await progress_callback(f"✅ Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}") logger.info(f"Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}") return { "status": "success", "message": f"✅ Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}. Ready for queries.", "vector_index": f"faiss{method_suffix}" } # ---------- RAG Search (Self-hosted) ---------- async def search_rag_documents_selfhosted(query: str, top_k: int = 5, method_suffix: str = "") -> list: """ Search for relevant document chunks using the locally stored FAISS index. Returns a list of dicts with content, score, and metadata. Args: query (str): Search query top_k (int): Number of results to return method_suffix (str): Optional suffix to append to index name (e.g., "_hybrid", "_regular") """ try: global faiss_stores, VECTOR_DIM # Get index paths based on method suffix index_path, meta_path = get_index_paths(method_suffix) # Load FAISS index + metadata if not already loaded if method_suffix not in faiss_stores or faiss_stores[method_suffix].index.ntotal == 0: if os.path.exists(index_path) and os.path.exists(meta_path): faiss_stores[method_suffix] = FaissStore.load(index_path, meta_path) VECTOR_DIM = faiss_stores[method_suffix].index.d logger.info(f"Loaded FAISS store with {faiss_stores[method_suffix].index.ntotal} vectors.") else: logger.warning(f"No FAISS store found for method '{method_suffix}'. Please create RAG index first.") return [] faiss_store = faiss_stores[method_suffix] # Generate query embedding using OpenAI raw_query = await generate_embeddings_batch([query]) q_vec = np.array([raw_query[0]], dtype=np.float32) q_vec = _normalize_embeddings(q_vec) # shape: (1, dim) # Search for top_k similar chunks results = faiss_store.search(q_vec, k=top_k) formatted_results = [] for score, meta in results: formatted_results.append({ "score": float(score), "content": meta.get("text", ""), "source": meta.get("source", "selfhosted"), "chunk_index": meta.get("id", None), "title": meta.get("title", None) }) return formatted_results except Exception as e: logger.error(f"Error searching self-hosted RAG documents: {e}") return []