import streamlit as st
import requests
import json
from typing import Optional, List, Dict
import time
import os
import magic # For file type detection
# Configuration
API_BASE_URL = "http://localhost:8000/api/v1"
# Professional theme colors for ClariDoc
THEME_COLORS = {
"primary": "#1E3A8A", # Deep blue
"secondary": "#3B82F6", # Blue
"success": "#10B981", # Green
"warning": "#F59E0B", # Amber
"error": "#EF4444", # Red
"neutral": "#6B7280", # Gray
"light": "#2F89E3" # Light gray
}
class RAGApp:
def __init__(self):
self.session_id: Optional[str] = None
self.username: str = "anonymous"
def set_username(self, username: str):
"""Set the username for the app"""
self.username = username
def create_session(self) -> bool:
"""Create a new session with the API"""
try:
response = requests.post(f"{API_BASE_URL}/session", params={"username": self.username})
if response.status_code == 200:
data = response.json()
self.session_id = data["session_id"]
st.session_state.session_id = self.session_id
return True
except Exception as e:
st.error(f"Failed to create session: {e}")
return False
def get_user_sessions(self) -> List[Dict]:
"""Get all sessions for current user"""
try:
response = requests.get(f"{API_BASE_URL}/sessions/{self.username}")
if response.status_code == 200:
data = response.json()
return data.get("sessions", [])
except Exception as e:
st.error(f"Failed to get user sessions: {e}")
return []
def restore_session(self, session_id: str) -> bool:
"""Restore a session from database"""
try:
response = requests.post(f"{API_BASE_URL}/session/{session_id}/restore")
if response.status_code == 200:
self.session_id = session_id
st.session_state.session_id = session_id
return True
except Exception as e:
st.error(f"Failed to restore session: {e}")
return False
def detect_file_type(self, file_content: bytes, filename: str) -> str:
"""Detect file type from content and filename"""
# Try to detect from filename extension first
if filename.lower().endswith('.pdf'):
return 'pdf'
elif filename.lower().endswith(('.doc', '.docx')):
return 'word'
else:
# Try to detect from file content using python-magic
try:
file_type = magic.from_buffer(file_content, mime=True)
if 'pdf' in file_type:
return 'pdf'
elif any(word_type in file_type for word_type in ['word', 'msword', 'document']):
return 'word'
except:
pass
return 'unknown'
def upload_document(self, file=None, url=None, doc_type=None) -> bool:
"""Upload document to the API (file or URL)"""
try:
if file:
# Auto-detect file type if not provided
if not doc_type:
file_content = file.getvalue()
doc_type = self.detect_file_type(file_content, file.name)
if doc_type == 'unknown':
st.error("Could not detect file type. Please specify manually.")
return False
files = {"file": (file.name, file.getvalue(), file.type)}
data = {"doc_type": doc_type}
response = requests.post(
f"{API_BASE_URL}/upload/{self.session_id}",
files=files,
data=data
)
else:
# URL upload
data = {"url": url, "doc_type": doc_type or "pdf"}
response = requests.post(
f"{API_BASE_URL}/upload/{self.session_id}",
data=data
)
if response.status_code == 200:
result = response.json()
st.success(f"Document uploaded successfully! Created {result['chunks_created']} chunks.")
return True
else:
error_msg = response.json().get('detail', 'Unknown error') if response.text else f"HTTP {response.status_code}"
st.error(f"Upload failed: {error_msg}")
except Exception as e:
st.error(f"Upload error: {e}")
return False
def query_document(self, query: str) -> Optional[Dict]:
"""Query the uploaded document and return full response data"""
try:
response = requests.post(
f"{API_BASE_URL}/query/{self.session_id}",
json={"query": query}
)
if response.status_code == 200:
result = response.json()
return result
else:
error_detail = response.json().get('detail', 'Unknown error')
st.error(f"Query failed: {error_detail}")
except Exception as e:
st.error(f"Query error: {e}")
return None
def get_session_status(self) -> Optional[dict]:
"""Get current session status"""
try:
response = requests.get(f"{API_BASE_URL}/session/{self.session_id}/status")
if response.status_code == 200:
return response.json()
except Exception as e:
st.error(f"Status check error: {e}")
return None
def show_professional_header():
"""Display professional ClariDoc header"""
st.markdown("""
đ ClariDoc
Professional Document Analysis & RAG Platform
HR âĸ Insurance âĸ Legal âĸ Financial âĸ Government âĸ Technical Policies
""", unsafe_allow_html=True)
def show_login_form():
"""Show professional login form"""
show_professional_header()
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
st.markdown("""
đ Welcome to ClariDoc
""", unsafe_allow_html=True)
with st.form("login_form"):
st.markdown('Enter your credentials to continue
', unsafe_allow_html=True)
username = st.text_input(
"Username",
value="professional_user",
help="Enter your username to access your document sessions",
placeholder="e.g., john.doe@company.com"
)
col_a, col_b = st.columns(2)
with col_a:
cancel = st.form_submit_button("Cancel", type="secondary")
with col_b:
submit = st.form_submit_button("Login", type="primary")
if submit and username:
st.session_state.username = username
st.session_state.logged_in = True
st.rerun()
def show_document_type_indicator(doc_type: str):
"""Show document type with appropriate icon and color"""
type_config = {
"HR/Employment": {"icon": "đĨ", "color": "#10B981"},
"Insurance": {"icon": "đĄī¸", "color": "#3B82F6"},
"Legal/Compliance": {"icon": "âī¸", "color": "#8B5CF6"},
"Financial/Regulatory": {"icon": "đ°", "color": "#F59E0B"},
"Government/Public Policy": {"icon": "đī¸", "color": "#EF4444"},
"Technical/IT Policies": {"icon": "âī¸", "color": "#6B7280"}
}
config = type_config.get(doc_type, {"icon": "đ", "color": "#6B7280"})
st.markdown(f"""
{config['icon']} {doc_type}
""", unsafe_allow_html=True)
def show_session_selector(app: RAGApp):
"""Show professional session selection interface"""
st.sidebar.markdown("### đ Document Sessions")
# Get user sessions
sessions = app.get_user_sessions()
if sessions:
st.sidebar.markdown("#### Active Sessions")
for i, session in enumerate(sessions):
with st.sidebar.container():
session_label = session['document_name'] or 'Untitled Document'
# Create a clean session card
st.markdown(f"""
{session_label}
đ {session['document_type']} âĸ {session['chunks_count']} chunks
đ {session['last_accessed']}
""", unsafe_allow_html=True)
if st.sidebar.button("đ Restore", key=f"restore_{session['session_id']}", help="Restore this session"):
if app.restore_session(session['session_id']):
st.success(f"â
Restored: {session_label}")
st.rerun()
st.sidebar.divider()
# Create new session button
if st.sidebar.button("â New Session", type="primary", use_container_width=True):
if app.create_session():
st.success("đ New session created!")
st.rerun()
def show_query_metadata(metadata: Dict):
"""Display extracted query metadata in a professional format"""
if not metadata:
return
st.markdown('đ Query Analysis
', unsafe_allow_html=True)
with st.expander("đ Extracted Metadata", expanded=True):
# Filter out None values and empty lists
filtered_metadata = {k: v for k, v in metadata.items()
if v is not None and v != [] and v != {}}
if filtered_metadata:
cols = st.columns(2)
col_idx = 0
for key, value in filtered_metadata.items():
with cols[col_idx % 2]:
# Format the key nicely
display_key = key.replace('_', ' ').title()
if isinstance(value, list):
value_str = ", ".join(str(v) for v in value[:3]) # Show first 3 items
if len(value) > 3:
value_str += f" (+{len(value)-3} more)"
else:
value_str = str(value)
st.markdown(f"""
{display_key}
{value_str}
""", unsafe_allow_html=True)
col_idx += 1
else:
st.info("âšī¸ No specific metadata extracted from this query")
def show_document_sources(top_clauses: List[Dict]):
"""Display top document sources with metadata"""
if not top_clauses:
return
st.markdown('đ Source Documents
', unsafe_allow_html=True)
for i, clause in enumerate(top_clauses[:3]): # Show top 3
metadata = clause.get('metadata', {})
# Extract key information
doc_id = metadata.get('doc_id', 'Unknown')[:8] + '...'
page_num = metadata.get('page_no', metadata.get('page', 'N/A'))
score = clause.get('score', 0)
# Get relevant metadata (skip technical fields)
skip_fields = {'doc_id', 'chunk_id', 'source', 'file_path', 'type', 'author', 'creator', 'producer','doc_category','format','keyword', 'doc_type','modDate','moddate','subject','title','total_pages','trapped','creationDate','creationdate' }
relevant_metadata = {k: v for k, v in metadata.items()
if k not in skip_fields and v is not None and v != [] and v != ''}
with st.expander(f"đ Document {i+1} (Page {page_num}) - Relevance: {score:.3f}", expanded=i==0):
# Show the text content
text_content = clause.get('text', '')
if text_content:
st.markdown('đ Content:
', unsafe_allow_html=True)
st.markdown(f'{text_content[:300]}{"..." if len(text_content) > 300 else ""}
',
unsafe_allow_html=True)
# Show metadata in a clean format
if relevant_metadata:
st.markdown('đ Document Properties:
', unsafe_allow_html=True)
# Create columns for metadata
if len(relevant_metadata) <= 4:
cols = st.columns(len(relevant_metadata))
else:
cols = st.columns(2)
for idx, (key, value) in enumerate(relevant_metadata.items()):
col_idx = idx % len(cols)
with cols[col_idx]:
display_key = key.replace('_', ' ').title()
if isinstance(value, list):
value_str = ", ".join(str(v) for v in value[:2])
if len(value) > 2:
value_str += f" (+{len(value)-2})"
else:
value_str = str(value)
st.markdown(f"""
{display_key}
{value_str}
""", unsafe_allow_html=True)
# Technical details in a collapsed section
with st.expander("đ§ Technical Details", expanded=False):
st.code(f"""
Document ID: {doc_id}
Page Number: {page_num}
Relevance Score: {score:.4f}
Source: {metadata.get('source', 'N/A')}
""")
def main():
st.set_page_config(
page_title="ClariDoc - Professional Document Analysis",
page_icon="đ",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS for professional dark styling
st.markdown("""
""", unsafe_allow_html=True)
# Initialize session state
if 'logged_in' not in st.session_state:
st.session_state.logged_in = False
if 'username' not in st.session_state:
st.session_state.username = "professional_user"
if 'messages' not in st.session_state:
st.session_state.messages = []
# Show login form if not logged in
if not st.session_state.logged_in:
show_login_form()
return
# Show professional header
show_professional_header()
# Initialize app
if 'app' not in st.session_state:
st.session_state.app = RAGApp()
app = st.session_state.app
app.set_username(st.session_state.username)
# Restore session ID if it exists in session state
if hasattr(st.session_state, 'session_id') and st.session_state.session_id:
app.session_id = st.session_state.session_id
# Welcome message
st.markdown(f"""
đ Welcome back, {st.session_state.username}!
Ready to analyze your professional documents
""", unsafe_allow_html=True)
# Session management in sidebar
show_session_selector(app)
# Logout button in sidebar
st.sidebar.divider()
if st.sidebar.button("đĒ Logout", type="secondary", use_container_width=True):
st.session_state.logged_in = False
st.session_state.username = "professional_user"
st.session_state.clear()
st.rerun()
# Main content area
if not app.session_id:
st.info("đ Please create or select a session from the sidebar to begin document analysis")
return
# Display current session info
st.success(f"đ¯ **Active Session:** `{app.session_id[:8]}...`")
# Document upload section
st.markdown("---")
st.markdown('đ¤ Document Upload
', unsafe_allow_html=True)
# Upload type selection
upload_type = st.radio(
"đ Choose upload method:",
["đ File Upload", "đ URL Import"],
horizontal=True
)
col1, col2 = st.columns([3, 1])
if upload_type == "đ File Upload":
with col1:
uploaded_file = st.file_uploader(
"Choose a professional document",
type=['pdf', 'docx', 'doc'],
help="Upload PDF or Word documents for analysis"
)
with col2:
auto_detect = st.checkbox("đ Auto-detect type", value=True)
if not auto_detect:
doc_type = st.selectbox("Document Type", ["pdf", "word"])
else:
doc_type = None
if uploaded_file and st.button("đ Upload & Process", type="primary"):
with st.spinner("đ Processing document..."):
progress_bar = st.progress(0)
progress_bar.progress(25, "đ Analyzing document structure...")
time.sleep(0.5)
progress_bar.progress(50, "đ§ Extracting metadata...")
time.sleep(0.5)
progress_bar.progress(75, "đ Creating vector embeddings...")
time.sleep(0.5)
if app.upload_document(file=uploaded_file, doc_type=doc_type):
progress_bar.progress(100, "â
Document processed successfully!")
st.balloons()
time.sleep(1)
progress_bar.empty()
else: # URL Upload
with col1:
url = st.text_input(
"đ Enter document URL:",
placeholder="https://example.com/document.pdf",
help="Enter a direct URL to a PDF document"
)
with col2:
doc_type = st.selectbox("Document Type", ["pdf", "word"], index=0)
if url and st.button("đ Load from URL & Process", type="primary"):
with st.spinner("đ Processing document from URL..."):
progress_bar = st.progress(0)
progress_bar.progress(20, "đ Downloading document...")
time.sleep(0.5)
progress_bar.progress(50, "đ Analyzing document structure...")
time.sleep(0.5)
progress_bar.progress(80, "đ§ Extracting metadata...")
time.sleep(0.5)
if app.upload_document(url=url, doc_type=doc_type):
progress_bar.progress(100, "â
Document processed successfully!")
st.balloons()
time.sleep(1)
progress_bar.empty()
# Query section
st.markdown("---")
st.markdown('đŦ Document Analysis
', unsafe_allow_html=True)
# Display session status
status = app.get_session_status()
if status and status.get("document_uploaded"):
doc_info = status.get("document_info", {})
col1, col2, col3 = st.columns(3)
with col1:
st.metric("đ Document", doc_info.get('filename', 'Unknown'))
with col2:
st.metric("đ§Š Chunks", doc_info.get('chunks_count', 0))
with col3:
st.metric("đ Type", doc_info.get('type', 'Unknown'))
# Chat interface
for message in st.session_state.messages:
with st.chat_message(message["role"]):
if message["role"] == "assistant" and "metadata" in message:
# Show the answer
st.markdown(message["content"])
# Show metadata and sources
if message.get("metadata"):
show_query_metadata(message["metadata"])
if message.get("sources"):
show_document_sources(message["sources"])
else:
st.markdown(message["content"])
# Chat input
if prompt := st.chat_input("đ Ask a question about your document..."):
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# Get AI response
with st.chat_message("assistant"):
with st.spinner("đ¤ Analyzing your question..."):
progress_bar = st.progress(0)
progress_bar.progress(25, "đ Extracting query metadata...")
time.sleep(0.3)
progress_bar.progress(50, "đ Searching document database...")
time.sleep(0.3)
progress_bar.progress(75, "đ§ Generating response...")
time.sleep(0.3)
response_data = app.query_document(prompt)
progress_bar.progress(100, "â
Analysis complete!")
time.sleep(0.5)
progress_bar.empty()
if response_data:
answer = response_data.get("answer", "No answer available")
st.markdown(answer)
# Extract and display metadata
query_metadata = response_data.get("query_metadata", {})
sources = response_data.get("sources", [])
if query_metadata:
show_query_metadata(query_metadata)
if sources:
show_document_sources(sources)
# Add to chat history with metadata
st.session_state.messages.append({
"role": "assistant",
"content": answer,
"metadata": query_metadata,
"sources": sources
})
else:
error_msg = "â Sorry, I couldn't process your question."
st.markdown(error_msg)
st.session_state.messages.append({"role": "assistant", "content": error_msg})
# Clear chat button
if st.session_state.messages:
col1, col2, col3 = st.columns([1, 1, 1])
with col2:
if st.button("đī¸ Clear Conversation", type="secondary", use_container_width=True):
st.session_state.messages = []
st.rerun()
if __name__ == "__main__":
main()