RAGFintech / src /enhanced_rag_system.py
JanviMl's picture
Update src/enhanced_rag_system.py
293b891 verified
import os
import tempfile
# Fix HuggingFace cache directory issue for HuggingFace Spaces
# Set cache directories to writable temporary directories
os.environ['TRANSFORMERS_CACHE'] = tempfile.mkdtemp()
os.environ['HF_HOME'] = tempfile.mkdtemp()
os.environ['SENTENCE_TRANSFORMERS_HOME'] = tempfile.mkdtemp()
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from typing import List, Tuple, Dict, Optional
from langchain.schema import Document
import re
import json
import warnings
warnings.filterwarnings('ignore')
# Import vector store components with better error handling
try:
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
VECTOR_STORE_AVAILABLE = True
print("βœ… ChromaDB and SentenceTransformers imported successfully")
except ImportError as e:
VECTOR_STORE_AVAILABLE = False
print(f"⚠️ Vector store import error: {e}")
except Exception as e:
VECTOR_STORE_AVAILABLE = False
print(f"⚠️ Vector store initialization error: {e}")
# Import LLM components
try:
import openai
LLM_AVAILABLE = bool(os.getenv("OPENAI_API_KEY"))
if LLM_AVAILABLE:
openai.api_key = os.getenv("OPENAI_API_KEY")
print("βœ… OpenAI API key found and configured")
else:
print("⚠️ OpenAI API key not found in environment")
except ImportError:
LLM_AVAILABLE = False
print("⚠️ OpenAI library not available")
# Import our custom modules
from document_processor import DocumentProcessor
from auth_system import AuthSystem
class EnhancedRAGSystem:
"""Complete RAG system with Vector Store, LLM, and RBAC enforcement"""
def __init__(self):
self.document_processor = DocumentProcessor()
self.auth_system = AuthSystem()
self.documents = []
self.initialized = False
self.query_feedback = {}
# Vector Store Components
self.chroma_client = None
self.collection = None
self.embedding_model = None
self.vector_store_initialized = False
# LLM Components
self.llm_client = None
self.llm_model = "gpt-3.5-turbo"
self.llm_initialized = False
# Intent classification keywords
self.intent_keywords = {
"finance": ["revenue", "profit", "cost", "budget", "financial", "expense", "income", "cash", "margin", "roi", "sales"],
"marketing": ["campaign", "customer", "acquisition", "brand", "marketing", "advertising", "engagement", "conversion", "retention"],
"hr": ["employee", "hr", "policy", "leave", "benefits", "salary", "attendance", "performance", "training", "recruitment"],
"engineering": ["architecture", "technology", "system", "development", "technical", "infrastructure", "deployment", "security", "api"],
"general": ["company", "about", "overview", "mission", "values", "policy", "contact", "help"]
}
def initialize_system(self):
"""Initialize the complete RAG system with all components"""
try:
print("πŸš€ Initializing Complete RAG System...")
# Initialize Vector Store (ChromaDB)
self._initialize_vector_store()
# Initialize LLM
self._initialize_llm()
# Load documents
self.documents = self.document_processor.get_all_documents()
# Load documents into vector store if available
if self.vector_store_initialized:
self._load_documents_to_vector_store()
self.initialized = True
# Print initialization status
self._print_initialization_status()
except Exception as e:
print(f"❌ Error initializing RAG system: {str(e)}")
# Graceful fallback to template-based system
self.initialized = True
print("⚠️ Using fallback mode with template responses")
def _initialize_vector_store(self):
"""Initialize ChromaDB vector store with better error handling"""
if not VECTOR_STORE_AVAILABLE:
print("⚠️ ChromaDB/SentenceTransformers not available, using in-memory search")
return
try:
print("πŸ”§ Initializing ChromaDB...")
# Create a writable directory for ChromaDB
chroma_dir = tempfile.mkdtemp(prefix="chroma_")
print(f"πŸ“ Using ChromaDB directory: {chroma_dir}")
# Try different ChromaDB configurations for HuggingFace compatibility
try:
# First try: PersistentClient (newer API)
self.chroma_client = chromadb.PersistentClient(path=chroma_dir)
print("βœ… Using ChromaDB PersistentClient")
except Exception as e1:
try:
# Second try: Client with settings (older API)
self.chroma_client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory=chroma_dir
))
print("βœ… Using ChromaDB Client with Settings")
except Exception as e2:
# Third try: Simple client
self.chroma_client = chromadb.Client()
print("βœ… Using ChromaDB in-memory client")
# Get or create collection
collection_name = "finsolve_documents"
try:
self.collection = self.chroma_client.get_collection(collection_name)
print(f"βœ… Loaded existing ChromaDB collection: {collection_name}")
except:
self.collection = self.chroma_client.create_collection(
name=collection_name,
metadata={"description": "FinSolve documents with RBAC"}
)
print(f"βœ… Created new ChromaDB collection: {collection_name}")
# Initialize embedding model with smaller model for HuggingFace
try:
# Set cache directory for sentence transformers
cache_dir = tempfile.mkdtemp(prefix="sentence_transformers_")
self.embedding_model = SentenceTransformer(
"all-MiniLM-L6-v2",
cache_folder=cache_dir
)
print("βœ… Loaded sentence transformer model: all-MiniLM-L6-v2")
except Exception as e:
# Fallback to even smaller model
try:
cache_dir = tempfile.mkdtemp(prefix="sentence_transformers_fallback_")
self.embedding_model = SentenceTransformer(
"paraphrase-MiniLM-L3-v2",
cache_folder=cache_dir
)
print("βœ… Loaded fallback sentence transformer model: paraphrase-MiniLM-L3-v2")
except Exception as e2:
print(f"❌ Failed to load embedding model: {e2}")
raise e2
self.vector_store_initialized = True
except Exception as e:
print(f"⚠️ ChromaDB initialization failed: {str(e)}")
print("⚠️ Falling back to in-memory search")
self.vector_store_initialized = False
def _initialize_llm(self):
"""Initialize OpenAI LLM"""
if not LLM_AVAILABLE:
print("⚠️ OpenAI API key not found, using template responses")
return
try:
# Test OpenAI connection with updated API
response = openai.ChatCompletion.create(
model=self.llm_model,
messages=[{"role": "user", "content": "Hello"}],
max_tokens=10
)
self.llm_client = openai
self.llm_initialized = True
print(f"βœ… OpenAI LLM initialized: {self.llm_model}")
except Exception as e:
print(f"⚠️ OpenAI initialization failed: {str(e)}")
print("⚠️ Using template-based responses")
def _load_documents_to_vector_store(self):
"""Load documents into ChromaDB vector store"""
if not self.vector_store_initialized or not self.embedding_model:
return
try:
# Check if documents already loaded
if self.collection.count() > 0:
print(f"βœ… ChromaDB already contains {self.collection.count()} documents")
return
print("πŸ“„ Loading documents into vector store...")
texts = []
metadatas = []
ids = []
for i, doc in enumerate(self.documents):
doc_id = f"doc_{i}_{hash(doc.page_content) % 10000}"
metadata = {
"content_type": doc.metadata.get("content_type", "general"),
"title": doc.metadata.get("title", "Document"),
"department": doc.metadata.get("department", "General"),
"type": doc.metadata.get("type", "Document"),
"chunk_id": str(doc.metadata.get("chunk_id", 0)),
"source": doc.metadata.get("source", "unknown")
}
texts.append(doc.page_content)
metadatas.append(metadata)
ids.append(doc_id)
# Generate embeddings in batches to avoid memory issues
batch_size = 10
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i+batch_size]
batch_metadatas = metadatas[i:i+batch_size]
batch_ids = ids[i:i+batch_size]
# Generate embeddings
embeddings = self.embedding_model.encode(batch_texts).tolist()
# Add to ChromaDB
self.collection.add(
embeddings=embeddings,
documents=batch_texts,
metadatas=batch_metadatas,
ids=batch_ids
)
print(f"βœ… Loaded {len(self.documents)} documents into ChromaDB")
except Exception as e:
print(f"⚠️ Error loading documents to vector store: {str(e)}")
def _print_initialization_status(self):
"""Print comprehensive initialization status"""
print("\n" + "="*50)
print("πŸ€– FINSOLVE RAG SYSTEM STATUS")
print("="*50)
print(f"βœ… Python: Core system initialized")
print(f"{'βœ…' if self.vector_store_initialized else '⚠️'} ChromaDB Vector Store: {'Ready' if self.vector_store_initialized else 'Fallback mode'}")
print(f"{'βœ…' if self.llm_initialized else '⚠️'} OpenAI LLM: {'OpenAI GPT' if self.llm_initialized else 'Template mode'}")
print(f"βœ… Streamlit: UI active")
print(f"πŸ”„ FastAPI: {'Real FastAPI' if self._check_fastapi_running() else 'Simulated API'}")
print(f"βœ… Authentication: JWT-style RBAC")
print(f"βœ… NLP: Intent classification + {'LLM' if self.llm_initialized else 'Templates'}")
print(f"βœ… RAG: Vector retrieval + context augmentation")
print(f"πŸ“Š Documents loaded: {len(self.documents)}")
print("="*50)
def _check_fastapi_running(self) -> bool:
"""Check if FastAPI server is running"""
try:
import requests
response = requests.get("http://localhost:8000/health", timeout=2)
return response.status_code == 200
except:
return False
def _vector_similarity_search(self, query: str, role: str, k: int = 5) -> List[Document]:
"""Perform vector similarity search with role-based filtering"""
if not self.vector_store_initialized:
return self._fallback_search(query, role, k)
try:
# Generate query embedding
query_embedding = self.embedding_model.encode([query]).tolist()[0]
# Build role-based filter
where_clause = self._build_role_filter(role)
# Perform vector search
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=k,
where=where_clause,
include=["documents", "metadatas", "distances"]
)
# Convert to Document objects
documents = []
if results['documents'] and results['documents'][0]:
for i, (doc, metadata) in enumerate(zip(results['documents'][0], results['metadatas'][0])):
distance = results['distances'][0][i] if results['distances'] else 0
metadata['similarity_score'] = 1 - distance
documents.append(Document(
page_content=doc,
metadata=metadata
))
return documents
except Exception as e:
print(f"❌ Vector search error: {str(e)}")
return self._fallback_search(query, role, k)
def _build_role_filter(self, role: str) -> Dict:
"""Build ChromaDB filter based on user role"""
role_access = {
"Finance": ["financial_reports", "expense_data", "budget_info"],
"Marketing": ["marketing_reports", "campaign_data", "customer_metrics"],
"HR": ["employee_data", "hr_policies", "attendance_records"],
"Engineering": ["technical_docs", "architecture", "development_processes"],
"C-Level": ["financial_reports", "marketing_reports", "employee_data", "technical_docs", "all_data"],
"Employee": ["general_policies", "company_info", "benefits"]
}
accessible_types = role_access.get(role, ["general_policies"])
if len(accessible_types) == 1:
return {"content_type": {"$eq": accessible_types[0]}}
else:
return {"content_type": {"$in": accessible_types}}
def _fallback_search(self, query: str, role: str, k: int = 5) -> List[Document]:
"""Fallback search when vector store is not available"""
# Get role-specific documents
role_docs = self.document_processor.get_documents_for_role(role)
# Simple keyword matching
query_terms = query.lower().split()
scored_docs = []
for doc in role_docs:
content_lower = doc.page_content.lower()
score = 0
for term in query_terms:
score += content_lower.count(term)
if query.lower() in content_lower:
score += 10
if score > 0:
scored_docs.append((doc, score))
scored_docs.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, score in scored_docs[:k]]
def _classify_query_intent(self, query: str) -> str:
"""Classify query intent using keyword matching"""
query_lower = query.lower()
intent_scores = {}
for intent, keywords in self.intent_keywords.items():
score = sum(1 for keyword in keywords if keyword in query_lower)
if score > 0:
intent_scores[intent] = score
if intent_scores:
return max(intent_scores, key=intent_scores.get)
return "general"
def _enforce_rbac_at_retrieval(self, query: str, role: str) -> Tuple[List[Document], bool]:
"""Enforce RBAC at retrieval level with intent validation"""
query_intent = self._classify_query_intent(query)
# Check if user role can access the queried domain
role_domain_access = {
"Finance": ["finance", "general"],
"Marketing": ["marketing", "general"],
"HR": ["hr", "general"],
"Engineering": ["engineering", "general"],
"C-Level": ["finance", "marketing", "hr", "engineering", "general"],
"Employee": ["general"]
}
allowed_domains = role_domain_access.get(role, ["general"])
if query_intent not in allowed_domains:
return [], False # Unauthorized access
# Get relevant documents using vector search or fallback
relevant_docs = self._vector_similarity_search(query, role)
return relevant_docs, True
async def _generate_llm_response(self, query: str, context: str, user_role: str, query_intent: str) -> str:
"""Generate response using OpenAI LLM"""
if not self.llm_initialized:
return self._generate_template_response(query, [], user_role, query_intent)
try:
system_prompt = f"""You are an AI assistant for FinSolve Technologies, a leading FinTech company.
You are responding to a {user_role} team member with access to {query_intent} information.
Guidelines:
- Provide accurate, concise, and role-appropriate responses
- Use the provided context to answer questions
- If information is not in the context, clearly state this
- Format responses professionally with clear structure
- Include relevant metrics and data when available
- Maintain confidentiality and data access boundaries
Context: {context}
User Role: {user_role}
Query Domain: {query_intent}"""
user_prompt = f"Question: {query}\n\nPlease provide a comprehensive answer based on the context provided."
response = self.llm_client.ChatCompletion.create(
model=self.llm_model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
max_tokens=1000,
temperature=0.7,
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"❌ LLM error: {str(e)}")
return self._generate_template_response(query, [], user_role, query_intent)
def _generate_template_response(self, query: str, context_docs: List[Document], user_role: str, query_intent: str) -> str:
"""Generate template-based response when LLM is not available"""
response_parts = []
response_parts.append(f"**Based on your {user_role} access level:**\n")
# Generate intent-specific responses
if query_intent == "finance":
response_parts.extend(self._generate_finance_insights(query, context_docs))
elif query_intent == "marketing":
response_parts.extend(self._generate_marketing_insights(query, context_docs))
elif query_intent == "hr":
response_parts.extend(self._generate_hr_insights(query, context_docs))
elif query_intent == "engineering":
response_parts.extend(self._generate_technical_insights(query, context_docs))
else:
response_parts.extend(self._generate_general_insights(query, context_docs))
return "\n".join(response_parts)
def _generate_finance_insights(self, query: str, context_docs: List[Document]) -> List[str]:
"""Generate finance-specific insights"""
insights = ["πŸ’° **Financial Insights:**", ""]
# Extract content for analysis
content = " ".join([doc.page_content for doc in context_docs])
if "revenue" in query.lower() or "2.6 billion" in content:
insights.extend([
"πŸ“ˆ **Revenue Performance:**",
"β€’ Q4 2024: $2.6 billion (35% YoY growth)",
"β€’ Annual 2024: $9.4 billion (28% YoY increase)",
"β€’ Strong growth trajectory maintained throughout the year",
""
])
if "margin" in query.lower() or "profit" in query.lower():
insights.extend([
"πŸ“Š **Profitability Metrics:**",
"β€’ Gross Margin: 64% (improved from 58% in Q1)",
"β€’ Net Income: $325M (18% YoY increase)",
"β€’ Operating Income: $650M",
""
])
if "cost" in query.lower() or "expense" in query.lower():
insights.extend([
"πŸ’Έ **Cost Analysis:**",
"β€’ Vendor Services: $30M (18% increase)",
"β€’ Software Subscriptions: $25M (22% increase)",
"β€’ Marketing Investment: $2.3B with strong ROI",
""
])
insights.append("🎯 **Key Takeaway:** Strong revenue growth with improving margins despite increased operational costs.")
return insights
def _generate_marketing_insights(self, query: str, context_docs: List[Document]) -> List[str]:
"""Generate marketing-specific insights"""
insights = ["πŸ“ˆ **Marketing Insights:**", ""]
insights.extend([
"🎯 **Campaign Performance:**",
"β€’ Customer Acquisition: 20% increase year-over-year",
"β€’ Digital Campaign ROI: 3.5x return on $5M investment",
"β€’ Q4 Results: 220,000 new customers (exceeded target)",
"",
"πŸ’° **ROI Analysis:**",
"β€’ Overall Marketing ROI: 4.5x",
"β€’ Digital Channels: 3.5x return",
"β€’ Event Marketing: 5.0x return",
"β€’ Email Marketing: 2.0x return",
"",
"πŸš€ **Key Takeaway:** Successful global expansion with strong ROI across all marketing channels."
])
return insights
def _generate_hr_insights(self, query: str, context_docs: List[Document]) -> List[str]:
"""Generate HR-specific insights"""
insights = ["πŸ‘₯ **HR Insights:**", ""]
if "benefits" in query.lower():
insights.extend([
"πŸ₯ **Employee Benefits:**",
"β€’ Health Insurance: Family floater policy",
"β€’ Provident Fund: 12% employer contribution",
"β€’ Maternity Leave: 26 weeks paid leave",
"β€’ Flexible Work: Up to 2 days/week WFH",
""
])
if "leave" in query.lower():
insights.extend([
"πŸ“… **Leave Policies:**",
"β€’ Annual Leave: 15-21 days/year",
"β€’ Sick Leave: 12 days/year",
"β€’ Casual Leave: 7 days/year",
"β€’ Emergency Leave: Available with manager approval",
""
])
insights.append("πŸ’‘ **Key Takeaway:** Comprehensive benefits package with competitive compensation and flexible work arrangements.")
return insights
def _generate_technical_insights(self, query: str, context_docs: List[Document]) -> List[str]:
"""Generate technical/engineering insights"""
insights = ["πŸ”§ **Technical Insights:**", ""]
if "architecture" in query.lower():
insights.extend([
"πŸ—οΈ **System Architecture:**",
"β€’ Microservices-based, cloud-native design",
"β€’ AWS infrastructure with Kubernetes orchestration",
"β€’ PostgreSQL, MongoDB, Redis for data storage",
"β€’ 99.99% uptime target with auto-scaling",
""
])
if "technology" in query.lower():
insights.extend([
"πŸ’» **Technology Stack:**",
"β€’ Frontend: React 18, TypeScript, Tailwind CSS",
"β€’ Backend: Node.js, Python, Go",
"β€’ Mobile: Swift (iOS), Kotlin (Android)",
"β€’ Infrastructure: AWS, Kubernetes, Docker",
""
])
insights.append("⚑ **Key Takeaway:** Modern, scalable architecture with strong security and compliance standards.")
return insights
def _generate_general_insights(self, query: str, context_docs: List[Document]) -> List[str]:
"""Generate general company insights"""
insights = ["🏒 **Company Information:**", ""]
insights.extend([
"πŸ“‹ **About FinSolve Technologies:**",
"β€’ Founded: 2018",
"β€’ Headquarters: Bangalore, India",
"β€’ Global presence: North America, Europe, Asia-Pacific",
"β€’ Services: Digital banking, payments, wealth management",
"",
"🎯 **Mission & Values:**",
"β€’ Mission: Empower financial freedom through technology",
"β€’ Core Values: Integrity, Innovation, Customer Focus",
"β€’ Commitment: Secure, scalable financial solutions",
])
return insights
def _generate_unauthorized_response(self, query: str, user_role: str, query_intent: str) -> str:
"""Generate graceful unauthorized access message"""
intent_role_map = {
"finance": "Finance and Executive",
"marketing": "Marketing and Executive",
"hr": "HR and Executive",
"engineering": "Engineering and Executive"
}
required_roles = intent_role_map.get(query_intent, "appropriate")
return f"""πŸ›‘οΈ **Access Restricted**
This information is restricted to **{required_roles}** roles only.
Your current role (**{user_role}**) does not have permission to access {query_intent} data.
**Available to you:**
{chr(10).join(['β€’ ' + doc.replace('_', ' ').title() for doc in self.auth_system.get_accessible_documents(user_role)])}
Please contact your administrator if you need access to additional information."""
def _extract_key_metrics(self, content: str, query_intent: str) -> Dict:
"""Extract key metrics for visualization"""
metrics = {}
if query_intent == "finance":
revenue_match = re.search(r'revenue[:\s]*\$?([\d.,]+)\s*(billion|million)', content.lower())
if revenue_match:
amount = revenue_match.group(1).replace(',', '')
unit = revenue_match.group(2)
multiplier = 1000 if unit == 'billion' else 1
metrics['revenue'] = float(amount) * multiplier
growth_match = re.search(r'(\d+)%\s*(yoy|growth)', content.lower())
if growth_match:
metrics['growth_rate'] = int(growth_match.group(1))
elif query_intent == "marketing":
acq_match = re.search(r'(\d+,?\d*)\s*new customers', content.lower())
if acq_match:
metrics['customer_acquisition'] = int(acq_match.group(1).replace(',', ''))
roi_match = re.search(r'(\d+\.?\d*)x\s*r[oe]i', content.lower())
if roi_match:
metrics['roi'] = float(roi_match.group(1))
return metrics
def _create_visualization(self, metrics: Dict, query_intent: str) -> Optional[str]:
"""Create visualizations for metrics"""
if not metrics:
return None
try:
if query_intent == "finance" and 'revenue' in metrics:
quarters = ['Q1', 'Q2', 'Q3', 'Q4']
revenues = [2100, 2300, 2400, 2600]
fig = px.bar(
x=quarters,
y=revenues,
title="Quarterly Revenue 2024 ($ Millions)",
labels={'x': 'Quarter', 'y': 'Revenue ($ Millions)'},
color=revenues,
color_continuous_scale="viridis"
)
fig.update_layout(height=400, showlegend=False)
return fig.to_html(include_plotlyjs='cdn', div_id="revenue_chart")
elif query_intent == "marketing" and 'customer_acquisition' in metrics:
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun']
acquisitions = [18000, 22000, 25000, 28000, 32000, 35000]
fig = px.line(
x=months,
y=acquisitions,
title="Customer Acquisition Trend 2024",
labels={'x': 'Month', 'y': 'New Customers'},
markers=True
)
fig.update_layout(height=400, showlegend=False)
return fig.to_html(include_plotlyjs='cdn', div_id="acquisition_chart")
return None
except Exception as e:
print(f"❌ Error creating visualization: {str(e)}")
return None
def _create_data_table(self, content: str, query_intent: str) -> Optional[str]:
"""Create data tables from content"""
try:
if query_intent == "finance":
data = {
'Metric': ['Q4 Revenue', 'Annual Revenue', 'Net Income', 'Gross Margin', 'ROI'],
'Value': ['$2.6B', '$9.4B', '$325M', '64%', '15%'],
'YoY Growth': ['+35%', '+28%', '+18%', '+6%', '+3%']
}
df = pd.DataFrame(data)
return df.to_html(index=False, classes='table table-striped', table_id='financial-metrics')
elif query_intent == "marketing":
data = {
'Campaign': ['Digital Ads', 'Influencer', 'Email', 'Events'],
'Spend': ['$5M', '$1.5M', '$0.2M', '$2M'],
'ROI': ['3.5x', '4.2x', '2.0x', '5.0x'],
'Leads': ['180K', '60K', '25K', '300']
}
df = pd.DataFrame(data)
return df.to_html(index=False, classes='table table-striped', table_id='marketing-metrics')
return None
except Exception as e:
print(f"❌ Error creating table: {str(e)}")
return None
def store_feedback(self, query: str, response: str, rating: int, role: str):
"""Store user feedback for system improvement"""
feedback_id = len(self.query_feedback)
self.query_feedback[feedback_id] = {
'query': query,
'response': response,
'rating': rating,
'role': role,
'timestamp': pd.Timestamp.now(),
'intent': self._classify_query_intent(query)
}
def query(self, query: str, user_role: str) -> Tuple[str, List[str], Optional[str], Optional[str]]:
"""Enhanced query method with complete RAG pipeline"""
try:
if not self.initialized:
return "System not initialized. Please try again.", [], None, None
# Enforce RBAC at retrieval level
relevant_docs, authorized = self._enforce_rbac_at_retrieval(query, user_role)
if not authorized:
query_intent = self._classify_query_intent(query)
unauthorized_msg = self._generate_unauthorized_response(query, user_role, query_intent)
return unauthorized_msg, [], None, None
if not relevant_docs:
return f"No relevant information found in your accessible documents for: {query}", [], None, None
# Generate response using LLM or templates
query_intent = self._classify_query_intent(query)
if self.llm_initialized:
# Prepare context for LLM
context = "\n\n".join([doc.page_content for doc in relevant_docs])
import asyncio
try:
# Try to get event loop, create one if it doesn't exist
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
response = loop.run_until_complete(
self._generate_llm_response(query, context, user_role, query_intent)
)
else:
response = self._generate_template_response(query, relevant_docs, user_role, query_intent)
# Extract sources
sources = []
for doc in relevant_docs:
source = doc.metadata.get('title', 'Company Documents')
if source not in sources:
sources.append(source)
# Generate visualizations and tables
context_content = " ".join([doc.page_content for doc in relevant_docs])
metrics = self._extract_key_metrics(context_content, query_intent)
visualization = self._create_visualization(metrics, query_intent)
table = self._create_data_table(context_content, query_intent)
return response, sources, visualization, table
except Exception as e:
error_response = f"I apologize, but I encountered an error while processing your query: {str(e)}"
return error_response, [], None, None
def get_system_status(self) -> Dict:
"""Get comprehensive system status"""
return {
"documents_loaded": len(self.documents),
"system_initialized": self.initialized,
"vector_store_available": self.vector_store_initialized,
"llm_available": self.llm_initialized,
"feedback_entries": len(self.query_feedback),
"tech_stack": {
"python": "βœ… Active",
"streamlit": "βœ… Active",
"vector_store": "βœ… ChromaDB" if self.vector_store_initialized else "⚠️ Fallback",
"llm": f"βœ… {self.llm_model}" if self.llm_initialized else "⚠️ Templates",
"fastapi": "βœ… Real FastAPI" if self._check_fastapi_running() else "πŸ”„ Simulated",
"authentication": "βœ… JWT-style RBAC"
}
}
def get_available_documents_for_role(self, role: str) -> List[Dict]:
"""Get list of documents available for a specific role"""
accessible_docs = self.auth_system.get_accessible_documents(role)
doc_info = self.document_processor.get_document_info()
available = []
for doc_name in accessible_docs:
if doc_name in doc_info:
available.append({
"content_type": doc_name,
**doc_info[doc_name]
})
return available