import streamlit as st import requests import json from typing import Optional, List, Dict import time import os import magic # For file type detection # Configuration API_BASE_URL = "http://localhost:8000/api/v1" # Professional theme colors for ClariDoc THEME_COLORS = { "primary": "#1E3A8A", # Deep blue "secondary": "#3B82F6", # Blue "success": "#10B981", # Green "warning": "#F59E0B", # Amber "error": "#EF4444", # Red "neutral": "#6B7280", # Gray "light": "#2F89E3" # Light gray } class RAGApp: def __init__(self): self.session_id: Optional[str] = None self.username: str = "anonymous" def set_username(self, username: str): """Set the username for the app""" self.username = username def create_session(self) -> bool: """Create a new session with the API""" try: response = requests.post(f"{API_BASE_URL}/session", params={"username": self.username}) if response.status_code == 200: data = response.json() self.session_id = data["session_id"] st.session_state.session_id = self.session_id return True except Exception as e: st.error(f"Failed to create session: {e}") return False def get_user_sessions(self) -> List[Dict]: """Get all sessions for current user""" try: response = requests.get(f"{API_BASE_URL}/sessions/{self.username}") if response.status_code == 200: data = response.json() return data.get("sessions", []) except Exception as e: st.error(f"Failed to get user sessions: {e}") return [] def restore_session(self, session_id: str) -> bool: """Restore a session from database""" try: response = requests.post(f"{API_BASE_URL}/session/{session_id}/restore") if response.status_code == 200: self.session_id = session_id st.session_state.session_id = session_id return True except Exception as e: st.error(f"Failed to restore session: {e}") return False def detect_file_type(self, file_content: bytes, filename: str) -> str: """Detect file type from content and filename""" # Try to detect from filename extension first if filename.lower().endswith('.pdf'): return 'pdf' elif filename.lower().endswith(('.doc', '.docx')): return 'word' else: # Try to detect from file content using python-magic try: file_type = magic.from_buffer(file_content, mime=True) if 'pdf' in file_type: return 'pdf' elif any(word_type in file_type for word_type in ['word', 'msword', 'document']): return 'word' except: pass return 'unknown' def upload_document(self, file=None, url=None, doc_type=None) -> bool: """Upload document to the API (file or URL)""" try: if file: # Auto-detect file type if not provided if not doc_type: file_content = file.getvalue() doc_type = self.detect_file_type(file_content, file.name) if doc_type == 'unknown': st.error("Could not detect file type. Please specify manually.") return False files = {"file": (file.name, file.getvalue(), file.type)} data = {"doc_type": doc_type} response = requests.post( f"{API_BASE_URL}/upload/{self.session_id}", files=files, data=data ) else: # URL upload data = {"url": url, "doc_type": doc_type or "pdf"} response = requests.post( f"{API_BASE_URL}/upload/{self.session_id}", data=data ) if response.status_code == 200: result = response.json() st.success(f"Document uploaded successfully! Created {result['chunks_created']} chunks.") return True else: error_msg = response.json().get('detail', 'Unknown error') if response.text else f"HTTP {response.status_code}" st.error(f"Upload failed: {error_msg}") except Exception as e: st.error(f"Upload error: {e}") return False def query_document(self, query: str) -> Optional[Dict]: """Query the uploaded document and return full response data""" try: response = requests.post( f"{API_BASE_URL}/query/{self.session_id}", json={"query": query} ) if response.status_code == 200: result = response.json() return result else: error_detail = response.json().get('detail', 'Unknown error') st.error(f"Query failed: {error_detail}") except Exception as e: st.error(f"Query error: {e}") return None def get_session_status(self) -> Optional[dict]: """Get current session status""" try: response = requests.get(f"{API_BASE_URL}/session/{self.session_id}/status") if response.status_code == 200: return response.json() except Exception as e: st.error(f"Status check error: {e}") return None def show_professional_header(): """Display professional ClariDoc header""" st.markdown("""

📄 ClariDoc

Professional Document Analysis & RAG Platform

