Spaces:
Running
Running
| """ | |
| ================================================================================ | |
| Phase 4 β Vector Database Storage (FAISS) β v3 | |
| University-Level RAG Pipeline | |
| ================================================================================ | |
| KEY CHANGES vs v2: | |
| β Embedding model unified as text-embedding-3-large (was mismatched). | |
| β API key loaded from .env / environment (not hardcoded). | |
| Requirements: | |
| pip install faiss-cpu numpy openai python-dotenv | |
| Input : rag_dataset_with_embeddings.json (Phase 3 output) | |
| Output : faiss_index/ | |
| βββ index.faiss | |
| βββ metadata.json | |
| Usage: | |
| python vector_store.py | |
| ================================================================================ | |
| """ | |
| import json | |
| import math | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Any | |
| import numpy as np | |
| import faiss | |
| from openai import OpenAI | |
| # ββ Configuration βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| INPUT_FILE = Path("rag_dataset_with_embeddings.json") | |
| INDEX_DIR = Path("faiss_index") | |
| INDEX_FILE = INDEX_DIR / "index.faiss" | |
| METADATA_FILE = INDEX_DIR / "metadata.json" | |
| # β Must match embedding_generator.py and app.py β single source of truth | |
| EMBEDDING_MODEL = "text-embedding-3-large" | |
| TOP_K = 5 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Load Dataset | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_dataset(path: Path) -> List[Dict[str, Any]]: | |
| if not path.exists(): | |
| raise FileNotFoundError(f"'{path}' not found. Run embedding_generator.py first.") | |
| with open(path, "r", encoding="utf-8") as fh: | |
| data = json.load(fh) | |
| if not data or "embedding" not in data[0]: | |
| raise ValueError("Records missing 'embedding' field. Run embedding_generator.py first.") | |
| print(f" β Loaded {len(data)} records from '{path}'.") | |
| return data | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Build FAISS Index | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_faiss_index(records: List[Dict[str, Any]]) -> Tuple[faiss.Index, List[Dict]]: | |
| embeddings = np.array([rec["embedding"] for rec in records], dtype=np.float32) | |
| n_vectors, dims = embeddings.shape | |
| print(f" β Embedding matrix: {n_vectors} Γ {dims}") | |
| # IndexFlatIP + IDMap for exact cosine similarity (L2-normalised vectors) | |
| index = faiss.IndexFlatIP(dims) | |
| index_with_ids = faiss.IndexIDMap(index) | |
| ids = np.arange(n_vectors, dtype=np.int64) | |
| index_with_ids.add_with_ids(embeddings, ids) | |
| print(f" β FAISS index built. Vectors stored: {index_with_ids.ntotal}") | |
| metadata = [ | |
| { | |
| "text": rec["text"], | |
| "source": rec["source"], | |
| "chunk_id": rec["chunk_id"], | |
| "language": rec["language"], | |
| "was_translated": rec.get("was_translated", False), | |
| "doc_type": rec.get("doc_type", "general"), | |
| "section_title": rec.get("section_title", ""), | |
| } | |
| for rec in records | |
| ] | |
| return index_with_ids, metadata | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Save & Load Index | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_index(index: faiss.Index, metadata: List[Dict], index_dir: Path) -> None: | |
| index_dir.mkdir(parents=True, exist_ok=True) | |
| faiss.write_index(index, str(INDEX_FILE)) | |
| print(f" β FAISS index saved β '{INDEX_FILE}'") | |
| with open(METADATA_FILE, "w", encoding="utf-8") as fh: | |
| json.dump(metadata, fh, ensure_ascii=False, indent=2) | |
| print(f" β Metadata saved β '{METADATA_FILE}'") | |
| idx_mb = INDEX_FILE.stat().st_size / (1024 * 1024) | |
| meta_mb = METADATA_FILE.stat().st_size / (1024 * 1024) | |
| print(f" β Index: {idx_mb:.2f} MB | Metadata: {meta_mb:.2f} MB") | |
| def load_index() -> Tuple[faiss.Index, List[Dict]]: | |
| if not INDEX_FILE.exists() or not METADATA_FILE.exists(): | |
| raise FileNotFoundError(f"Index not found in '{INDEX_DIR}/'. Run vector_store.py first.") | |
| index = faiss.read_index(str(INDEX_FILE)) | |
| with open(METADATA_FILE, "r", encoding="utf-8") as fh: | |
| metadata = json.load(fh) | |
| print(f" β Index loaded. Vectors: {index.ntotal}") | |
| return index, metadata | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # OpenAI Embedding Helper | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_client() -> OpenAI: | |
| import os | |
| return OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "")) | |
| def get_query_embedding(query: str, client: OpenAI) -> np.ndarray: | |
| """Encode query with same model used for documents.""" | |
| response = client.embeddings.create(model=EMBEDDING_MODEL, input=query) | |
| vec = response.data[0].embedding | |
| norm = math.sqrt(sum(x * x for x in vec)) | |
| if norm > 0: | |
| vec = [x / norm for x in vec] | |
| return np.array(vec, dtype=np.float32).reshape(1, -1) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Similarity Search | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def search(query: str, index: faiss.Index, metadata: List[Dict], client: OpenAI, top_k: int = TOP_K) -> List[Dict]: | |
| query_vec = get_query_embedding(query, client) | |
| scores, ids = index.search(query_vec, top_k) | |
| results = [] | |
| for rank, (score, doc_id) in enumerate(zip(scores[0], ids[0]), start=1): | |
| if doc_id == -1: | |
| continue | |
| results.append({"rank": rank, "score": round(float(score), 4), **metadata[doc_id]}) | |
| return results | |
| def print_results(query: str, results: List[Dict]) -> None: | |
| print(f"\n Query: \"{query}\"") | |
| print(f" {'β' * 64}") | |
| if not results: | |
| print(" No results found.") | |
| return | |
| for res in results: | |
| lang_tag = f"[{res.get('language','?')}]" | |
| print(f"\n #{res['rank']} Score: {res['score']:.4f} {lang_tag} " | |
| f"Source: {res['source']} Chunk: {res['chunk_id']}") | |
| print(f" {res['text'][:200].replace(chr(10),' ')}{'...' if len(res['text']) > 200 else ''}") | |
| print(f"\n {'β' * 64}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main() -> None: | |
| print("=" * 70) | |
| print(" Phase 4 β FAISS Vector Database v3") | |
| print(f" Model: {EMBEDDING_MODEL}") | |
| print("=" * 70) | |
| print("\n[STEP 1] Loading embedded dataset ...") | |
| records = load_dataset(INPUT_FILE) | |
| print("\n[STEP 2] Building FAISS index ...") | |
| index, metadata = build_faiss_index(records) | |
| print("\n[STEP 3] Saving index to disk ...") | |
| save_index(index, metadata, INDEX_DIR) | |
| print("\n[STEP 4] Reloading index (verification) ...") | |
| index, metadata = load_index() | |
| print("\n[STEP 5] Initialising OpenAI client ...") | |
| client = load_client() | |
| print("\n[STEP 6] Demo similarity searches ...") | |
| print("=" * 70) | |
| demo_queries = [ | |
| "What courses are offered this semester?", | |
| "Ω Ψ§ ΩΩ Ω ΩΨ§ΨΉΩΨ― Ψ§ΩΨ§Ω ΨͺΨΨ§ΩΨ§Ψͺ Ψ§ΩΩΩΨ§Ψ¦ΩΨ©Ψ", | |
| "graduation credit hour requirements", | |
| ] | |
| for query in demo_queries: | |
| results = search(query, index, metadata, client, top_k=TOP_K) | |
| print_results(query, results) | |
| print("\n" + "=" * 70) | |
| print(f" β Phase 4 Complete!") | |
| print(f" Vectors : {index.ntotal}") | |
| print(f" Dims : 3072 (text-embedding-3-large)") | |
| print(f" Saved : {INDEX_DIR}/") | |
| print("=" * 70) | |
| if __name__ == "__main__": | |
| main() | |