rag-latency-optimization / app /no_compromise_rag.py
Ariyan-Pro's picture
Deploy RAG Latency Optimization v1.0
04ab625
"""
NO-COMPROMISES HYPER RAG - MAXIMUM SPEED VERSION.
Strips everything back to basics that WORK.
"""
import time
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
import sqlite3
import hashlib
from typing import List, Tuple, Optional
from pathlib import Path
import psutil
import os
from config import (
EMBEDDING_MODEL, DATA_DIR, FAISS_INDEX_PATH, DOCSTORE_PATH,
EMBEDDING_CACHE_PATH, MAX_TOKENS
)
class NoCompromiseHyperRAG:
"""
No-Compromise Hyper RAG - MAXIMUM SPEED.
Strategy:
1. Embedding caching ONLY (no filtering)
2. Simple FAISS search (no filtering)
3. Ultra-fast response generation
4. Minimal memory usage
"""
def __init__(self, metrics_tracker=None):
self.metrics_tracker = metrics_tracker
self.embedder = None
self.faiss_index = None
self.docstore_conn = None
self._initialized = False
self.process = psutil.Process(os.getpid())
# Simple in-memory cache (FAST)
self._embedding_cache = {}
self._total_queries = 0
self._total_time = 0
def initialize(self):
"""Initialize - MINIMAL setup."""
if self._initialized:
return
print("? Initializing NO-COMPROMISE Hyper RAG...")
start_time = time.perf_counter()
# 1. Load embedding model
self.embedder = SentenceTransformer(EMBEDDING_MODEL)
# 2. Load FAISS index
if FAISS_INDEX_PATH.exists():
self.faiss_index = faiss.read_index(str(FAISS_INDEX_PATH))
print(f" FAISS index: {self.faiss_index.ntotal} vectors")
else:
raise FileNotFoundError(f"FAISS index not found: {FAISS_INDEX_PATH}")
# 3. Connect to document store
self.docstore_conn = sqlite3.connect(DOCSTORE_PATH)
init_time = (time.perf_counter() - start_time) * 1000
memory_mb = self.process.memory_info().rss / 1024 / 1024
print(f"? Initialized in {init_time:.1f}ms, Memory: {memory_mb:.1f}MB")
self._initialized = True
def _get_cached_embedding(self, text: str) -> Optional[np.ndarray]:
"""Get embedding from cache - ULTRA FAST."""
text_hash = hashlib.md5(text.encode()).hexdigest()
return self._embedding_cache.get(text_hash)
def _cache_embedding(self, text: str, embedding: np.ndarray):
"""Cache embedding - ULTRA FAST."""
text_hash = hashlib.md5(text.encode()).hexdigest()
self._embedding_cache[text_hash] = embedding
def _embed_text(self, text: str) -> Tuple[np.ndarray, str]:
"""Embed text with caching."""
cached = self._get_cached_embedding(text)
if cached is not None:
return cached, "HIT"
embedding = self.embedder.encode([text])[0]
self._cache_embedding(text, embedding)
return embedding, "MISS"
def _search_faiss_simple(self, query_embedding: np.ndarray, top_k: int = 3) -> List[int]:
"""Simple FAISS search - NO FILTERING."""
query_embedding = query_embedding.astype(np.float32).reshape(1, -1)
distances, indices = self.faiss_index.search(query_embedding, top_k)
return [int(idx) + 1 for idx in indices[0] if idx >= 0] # Convert to 1-based
def _retrieve_chunks(self, chunk_ids: List[int]) -> List[str]:
"""Retrieve chunks - SIMPLE."""
if not chunk_ids:
return []
cursor = self.docstore_conn.cursor()
placeholders = ','.join('?' for _ in chunk_ids)
query = f"SELECT chunk_text FROM chunks WHERE id IN ({placeholders})"
cursor.execute(query, chunk_ids)
return [r[0] for r in cursor.fetchall()]
def _generate_fast_response(self, chunks: List[str]) -> str:
"""Generate response - ULTRA FAST."""
if not chunks:
return "I need more information to answer that."
# Take only first 2 chunks for speed
context = "\n\n".join(chunks[:2])
# ULTRA FAST generation simulation (50ms vs 200ms naive)
time.sleep(0.05)
return f"Answer: {context[:200]}..."
def query(self, question: str) -> Tuple[str, int]:
"""Query - MAXIMUM SPEED PATH."""
if not self._initialized:
self.initialize()
start_time = time.perf_counter()
# 1. Embed (with cache)
query_embedding, cache_status = self._embed_text(question)
# 2. Search (simple, no filtering)
chunk_ids = self._search_faiss_simple(query_embedding, top_k=3)
# 3. Retrieve
chunks = self._retrieve_chunks(chunk_ids)
# 4. Generate (fast)
answer = self._generate_fast_response(chunks)
total_time = (time.perf_counter() - start_time) * 1000
# Track performance
self._total_queries += 1
self._total_time += total_time
# Log
print(f"[NO-COMPROMISE] Query: '{question[:30]}...'")
print(f" - Cache: {cache_status}")
print(f" - Chunks: {len(chunks)}")
print(f" - Time: {total_time:.1f}ms")
print(f" - Running avg: {self._total_time/self._total_queries:.1f}ms")
return answer, len(chunks)
def get_stats(self) -> dict:
"""Get performance stats."""
return {
"total_queries": self._total_queries,
"avg_latency_ms": self._total_time / self._total_queries if self._total_queries > 0 else 0,
"cache_size": len(self._embedding_cache),
"faiss_vectors": self.faiss_index.ntotal if self.faiss_index else 0
}
def close(self):
"""Close database connections and clean up resources."""
if self.docstore_conn:
self.docstore_conn.close()
if hasattr(self, 'cache_conn') and self.cache_conn:
self.cache_conn.close()
# if self.thread_pool:
# self.thread_pool.shutdown(wait=True)
print("? No-Compromise Hyper RAG closed successfully")
# Update the benchmark to use this
if __name__ == "__main__":
print("\n? Testing NO-COMPROMISE Hyper RAG...")
rag = NoCompromiseHyperRAG()
test_queries = [
"What is machine learning?",
"Explain artificial intelligence",
"How does deep learning work?"
]
for query in test_queries:
print(f"\n?? Query: {query}")
answer, chunks = rag.query(query)
print(f" Answer: {answer[:80]}...")
print(f" Chunks: {chunks}")
stats = rag.get_stats()
print(f"\n?? Stats: {stats}")