HR â€ĸ Insurance â€ĸ Legal â€ĸ Financial â€ĸ Government â€ĸ Technical Policies

""", unsafe_allow_html=True) def show_login_form(): """Show professional login form""" show_professional_header() col1, col2, col3 = st.columns([1, 2, 1]) with col2: st.markdown("""

🔐 Welcome to ClariDoc

""", unsafe_allow_html=True) with st.form("login_form"): st.markdown('

Enter your credentials to continue

', unsafe_allow_html=True) username = st.text_input( "Username", value="professional_user", help="Enter your username to access your document sessions", placeholder="e.g., john.doe@company.com" ) col_a, col_b = st.columns(2) with col_a: cancel = st.form_submit_button("Cancel", type="secondary") with col_b: submit = st.form_submit_button("Login", type="primary") if submit and username: st.session_state.username = username st.session_state.logged_in = True st.rerun() def show_document_type_indicator(doc_type: str): """Show document type with appropriate icon and color""" type_config = { "HR/Employment": {"icon": "đŸ‘Ĩ", "color": "#10B981"}, "Insurance": {"icon": "đŸ›Ąī¸", "color": "#3B82F6"}, "Legal/Compliance": {"icon": "âš–ī¸", "color": "#8B5CF6"}, "Financial/Regulatory": {"icon": "💰", "color": "#F59E0B"}, "Government/Public Policy": {"icon": "đŸ›ī¸", "color": "#EF4444"}, "Technical/IT Policies": {"icon": "âš™ī¸", "color": "#6B7280"} } config = type_config.get(doc_type, {"icon": "📄", "color": "#6B7280"}) st.markdown(f"""
{config['icon']} {doc_type}
""", unsafe_allow_html=True) def show_session_selector(app: RAGApp): """Show professional session selection interface""" st.sidebar.markdown("### 📂 Document Sessions") # Get user sessions sessions = app.get_user_sessions() if sessions: st.sidebar.markdown("#### Active Sessions") for i, session in enumerate(sessions): with st.sidebar.container(): session_label = session['document_name'] or 'Untitled Document' # Create a clean session card st.markdown(f"""
{session_label}
📄 {session['document_type']} â€ĸ {session['chunks_count']} chunks
🕐 {session['last_accessed']}
""", unsafe_allow_html=True) if st.sidebar.button("🔄 Restore", key=f"restore_{session['session_id']}", help="Restore this session"): if app.restore_session(session['session_id']): st.success(f"✅ Restored: {session_label}") st.rerun() st.sidebar.divider() # Create new session button if st.sidebar.button("➕ New Session", type="primary", use_container_width=True): if app.create_session(): st.success("🎉 New session created!") st.rerun() def show_query_metadata(metadata: Dict): """Display extracted query metadata in a professional format""" if not metadata: return st.markdown('

🔍 Query Analysis

', unsafe_allow_html=True) with st.expander("📊 Extracted Metadata", expanded=True): # Filter out None values and empty lists filtered_metadata = {k: v for k, v in metadata.items() if v is not None and v != [] and v != {}} if filtered_metadata: cols = st.columns(2) col_idx = 0 for key, value in filtered_metadata.items(): with cols[col_idx % 2]: # Format the key nicely display_key = key.replace('_', ' ').title() if isinstance(value, list): value_str = ", ".join(str(v) for v in value[:3]) # Show first 3 items if len(value) > 3: value_str += f" (+{len(value)-3} more)" else: value_str = str(value) st.markdown(f"""
{display_key}
{value_str}
""", unsafe_allow_html=True) col_idx += 1 else: st.info("â„šī¸ No specific metadata extracted from this query") def show_document_sources(top_clauses: List[Dict]): """Display top document sources with metadata""" if not top_clauses: return st.markdown('

📚 Source Documents

', unsafe_allow_html=True) for i, clause in enumerate(top_clauses[:3]): # Show top 3 metadata = clause.get('metadata', {}) # Extract key information doc_id = metadata.get('doc_id', 'Unknown')[:8] + '...' page_num = metadata.get('page_no', metadata.get('page', 'N/A')) score = clause.get('score', 0) # Get relevant metadata (skip technical fields) skip_fields = {'doc_id', 'chunk_id', 'source', 'file_path', 'type', 'author', 'creator', 'producer','doc_category','format','keyword', 'doc_type','modDate','moddate','subject','title','total_pages','trapped','creationDate','creationdate' } relevant_metadata = {k: v for k, v in metadata.items() if k not in skip_fields and v is not None and v != [] and v != ''} with st.expander(f"📄 Document {i+1} (Page {page_num}) - Relevance: {score:.3f}", expanded=i==0): # Show the text content text_content = clause.get('text', '') if text_content: st.markdown('

📝 Content:

', unsafe_allow_html=True) st.markdown(f'
{text_content[:300]}{"..." if len(text_content) > 300 else ""}
', unsafe_allow_html=True) # Show metadata in a clean format if relevant_metadata: st.markdown('

📊 Document Properties:

', unsafe_allow_html=True) # Create columns for metadata if len(relevant_metadata) <= 4: cols = st.columns(len(relevant_metadata)) else: cols = st.columns(2) for idx, (key, value) in enumerate(relevant_metadata.items()): col_idx = idx % len(cols) with cols[col_idx]: display_key = key.replace('_', ' ').title() if isinstance(value, list): value_str = ", ".join(str(v) for v in value[:2]) if len(value) > 2: value_str += f" (+{len(value)-2})" else: value_str = str(value) st.markdown(f"""
{display_key}
{value_str}
""", unsafe_allow_html=True) # Technical details in a collapsed section with st.expander("🔧 Technical Details", expanded=False): st.code(f""" Document ID: {doc_id} Page Number: {page_num} Relevance Score: {score:.4f} Source: {metadata.get('source', 'N/A')} """) def main(): st.set_page_config( page_title="ClariDoc - Professional Document Analysis", page_icon="📄", layout="wide", initial_sidebar_state="expanded" ) # Custom CSS for professional dark styling st.markdown(""" """, unsafe_allow_html=True) # Initialize session state if 'logged_in' not in st.session_state: st.session_state.logged_in = False if 'username' not in st.session_state: st.session_state.username = "professional_user" if 'messages' not in st.session_state: st.session_state.messages = [] # Show login form if not logged in if not st.session_state.logged_in: show_login_form() return # Show professional header show_professional_header() # Initialize app if 'app' not in st.session_state: st.session_state.app = RAGApp() app = st.session_state.app app.set_username(st.session_state.username) # Restore session ID if it exists in session state if hasattr(st.session_state, 'session_id') and st.session_state.session_id: app.session_id = st.session_state.session_id # Welcome message st.markdown(f"""

