Spaces:
Sleeping
Sleeping
File size: 6,804 Bytes
3506c42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | import os
import json
import faiss
import numpy as np
from shared_utilities import chunk_text, validate_chunk_sizes, generate_embeddings_batch
import asyncio
import logging
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
# ---------- Config ----------
TOP_K = 5
PERSIST_DIR = "/persistent/faiss_store"
if not os.path.exists("/persistent"): # fallback if running locally
PERSIST_DIR = "./faiss_store"
os.makedirs(PERSIST_DIR, exist_ok=True)
INDEX_PATH = os.path.join(PERSIST_DIR, "store.index")
META_PATH = os.path.join(PERSIST_DIR, "store.json")
# OpenAI text-embedding-3-small dimension (used for FAISS)
OPENAI_EMBEDDING_DIM = 1536
def _normalize_embeddings(embeddings: np.ndarray) -> np.ndarray:
"""Normalize embeddings for cosine similarity (inner product in FAISS)."""
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
return (embeddings / np.maximum(norms, 1e-9)).astype("float32")
# ---------- FAISS Vector Store ----------
class FaissStore:
def __init__(self, dim):
self.index = faiss.IndexFlatIP(dim) # Inner product for cosine similarity (normalized vectors)
self.metadatas = []
def add(self, vectors: np.ndarray, metadatas):
self.index.add(vectors)
self.metadatas.extend(metadatas)
def search(self, q_vec: np.ndarray, k=TOP_K):
if self.index.ntotal == 0:
return []
D, I = self.index.search(q_vec, k)
results = []
for score, idx in zip(D[0], I[0]):
if idx < 0:
continue
results.append((float(score), self.metadatas[idx]))
return results
def save(self, index_path, meta_path):
faiss.write_index(self.index, index_path)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(self.metadatas, f, ensure_ascii=False, indent=2)
@classmethod
def load(cls, index_path, meta_path):
index = faiss.read_index(index_path)
with open(meta_path, "r", encoding="utf-8") as f:
metadatas = json.load(f)
store = cls(index.d)
store.index = index
store.metadatas = metadatas
return store
# ---------- Global store ----------
faiss_stores = {} # Dictionary to store multiple FAISS indices by method
VECTOR_DIM = None
def get_index_paths(method_suffix=""):
"""Get index paths based on method suffix"""
if method_suffix:
index_path = os.path.join(PERSIST_DIR, f"store{method_suffix}.index")
meta_path = os.path.join(PERSIST_DIR, f"store{method_suffix}.json")
else:
index_path = INDEX_PATH
meta_path = META_PATH
return index_path, meta_path
# ---------- RAG Builder ----------
async def create_rag_from_text_selfhosted(extracted_text: str, source_info: str, progress_callback=None, method_suffix=""):
global faiss_stores, VECTOR_DIM
# Get index paths based on method suffix
index_path, meta_path = get_index_paths(method_suffix)
if progress_callback:
await progress_callback("📄 Chunking text into segments...")
chunks = chunk_text(extracted_text)
chunks = validate_chunk_sizes(chunks, max_tokens=8000)
if progress_callback:
await progress_callback(f"🔢 Creating {len(chunks)} embeddings (OpenAI)...")
# Create embeddings using OpenAI text-embedding-3-small
raw_embeddings = await generate_embeddings_batch(chunks, progress_callback)
embeddings = np.array(raw_embeddings, dtype=np.float32)
embeddings = _normalize_embeddings(embeddings)
VECTOR_DIM = embeddings.shape[1]
# Create or get store for this method
if method_suffix not in faiss_stores:
faiss_stores[method_suffix] = FaissStore(VECTOR_DIM)
faiss_store = faiss_stores[method_suffix]
metas = [{"id": i, "text": c, "source": source_info} for i, c in enumerate(chunks)]
faiss_store.add(embeddings, metas)
# Save to persistent storage
faiss_store.save(index_path, meta_path)
if progress_callback:
await progress_callback(f"✅ Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}")
logger.info(f"Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}")
return {
"status": "success",
"message": f"✅ Indexed {len(chunks)} chunks and saved to {PERSIST_DIR}. Ready for queries.",
"vector_index": f"faiss{method_suffix}"
}
# ---------- RAG Search (Self-hosted) ----------
async def search_rag_documents_selfhosted(query: str, top_k: int = 5, method_suffix: str = "") -> list:
"""
Search for relevant document chunks using the locally stored FAISS index.
Returns a list of dicts with content, score, and metadata.
Args:
query (str): Search query
top_k (int): Number of results to return
method_suffix (str): Optional suffix to append to index name (e.g., "_hybrid", "_regular")
"""
try:
global faiss_stores, VECTOR_DIM
# Get index paths based on method suffix
index_path, meta_path = get_index_paths(method_suffix)
# Load FAISS index + metadata if not already loaded
if method_suffix not in faiss_stores or faiss_stores[method_suffix].index.ntotal == 0:
if os.path.exists(index_path) and os.path.exists(meta_path):
faiss_stores[method_suffix] = FaissStore.load(index_path, meta_path)
VECTOR_DIM = faiss_stores[method_suffix].index.d
logger.info(f"Loaded FAISS store with {faiss_stores[method_suffix].index.ntotal} vectors.")
else:
logger.warning(f"No FAISS store found for method '{method_suffix}'. Please create RAG index first.")
return []
faiss_store = faiss_stores[method_suffix]
# Generate query embedding using OpenAI
raw_query = await generate_embeddings_batch([query])
q_vec = np.array([raw_query[0]], dtype=np.float32)
q_vec = _normalize_embeddings(q_vec) # shape: (1, dim)
# Search for top_k similar chunks
results = faiss_store.search(q_vec, k=top_k)
formatted_results = []
for score, meta in results:
formatted_results.append({
"score": float(score),
"content": meta.get("text", ""),
"source": meta.get("source", "selfhosted"),
"chunk_index": meta.get("id", None),
"title": meta.get("title", None)
})
return formatted_results
except Exception as e:
logger.error(f"Error searching self-hosted RAG documents: {e}")
return [] |