Spaces:
Build error
Build error
| # app.py | |
| import streamlit as st | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime | |
| from threading import Lock | |
| from utils.database import ( | |
| # Base database operations | |
| create_connection, | |
| create_tables, | |
| get_all_documents, | |
| force_recreate_collections_tables, | |
| # Collection operations | |
| get_collections, | |
| create_collection, | |
| add_document_to_collection, | |
| remove_from_collection, | |
| update_collection, | |
| get_collection_documents, | |
| handle_document_upload, | |
| verify_database_tables, | |
| initialize_qa_system, | |
| get_embeddings_model, | |
| # Search functionality | |
| search_documents | |
| ) | |
| # Component imports | |
| from components.chat import display_chat_interface | |
| from components.collection_manager import ( | |
| display_enhanced_collections, | |
| show_collection_creation_dialog | |
| ) | |
| # Optional analytics import if you're using it | |
| from utils.analytics import display_analytics_dashboard | |
| from components.chat import display_chat_interface | |
| from components.document_store import display_documents_tab | |
| # Create locks for thread-safe operations | |
| conn_lock = Lock() | |
| if not os.path.exists('/data'): | |
| try: | |
| from setup import setup_directories | |
| if not setup_directories(): | |
| raise Exception("Failed to set up directories") | |
| except Exception as e: | |
| st.error(f"Setup error: {e}") | |
| st.stop() | |
| def verify_database_tables(conn): | |
| """Verify that all required tables exist.""" | |
| try: | |
| cursor = conn.cursor() | |
| # Get list of all tables | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") | |
| tables = [table[0] for table in cursor.fetchall()] | |
| # Check for required tables | |
| required_tables = { | |
| 'documents', 'queries', 'annotations', | |
| 'collections', 'document_collections', 'chats', 'chat_messages' | |
| } | |
| missing_tables = required_tables - set(tables) | |
| # If collections table doesn't exist, force recreate it | |
| if 'collections' not in tables: | |
| if force_recreate_collections_tables(conn): | |
| st.session_state.show_debug = True | |
| else: | |
| st.error("Failed to create required database tables.") | |
| return False | |
| return True | |
| except Exception as e: | |
| st.error(f"Error verifying database tables: {e}") | |
| return False | |
| def initialize_database(): | |
| """Initialize database with persistent storage.""" | |
| try: | |
| if 'db_conn' not in st.session_state: | |
| # Get storage path | |
| if os.path.exists('/data'): | |
| base_path = Path('/data') | |
| else: | |
| base_path = Path(os.getcwd()) / 'data' | |
| # Ensure data directory exists | |
| base_path.mkdir(parents=True, exist_ok=True) | |
| # Create database path | |
| db_path = base_path / 'rfp_analysis.db' | |
| try: | |
| # Ensure file can be created | |
| db_path.touch(exist_ok=True) | |
| except Exception as e: | |
| st.error(f"Failed to create database file: {e}") | |
| return False | |
| # Create connection | |
| conn = create_connection(str(db_path)) | |
| if conn is not None: | |
| # Create all tables (includes collection tables) | |
| create_tables(conn) | |
| st.session_state.db_conn = conn | |
| # Verify tables were created | |
| verify_database_tables(conn) | |
| return True | |
| else: | |
| return False | |
| else: | |
| # Verify tables exist in existing connection | |
| verify_database_tables(st.session_state.db_conn) | |
| return True | |
| except Exception as e: | |
| st.error(f"Database initialization error: {e}") | |
| return False | |
| def initialize_session_state(): | |
| """Initialize all session state variables.""" | |
| if 'chat_ready' not in st.session_state: | |
| st.session_state.chat_ready = False | |
| if 'current_collection' not in st.session_state: | |
| st.session_state.current_collection = None | |
| if 'current_document' not in st.session_state: | |
| st.session_state.current_document = None | |
| if 'processed_files' not in st.session_state: | |
| st.session_state.processed_files = [] | |
| if 'vector_stores' not in st.session_state: | |
| st.session_state.vector_stores = {} | |
| if 'selected_collection' not in st.session_state: | |
| st.session_state.selected_collection = None | |
| if 'show_collection_dialog' not in st.session_state: | |
| st.session_state.show_collection_dialog = False | |
| if 'selected_collection' not in st.session_state: | |
| st.session_state.selected_collection = None | |
| def display_top_bar(): | |
| """Display the application's top navigation bar.""" | |
| col1, col2, col3 = st.columns([1, 3, 1]) | |
| with col1: | |
| if os.path.exists("img/logo.png"): | |
| st.image("img/logo.png", width=100) | |
| else: | |
| st.warning("Logo not found at img/logo.png") | |
| with col2: | |
| st.title("Synaptyx.AI - RFP Analysis Agent") | |
| with col3: | |
| if st.session_state.current_collection: | |
| st.info(f"π Active Collection: {st.session_state.current_collection['name']}") | |
| def initialize_chat_from_existing(): | |
| """Initialize chat system from existing documents in database.""" | |
| if not st.session_state.get('chat_ready'): | |
| try: | |
| documents = get_all_documents(st.session_state.db_conn) | |
| if documents: | |
| # Initialize vector store and QA system with existing documents | |
| embeddings = get_embeddings_model() | |
| chunks = [] | |
| for doc in documents: | |
| doc_chunks = text_splitter.split_text(doc['content']) | |
| for chunk in doc_chunks: | |
| chunks.append({ | |
| 'content': chunk, | |
| 'metadata': {'source': doc['name'], 'document_id': doc['id']} | |
| }) | |
| vector_store = FAISS.from_texts( | |
| [chunk['content'] for chunk in chunks], | |
| embeddings, | |
| [chunk['metadata'] for chunk in chunks] | |
| ) | |
| st.session_state.vector_store = vector_store | |
| st.session_state.qa_system = initialize_qa_system(vector_store) | |
| st.session_state.chat_ready = True | |
| return True | |
| except Exception as e: | |
| st.error(f"Error initializing chat: {e}") | |
| return False | |
| def display_collection_sidebar(): | |
| """Display the document management sidebar.""" | |
| with st.sidebar: | |
| st.title("π Document Manager") | |
| # Collection Creation Button | |
| if st.button("β Create New Collection", use_container_width=True): | |
| st.session_state.show_collection_dialog = True | |
| # Upload Section | |
| st.header("Upload Documents") | |
| # Get collections for dropdown | |
| collections = get_collections(st.session_state.db_conn) | |
| if collections: | |
| upload_to = st.selectbox( | |
| "Upload to", | |
| options=["Select Collection..."] + [c['name'] for c in collections], | |
| key="upload_destination" | |
| ) | |
| uploaded_files = st.file_uploader( | |
| "Upload PDF documents", | |
| type=['pdf'], | |
| accept_multiple_files=True, | |
| help="Limit 200MB per file β’ PDF" | |
| ) | |
| if uploaded_files and upload_to != "Select Collection...": | |
| collection_id = next( | |
| (c['id'] for c in collections if c['name'] == upload_to), | |
| None | |
| ) | |
| if collection_id: | |
| handle_document_upload(uploaded_files, collection_id=collection_id) | |
| # Show collection creation dialog if triggered | |
| if st.session_state.get('show_collection_dialog', False): | |
| show_collection_creation_dialog() | |
| def display_collection_dialog(): | |
| """Display the create collection dialog.""" | |
| with st.sidebar: | |
| st.subheader("Create New Collection") | |
| name = st.text_input("Collection Name", key="sidebar_collection_name") | |
| description = st.text_area("Description", key="sidebar_collection_desc") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Create", key="sidebar_create_btn"): | |
| if name: | |
| if create_collection(st.session_state.db_conn, name, description): | |
| st.success(f"Collection '{name}' created!") | |
| st.session_state.show_collection_dialog = False | |
| st.rerun() | |
| else: | |
| st.error("Please enter a collection name") | |
| with col2: | |
| if st.button("Cancel", key="sidebar_cancel_btn"): | |
| st.session_state.show_collection_dialog = False | |
| st.rerun() | |
| def display_welcome_screen(): | |
| """Display the welcome screen with getting started information.""" | |
| st.title("π€ Welcome to SYNAPTYX") | |
| st.markdown("### Your AI-powered RFP Analysis Assistant") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown(""" | |
| #### Getting Started: | |
| 1. Upload your RFP documents using the sidebar | |
| 2. Organize documents into collections | |
| 3. Start analyzing with AI! | |
| You can: | |
| - Create multiple collections | |
| - Upload documents to specific collections | |
| - Search across all documents | |
| - Get AI-powered insights and summaries | |
| """) | |
| with col2: | |
| st.markdown("#### Example Questions:") | |
| examples = [ | |
| "π Summarize the main points of the document", | |
| "π Draft a 'Why Us' section based on the document", | |
| "π― Extract key success metrics and outcomes", | |
| "π‘ What are the innovative solutions mentioned?", | |
| "π€ Analyze the partnership benefits described", | |
| "π Compare requirements across documents", | |
| "π Find similar sections across RFPs" | |
| ] | |
| for example in examples: | |
| st.markdown(f"β’ {example}") | |
| def display_chat_area(): | |
| """Display the main chat interface or welcome screen.""" | |
| if not st.session_state.chat_ready: | |
| display_welcome_screen() | |
| else: | |
| display_chat_interface() | |
| def main(): | |
| st.set_page_config(layout="wide") | |
| # Initialize database and session state | |
| initialize_database() | |
| initialize_session_state() | |
| # Display sidebar | |
| display_collection_sidebar() | |
| # Main content area tabs | |
| tab1, tab2 = st.tabs(["Chat", "Collections"]) | |
| with tab1: | |
| if st.session_state.chat_ready: | |
| display_chat_interface() | |
| else: | |
| display_welcome_screen() | |
| with tab2: | |
| from components.collection_manager import display_enhanced_collections | |
| display_enhanced_collections() | |
| if __name__ == "__main__": | |
| main() |