Spaces:
Paused
Paused
| import os | |
| import tempfile | |
| # Fix HuggingFace cache directory issue for HuggingFace Spaces | |
| # Set cache directories to writable temporary directories | |
| os.environ['TRANSFORMERS_CACHE'] = tempfile.mkdtemp() | |
| os.environ['HF_HOME'] = tempfile.mkdtemp() | |
| os.environ['SENTENCE_TRANSFORMERS_HOME'] = tempfile.mkdtemp() | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from typing import List, Tuple, Dict, Optional | |
| from langchain.schema import Document | |
| import re | |
| import json | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Import vector store components with better error handling | |
| try: | |
| import chromadb | |
| from chromadb.config import Settings | |
| from sentence_transformers import SentenceTransformer | |
| VECTOR_STORE_AVAILABLE = True | |
| print("β ChromaDB and SentenceTransformers imported successfully") | |
| except ImportError as e: | |
| VECTOR_STORE_AVAILABLE = False | |
| print(f"β οΈ Vector store import error: {e}") | |
| except Exception as e: | |
| VECTOR_STORE_AVAILABLE = False | |
| print(f"β οΈ Vector store initialization error: {e}") | |
| # Import LLM components | |
| try: | |
| import openai | |
| LLM_AVAILABLE = bool(os.getenv("OPENAI_API_KEY")) | |
| if LLM_AVAILABLE: | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| print("β OpenAI API key found and configured") | |
| else: | |
| print("β οΈ OpenAI API key not found in environment") | |
| except ImportError: | |
| LLM_AVAILABLE = False | |
| print("β οΈ OpenAI library not available") | |
| # Import our custom modules | |
| from document_processor import DocumentProcessor | |
| from auth_system import AuthSystem | |
| class EnhancedRAGSystem: | |
| """Complete RAG system with Vector Store, LLM, and RBAC enforcement""" | |
| def __init__(self): | |
| self.document_processor = DocumentProcessor() | |
| self.auth_system = AuthSystem() | |
| self.documents = [] | |
| self.initialized = False | |
| self.query_feedback = {} | |
| # Vector Store Components | |
| self.chroma_client = None | |
| self.collection = None | |
| self.embedding_model = None | |
| self.vector_store_initialized = False | |
| # LLM Components | |
| self.llm_client = None | |
| self.llm_model = "gpt-3.5-turbo" | |
| self.llm_initialized = False | |
| # Intent classification keywords | |
| self.intent_keywords = { | |
| "finance": ["revenue", "profit", "cost", "budget", "financial", "expense", "income", "cash", "margin", "roi", "sales"], | |
| "marketing": ["campaign", "customer", "acquisition", "brand", "marketing", "advertising", "engagement", "conversion", "retention"], | |
| "hr": ["employee", "hr", "policy", "leave", "benefits", "salary", "attendance", "performance", "training", "recruitment"], | |
| "engineering": ["architecture", "technology", "system", "development", "technical", "infrastructure", "deployment", "security", "api"], | |
| "general": ["company", "about", "overview", "mission", "values", "policy", "contact", "help"] | |
| } | |
| def initialize_system(self): | |
| """Initialize the complete RAG system with all components""" | |
| try: | |
| print("π Initializing Complete RAG System...") | |
| # Initialize Vector Store (ChromaDB) | |
| self._initialize_vector_store() | |
| # Initialize LLM | |
| self._initialize_llm() | |
| # Load documents | |
| self.documents = self.document_processor.get_all_documents() | |
| # Load documents into vector store if available | |
| if self.vector_store_initialized: | |
| self._load_documents_to_vector_store() | |
| self.initialized = True | |
| # Print initialization status | |
| self._print_initialization_status() | |
| except Exception as e: | |
| print(f"β Error initializing RAG system: {str(e)}") | |
| # Graceful fallback to template-based system | |
| self.initialized = True | |
| print("β οΈ Using fallback mode with template responses") | |
| def _initialize_vector_store(self): | |
| """Initialize ChromaDB vector store with better error handling""" | |
| if not VECTOR_STORE_AVAILABLE: | |
| print("β οΈ ChromaDB/SentenceTransformers not available, using in-memory search") | |
| return | |
| try: | |
| print("π§ Initializing ChromaDB...") | |
| # Create a writable directory for ChromaDB | |
| chroma_dir = tempfile.mkdtemp(prefix="chroma_") | |
| print(f"π Using ChromaDB directory: {chroma_dir}") | |
| # Try different ChromaDB configurations for HuggingFace compatibility | |
| try: | |
| # First try: PersistentClient (newer API) | |
| self.chroma_client = chromadb.PersistentClient(path=chroma_dir) | |
| print("β Using ChromaDB PersistentClient") | |
| except Exception as e1: | |
| try: | |
| # Second try: Client with settings (older API) | |
| self.chroma_client = chromadb.Client(Settings( | |
| chroma_db_impl="duckdb+parquet", | |
| persist_directory=chroma_dir | |
| )) | |
| print("β Using ChromaDB Client with Settings") | |
| except Exception as e2: | |
| # Third try: Simple client | |
| self.chroma_client = chromadb.Client() | |
| print("β Using ChromaDB in-memory client") | |
| # Get or create collection | |
| collection_name = "finsolve_documents" | |
| try: | |
| self.collection = self.chroma_client.get_collection(collection_name) | |
| print(f"β Loaded existing ChromaDB collection: {collection_name}") | |
| except: | |
| self.collection = self.chroma_client.create_collection( | |
| name=collection_name, | |
| metadata={"description": "FinSolve documents with RBAC"} | |
| ) | |
| print(f"β Created new ChromaDB collection: {collection_name}") | |
| # Initialize embedding model with smaller model for HuggingFace | |
| try: | |
| # Set cache directory for sentence transformers | |
| cache_dir = tempfile.mkdtemp(prefix="sentence_transformers_") | |
| self.embedding_model = SentenceTransformer( | |
| "all-MiniLM-L6-v2", | |
| cache_folder=cache_dir | |
| ) | |
| print("β Loaded sentence transformer model: all-MiniLM-L6-v2") | |
| except Exception as e: | |
| # Fallback to even smaller model | |
| try: | |
| cache_dir = tempfile.mkdtemp(prefix="sentence_transformers_fallback_") | |
| self.embedding_model = SentenceTransformer( | |
| "paraphrase-MiniLM-L3-v2", | |
| cache_folder=cache_dir | |
| ) | |
| print("β Loaded fallback sentence transformer model: paraphrase-MiniLM-L3-v2") | |
| except Exception as e2: | |
| print(f"β Failed to load embedding model: {e2}") | |
| raise e2 | |
| self.vector_store_initialized = True | |
| except Exception as e: | |
| print(f"β οΈ ChromaDB initialization failed: {str(e)}") | |
| print("β οΈ Falling back to in-memory search") | |
| self.vector_store_initialized = False | |
| def _initialize_llm(self): | |
| """Initialize OpenAI LLM""" | |
| if not LLM_AVAILABLE: | |
| print("β οΈ OpenAI API key not found, using template responses") | |
| return | |
| try: | |
| # Test OpenAI connection with updated API | |
| response = openai.ChatCompletion.create( | |
| model=self.llm_model, | |
| messages=[{"role": "user", "content": "Hello"}], | |
| max_tokens=10 | |
| ) | |
| self.llm_client = openai | |
| self.llm_initialized = True | |
| print(f"β OpenAI LLM initialized: {self.llm_model}") | |
| except Exception as e: | |
| print(f"β οΈ OpenAI initialization failed: {str(e)}") | |
| print("β οΈ Using template-based responses") | |
| def _load_documents_to_vector_store(self): | |
| """Load documents into ChromaDB vector store""" | |
| if not self.vector_store_initialized or not self.embedding_model: | |
| return | |
| try: | |
| # Check if documents already loaded | |
| if self.collection.count() > 0: | |
| print(f"β ChromaDB already contains {self.collection.count()} documents") | |
| return | |
| print("π Loading documents into vector store...") | |
| texts = [] | |
| metadatas = [] | |
| ids = [] | |
| for i, doc in enumerate(self.documents): | |
| doc_id = f"doc_{i}_{hash(doc.page_content) % 10000}" | |
| metadata = { | |
| "content_type": doc.metadata.get("content_type", "general"), | |
| "title": doc.metadata.get("title", "Document"), | |
| "department": doc.metadata.get("department", "General"), | |
| "type": doc.metadata.get("type", "Document"), | |
| "chunk_id": str(doc.metadata.get("chunk_id", 0)), | |
| "source": doc.metadata.get("source", "unknown") | |
| } | |
| texts.append(doc.page_content) | |
| metadatas.append(metadata) | |
| ids.append(doc_id) | |
| # Generate embeddings in batches to avoid memory issues | |
| batch_size = 10 | |
| for i in range(0, len(texts), batch_size): | |
| batch_texts = texts[i:i+batch_size] | |
| batch_metadatas = metadatas[i:i+batch_size] | |
| batch_ids = ids[i:i+batch_size] | |
| # Generate embeddings | |
| embeddings = self.embedding_model.encode(batch_texts).tolist() | |
| # Add to ChromaDB | |
| self.collection.add( | |
| embeddings=embeddings, | |
| documents=batch_texts, | |
| metadatas=batch_metadatas, | |
| ids=batch_ids | |
| ) | |
| print(f"β Loaded {len(self.documents)} documents into ChromaDB") | |
| except Exception as e: | |
| print(f"β οΈ Error loading documents to vector store: {str(e)}") | |
| def _print_initialization_status(self): | |
| """Print comprehensive initialization status""" | |
| print("\n" + "="*50) | |
| print("π€ FINSOLVE RAG SYSTEM STATUS") | |
| print("="*50) | |
| print(f"β Python: Core system initialized") | |
| print(f"{'β ' if self.vector_store_initialized else 'β οΈ'} ChromaDB Vector Store: {'Ready' if self.vector_store_initialized else 'Fallback mode'}") | |
| print(f"{'β ' if self.llm_initialized else 'β οΈ'} OpenAI LLM: {'OpenAI GPT' if self.llm_initialized else 'Template mode'}") | |
| print(f"β Streamlit: UI active") | |
| print(f"π FastAPI: {'Real FastAPI' if self._check_fastapi_running() else 'Simulated API'}") | |
| print(f"β Authentication: JWT-style RBAC") | |
| print(f"β NLP: Intent classification + {'LLM' if self.llm_initialized else 'Templates'}") | |
| print(f"β RAG: Vector retrieval + context augmentation") | |
| print(f"π Documents loaded: {len(self.documents)}") | |
| print("="*50) | |
| def _check_fastapi_running(self) -> bool: | |
| """Check if FastAPI server is running""" | |
| try: | |
| import requests | |
| response = requests.get("http://localhost:8000/health", timeout=2) | |
| return response.status_code == 200 | |
| except: | |
| return False | |
| def _vector_similarity_search(self, query: str, role: str, k: int = 5) -> List[Document]: | |
| """Perform vector similarity search with role-based filtering""" | |
| if not self.vector_store_initialized: | |
| return self._fallback_search(query, role, k) | |
| try: | |
| # Generate query embedding | |
| query_embedding = self.embedding_model.encode([query]).tolist()[0] | |
| # Build role-based filter | |
| where_clause = self._build_role_filter(role) | |
| # Perform vector search | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=k, | |
| where=where_clause, | |
| include=["documents", "metadatas", "distances"] | |
| ) | |
| # Convert to Document objects | |
| documents = [] | |
| if results['documents'] and results['documents'][0]: | |
| for i, (doc, metadata) in enumerate(zip(results['documents'][0], results['metadatas'][0])): | |
| distance = results['distances'][0][i] if results['distances'] else 0 | |
| metadata['similarity_score'] = 1 - distance | |
| documents.append(Document( | |
| page_content=doc, | |
| metadata=metadata | |
| )) | |
| return documents | |
| except Exception as e: | |
| print(f"β Vector search error: {str(e)}") | |
| return self._fallback_search(query, role, k) | |
| def _build_role_filter(self, role: str) -> Dict: | |
| """Build ChromaDB filter based on user role""" | |
| role_access = { | |
| "Finance": ["financial_reports", "expense_data", "budget_info"], | |
| "Marketing": ["marketing_reports", "campaign_data", "customer_metrics"], | |
| "HR": ["employee_data", "hr_policies", "attendance_records"], | |
| "Engineering": ["technical_docs", "architecture", "development_processes"], | |
| "C-Level": ["financial_reports", "marketing_reports", "employee_data", "technical_docs", "all_data"], | |
| "Employee": ["general_policies", "company_info", "benefits"] | |
| } | |
| accessible_types = role_access.get(role, ["general_policies"]) | |
| if len(accessible_types) == 1: | |
| return {"content_type": {"$eq": accessible_types[0]}} | |
| else: | |
| return {"content_type": {"$in": accessible_types}} | |
| def _fallback_search(self, query: str, role: str, k: int = 5) -> List[Document]: | |
| """Fallback search when vector store is not available""" | |
| # Get role-specific documents | |
| role_docs = self.document_processor.get_documents_for_role(role) | |
| # Simple keyword matching | |
| query_terms = query.lower().split() | |
| scored_docs = [] | |
| for doc in role_docs: | |
| content_lower = doc.page_content.lower() | |
| score = 0 | |
| for term in query_terms: | |
| score += content_lower.count(term) | |
| if query.lower() in content_lower: | |
| score += 10 | |
| if score > 0: | |
| scored_docs.append((doc, score)) | |
| scored_docs.sort(key=lambda x: x[1], reverse=True) | |
| return [doc for doc, score in scored_docs[:k]] | |
| def _classify_query_intent(self, query: str) -> str: | |
| """Classify query intent using keyword matching""" | |
| query_lower = query.lower() | |
| intent_scores = {} | |
| for intent, keywords in self.intent_keywords.items(): | |
| score = sum(1 for keyword in keywords if keyword in query_lower) | |
| if score > 0: | |
| intent_scores[intent] = score | |
| if intent_scores: | |
| return max(intent_scores, key=intent_scores.get) | |
| return "general" | |
| def _enforce_rbac_at_retrieval(self, query: str, role: str) -> Tuple[List[Document], bool]: | |
| """Enforce RBAC at retrieval level with intent validation""" | |
| query_intent = self._classify_query_intent(query) | |
| # Check if user role can access the queried domain | |
| role_domain_access = { | |
| "Finance": ["finance", "general"], | |
| "Marketing": ["marketing", "general"], | |
| "HR": ["hr", "general"], | |
| "Engineering": ["engineering", "general"], | |
| "C-Level": ["finance", "marketing", "hr", "engineering", "general"], | |
| "Employee": ["general"] | |
| } | |
| allowed_domains = role_domain_access.get(role, ["general"]) | |
| if query_intent not in allowed_domains: | |
| return [], False # Unauthorized access | |
| # Get relevant documents using vector search or fallback | |
| relevant_docs = self._vector_similarity_search(query, role) | |
| return relevant_docs, True | |
| async def _generate_llm_response(self, query: str, context: str, user_role: str, query_intent: str) -> str: | |
| """Generate response using OpenAI LLM""" | |
| if not self.llm_initialized: | |
| return self._generate_template_response(query, [], user_role, query_intent) | |
| try: | |
| system_prompt = f"""You are an AI assistant for FinSolve Technologies, a leading FinTech company. | |
| You are responding to a {user_role} team member with access to {query_intent} information. | |
| Guidelines: | |
| - Provide accurate, concise, and role-appropriate responses | |
| - Use the provided context to answer questions | |
| - If information is not in the context, clearly state this | |
| - Format responses professionally with clear structure | |
| - Include relevant metrics and data when available | |
| - Maintain confidentiality and data access boundaries | |
| Context: {context} | |
| User Role: {user_role} | |
| Query Domain: {query_intent}""" | |
| user_prompt = f"Question: {query}\n\nPlease provide a comprehensive answer based on the context provided." | |
| response = self.llm_client.ChatCompletion.create( | |
| model=self.llm_model, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| max_tokens=1000, | |
| temperature=0.7, | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| print(f"β LLM error: {str(e)}") | |
| return self._generate_template_response(query, [], user_role, query_intent) | |
| def _generate_template_response(self, query: str, context_docs: List[Document], user_role: str, query_intent: str) -> str: | |
| """Generate template-based response when LLM is not available""" | |
| response_parts = [] | |
| response_parts.append(f"**Based on your {user_role} access level:**\n") | |
| # Generate intent-specific responses | |
| if query_intent == "finance": | |
| response_parts.extend(self._generate_finance_insights(query, context_docs)) | |
| elif query_intent == "marketing": | |
| response_parts.extend(self._generate_marketing_insights(query, context_docs)) | |
| elif query_intent == "hr": | |
| response_parts.extend(self._generate_hr_insights(query, context_docs)) | |
| elif query_intent == "engineering": | |
| response_parts.extend(self._generate_technical_insights(query, context_docs)) | |
| else: | |
| response_parts.extend(self._generate_general_insights(query, context_docs)) | |
| return "\n".join(response_parts) | |
| def _generate_finance_insights(self, query: str, context_docs: List[Document]) -> List[str]: | |
| """Generate finance-specific insights""" | |
| insights = ["π° **Financial Insights:**", ""] | |
| # Extract content for analysis | |
| content = " ".join([doc.page_content for doc in context_docs]) | |
| if "revenue" in query.lower() or "2.6 billion" in content: | |
| insights.extend([ | |
| "π **Revenue Performance:**", | |
| "β’ Q4 2024: $2.6 billion (35% YoY growth)", | |
| "β’ Annual 2024: $9.4 billion (28% YoY increase)", | |
| "β’ Strong growth trajectory maintained throughout the year", | |
| "" | |
| ]) | |
| if "margin" in query.lower() or "profit" in query.lower(): | |
| insights.extend([ | |
| "π **Profitability Metrics:**", | |
| "β’ Gross Margin: 64% (improved from 58% in Q1)", | |
| "β’ Net Income: $325M (18% YoY increase)", | |
| "β’ Operating Income: $650M", | |
| "" | |
| ]) | |
| if "cost" in query.lower() or "expense" in query.lower(): | |
| insights.extend([ | |
| "πΈ **Cost Analysis:**", | |
| "β’ Vendor Services: $30M (18% increase)", | |
| "β’ Software Subscriptions: $25M (22% increase)", | |
| "β’ Marketing Investment: $2.3B with strong ROI", | |
| "" | |
| ]) | |
| insights.append("π― **Key Takeaway:** Strong revenue growth with improving margins despite increased operational costs.") | |
| return insights | |
| def _generate_marketing_insights(self, query: str, context_docs: List[Document]) -> List[str]: | |
| """Generate marketing-specific insights""" | |
| insights = ["π **Marketing Insights:**", ""] | |
| insights.extend([ | |
| "π― **Campaign Performance:**", | |
| "β’ Customer Acquisition: 20% increase year-over-year", | |
| "β’ Digital Campaign ROI: 3.5x return on $5M investment", | |
| "β’ Q4 Results: 220,000 new customers (exceeded target)", | |
| "", | |
| "π° **ROI Analysis:**", | |
| "β’ Overall Marketing ROI: 4.5x", | |
| "β’ Digital Channels: 3.5x return", | |
| "β’ Event Marketing: 5.0x return", | |
| "β’ Email Marketing: 2.0x return", | |
| "", | |
| "π **Key Takeaway:** Successful global expansion with strong ROI across all marketing channels." | |
| ]) | |
| return insights | |
| def _generate_hr_insights(self, query: str, context_docs: List[Document]) -> List[str]: | |
| """Generate HR-specific insights""" | |
| insights = ["π₯ **HR Insights:**", ""] | |
| if "benefits" in query.lower(): | |
| insights.extend([ | |
| "π₯ **Employee Benefits:**", | |
| "β’ Health Insurance: Family floater policy", | |
| "β’ Provident Fund: 12% employer contribution", | |
| "β’ Maternity Leave: 26 weeks paid leave", | |
| "β’ Flexible Work: Up to 2 days/week WFH", | |
| "" | |
| ]) | |
| if "leave" in query.lower(): | |
| insights.extend([ | |
| "π **Leave Policies:**", | |
| "β’ Annual Leave: 15-21 days/year", | |
| "β’ Sick Leave: 12 days/year", | |
| "β’ Casual Leave: 7 days/year", | |
| "β’ Emergency Leave: Available with manager approval", | |
| "" | |
| ]) | |
| insights.append("π‘ **Key Takeaway:** Comprehensive benefits package with competitive compensation and flexible work arrangements.") | |
| return insights | |
| def _generate_technical_insights(self, query: str, context_docs: List[Document]) -> List[str]: | |
| """Generate technical/engineering insights""" | |
| insights = ["π§ **Technical Insights:**", ""] | |
| if "architecture" in query.lower(): | |
| insights.extend([ | |
| "ποΈ **System Architecture:**", | |
| "β’ Microservices-based, cloud-native design", | |
| "β’ AWS infrastructure with Kubernetes orchestration", | |
| "β’ PostgreSQL, MongoDB, Redis for data storage", | |
| "β’ 99.99% uptime target with auto-scaling", | |
| "" | |
| ]) | |
| if "technology" in query.lower(): | |
| insights.extend([ | |
| "π» **Technology Stack:**", | |
| "β’ Frontend: React 18, TypeScript, Tailwind CSS", | |
| "β’ Backend: Node.js, Python, Go", | |
| "β’ Mobile: Swift (iOS), Kotlin (Android)", | |
| "β’ Infrastructure: AWS, Kubernetes, Docker", | |
| "" | |
| ]) | |
| insights.append("β‘ **Key Takeaway:** Modern, scalable architecture with strong security and compliance standards.") | |
| return insights | |
| def _generate_general_insights(self, query: str, context_docs: List[Document]) -> List[str]: | |
| """Generate general company insights""" | |
| insights = ["π’ **Company Information:**", ""] | |
| insights.extend([ | |
| "π **About FinSolve Technologies:**", | |
| "β’ Founded: 2018", | |
| "β’ Headquarters: Bangalore, India", | |
| "β’ Global presence: North America, Europe, Asia-Pacific", | |
| "β’ Services: Digital banking, payments, wealth management", | |
| "", | |
| "π― **Mission & Values:**", | |
| "β’ Mission: Empower financial freedom through technology", | |
| "β’ Core Values: Integrity, Innovation, Customer Focus", | |
| "β’ Commitment: Secure, scalable financial solutions", | |
| ]) | |
| return insights | |
| def _generate_unauthorized_response(self, query: str, user_role: str, query_intent: str) -> str: | |
| """Generate graceful unauthorized access message""" | |
| intent_role_map = { | |
| "finance": "Finance and Executive", | |
| "marketing": "Marketing and Executive", | |
| "hr": "HR and Executive", | |
| "engineering": "Engineering and Executive" | |
| } | |
| required_roles = intent_role_map.get(query_intent, "appropriate") | |
| return f"""π‘οΈ **Access Restricted** | |
| This information is restricted to **{required_roles}** roles only. | |
| Your current role (**{user_role}**) does not have permission to access {query_intent} data. | |
| **Available to you:** | |
| {chr(10).join(['β’ ' + doc.replace('_', ' ').title() for doc in self.auth_system.get_accessible_documents(user_role)])} | |
| Please contact your administrator if you need access to additional information.""" | |
| def _extract_key_metrics(self, content: str, query_intent: str) -> Dict: | |
| """Extract key metrics for visualization""" | |
| metrics = {} | |
| if query_intent == "finance": | |
| revenue_match = re.search(r'revenue[:\s]*\$?([\d.,]+)\s*(billion|million)', content.lower()) | |
| if revenue_match: | |
| amount = revenue_match.group(1).replace(',', '') | |
| unit = revenue_match.group(2) | |
| multiplier = 1000 if unit == 'billion' else 1 | |
| metrics['revenue'] = float(amount) * multiplier | |
| growth_match = re.search(r'(\d+)%\s*(yoy|growth)', content.lower()) | |
| if growth_match: | |
| metrics['growth_rate'] = int(growth_match.group(1)) | |
| elif query_intent == "marketing": | |
| acq_match = re.search(r'(\d+,?\d*)\s*new customers', content.lower()) | |
| if acq_match: | |
| metrics['customer_acquisition'] = int(acq_match.group(1).replace(',', '')) | |
| roi_match = re.search(r'(\d+\.?\d*)x\s*r[oe]i', content.lower()) | |
| if roi_match: | |
| metrics['roi'] = float(roi_match.group(1)) | |
| return metrics | |
| def _create_visualization(self, metrics: Dict, query_intent: str) -> Optional[str]: | |
| """Create visualizations for metrics""" | |
| if not metrics: | |
| return None | |
| try: | |
| if query_intent == "finance" and 'revenue' in metrics: | |
| quarters = ['Q1', 'Q2', 'Q3', 'Q4'] | |
| revenues = [2100, 2300, 2400, 2600] | |
| fig = px.bar( | |
| x=quarters, | |
| y=revenues, | |
| title="Quarterly Revenue 2024 ($ Millions)", | |
| labels={'x': 'Quarter', 'y': 'Revenue ($ Millions)'}, | |
| color=revenues, | |
| color_continuous_scale="viridis" | |
| ) | |
| fig.update_layout(height=400, showlegend=False) | |
| return fig.to_html(include_plotlyjs='cdn', div_id="revenue_chart") | |
| elif query_intent == "marketing" and 'customer_acquisition' in metrics: | |
| months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun'] | |
| acquisitions = [18000, 22000, 25000, 28000, 32000, 35000] | |
| fig = px.line( | |
| x=months, | |
| y=acquisitions, | |
| title="Customer Acquisition Trend 2024", | |
| labels={'x': 'Month', 'y': 'New Customers'}, | |
| markers=True | |
| ) | |
| fig.update_layout(height=400, showlegend=False) | |
| return fig.to_html(include_plotlyjs='cdn', div_id="acquisition_chart") | |
| return None | |
| except Exception as e: | |
| print(f"β Error creating visualization: {str(e)}") | |
| return None | |
| def _create_data_table(self, content: str, query_intent: str) -> Optional[str]: | |
| """Create data tables from content""" | |
| try: | |
| if query_intent == "finance": | |
| data = { | |
| 'Metric': ['Q4 Revenue', 'Annual Revenue', 'Net Income', 'Gross Margin', 'ROI'], | |
| 'Value': ['$2.6B', '$9.4B', '$325M', '64%', '15%'], | |
| 'YoY Growth': ['+35%', '+28%', '+18%', '+6%', '+3%'] | |
| } | |
| df = pd.DataFrame(data) | |
| return df.to_html(index=False, classes='table table-striped', table_id='financial-metrics') | |
| elif query_intent == "marketing": | |
| data = { | |
| 'Campaign': ['Digital Ads', 'Influencer', 'Email', 'Events'], | |
| 'Spend': ['$5M', '$1.5M', '$0.2M', '$2M'], | |
| 'ROI': ['3.5x', '4.2x', '2.0x', '5.0x'], | |
| 'Leads': ['180K', '60K', '25K', '300'] | |
| } | |
| df = pd.DataFrame(data) | |
| return df.to_html(index=False, classes='table table-striped', table_id='marketing-metrics') | |
| return None | |
| except Exception as e: | |
| print(f"β Error creating table: {str(e)}") | |
| return None | |
| def store_feedback(self, query: str, response: str, rating: int, role: str): | |
| """Store user feedback for system improvement""" | |
| feedback_id = len(self.query_feedback) | |
| self.query_feedback[feedback_id] = { | |
| 'query': query, | |
| 'response': response, | |
| 'rating': rating, | |
| 'role': role, | |
| 'timestamp': pd.Timestamp.now(), | |
| 'intent': self._classify_query_intent(query) | |
| } | |
| def query(self, query: str, user_role: str) -> Tuple[str, List[str], Optional[str], Optional[str]]: | |
| """Enhanced query method with complete RAG pipeline""" | |
| try: | |
| if not self.initialized: | |
| return "System not initialized. Please try again.", [], None, None | |
| # Enforce RBAC at retrieval level | |
| relevant_docs, authorized = self._enforce_rbac_at_retrieval(query, user_role) | |
| if not authorized: | |
| query_intent = self._classify_query_intent(query) | |
| unauthorized_msg = self._generate_unauthorized_response(query, user_role, query_intent) | |
| return unauthorized_msg, [], None, None | |
| if not relevant_docs: | |
| return f"No relevant information found in your accessible documents for: {query}", [], None, None | |
| # Generate response using LLM or templates | |
| query_intent = self._classify_query_intent(query) | |
| if self.llm_initialized: | |
| # Prepare context for LLM | |
| context = "\n\n".join([doc.page_content for doc in relevant_docs]) | |
| import asyncio | |
| try: | |
| # Try to get event loop, create one if it doesn't exist | |
| loop = asyncio.get_event_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| response = loop.run_until_complete( | |
| self._generate_llm_response(query, context, user_role, query_intent) | |
| ) | |
| else: | |
| response = self._generate_template_response(query, relevant_docs, user_role, query_intent) | |
| # Extract sources | |
| sources = [] | |
| for doc in relevant_docs: | |
| source = doc.metadata.get('title', 'Company Documents') | |
| if source not in sources: | |
| sources.append(source) | |
| # Generate visualizations and tables | |
| context_content = " ".join([doc.page_content for doc in relevant_docs]) | |
| metrics = self._extract_key_metrics(context_content, query_intent) | |
| visualization = self._create_visualization(metrics, query_intent) | |
| table = self._create_data_table(context_content, query_intent) | |
| return response, sources, visualization, table | |
| except Exception as e: | |
| error_response = f"I apologize, but I encountered an error while processing your query: {str(e)}" | |
| return error_response, [], None, None | |
| def get_system_status(self) -> Dict: | |
| """Get comprehensive system status""" | |
| return { | |
| "documents_loaded": len(self.documents), | |
| "system_initialized": self.initialized, | |
| "vector_store_available": self.vector_store_initialized, | |
| "llm_available": self.llm_initialized, | |
| "feedback_entries": len(self.query_feedback), | |
| "tech_stack": { | |
| "python": "β Active", | |
| "streamlit": "β Active", | |
| "vector_store": "β ChromaDB" if self.vector_store_initialized else "β οΈ Fallback", | |
| "llm": f"β {self.llm_model}" if self.llm_initialized else "β οΈ Templates", | |
| "fastapi": "β Real FastAPI" if self._check_fastapi_running() else "π Simulated", | |
| "authentication": "β JWT-style RBAC" | |
| } | |
| } | |
| def get_available_documents_for_role(self, role: str) -> List[Dict]: | |
| """Get list of documents available for a specific role""" | |
| accessible_docs = self.auth_system.get_accessible_documents(role) | |
| doc_info = self.document_processor.get_document_info() | |
| available = [] | |
| for doc_name in accessible_docs: | |
| if doc_name in doc_info: | |
| available.append({ | |
| "content_type": doc_name, | |
| **doc_info[doc_name] | |
| }) | |
| return available |