👋 Welcome back, {st.session_state.username}!

Ready to analyze your professional documents

""", unsafe_allow_html=True) # Session management in sidebar show_session_selector(app) # Logout button in sidebar st.sidebar.divider() if st.sidebar.button("đŸšĒ Logout", type="secondary", use_container_width=True): st.session_state.logged_in = False st.session_state.username = "professional_user" st.session_state.clear() st.rerun() # Main content area if not app.session_id: st.info("👈 Please create or select a session from the sidebar to begin document analysis") return # Display current session info st.success(f"đŸŽ¯ **Active Session:** `{app.session_id[:8]}...`") # Document upload section st.markdown("---") st.markdown('

📤 Document Upload

', unsafe_allow_html=True) # Upload type selection upload_type = st.radio( "📂 Choose upload method:", ["📁 File Upload", "🌐 URL Import"], horizontal=True ) col1, col2 = st.columns([3, 1]) if upload_type == "📁 File Upload": with col1: uploaded_file = st.file_uploader( "Choose a professional document", type=['pdf', 'docx', 'doc'], help="Upload PDF or Word documents for analysis" ) with col2: auto_detect = st.checkbox("🔍 Auto-detect type", value=True) if not auto_detect: doc_type = st.selectbox("Document Type", ["pdf", "word"]) else: doc_type = None if uploaded_file and st.button("🚀 Upload & Process", type="primary"): with st.spinner("🔄 Processing document..."): progress_bar = st.progress(0) progress_bar.progress(25, "📄 Analyzing document structure...") time.sleep(0.5) progress_bar.progress(50, "🧠 Extracting metadata...") time.sleep(0.5) progress_bar.progress(75, "🔗 Creating vector embeddings...") time.sleep(0.5) if app.upload_document(file=uploaded_file, doc_type=doc_type): progress_bar.progress(100, "✅ Document processed successfully!") st.balloons() time.sleep(1) progress_bar.empty() else: # URL Upload with col1: url = st.text_input( "📎 Enter document URL:", placeholder="https://example.com/document.pdf", help="Enter a direct URL to a PDF document" ) with col2: doc_type = st.selectbox("Document Type", ["pdf", "word"], index=0) if url and st.button("🚀 Load from URL & Process", type="primary"): with st.spinner("🔄 Processing document from URL..."): progress_bar = st.progress(0) progress_bar.progress(20, "🌐 Downloading document...") time.sleep(0.5) progress_bar.progress(50, "📄 Analyzing document structure...") time.sleep(0.5) progress_bar.progress(80, "🧠 Extracting metadata...") time.sleep(0.5) if app.upload_document(url=url, doc_type=doc_type): progress_bar.progress(100, "✅ Document processed successfully!") st.balloons() time.sleep(1) progress_bar.empty() # Query section st.markdown("---") st.markdown('

đŸ’Ŧ Document Analysis

', unsafe_allow_html=True) # Display session status status = app.get_session_status() if status and status.get("document_uploaded"): doc_info = status.get("document_info", {}) col1, col2, col3 = st.columns(3) with col1: st.metric("📄 Document", doc_info.get('filename', 'Unknown')) with col2: st.metric("🧩 Chunks", doc_info.get('chunks_count', 0)) with col3: st.metric("📊 Type", doc_info.get('type', 'Unknown')) # Chat interface for message in st.session_state.messages: with st.chat_message(message["role"]): if message["role"] == "assistant" and "metadata" in message: # Show the answer st.markdown(message["content"]) # Show metadata and sources if message.get("metadata"): show_query_metadata(message["metadata"]) if message.get("sources"): show_document_sources(message["sources"]) else: st.markdown(message["content"]) # Chat input if prompt := st.chat_input("💭 Ask a question about your document..."): # Add user message to chat history st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) # Get AI response with st.chat_message("assistant"): with st.spinner("🤔 Analyzing your question..."): progress_bar = st.progress(0) progress_bar.progress(25, "🔍 Extracting query metadata...") time.sleep(0.3) progress_bar.progress(50, "🔎 Searching document database...") time.sleep(0.3) progress_bar.progress(75, "🧠 Generating response...") time.sleep(0.3) response_data = app.query_document(prompt) progress_bar.progress(100, "✅ Analysis complete!") time.sleep(0.5) progress_bar.empty() if response_data: answer = response_data.get("answer", "No answer available") st.markdown(answer) # Extract and display metadata query_metadata = response_data.get("query_metadata", {}) sources = response_data.get("sources", []) if query_metadata: show_query_metadata(query_metadata) if sources: show_document_sources(sources) # Add to chat history with metadata st.session_state.messages.append({ "role": "assistant", "content": answer, "metadata": query_metadata, "sources": sources }) else: error_msg = "❌ Sorry, I couldn't process your question." st.markdown(error_msg) st.session_state.messages.append({"role": "assistant", "content": error_msg}) # Clear chat button if st.session_state.messages: col1, col2, col3 = st.columns([1, 1, 1]) with col2: if st.button("đŸ—‘ī¸ Clear Conversation", type="secondary", use_container_width=True): st.session_state.messages = [] st.rerun() if __name__ == "__main__": main()