rag-app / selfHosted_functions.py
bhavinmatariya's picture
Upload 13 files
3506c42 verified
import os
import json
import faiss
import numpy as np
from shared_utilities import chunk_text, validate_chunk_sizes, generate_embeddings_batch
import asyncio
import logging
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
# ---------- Config ----------
TOP_K = 5
PERSIST_DIR = "/persistent/faiss_store"
if not os.path.exists("/persistent"): # fallback if running locally
PERSIST_DIR = "./faiss_store"
os.makedirs(PERSIST_DIR, exist_ok=True)
INDEX_PATH = os.path.join(PERSIST_DIR, "store.index")
META_PATH = os.path.join(PERSIST_DIR, "store.json")
# OpenAI text-embedding-3-small dimension (used for FAISS)
OPENAI_EMBEDDING_DIM = 1536
def _normalize_embeddings(embeddings: np.ndarray) -> np.ndarray:
"""Normalize embeddings for cosine similarity (inner product in FAISS)."""
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
return (embeddings / np.maximum(norms, 1e-9)).astype("float32")
# ---------- FAISS Vector Store ----------
class FaissStore:
def __init__(self, dim):
self.index = faiss.IndexFlatIP(dim) # Inner product for cosine similarity (normalized vectors)
self.metadatas = []
def add(self, vectors: np.ndarray, metadatas):
self.index.add(vectors)
self.metadatas.extend(metadatas)
def search(self, q_vec: np.ndarray, k=TOP_K):
if self.index.ntotal == 0:
return []
D, I = self.index.search(q_vec, k)
results = []
for score, idx in zip(D[0], I[0]):
if idx < 0:
continue
results.append((float(score), self.metadatas[idx]))
return results
def save(self, index_path, meta_path):
faiss.write_index(self.index, index_path)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(self.metadatas, f, ensure_ascii=False, indent=2)
@classmethod
def load(cls, index_path, meta_path):
index = faiss.read_index(index_path)
with open(meta_path, "r", encoding="utf-8") as f:
metadatas = json.load(f)
store = cls(index.d)
store.index = index
store.metadatas = metadatas
return store
# ---------- Global store ----------
faiss_stores = {} # Dictionary to store multiple FAISS indices by method
VECTOR_DIM = None
def get_index_paths(method_suffix=""):
"""Get index paths based on method suffix"""
if method_suffix:
index_path = os.path.join(PERSIST_DIR, f"store{method_suffix}.index")
meta_path = os.path.join(PERSIST_DIR, f"store{method_suffix}.json")
else:
index_path = INDEX_PATH
meta_path = META_PATH
return index_path, meta_path
# ---------- RAG Builder ----------
async def create_rag_from_text_selfhosted(extracted_text: str, source_info: str, progress_callback=None, method_suffix=""):
global faiss_stores, VECTOR_DIM
# Get index paths based on method suffix
index_path, meta_path = get_index_paths(method_suffix)
if progress_callback:
await progress_callback("πŸ“„ Chunking text into segments...")
chunks = chunk_text(extracted_text)
chunks = validate_chunk_sizes(chunks, max_tokens=8000)
if progress_callback:
await progress_callback(f"πŸ”’ Creating {len(chunks)} embeddings (OpenAI)...")
# Create embeddings using OpenAI text-embedding-3-small
raw_embeddings = await generate_embeddings_batch(chunks, progress_callback)
embeddings = np.array(raw_embeddings, dtype=np.float32)
embeddings = _normalize_embeddings(embeddings)
VECTOR_DIM = embeddings.shape[1]
# Create or get store for this method
if method_suffix not in faiss_stores:
faiss_stores[method_suffix] = FaissStore(VECTOR_DIM)
faiss_store = faiss_stores[method_suffix]
metas = [{"id": i, "text": c, "source": source_info} for i, c in enumerate(chunks)]
faiss_store.add(embeddings, metas)
# Save to persistent storage
faiss_store.save(index_path, meta_path)
if progress_callback:
await progress_callback(f"βœ… Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}")
logger.info(f"Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}")
return {
"status": "success",
"message": f"βœ… Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}. Ready for queries.",
"vector_index": f"faiss{method_suffix}"
}
# ---------- RAG Search (Self-hosted) ----------
async def search_rag_documents_selfhosted(query: str, top_k: int = 5, method_suffix: str = "") -> list:
"""
Search for relevant document chunks using the locally stored FAISS index.
Returns a list of dicts with content, score, and metadata.
Args:
query (str): Search query
top_k (int): Number of results to return
method_suffix (str): Optional suffix to append to index name (e.g., "_hybrid", "_regular")
"""
try:
global faiss_stores, VECTOR_DIM
# Get index paths based on method suffix
index_path, meta_path = get_index_paths(method_suffix)
# Load FAISS index + metadata if not already loaded
if method_suffix not in faiss_stores or faiss_stores[method_suffix].index.ntotal == 0:
if os.path.exists(index_path) and os.path.exists(meta_path):
faiss_stores[method_suffix] = FaissStore.load(index_path, meta_path)
VECTOR_DIM = faiss_stores[method_suffix].index.d
logger.info(f"Loaded FAISS store with {faiss_stores[method_suffix].index.ntotal} vectors.")
else:
logger.warning(f"No FAISS store found for method '{method_suffix}'. Please create RAG index first.")
return []
faiss_store = faiss_stores[method_suffix]
# Generate query embedding using OpenAI
raw_query = await generate_embeddings_batch([query])
q_vec = np.array([raw_query[0]], dtype=np.float32)
q_vec = _normalize_embeddings(q_vec) # shape: (1, dim)
# Search for top_k similar chunks
results = faiss_store.search(q_vec, k=top_k)
formatted_results = []
for score, meta in results:
formatted_results.append({
"score": float(score),
"content": meta.get("text", ""),
"source": meta.get("source", "selfhosted"),
"chunk_index": meta.get("id", None),
"title": meta.get("title", None)
})
return formatted_results
except Exception as e:
logger.error(f"Error searching self-hosted RAG documents: {e}")
return []