hh786's picture
Deployment of Hierarchical RAG system
c54dcef
"""RAG retrieval pipelines: Base-RAG and Hier-RAG."""
import time
import logging
from typing import List, Dict, Any, Optional, Tuple
from core.index import VectorStore, IndexManager
from openai import OpenAI
import openai
import os
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
logger = logging.getLogger(__name__)
class BaseRAG:
"""Standard RAG pipeline without hierarchical filtering."""
def __init__(
self,
vector_store: VectorStore,
llm_model: str = "gpt-3.5-turbo",
api_key: Optional[str] = None
):
"""
Initialize Base RAG pipeline.
Args:
vector_store: Vector store instance
llm_model: OpenAI model name
api_key: OpenAI API key
"""
self.vector_store = vector_store
self.llm_model = llm_model
# Set OpenAI API key
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.client = OpenAI(api_key=self.api_key)
def retrieve(
self,
query: str,
n_results: int = 5
) -> Tuple[List[Dict[str, Any]], float]:
"""
Retrieve relevant documents.
Args:
query: Search query
n_results: Number of results to retrieve
Returns:
Tuple of (results, retrieval_time)
"""
start_time = time.time()
results = self.vector_store.search(query, n_results=n_results)
retrieval_time = time.time() - start_time
logger.info(f"Retrieved {len(results)} documents in {retrieval_time:.3f}s")
return results, retrieval_time
@retry(
retry=retry_if_exception_type((openai.RateLimitError, openai.APITimeoutError)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True
)
def generate(
self,
query: str,
contexts: List[str],
max_tokens: int = 500
) -> Tuple[str, float]:
"""
Generate answer using LLM with retry logic.
Args:
query: User query
contexts: Retrieved context documents
max_tokens: Maximum tokens in response
Returns:
Tuple of (answer, generation_time)
"""
# Build prompt
context_text = "\n\n".join([f"Context {i+1}:\n{ctx}" for i, ctx in enumerate(contexts)])
prompt = f"""Based on the following context documents, please answer the question.
{context_text}
Question: {query}
Answer:"""
start_time = time.time()
try:
response = self.client.chat.completions.create(
model=self.llm_model,
messages=[
{"role": "system", "content": "You are a helpful assistant that answers questions based on provided context."},
{"role": "user", "content": prompt}
],
max_tokens=max_tokens,
temperature=0.3
)
answer = response.choices[0].message.content
generation_time = time.time() - start_time
logger.info(f"Generated answer in {generation_time:.3f}s")
return answer, generation_time
except openai.AuthenticationError as e:
logger.error(f"Authentication failed: {str(e)}")
return "❌ **Authentication Error**: Invalid OpenAI API key. Please check your credentials in Settings → Secrets.", 0
except openai.RateLimitError as e:
logger.error(f"Rate limit exceeded: {str(e)}")
return "⚠️ **Rate Limit Exceeded**: Too many requests. Please wait a moment and try again.", 0
except openai.APITimeoutError as e:
logger.error(f"API timeout: {str(e)}")
return "⏱️ **Timeout Error**: Request took too long. Please try again with a shorter query.", 0
except openai.APIConnectionError as e:
logger.error(f"Connection error: {str(e)}")
return "🌐 **Connection Error**: Unable to reach OpenAI API. Please check your internet connection.", 0
except Exception as e:
logger.error(f"Unexpected error during generation: {str(e)}")
return f"❌ **Error**: {str(e)}", 0
def query(
self,
query: str,
n_results: int = 5,
max_tokens: int = 500
) -> Dict[str, Any]:
"""
Complete RAG pipeline: retrieve + generate.
Args:
query: User query
n_results: Number of documents to retrieve
max_tokens: Maximum tokens in response
Returns:
Dictionary with answer, contexts, and timing info
"""
# Retrieve
results, retrieval_time = self.retrieve(query, n_results)
# Extract contexts
contexts = [r["document"] for r in results]
# Generate
answer, generation_time = self.generate(query, contexts, max_tokens)
total_time = retrieval_time + generation_time
logger.info(f"Base-RAG query completed in {total_time:.3f}s (retrieval: {retrieval_time:.3f}s, generation: {generation_time:.3f}s)")
return {
"query": query,
"answer": answer,
"contexts": results,
"retrieval_time": retrieval_time,
"generation_time": generation_time,
"total_time": total_time,
"pipeline": "Base-RAG"
}
class HierarchicalRAG:
"""Hierarchical RAG pipeline with metadata filtering."""
def __init__(
self,
vector_store: VectorStore,
llm_model: str = "gpt-3.5-turbo",
api_key: Optional[str] = None
):
"""
Initialize Hierarchical RAG pipeline.
Args:
vector_store: Vector store instance
llm_model: OpenAI model name
api_key: OpenAI API key
"""
self.vector_store = vector_store
self.llm_model = llm_model
# Set OpenAI API key
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.client = OpenAI(api_key=self.api_key)
def infer_hierarchy_from_query(self, query: str) -> Dict[str, Optional[str]]:
"""
Infer hierarchical filters from query using simple keyword matching.
Args:
query: User query
Returns:
Dictionary with level1, level2, level3, doc_type filters
"""
query_lower = query.lower()
# This is a simple heuristic - in production, use an LLM classifier
filters = {
"level1": None,
"level2": None,
"level3": None,
"doc_type": None
}
# Simple keyword-based inference (can be enhanced with LLM)
# Hospital domain keywords
if any(kw in query_lower for kw in ["patient", "clinical", "medical", "treatment", "admission", "hospital", "nurse", "doctor"]):
filters["level1"] = "Clinical Care"
elif any(kw in query_lower for kw in ["policy", "compliance", "administrative", "staff"]):
filters["level1"] = "Administrative"
elif any(kw in query_lower for kw in ["infection", "safety", "quality", "incident", "error"]):
filters["level1"] = "Quality & Safety"
elif any(kw in query_lower for kw in ["training", "education", "course", "certification"]):
filters["level1"] = "Education & Training"
# Bank domain keywords
elif any(kw in query_lower for kw in ["account", "loan", "banking", "retail", "customer", "deposit"]):
filters["level1"] = "Retail Banking"
elif any(kw in query_lower for kw in ["risk", "credit", "fraud", "default"]):
filters["level1"] = "Risk Management"
elif any(kw in query_lower for kw in ["compliance", "kyc", "aml", "regulatory", "legal"]):
filters["level1"] = "Compliance & Legal"
elif any(kw in query_lower for kw in ["corporate", "business", "commercial", "treasury"]):
filters["level1"] = "Corporate Banking"
# Fluid simulation keywords
elif any(kw in query_lower for kw in ["turbulence", "flow", "simulation", "cfd", "solver", "algorithm"]):
filters["level1"] = "Physical Models"
elif any(kw in query_lower for kw in ["mesh", "grid", "discretization", "numerical", "finite"]):
filters["level1"] = "Numerical Methods"
elif any(kw in query_lower for kw in ["validation", "verification", "benchmark", "accuracy"]):
filters["level1"] = "Validation & Verification"
elif any(kw in query_lower for kw in ["software", "tool", "platform", "parallel", "computing"]):
filters["level1"] = "Software & Tools"
# Doc type inference
if any(kw in query_lower for kw in ["policy", "policies"]):
filters["doc_type"] = "policy"
elif any(kw in query_lower for kw in ["manual", "guide", "handbook"]):
filters["doc_type"] = "manual"
elif any(kw in query_lower for kw in ["report", "analysis", "findings"]):
filters["doc_type"] = "report"
elif any(kw in query_lower for kw in ["protocol", "procedure", "standard"]):
filters["doc_type"] = "protocol"
elif any(kw in query_lower for kw in ["paper", "research", "study"]):
filters["doc_type"] = "paper"
logger.info(f"Inferred filters: {filters}")
return filters
def retrieve(
self,
query: str,
n_results: int = 5,
level1: Optional[str] = None,
level2: Optional[str] = None,
level3: Optional[str] = None,
doc_type: Optional[str] = None,
auto_infer: bool = True
) -> Tuple[List[Dict[str, Any]], float, Dict[str, Optional[str]]]:
"""
Retrieve relevant documents with hierarchical filtering.
Args:
query: Search query
n_results: Number of results to retrieve
level1: Domain filter
level2: Section filter
level3: Topic filter
doc_type: Document type filter
auto_infer: Whether to auto-infer filters from query
Returns:
Tuple of (results, retrieval_time, applied_filters)
"""
# Auto-infer filters if enabled and no explicit filters provided
if auto_infer and not any([level1, level2, level3, doc_type]):
inferred = self.infer_hierarchy_from_query(query)
level1 = level1 or inferred["level1"]
level2 = level2 or inferred["level2"]
level3 = level3 or inferred["level3"]
doc_type = doc_type or inferred["doc_type"]
applied_filters = {
"level1": level1,
"level2": level2,
"level3": level3,
"doc_type": doc_type
}
start_time = time.time()
results = self.vector_store.search_with_hierarchy(
query=query,
n_results=n_results,
level1=level1,
level2=level2,
level3=level3,
doc_type=doc_type
)
retrieval_time = time.time() - start_time
logger.info(f"Retrieved {len(results)} documents with filters in {retrieval_time:.3f}s. Filters: {applied_filters}")
return results, retrieval_time, applied_filters
@retry(
retry=retry_if_exception_type((openai.RateLimitError, openai.APITimeoutError)),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True
)
def generate(
self,
query: str,
contexts: List[str],
max_tokens: int = 500
) -> Tuple[str, float]:
"""
Generate answer using LLM with retry logic.
Args:
query: User query
contexts: Retrieved context documents
max_tokens: Maximum tokens in response
Returns:
Tuple of (answer, generation_time)
"""
# Build prompt
context_text = "\n\n".join([f"Context {i+1}:\n{ctx}" for i, ctx in enumerate(contexts)])
prompt = f"""Based on the following context documents, please answer the question.
{context_text}
Question: {query}
Answer:"""
start_time = time.time()
try:
response = self.client.chat.completions.create(
model=self.llm_model,
messages=[
{"role": "system", "content": "You are a helpful assistant that answers questions based on provided context."},
{"role": "user", "content": prompt}
],
max_tokens=max_tokens,
temperature=0.3
)
answer = response.choices[0].message.content
generation_time = time.time() - start_time
logger.info(f"Generated answer in {generation_time:.3f}s")
return answer, generation_time
except openai.AuthenticationError as e:
logger.error(f"Authentication failed: {str(e)}")
return "❌ **Authentication Error**: Invalid OpenAI API key. Please check your credentials.", 0
except openai.RateLimitError as e:
logger.error(f"Rate limit exceeded: {str(e)}")
return "⚠️ **Rate Limit Exceeded**: Too many requests. Please wait a moment and try again.", 0
except openai.APITimeoutError as e:
logger.error(f"API timeout: {str(e)}")
return "⏱️ **Timeout Error**: Request took too long. Please try again.", 0
except openai.APIConnectionError as e:
logger.error(f"Connection error: {str(e)}")
return "🌐 **Connection Error**: Unable to reach OpenAI API. Check your connection.", 0
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return f"❌ **Error**: {str(e)}", 0
def query(
self,
query: str,
n_results: int = 5,
max_tokens: int = 500,
level1: Optional[str] = None,
level2: Optional[str] = None,
level3: Optional[str] = None,
doc_type: Optional[str] = None,
auto_infer: bool = True
) -> Dict[str, Any]:
"""
Complete Hierarchical RAG pipeline: filter + retrieve + generate.
Args:
query: User query
n_results: Number of documents to retrieve
max_tokens: Maximum tokens in response
level1: Domain filter
level2: Section filter
level3: Topic filter
doc_type: Document type filter
auto_infer: Whether to auto-infer filters from query
Returns:
Dictionary with answer, contexts, filters, and timing info
"""
# Retrieve with hierarchy
results, retrieval_time, applied_filters = self.retrieve(
query=query,
n_results=n_results,
level1=level1,
level2=level2,
level3=level3,
doc_type=doc_type,
auto_infer=auto_infer
)
# Extract contexts
contexts = [r["document"] for r in results]
# Generate
answer, generation_time = self.generate(query, contexts, max_tokens)
total_time = retrieval_time + generation_time
logger.info(f"Hier-RAG query completed in {total_time:.3f}s (retrieval: {retrieval_time:.3f}s, generation: {generation_time:.3f}s)")
return {
"query": query,
"answer": answer,
"contexts": results,
"applied_filters": applied_filters,
"retrieval_time": retrieval_time,
"generation_time": generation_time,
"total_time": total_time,
"pipeline": "Hier-RAG"
}
class RAGComparator:
"""Compare Base-RAG and Hier-RAG side-by-side."""
def __init__(
self,
vector_store: VectorStore,
llm_model: str = "gpt-3.5-turbo",
api_key: Optional[str] = None
):
"""
Initialize RAG comparator.
Args:
vector_store: Vector store instance
llm_model: OpenAI model name
api_key: OpenAI API key
"""
self.base_rag = BaseRAG(vector_store, llm_model, api_key)
self.hier_rag = HierarchicalRAG(vector_store, llm_model, api_key)
def compare(
self,
query: str,
n_results: int = 5,
max_tokens: int = 500,
level1: Optional[str] = None,
level2: Optional[str] = None,
level3: Optional[str] = None,
doc_type: Optional[str] = None,
auto_infer: bool = True
) -> Dict[str, Any]:
"""
Run both pipelines and compare results.
Args:
query: User query
n_results: Number of documents to retrieve
max_tokens: Maximum tokens in response
level1: Domain filter (Hier-RAG only)
level2: Section filter (Hier-RAG only)
level3: Topic filter (Hier-RAG only)
doc_type: Document type filter (Hier-RAG only)
auto_infer: Whether to auto-infer filters (Hier-RAG only)
Returns:
Dictionary with results from both pipelines
"""
logger.info(f"Comparing pipelines for query: {query}")
# Run Base-RAG
base_results = self.base_rag.query(query, n_results, max_tokens)
# Run Hier-RAG
hier_results = self.hier_rag.query(
query=query,
n_results=n_results,
max_tokens=max_tokens,
level1=level1,
level2=level2,
level3=level3,
doc_type=doc_type,
auto_infer=auto_infer
)
# Calculate speedup
speedup = base_results["total_time"] / hier_results["total_time"] if hier_results["total_time"] > 0 else 0
logger.info(f"Comparison complete. Speedup: {speedup:.2f}x")
return {
"query": query,
"base_rag": base_results,
"hier_rag": hier_results,
"speedup": speedup
}