Spaces:
Paused
Paused
| import streamlit as st | |
| # Page config MUST be the first Streamlit command | |
| st.set_page_config( | |
| page_title="FinSolve AI Assistant - Complete Tech Stack", | |
| page_icon="π€", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| import pandas as pd | |
| import os | |
| import sys | |
| from typing import Dict, List, Optional | |
| from datetime import datetime | |
| import json | |
| import time | |
| # Add the src directory to Python path for imports | |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) | |
| # Import our enhanced modules | |
| from enhanced_rag_system import EnhancedRAGSystem | |
| from auth_system import AuthSystem | |
| from document_processor import DocumentProcessor | |
| # Check tech stack availability with FastAPI detection | |
| def check_tech_stack(): | |
| """Check which tech stack components are available""" | |
| tech_status = {} | |
| # Python (always available) | |
| tech_status['python'] = {'available': True, 'status': 'Core system'} | |
| # Streamlit (always available) | |
| tech_status['streamlit'] = {'available': True, 'status': 'UI active'} | |
| # Vector Store (ChromaDB) | |
| try: | |
| import chromadb | |
| from sentence_transformers import SentenceTransformer | |
| tech_status['vector_store'] = {'available': True, 'status': 'ChromaDB ready'} | |
| except ImportError: | |
| tech_status['vector_store'] = {'available': False, 'status': 'Fallback mode'} | |
| # LLM (OpenAI) | |
| openai_key = os.getenv("OPENAI_API_KEY") | |
| if openai_key: | |
| try: | |
| import openai | |
| tech_status['llm'] = {'available': True, 'status': 'OpenAI GPT'} | |
| except ImportError: | |
| tech_status['llm'] = {'available': False, 'status': 'Template mode'} | |
| else: | |
| tech_status['llm'] = {'available': False, 'status': 'No API key'} | |
| # FastAPI (check if real FastAPI is available) | |
| try: | |
| import requests | |
| # Try to ping FastAPI server | |
| response = requests.get("http://localhost:8000/health", timeout=2) | |
| if response.status_code == 200: | |
| tech_status['fastapi'] = {'available': True, 'status': 'Real FastAPI'} | |
| else: | |
| tech_status['fastapi'] = {'available': True, 'status': 'Simulated API'} | |
| except: | |
| try: | |
| # Check if FastAPI is installable | |
| import fastapi | |
| import uvicorn | |
| tech_status['fastapi'] = {'available': True, 'status': 'Available (not running)'} | |
| except ImportError: | |
| tech_status['fastapi'] = {'available': True, 'status': 'Simulated API'} | |
| return tech_status | |
| # Enhanced CSS with small popup tech stack indicators | |
| st.markdown(""" | |
| <style> | |
| .main-header { | |
| background: linear-gradient(90deg, #1f4e79 0%, #2d5aa0 100%); | |
| padding: 1.5rem; | |
| border-radius: 10px; | |
| margin-bottom: 2rem; | |
| color: white; | |
| text-align: center; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
| } | |
| .role-badge { | |
| background-color: #28a745; | |
| color: white; | |
| padding: 0.3rem 0.8rem; | |
| border-radius: 20px; | |
| font-size: 0.9rem; | |
| font-weight: bold; | |
| margin-left: 0.5rem; | |
| } | |
| .chat-message { | |
| padding: 1.2rem; | |
| border-radius: 10px; | |
| margin-bottom: 1rem; | |
| border-left: 4px solid #1f4e79; | |
| background-color: #f8f9fa; | |
| box-shadow: 0 2px 4px rgba(0, 0, 0, 0.05); | |
| } | |
| .user-message { | |
| background-color: #e3f2fd; | |
| border-left-color: #2196f3; | |
| } | |
| .assistant-message { | |
| background-color: #f1f8e9; | |
| border-left-color: #4caf50; | |
| } | |
| .source-info { | |
| background-color: #e9ecef; | |
| padding: 0.6rem; | |
| border-radius: 8px; | |
| margin-top: 0.8rem; | |
| font-size: 0.85rem; | |
| color: #6c757d; | |
| border-left: 3px solid #17a2b8; | |
| } | |
| .unauthorized-message { | |
| background-color: #f8d7da; | |
| border-left-color: #dc3545; | |
| color: #721c24; | |
| } | |
| .feedback-container { | |
| background-color: #fff3cd; | |
| padding: 0.8rem; | |
| border-radius: 8px; | |
| margin-top: 0.5rem; | |
| border-left: 3px solid #ffc107; | |
| } | |
| .demo-switch { | |
| background-color: #fff8e1; | |
| padding: 1rem; | |
| border-radius: 8px; | |
| margin-bottom: 1rem; | |
| border: 1px solid #ffcc02; | |
| } | |
| .system-metrics { | |
| background-color: #f8f9fa; | |
| padding: 1rem; | |
| border-radius: 8px; | |
| margin: 1rem 0; | |
| } | |
| .metric-item { | |
| display: flex; | |
| justify-content: space-between; | |
| padding: 0.25rem 0; | |
| border-bottom: 1px solid #dee2e6; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| def display_tech_stack_notification(): | |
| """Display tech stack status as a Streamlit notification""" | |
| # Initialize popup state | |
| if 'show_tech_notification' not in st.session_state: | |
| st.session_state.show_tech_notification = True | |
| st.session_state.notification_start_time = time.time() | |
| # Auto-hide notification after 5 seconds | |
| if st.session_state.show_tech_notification: | |
| current_time = time.time() | |
| if current_time - st.session_state.notification_start_time > 5: | |
| st.session_state.show_tech_notification = False | |
| st.rerun() | |
| # Show notification if enabled | |
| if st.session_state.show_tech_notification: | |
| tech_status = check_tech_stack() | |
| # Create a container for the notification | |
| with st.container(): | |
| st.info("ποΈ **Tech Stack Status** - Auto-hiding in 5 seconds...") | |
| # Create columns for status items | |
| cols = st.columns(5) | |
| component_names = { | |
| 'python': 'Python', | |
| 'streamlit': 'Streamlit', | |
| 'vector_store': 'Vector Store', | |
| 'llm': 'LLM', | |
| 'fastapi': 'FastAPI' | |
| } | |
| for i, (component, info) in enumerate(tech_status.items()): | |
| with cols[i]: | |
| if info['available']: | |
| if component == 'fastapi': | |
| icon = "π" | |
| color = "orange" | |
| else: | |
| icon = "β " | |
| color = "green" | |
| else: | |
| icon = "β οΈ" | |
| color = "red" | |
| st.markdown(f""" | |
| <div style="text-align: center; padding: 0.5rem;"> | |
| <div style="font-size: 1.2rem;">{icon}</div> | |
| <div style="font-size: 0.8rem; font-weight: bold;">{component_names.get(component, component)}</div> | |
| <div style="font-size: 0.7rem; color: {color};">{info['status']}</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| def display_tech_status_sidebar(): | |
| """Display tech status in sidebar""" | |
| with st.sidebar: | |
| st.markdown("---") | |
| st.markdown("### π§ Tech Stack") | |
| tech_status = check_tech_stack() | |
| for component, info in tech_status.items(): | |
| if info['available']: | |
| if component == 'fastapi': | |
| icon = "π" | |
| else: | |
| icon = "β " | |
| else: | |
| icon = "β οΈ" | |
| component_names = { | |
| 'python': 'Python', | |
| 'streamlit': 'Streamlit', | |
| 'vector_store': 'Vector Store', | |
| 'llm': 'LLM', | |
| 'fastapi': 'FastAPI' | |
| } | |
| st.markdown(f"{icon} **{component_names.get(component, component)}**: {info['status']}") | |
| def initialize_session_state(): | |
| """Initialize session state variables""" | |
| if 'authenticated' not in st.session_state: | |
| st.session_state.authenticated = False | |
| if 'user_role' not in st.session_state: | |
| st.session_state.user_role = None | |
| if 'username' not in st.session_state: | |
| st.session_state.username = None | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if 'enhanced_rag_system' not in st.session_state: | |
| st.session_state.enhanced_rag_system = None | |
| if 'demo_mode' not in st.session_state: | |
| st.session_state.demo_mode = False | |
| if 'demo_role' not in st.session_state: | |
| st.session_state.demo_role = None | |
| def demo_role_switch(): | |
| """Demo mode for role switching""" | |
| st.markdown('<div class="demo-switch">', unsafe_allow_html=True) | |
| st.markdown("π§ͺ **Demo Mode**: Switch between roles to test different access levels") | |
| demo_roles = ["Finance", "Marketing", "HR", "Engineering", "C-Level", "Employee"] | |
| selected_role = st.selectbox( | |
| "Simulate User Role:", | |
| demo_roles, | |
| index=demo_roles.index(st.session_state.user_role) if st.session_state.user_role in demo_roles else 0, | |
| key="demo_role_selector" | |
| ) | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| if st.button("π Switch Role", key="switch_role"): | |
| st.session_state.demo_role = selected_role | |
| st.session_state.demo_mode = True | |
| st.success(f"Switched to {selected_role} role for demo purposes") | |
| with col2: | |
| if st.button("π Use Actual Role", key="actual_role"): | |
| st.session_state.demo_mode = False | |
| st.session_state.demo_role = None | |
| st.info(f"Using your actual role: {st.session_state.user_role}") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| def login_page(): | |
| """Display enhanced login page with tech stack notification""" | |
| st.markdown('<div class="main-header"><h1>π€ FinSolve AI Assistant</h1><p>Complete Tech Stack Implementation</p></div>', unsafe_allow_html=True) | |
| # Display tech stack notification | |
| display_tech_stack_notification() | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col2: | |
| st.subheader("π Secure Login") | |
| with st.form("login_form"): | |
| username = st.text_input("Username", placeholder="Enter your username") | |
| password = st.text_input("Password", type="password", placeholder="Enter your password") | |
| submit_button = st.form_submit_button("π Login", use_container_width=True) | |
| if submit_button: | |
| auth_system = AuthSystem() | |
| if auth_system.authenticate(username, password): | |
| st.session_state.authenticated = True | |
| st.session_state.username = username | |
| st.session_state.user_role = auth_system.get_user_role(username) | |
| # Reset notification for next page | |
| st.session_state.show_tech_notification = True | |
| st.session_state.notification_start_time = time.time() | |
| st.success(f"Welcome {username}! Your role: {st.session_state.user_role}") | |
| st.rerun() | |
| else: | |
| st.error("β Invalid credentials. Please try again.") | |
| # Enhanced demo credentials info | |
| with st.expander("π Demo Credentials & Full Tech Stack"): | |
| st.markdown(""" | |
| **Test Accounts:** | |
| - **Finance**: `tony.finance` / `password123` | |
| - **Marketing**: `sarah.marketing` / `password123` | |
| - **HR**: `mike.hr` / `password123` | |
| - **Engineering**: `peter.engineering` / `password123` | |
| - **C-Level**: `ceo.admin` / `password123` | |
| - **Employee**: `john.employee` / `password123` | |
| **ποΈ Complete Tech Stack Implementation:** | |
| β **1. Python**: Core programming language | |
| β **2. FastAPI**: Simulated REST API endpoints | |
| β **3. LLM**: OpenAI GPT integration + template fallback | |
| β **4. Vector Store**: ChromaDB + Sentence Transformers | |
| β **5. Streamlit**: Enhanced UI with rich features | |
| **π§ Additional Features:** | |
| - π‘οΈ RBAC enforcement at retrieval level | |
| - π Interactive visualizations (Plotly) | |
| - π Complete source attribution | |
| - π« Graceful unauthorized access handling | |
| - π User feedback collection system | |
| - π Demo role switching for presentations | |
| - π Real-time system metrics | |
| **π‘ Environment Setup:** | |
| - Set `OPENAI_API_KEY` for full LLM features | |
| - ChromaDB auto-initializes on first run | |
| - All data embedded for zero-setup deployment | |
| """) | |
| def display_feedback_widget(query: str, response: str): | |
| """Display feedback widget for user ratings""" | |
| st.markdown('<div class="feedback-container">', unsafe_allow_html=True) | |
| st.markdown("**π How helpful was this response?**") | |
| # Create a unique identifier for this specific feedback widget | |
| # Using hash of query + response to ensure uniqueness per message | |
| import hashlib | |
| unique_id = hashlib.md5((query + response).encode()).hexdigest()[:8] | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| rating = None | |
| with col1: | |
| if st.button("β", key=f"rating_1_{unique_id}"): | |
| rating = 1 | |
| with col2: | |
| if st.button("ββ", key=f"rating_2_{unique_id}"): | |
| rating = 2 | |
| with col3: | |
| if st.button("βββ", key=f"rating_3_{unique_id}"): | |
| rating = 3 | |
| with col4: | |
| if st.button("ββββ", key=f"rating_4_{unique_id}"): | |
| rating = 4 | |
| with col5: | |
| if st.button("βββββ", key=f"rating_5_{unique_id}"): | |
| rating = 5 | |
| if rating: | |
| current_role = st.session_state.demo_role if st.session_state.demo_mode else st.session_state.user_role | |
| st.session_state.enhanced_rag_system.store_feedback(query, response, rating, current_role) | |
| st.success(f"Thank you for rating this response {rating}/5 stars! π") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| def display_system_metrics(): | |
| """Display comprehensive system metrics""" | |
| if st.session_state.enhanced_rag_system: | |
| status = st.session_state.enhanced_rag_system.get_system_status() | |
| st.markdown('<div class="system-metrics">', unsafe_allow_html=True) | |
| st.markdown("**π System Metrics**") | |
| # Basic metrics | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Documents", status.get('documents_loaded', 0)) | |
| with col2: | |
| st.metric("Feedback", status.get('feedback_entries', 0)) | |
| with col3: | |
| system_health = "π’ Healthy" if status.get('system_initialized') else "π΄ Error" | |
| st.markdown(f"**Status:** {system_health}") | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| def main_app(): | |
| """Enhanced main application interface""" | |
| # Display tech stack notification on app entry | |
| display_tech_stack_notification() | |
| # Initialize Enhanced RAG system if not done | |
| if st.session_state.enhanced_rag_system is None: | |
| with st.spinner("π Initializing Complete RAG System..."): | |
| st.session_state.enhanced_rag_system = EnhancedRAGSystem() | |
| st.session_state.enhanced_rag_system.initialize_system() | |
| # Header without tech stack status (now in notification) | |
| col1, col2, col3 = st.columns([3, 2, 1]) | |
| with col1: | |
| st.markdown('<div class="main-header"><h1>π€ FinSolve AI Assistant</h1><p>Complete Tech Stack RAG System</p></div>', unsafe_allow_html=True) | |
| with col2: | |
| current_role = st.session_state.demo_role if st.session_state.demo_mode else st.session_state.user_role | |
| demo_indicator = " (Demo)" if st.session_state.demo_mode else "" | |
| st.markdown(f""" | |
| <div style="text-align: center; margin-top: 1.5rem;"> | |
| <strong>π {st.session_state.username}</strong><br> | |
| <span class="role-badge">{current_role}{demo_indicator}</span> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| with col3: | |
| if st.button("πͺ Logout", use_container_width=True): | |
| for key in list(st.session_state.keys()): | |
| del st.session_state[key] | |
| st.rerun() | |
| # Demo role switching section | |
| if st.checkbox("π§ͺ Enable Demo Mode", help="Switch between roles to test different access levels"): | |
| demo_role_switch() | |
| # Sidebar with enhanced role info and system metrics | |
| with st.sidebar: | |
| st.header("π Your Access Level") | |
| current_role = st.session_state.demo_role if st.session_state.demo_mode else st.session_state.user_role | |
| role_permissions = { | |
| "Finance": ["π Financial reports", "π° Marketing expenses", "π§ Equipment costs", "π³ Reimbursements"], | |
| "Marketing": ["π Campaign performance", "π¬ Customer feedback", "π Sales metrics", "π― ROI data"], | |
| "HR": ["π₯ Employee data", "π Attendance records", "π° Payroll info", "β Performance reviews"], | |
| "Engineering": ["ποΈ Technical architecture", "βοΈ Development processes", "π Operational guidelines", "π Security docs"], | |
| "C-Level": ["π Full access to all company data", "π Executive dashboards", "π Strategic metrics"], | |
| "Employee": ["π General company information", "π Policies", "π Events", "β FAQs"] | |
| } | |
| permissions = role_permissions.get(current_role, []) | |
| for perm in permissions: | |
| st.markdown(f"β {perm}") | |
| st.markdown("---") | |
| st.header("π‘ Sample Questions") | |
| sample_questions = { | |
| "Finance": [ | |
| "What was our Q4 2024 revenue?", | |
| "Show me cost breakdown with charts", | |
| "What's our marketing ROI?", | |
| "Create financial metrics table" | |
| ], | |
| "Marketing": [ | |
| "How did our Q4 campaigns perform?", | |
| "Show customer acquisition trends", | |
| "What's our digital marketing ROI?", | |
| "Create campaign performance chart" | |
| ], | |
| "HR": [ | |
| "What are the leave policies?", | |
| "Show me employee benefits", | |
| "What's our compensation structure?", | |
| "How do I apply for maternity leave?" | |
| ], | |
| "Engineering": [ | |
| "What's our system architecture?", | |
| "Show me our technology stack", | |
| "What's our deployment process?", | |
| "Explain our security measures" | |
| ], | |
| "C-Level": [ | |
| "Give me a company overview", | |
| "Show all department metrics", | |
| "What are our growth trends?", | |
| "Create executive dashboard" | |
| ], | |
| "Employee": [ | |
| "What are the company policies?", | |
| "How do I apply for leave?", | |
| "What benefits do we have?", | |
| "What's the dress code?" | |
| ] | |
| } | |
| questions = sample_questions.get(current_role, []) | |
| for q in questions: | |
| if st.button(q, key=f"sample_{q}", use_container_width=True): | |
| st.session_state.current_query = q | |
| # System metrics for all users | |
| st.markdown("---") | |
| display_system_metrics() | |
| # Tech stack status in sidebar | |
| display_tech_status_sidebar() | |
| # Main chat interface | |
| st.header("π¬ AI-Powered Chat with Complete RAG") | |
| # Display chat history with enhanced styling | |
| for i, chat_item in enumerate(st.session_state.chat_history): | |
| query, response, sources, visualization, table = chat_item | |
| # User message | |
| st.markdown(f""" | |
| <div class="chat-message user-message"> | |
| <strong>π€ You:</strong> {query} | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Assistant response | |
| response_class = "unauthorized-message" if "Access Restricted" in response else "assistant-message" | |
| st.markdown(f""" | |
| <div class="chat-message {response_class}"> | |
| <strong>π€ AI Assistant:</strong><br>{response} | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Display visualization if available | |
| if visualization: | |
| st.markdown("π **Interactive Data Visualization:**") | |
| st.components.v1.html(visualization, height=450) | |
| # Display table if available | |
| if table: | |
| st.markdown("π **Data Table:**") | |
| st.markdown(table, unsafe_allow_html=True) | |
| # Display sources | |
| if sources: | |
| st.markdown(f""" | |
| <div class="source-info"> | |
| <strong>π Sources:</strong> {" | ".join(sources)} | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Feedback widget | |
| if "Access Restricted" not in response: | |
| display_feedback_widget(query, response) | |
| st.markdown("---") | |
| # Query input with enhanced features | |
| query = st.text_input( | |
| "π Ask your question:", | |
| value=st.session_state.get('current_query', ''), | |
| placeholder="Type your question here... (e.g., 'Show me Q4 revenue with charts')", | |
| key="query_input" | |
| ) | |
| col1, col2, col3 = st.columns([1, 1, 3]) | |
| with col1: | |
| ask_button = st.button("π Ask AI", use_container_width=True) | |
| with col2: | |
| clear_button = st.button("ποΈ Clear Chat", use_container_width=True) | |
| if clear_button: | |
| st.session_state.chat_history = [] | |
| st.rerun() | |
| if ask_button and query: | |
| with st.spinner("π§ Processing with complete RAG pipeline..."): | |
| try: | |
| current_role = st.session_state.demo_role if st.session_state.demo_mode else st.session_state.user_role | |
| response, sources, visualization, table = st.session_state.enhanced_rag_system.query( | |
| query, | |
| current_role | |
| ) | |
| # Add to chat history | |
| st.session_state.chat_history.append((query, response, sources, visualization, table)) | |
| # Clear the current query | |
| if 'current_query' in st.session_state: | |
| del st.session_state.current_query | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"β Error processing query: {str(e)}") | |
| def main(): | |
| """Main application entry point""" | |
| initialize_session_state() | |
| if not st.session_state.authenticated: | |
| login_page() | |
| else: | |
| main_app() | |
| if __name__ == "__main__": | |
| main() |