Spaces:
Build error
Build error
| # app.py | |
| import streamlit as st | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime | |
| from threading import Lock | |
| from utils.database import ( | |
| # Base database operations | |
| create_connection, | |
| create_tables, | |
| get_all_documents, | |
| # Collection operations | |
| get_collections, | |
| create_collection, | |
| add_document_to_collection, | |
| remove_from_collection, | |
| update_collection, | |
| get_collection_documents, | |
| handle_document_upload, | |
| # Search functionality | |
| search_documents | |
| ) | |
| from components.chat import display_chat_interface | |
| # Create locks for thread-safe operations | |
| conn_lock = Lock() | |
| def verify_database_tables(conn): | |
| """Verify that all required tables exist.""" | |
| try: | |
| cursor = conn.cursor() | |
| # Get list of all tables | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") | |
| tables = cursor.fetchall() | |
| st.write("Existing tables:", [table[0] for table in tables]) | |
| # If collections table doesn't exist, recreate tables | |
| if 'collections' not in [table[0] for table in tables]: | |
| st.warning("Collections table not found. Recreating tables...") | |
| create_tables(conn) | |
| # Verify again | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") | |
| tables = cursor.fetchall() | |
| st.write("Tables after recreation:", [table[0] for table in tables]) | |
| except Exception as e: | |
| st.error(f"Error verifying tables: {e}") | |
| def initialize_database(): | |
| """Initialize database with persistent storage.""" | |
| try: | |
| if 'db_conn' not in st.session_state: | |
| # Get storage path | |
| if os.path.exists('/data'): | |
| base_path = Path('/data') | |
| else: | |
| base_path = Path(os.getcwd()) / 'data' | |
| # Ensure data directory exists | |
| base_path.mkdir(parents=True, exist_ok=True) | |
| # Create database path | |
| db_path = base_path / 'rfp_analysis.db' | |
| try: | |
| # Ensure file can be created | |
| db_path.touch(exist_ok=True) | |
| except Exception as e: | |
| st.error(f"Failed to create database file: {e}") | |
| return False | |
| # Create connection | |
| conn = create_connection(str(db_path)) | |
| if conn is not None: | |
| # Create all tables (includes collection tables) | |
| create_tables(conn) | |
| st.session_state.db_conn = conn | |
| # Verify tables were created | |
| verify_database_tables(conn) | |
| return True | |
| else: | |
| return False | |
| else: | |
| # Verify tables exist in existing connection | |
| verify_database_tables(st.session_state.db_conn) | |
| return True | |
| except Exception as e: | |
| st.error(f"Database initialization error: {e}") | |
| return False | |
| def initialize_session_state(): | |
| """Initialize all session state variables.""" | |
| if 'chat_ready' not in st.session_state: | |
| st.session_state.chat_ready = False | |
| if 'current_collection' not in st.session_state: | |
| st.session_state.current_collection = None | |
| if 'processed_files' not in st.session_state: | |
| st.session_state.processed_files = [] | |
| if 'vector_stores' not in st.session_state: | |
| st.session_state.vector_stores = {} | |
| def display_top_bar(): | |
| """Display the application's top navigation bar.""" | |
| col1, col2, col3 = st.columns([1, 3, 1]) | |
| with col1: | |
| if os.path.exists("img/logo.png"): | |
| st.image("img/logo.png", width=100) | |
| else: | |
| st.warning("Logo not found at img/logo.png") | |
| with col2: | |
| st.title("Synaptyx.AI - RFP Analysis Agent") | |
| with col3: | |
| if st.session_state.current_collection: | |
| st.info(f"π Active Collection: {st.session_state.current_collection['name']}") | |
| def display_collection_sidebar(): | |
| """Display the collection management sidebar.""" | |
| with st.sidebar: | |
| st.title("π Document Manager") | |
| # Collection Selection | |
| st.header("Collections") | |
| collections = get_collections(st.session_state.db_conn) | |
| # Create New Collection button | |
| if st.button("β Create New Collection"): | |
| st.session_state.show_collection_dialog = True | |
| # Collection selector | |
| collection_options = ["All Documents"] + [c['name'] for c in collections] | |
| selected_collection = st.selectbox( | |
| "Select Collection", | |
| collection_options, | |
| index=0, | |
| key="collection_selector" | |
| ) | |
| # Update current collection in session state | |
| if selected_collection != "All Documents": | |
| st.session_state.current_collection = next( | |
| (c for c in collections if c['name'] == selected_collection), | |
| None | |
| ) | |
| else: | |
| st.session_state.current_collection = None | |
| # Upload Section | |
| st.header("Upload Documents") | |
| uploaded_files = st.file_uploader( | |
| "Upload PDF documents", | |
| type=['pdf'], | |
| accept_multiple_files=True, | |
| help="Limit 200MB per file β’ PDF" | |
| ) | |
| # Process uploads automatically when files are selected | |
| if uploaded_files: | |
| if 'processed_files' not in st.session_state or uploaded_files != st.session_state.processed_files: | |
| with st.spinner("Processing documents..."): | |
| collection_id = st.session_state.current_collection['id'] if st.session_state.current_collection else None | |
| # Create a dictionary with the collection_id | |
| upload_params = {'collection_id': collection_id} if collection_id else {} | |
| # Pass the parameters as a dictionary to handle_document_upload | |
| handle_document_upload(uploaded_files, **upload_params) | |
| st.session_state.processed_files = uploaded_files | |
| st.session_state.chat_ready = True | |
| # Switch to chat interface | |
| st.rerun() | |
| # Display current collection info | |
| if st.session_state.current_collection: | |
| st.header("Collection Info") | |
| collection = st.session_state.current_collection | |
| documents = get_collection_documents(st.session_state.db_conn, collection['id']) | |
| st.markdown(f""" | |
| **Collection:** {collection['name']} | |
| **Documents:** {len(documents)} | |
| **Created:** {collection['created_at']} | |
| """) | |
| if documents: | |
| st.subheader("Documents in Collection") | |
| for doc in documents: | |
| st.write(f"π {doc['name']}") | |
| def display_collection_dialog(): | |
| """Display the create collection dialog.""" | |
| with st.sidebar: | |
| st.subheader("Create New Collection") | |
| name = st.text_input("Collection Name") | |
| description = st.text_area("Description") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Create"): | |
| if name: | |
| if create_collection(st.session_state.db_conn, name, description): | |
| st.success(f"Collection '{name}' created!") | |
| st.session_state.show_collection_dialog = False | |
| st.rerun() | |
| else: | |
| st.error("Please enter a collection name") | |
| with col2: | |
| if st.button("Cancel"): | |
| st.session_state.show_collection_dialog = False | |
| st.rerun() | |
| def display_welcome_screen(): | |
| """Display the welcome screen with getting started information.""" | |
| st.title("π€ Welcome to SYNAPTYX") | |
| st.markdown("### Your AI-powered RFP Analysis Assistant") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown(""" | |
| #### Getting Started: | |
| 1. Upload your RFP documents using the sidebar | |
| 2. Organize documents into collections | |
| 3. Start analyzing with AI! | |
| You can: | |
| - Create multiple collections | |
| - Upload documents to specific collections | |
| - Search across all documents | |
| - Get AI-powered insights and summaries | |
| """) | |
| with col2: | |
| st.markdown("#### Example Questions:") | |
| examples = [ | |
| "π Summarize the main points of the document", | |
| "π Draft a 'Why Us' section based on the document", | |
| "π― Extract key success metrics and outcomes", | |
| "π‘ What are the innovative solutions mentioned?", | |
| "π€ Analyze the partnership benefits described", | |
| "π Compare requirements across documents", | |
| "π Find similar sections across RFPs" | |
| ] | |
| for example in examples: | |
| st.markdown(f"β’ {example}") | |
| def display_chat_area(): | |
| """Display the main chat interface or welcome screen.""" | |
| if not st.session_state.chat_ready: | |
| display_welcome_screen() | |
| else: | |
| display_chat_interface() | |
| def main(): | |
| st.set_page_config( | |
| layout="wide", | |
| page_title="SYNAPTYX - RFP Analysis Agent", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Initialize database and session state | |
| if not initialize_database(): | |
| st.error("Failed to initialize database. Please check storage system.") | |
| return | |
| initialize_session_state() | |
| # Display the top navigation bar | |
| display_top_bar() | |
| # Display the collection sidebar | |
| display_collection_sidebar() | |
| # Show collection creation dialog if triggered | |
| if st.session_state.get('show_collection_dialog', False): | |
| display_collection_dialog() | |
| # Display chat area | |
| display_chat_area() | |
| if __name__ == "__main__": | |
| main() |