import streamlit as st import os import tempfile from pathlib import Path from typing import List, Dict, Any from utils.pdf_processor import PDFProcessor from utils.vector_store import VectorStoreManager from utils.rag_chain import InsuranceRAGChain from config import Config # Page configuration st.set_page_config( page_title="Insurance Helper", layout="wide", initial_sidebar_state="expanded" ) # Custom CSS st.markdown(""" """, unsafe_allow_html=True) def initialize_session_state(): """Initialize session state variables""" if 'messages' not in st.session_state: st.session_state.messages = [] if 'uploaded_files' not in st.session_state: st.session_state.uploaded_files = [] if 'documents_processed' not in st.session_state: st.session_state.documents_processed = True # if 'pdf_processor' not in st.session_state: st.session_state.pdf_processor = PDFProcessor() if 'vs_manager' not in st.session_state: try: st.session_state.vs_manager = VectorStoreManager() except Exception as e: st.error(f"Failed to initialize Vector Store: {str(e)}") st.stop() if 'rag_chain' not in st.session_state: st.session_state.rag_chain = InsuranceRAGChain(st.session_state.vs_manager) if 'collection_created' not in st.session_state: st.session_state.collection_created = True def detect_query_intent(question: str) -> str: """ Automatically detect the intent of the user's question Args: question: User's question Returns: Query mode string """ question_lower = question.lower() # Check for add-on related queries addon_keywords = ['addon', 'add-on', 'rider', 'optional cover', 'additional cover', 'recommend', 'should i take', 'which cover', 'extra protection'] if any(keyword in question_lower for keyword in addon_keywords): return "addons" # Check for exclusion related queries exclusion_keywords = ['exclusion', 'not covered', 'does not cover', "doesn't cover", 'what is excluded', 'not include', 'gap', 'missing'] if any(keyword in question_lower for keyword in exclusion_keywords): return "exclusions" # Check for coverage related queries coverage_keywords = ['what is covered', 'coverage', 'what does it cover', 'included', 'protection', 'insured for', 'claim for'] if any(keyword in question_lower for keyword in coverage_keywords): return "coverage" # Check for term explanation queries term_keywords = ['explain', 'what is', 'what does', 'meaning of', 'define', 'idv', 'ncb', 'depreciation', 'premium', 'term'] if any(keyword in question_lower for keyword in term_keywords): return "terms" # Default to general query return "general" def save_uploaded_file(uploaded_file) -> str: """Save uploaded file to temporary directory""" try: temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, uploaded_file.name) with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) return file_path except Exception as e: st.error(f"Error saving file: {str(e)}") return None def process_pdfs(file_paths: List[str]) -> bool: """Process PDFs and add to vector store""" try: with st.spinner("Processing PDFs..."): # Process all PDFs all_chunks, all_metadata = st.session_state.pdf_processor.process_multiple_pdfs(file_paths) if not all_chunks: st.error("No content extracted from PDFs") return False # Create collection if not exists if not st.session_state.collection_created: st.session_state.vs_manager.create_collection(recreate=False) st.session_state.collection_created = True # Add documents to vector store st.session_state.vs_manager.add_documents(all_chunks) st.success(f"Successfully processed {len(file_paths)} PDF(s) with {len(all_chunks)} chunks") return True except Exception as e: st.error(f"Error processing PDFs: {str(e)}") return False def display_message(message: Dict[str, Any]): """Display a chat message""" with st.chat_message(message["role"]): st.markdown(message["content"]) # Display sources if available if "sources" in message and message["sources"]: with st.expander(f"View {len(message['sources'])} Sources"): for source in message["sources"]: st.markdown(f"""
Source {source['index']}: {source['source_file']} (Page {source['page']})
Section: {source['section_type']}
Preview: {source['content_preview']}
""", unsafe_allow_html=True) def sidebar(): """Render sidebar with controls""" with st.sidebar: # Smart mode indicator st.markdown("#### Smart Mode") st.info("The system automatically detects your question type and uses the best search strategy!") st.markdown("---") # Show what queries trigger what modes with st.expander("How Smart Mode Works"): st.markdown(""" **Exclusions Mode** - "What's not covered?" - "What are the exclusions?" **Coverage Mode** - "What is covered?" - "What does this policy include?" **Terms Explanation** - "Explain IDV" - "What does NCB mean?" **General Mode** - Everything else """) st.markdown("---") # Clear chat button if st.button("Clear Chat History", use_container_width=True): st.session_state.messages = [] st.rerun() def main(): """Main application""" # Initialize session state initialize_session_state() # Render sidebar sidebar() # Main header st.markdown('
Insurance Helper
', unsafe_allow_html=True) st.markdown('
Understanding your insurance made easy
', unsafe_allow_html=True) # Display chat messages for message in st.session_state.messages: display_message(message) # Chat input if prompt := st.chat_input("Ask about your insurance..."): # Add user message st.session_state.messages.append({"role": "user", "content": prompt}) display_message({"role": "user", "content": prompt}) # Get response based on auto-detected query intent with st.chat_message("assistant"): with st.spinner("Thinking..."): try: # Automatically detect query intent detected_intent = detect_query_intent(prompt) intent_name = { "addons": "Add-ons Analysis", "exclusions": "Exclusions Check", "coverage": "Coverage Analysis", "terms": "Term Explanation", "general": "General Query" } st.caption(f"{intent_name.get(detected_intent, 'General Query')}") # Route to appropriate query method if detected_intent == "addons": result = st.session_state.rag_chain.query_specific_section( prompt, section_type="addons" ) elif detected_intent == "exclusions": result = st.session_state.rag_chain.query_specific_section( prompt, section_type="exclusions" ) elif detected_intent == "coverage": result = st.session_state.rag_chain.query_specific_section( prompt, section_type="coverage" ) else: # terms or general result = st.session_state.rag_chain.query(prompt) # Display answer st.markdown(result["answer"]) # Display sources if "sources" in result and result["sources"]: with st.expander(f"View {len(result['sources'])} Sources"): for source in result["sources"]: st.markdown(f"""
Source {source['index']}: {source['source_file']} (Page {source['page']})
Section: {source['section_type']}
Preview: {source['content_preview']}
""", unsafe_allow_html=True) # Add assistant message to history st.session_state.messages.append({ "role": "assistant", "content": result["answer"], "sources": result.get("sources", []) }) except Exception as e: error_msg = f"Error processing query: {str(e)}" st.error(error_msg) st.session_state.messages.append({ "role": "assistant", "content": error_msg }) if __name__ == "__main__": try: Config.validate_config() main() except ValueError as e: st.error(f"Configuration Error: {str(e)}") st.info("Please ensure your .env file contains all required API keys.")