import streamlit as st import os import tempfile from pathlib import Path from typing import List, Dict, Any from utils.pdf_processor import PDFProcessor from utils.vector_store import VectorStoreManager from utils.rag_chain import InsuranceRAGChain from config import Config # Page configuration st.set_page_config( page_title="Insurance Helper", layout="wide", initial_sidebar_state="expanded" ) # Custom CSS st.markdown(""" """, unsafe_allow_html=True) def initialize_session_state(): """Initialize session state variables""" if 'messages' not in st.session_state: st.session_state.messages = [] if 'uploaded_files' not in st.session_state: st.session_state.uploaded_files = [] if 'documents_processed' not in st.session_state: st.session_state.documents_processed = True # if 'pdf_processor' not in st.session_state: st.session_state.pdf_processor = PDFProcessor() if 'vs_manager' not in st.session_state: try: st.session_state.vs_manager = VectorStoreManager() except Exception as e: st.error(f"Failed to initialize Vector Store: {str(e)}") st.stop() if 'rag_chain' not in st.session_state: st.session_state.rag_chain = InsuranceRAGChain(st.session_state.vs_manager) if 'collection_created' not in st.session_state: st.session_state.collection_created = True def detect_query_intent(question: str) -> str: """ Automatically detect the intent of the user's question Args: question: User's question Returns: Query mode string """ question_lower = question.lower() # Check for add-on related queries addon_keywords = ['addon', 'add-on', 'rider', 'optional cover', 'additional cover', 'recommend', 'should i take', 'which cover', 'extra protection'] if any(keyword in question_lower for keyword in addon_keywords): return "addons" # Check for exclusion related queries exclusion_keywords = ['exclusion', 'not covered', 'does not cover', "doesn't cover", 'what is excluded', 'not include', 'gap', 'missing'] if any(keyword in question_lower for keyword in exclusion_keywords): return "exclusions" # Check for coverage related queries coverage_keywords = ['what is covered', 'coverage', 'what does it cover', 'included', 'protection', 'insured for', 'claim for'] if any(keyword in question_lower for keyword in coverage_keywords): return "coverage" # Check for term explanation queries term_keywords = ['explain', 'what is', 'what does', 'meaning of', 'define', 'idv', 'ncb', 'depreciation', 'premium', 'term'] if any(keyword in question_lower for keyword in term_keywords): return "terms" # Default to general query return "general" def save_uploaded_file(uploaded_file) -> str: """Save uploaded file to temporary directory""" try: temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, uploaded_file.name) with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) return file_path except Exception as e: st.error(f"Error saving file: {str(e)}") return None def process_pdfs(file_paths: List[str]) -> bool: """Process PDFs and add to vector store""" try: with st.spinner("Processing PDFs..."): # Process all PDFs all_chunks, all_metadata = st.session_state.pdf_processor.process_multiple_pdfs(file_paths) if not all_chunks: st.error("No content extracted from PDFs") return False # Create collection if not exists if not st.session_state.collection_created: st.session_state.vs_manager.create_collection(recreate=False) st.session_state.collection_created = True # Add documents to vector store st.session_state.vs_manager.add_documents(all_chunks) st.success(f"Successfully processed {len(file_paths)} PDF(s) with {len(all_chunks)} chunks") return True except Exception as e: st.error(f"Error processing PDFs: {str(e)}") return False def display_message(message: Dict[str, Any]): """Display a chat message""" with st.chat_message(message["role"]): st.markdown(message["content"]) # Display sources if available if "sources" in message and message["sources"]: with st.expander(f"View {len(message['sources'])} Sources"): for source in message["sources"]: st.markdown(f"""