# app.py import streamlit as st import os from pathlib import Path from typing import Dict, List, Optional, Any from datetime import datetime from threading import Lock from dataclasses import dataclass from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import FAISS from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.chat_models import ChatOpenAI from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder # MessagesPlaceholder from here from langchain_core.messages import HumanMessage, AIMessage, SystemMessage # Remove MessagesPlaceholder from here from langchain.chains import ConversationalRetrievalChain from langchain_core.runnables import RunnablePassthrough from langchain.memory import ConversationBufferMemory from utils.database import ( # Base database operations create_connection, create_tables, get_all_documents, force_recreate_collections_tables, # Collection operations get_collections, create_collection, add_document_to_collection, remove_from_collection, update_collection, get_collection_documents, handle_document_upload, verify_database_tables, initialize_qa_system, get_embeddings_model, # Search functionality search_documents ) # Component imports from components.chat import display_chat_interface from components.collection_manager import ( display_enhanced_collections, show_collection_creation_dialog ) # Optional analytics import if you're using it from utils.analytics import display_analytics_dashboard from components.chat import display_chat_interface from components.document_store import display_documents_tab # Create locks for thread-safe operations conn_lock = Lock() if not os.path.exists('/data'): try: from setup import setup_directories if not setup_directories(): raise Exception("Failed to set up directories") except Exception as e: st.error(f"Setup error: {e}") st.stop() def handle_chat_state_change(): """Handle changes in chat state when switching tabs or collections.""" if st.session_state.get('chat_ready'): # Check if we need to reinitialize with new documents if st.session_state.get('reinitialize_chat'): initialize_chat_from_collection() st.session_state.reinitialize_chat = False def verify_database_tables(conn): """Verify that all required tables exist.""" try: cursor = conn.cursor() # Get list of all tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [table[0] for table in cursor.fetchall()] # Check for required tables required_tables = { 'documents', 'queries', 'annotations', 'collections', 'document_collections', 'chats', 'chat_messages' } missing_tables = required_tables - set(tables) # If collections table doesn't exist, force recreate it if 'collections' not in tables: if force_recreate_collections_tables(conn): st.session_state.show_debug = True else: st.error("Failed to create required database tables.") return False return True except Exception as e: st.error(f"Error verifying database tables: {e}") return False def initialize_database(): """Initialize database with persistent storage.""" try: if 'db_conn' not in st.session_state: # Get storage path if os.path.exists('/data'): base_path = Path('/data') else: base_path = Path(os.getcwd()) / 'data' # Ensure data directory exists base_path.mkdir(parents=True, exist_ok=True) # Create database path db_path = base_path / 'rfp_analysis.db' try: # Ensure file can be created db_path.touch(exist_ok=True) except Exception as e: st.error(f"Failed to create database file: {e}") return False # Create connection conn = create_connection(str(db_path)) if conn is not None: # Create all tables (includes collection tables) create_tables(conn) st.session_state.db_conn = conn # Verify tables were created verify_database_tables(conn) return True else: return False else: # Verify tables exist in existing connection verify_database_tables(st.session_state.db_conn) return True except Exception as e: st.error(f"Database initialization error: {e}") return False @dataclass class SessionStateDefaults: """Default values for session state variables.""" # Collection Management show_collection_dialog: bool = False selected_collection: Optional[Dict] = None current_collection: Optional[Dict] = None # Chat System chat_ready: bool = False messages: Optional[List] = None # Changed from list[BaseMessage] current_chat_id: Optional[int] = None # Document Processing vector_store: Optional[Any] = None qa_system: Optional[Any] = None # State Management reinitialize_chat: bool = False debug_mode: bool = False def initialize_session_state() -> None: """ Initialize all session state variables with proper typing and documentation. """ defaults = SessionStateDefaults() # Dictionary mapping state variables to their descriptions state_vars: Dict[str, Dict[str, Any]] = { 'show_collection_dialog': { 'default': defaults.show_collection_dialog, 'desc': 'Controls visibility of collection creation dialog' }, 'selected_collection': { 'default': defaults.selected_collection, 'desc': 'Currently selected collection for viewing/editing' }, 'current_collection': { 'default': defaults.current_collection, 'desc': 'Active collection for chat/analysis' }, 'chat_ready': { 'default': defaults.chat_ready, 'desc': 'Indicates if chat system is initialized and ready' }, 'messages': { 'default': [] if defaults.messages is None else defaults.messages, 'desc': 'List of chat messages in the current session' }, 'current_chat_id': { 'default': defaults.current_chat_id, 'desc': 'ID of the current chat session' }, 'vector_store': { 'default': defaults.vector_store, 'desc': 'FAISS vector store for document embeddings' }, 'qa_system': { 'default': defaults.qa_system, 'desc': 'Initialized QA system for chat' }, 'reinitialize_chat': { 'default': defaults.reinitialize_chat, 'desc': 'Flag to trigger chat system reinitialization' }, 'debug_mode': { 'default': defaults.debug_mode, 'desc': 'Enable/disable debug information' } } # Initialize each state variable if not already present for var_name, config in state_vars.items(): if var_name not in st.session_state: st.session_state[var_name] = config['default'] def reset_chat_state() -> None: """Reset chat-related session state variables to defaults.""" st.session_state.messages = [] st.session_state.current_chat_id = None st.session_state.chat_ready = False st.session_state.qa_system = None def reset_collection_state() -> None: """Reset collection-related session state variables to defaults.""" st.session_state.selected_collection = None st.session_state.current_collection = None st.session_state.show_collection_dialog = False def get_current_state() -> Dict[str, Any]: """ Get the current state of all session variables. Useful for debugging and state management. """ return { key: value for key, value in st.session_state.items() if not key.startswith('_') # Exclude internal Streamlit states } def display_top_bar(): """Display the application's top navigation bar.""" col1, col2, col3 = st.columns([1, 3, 1]) with col1: if os.path.exists("img/logo.png"): st.image("img/logo.png", width=100) else: st.warning("Logo not found at img/logo.png") with col2: st.title("Synaptyx.AI - RFP Analysis Agent") with col3: if st.session_state.current_collection: st.info(f"📁 Active Collection: {st.session_state.current_collection['name']}") def initialize_chat_from_existing(): """Initialize chat system from existing documents in database.""" if not st.session_state.get('chat_ready'): try: documents = get_all_documents(st.session_state.db_conn) if documents: # Initialize vector store and QA system with existing documents embeddings = get_embeddings_model() chunks = [] for doc in documents: doc_chunks = text_splitter.split_text(doc['content']) for chunk in doc_chunks: chunks.append({ 'content': chunk, 'metadata': {'source': doc['name'], 'document_id': doc['id']} }) vector_store = FAISS.from_texts( [chunk['content'] for chunk in chunks], embeddings, [chunk['metadata'] for chunk in chunks] ) st.session_state.vector_store = vector_store st.session_state.qa_system = initialize_qa_system(vector_store) st.session_state.chat_ready = True return True except Exception as e: st.error(f"Error initializing chat: {e}") return False def initialize_chat_from_collection(): """Initialize chat system with vector store reuse.""" try: documents = None if st.session_state.get('current_collection'): documents = get_collection_documents(st.session_state.db_conn, st.session_state.current_collection['id']) else: documents = get_all_documents(st.session_state.db_conn) if documents: document_ids = [doc['id'] for doc in documents] # Check for existing vector store vector_store = get_existing_vector_store(document_ids) if vector_store: # Reuse existing vector store st.session_state.vector_store = vector_store st.session_state.qa_system = initialize_qa_system(vector_store) return True # If no existing vector store, create new one with st.spinner("Initializing chat system..."): embeddings = get_embeddings_model() text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=50, length_function=len, separators=["\n\n", "\n", " ", ""] ) chunks = [] for doc in documents: doc_chunks = text_splitter.split_text(doc['content']) for chunk in doc_chunks: chunks.append({ 'content': chunk, 'metadata': {'source': doc['name'], 'document_id': doc['id']} }) vector_store = FAISS.from_texts( [chunk['content'] for chunk in chunks], embeddings, [chunk['metadata'] for chunk in chunks] ) st.session_state.vector_store = vector_store st.session_state.qa_system = initialize_qa_system(vector_store) return True return False except Exception as e: st.error(f"Error initializing chat: {e}") return False def display_collection_sidebar(): """Display enhanced sidebar with collection management.""" with st.sidebar: st.title("💬 Chat Controls") if st.button("🔄 Start New Chat", use_container_width=True): st.session_state.messages = [] st.session_state.current_chat_id = None if st.session_state.chat_ready: st.rerun() st.divider() # Collection Management st.title("📚 Collections") collections = get_collections(st.session_state.db_conn) if st.button("➕ Create Collection", use_container_width=True): st.session_state.show_collection_dialog = True # Collection selection with document preview if collections: selected = st.selectbox( "Select Collection", options=["All Documents"] + [c['name'] for c in collections], key="collection_select" ) if selected != "All Documents": collection = next((c for c in collections if c['name'] == selected), None) if collection: documents = get_collection_documents(st.session_state.db_conn, collection['id']) if documents: st.markdown("### Documents in Collection") for doc in documents: with st.expander(f"📄 {doc['name']}", expanded=False): st.caption(f"Uploaded: {doc['upload_date']}") if st.button("Use for Chat", key=f"use_{doc['id']}"): initialize_chat_for_document(doc['id']) def display_collection_dialog(): """Display the create collection dialog.""" with st.sidebar: st.subheader("Create New Collection") name = st.text_input("Collection Name", key="sidebar_collection_name") description = st.text_area("Description", key="sidebar_collection_desc") col1, col2 = st.columns(2) with col1: if st.button("Create", key="sidebar_create_btn"): if name: if create_collection(st.session_state.db_conn, name, description): st.success(f"Collection '{name}' created!") st.session_state.show_collection_dialog = False st.rerun() else: st.error("Please enter a collection name") with col2: if st.button("Cancel", key="sidebar_cancel_btn"): st.session_state.show_collection_dialog = False st.rerun() def display_welcome_screen(): """Display the welcome screen with getting started information and enhanced features.""" st.title("🤖 Welcome to SYNAPTYX") st.markdown("### Your AI-powered RFP Analysis Assistant") # Check for existing documents documents = get_all_documents(st.session_state.db_conn) collections = get_collections(st.session_state.db_conn) col1, col2 = st.columns(2) with col1: st.markdown(""" #### Getting Started: 1. Upload your RFP documents using the sidebar 2. Organize documents into collections 3. Start analyzing with AI! You can: - Create multiple collections - Upload documents to specific collections - Search across all documents - Get AI-powered insights and summaries """) # Add action buttons if documents exist if documents: st.success(f"🎉 You have {len(documents)} documents ready for analysis!") col_a, col_b = st.columns(2) with col_a: if st.button("Start Chatting", use_container_width=True): if initialize_chat_from_collection(): st.session_state.chat_ready = True st.rerun() with col_b: if st.button("View Documents", use_container_width=True): st.session_state.show_document_store = True st.rerun() with col2: st.markdown("#### Example Questions:") examples = [ "📊 Summarize the main points of the document", "📝 Draft a 'Why Us' section based on the document", "🎯 Extract key success metrics and outcomes", "💡 What are the innovative solutions mentioned?", "🤝 Analyze the partnership benefits described", "📈 Compare requirements across documents", "🔍 Find similar sections across RFPs" ] for example in examples: st.markdown(f"• {example}") # Add collection stats if they exist if collections: st.markdown("---") st.markdown("#### Your Collections:") for collection in collections[:3]: # Show top 3 collections with st.container(): st.markdown(f""" 🗂️ **{collection['name']}** Documents: {collection['doc_count']} """) if len(collections) > 3: st.caption("... and more collections") def display_chat_area(): """Display the main chat interface or welcome screen with proper state management.""" if not st.session_state.chat_ready: display_welcome_screen() else: col1, col2 = st.columns([3, 1]) with col1: display_chat_interface() with col2: # Document context panel st.markdown("### Current Context") if st.session_state.current_collection: st.info(f"📁 Using Collection: {st.session_state.current_collection['name']}") else: st.info("📚 Using All Documents") # Quick actions st.markdown("### Quick Actions") if st.button("🔄 New Chat", use_container_width=True): reset_chat_state() st.rerun() if st.button("📋 Export Chat", use_container_width=True): export_chat_history() # Example questions for quick reference with st.expander("💡 Question Examples", expanded=False): examples = [ "Summarize main points", "Extract requirements", "Compare solutions", "Analyze pricing" ] for example in examples: if st.button(example, key=f"example_{example}"): # Set the example as the current question st.session_state.current_question = example st.rerun() def export_chat_history(): """Export current chat history.""" if st.session_state.messages: chat_text = "\n\n".join([ f"{'User' if isinstance(m, HumanMessage) else 'Assistant'}: {m.content}" for m in st.session_state.messages ]) st.download_button( "Download Chat History", chat_text, file_name=f"chat_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt", mime="text/plain" ) def main(): """Main application function.""" # Page configuration st.set_page_config( layout="wide", page_title="SYNAPTYX - RFP Analysis Agent", initial_sidebar_state="expanded" ) # Initialize database and session state if not initialize_database(): st.error("Failed to initialize database. Please contact support.") return initialize_session_state() # Top bar with larger logo col1, col2, col3 = st.columns([1.5, 4, 1]) with col1: if os.path.exists("img/logo.png"): st.image("img/logo.png", width=200) # Increased logo size else: st.info("Logo not found") with col2: st.title("SYNAPTYX - RFP Analysis Agent") with col3: if st.session_state.current_collection: st.info(f"📁 Active Collection: {st.session_state.current_collection['name']}") # Sidebar with st.sidebar: # Chat Controls Section st.title("💬 Chat Controls") # New Chat button if st.button("🔄 Start New Chat", use_container_width=True): reset_chat_state() st.rerun() st.divider() # Document Manager Section st.title("📚 Document Manager") # Collection Creation Button if st.button("➕ Create New Collection", use_container_width=True): st.session_state.show_collection_dialog = True # Collection Selection collections = get_collections(st.session_state.db_conn) if collections: selected = st.selectbox( "Select Collection", options=["All Documents"] + [c['name'] for c in collections], key="collection_select" ) if selected != "All Documents": collection = next((c for c in collections if c['name'] == selected), None) if collection: st.session_state.current_collection = collection display_collection_documents(collection['id']) # Upload Section st.header("Upload Documents") uploaded_files = st.file_uploader( "Upload PDF documents", type=['pdf'], accept_multiple_files=True, help="Limit 200MB per file • PDF" ) if uploaded_files: handle_document_upload(uploaded_files, collection_id=st.session_state.current_collection.get('id') if st.session_state.current_collection else None) # Show collection creation dialog if triggered if st.session_state.show_collection_dialog: show_collection_creation_dialog() # Main content area if st.session_state.chat_ready: display_chat_interface() else: display_welcome_screen() # Debug mode (if enabled) if st.session_state.debug_mode and st.sidebar.checkbox("Show Debug Info"): st.sidebar.write("Current State:", get_current_state()) def display_collection_documents(collection_id: int): """Display documents in the selected collection.""" documents = get_collection_documents(st.session_state.db_conn, collection_id) if documents: st.markdown("### Documents in Collection") for doc in documents: with st.expander(f"📄 {doc['name']}", expanded=False): st.caption(f"Uploaded: {doc['upload_date']}") col1, col2 = st.columns(2) with col1: if st.button("Preview", key=f"preview_{doc['id']}"): st.text_area( "Content Preview", value=doc['content'][:500] + "..." if len(doc['content']) > 500 else doc['content'], height=100, disabled=True ) with col2: if st.button("Use for Chat", key=f"chat_{doc['id']}"): initialize_chat_from_collection() st.rerun() if __name__ == "__main__": main()