cryogenic22's picture
Update app.py
786a572 verified
# app.py
import streamlit as st
import os
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime
from threading import Lock
from dataclasses import dataclass
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder # MessagesPlaceholder from here
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage # Remove MessagesPlaceholder from here
from langchain.chains import ConversationalRetrievalChain
from langchain_core.runnables import RunnablePassthrough
from langchain.memory import ConversationBufferMemory
from utils.database import (
# Base database operations
create_connection,
create_tables,
get_all_documents,
force_recreate_collections_tables,
# Collection operations
get_collections,
create_collection,
add_document_to_collection,
remove_from_collection,
update_collection,
get_collection_documents,
handle_document_upload,
verify_database_tables,
initialize_qa_system,
get_embeddings_model,
# Search functionality
search_documents
)
# Component imports
from components.chat import display_chat_interface
from components.collection_manager import (
display_enhanced_collections,
show_collection_creation_dialog
)
# Optional analytics import if you're using it
from utils.analytics import display_analytics_dashboard
from components.chat import display_chat_interface
from components.document_store import display_documents_tab
# Create locks for thread-safe operations
conn_lock = Lock()
if not os.path.exists('/data'):
try:
from setup import setup_directories
if not setup_directories():
raise Exception("Failed to set up directories")
except Exception as e:
st.error(f"Setup error: {e}")
st.stop()
def handle_chat_state_change():
"""Handle changes in chat state when switching tabs or collections."""
if st.session_state.get('chat_ready'):
# Check if we need to reinitialize with new documents
if st.session_state.get('reinitialize_chat'):
initialize_chat_from_collection()
st.session_state.reinitialize_chat = False
def verify_database_tables(conn):
"""Verify that all required tables exist."""
try:
cursor = conn.cursor()
# Get list of all tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [table[0] for table in cursor.fetchall()]
# Check for required tables
required_tables = {
'documents', 'queries', 'annotations',
'collections', 'document_collections', 'chats', 'chat_messages'
}
missing_tables = required_tables - set(tables)
# If collections table doesn't exist, force recreate it
if 'collections' not in tables:
if force_recreate_collections_tables(conn):
st.session_state.show_debug = True
else:
st.error("Failed to create required database tables.")
return False
return True
except Exception as e:
st.error(f"Error verifying database tables: {e}")
return False
def initialize_database():
"""Initialize database with persistent storage."""
try:
if 'db_conn' not in st.session_state:
# Get storage path
if os.path.exists('/data'):
base_path = Path('/data')
else:
base_path = Path(os.getcwd()) / 'data'
# Ensure data directory exists
base_path.mkdir(parents=True, exist_ok=True)
# Create database path
db_path = base_path / 'rfp_analysis.db'
try:
# Ensure file can be created
db_path.touch(exist_ok=True)
except Exception as e:
st.error(f"Failed to create database file: {e}")
return False
# Create connection
conn = create_connection(str(db_path))
if conn is not None:
# Create all tables (includes collection tables)
create_tables(conn)
st.session_state.db_conn = conn
# Verify tables were created
verify_database_tables(conn)
return True
else:
return False
else:
# Verify tables exist in existing connection
verify_database_tables(st.session_state.db_conn)
return True
except Exception as e:
st.error(f"Database initialization error: {e}")
return False
@dataclass
class SessionStateDefaults:
"""Default values for session state variables."""
# Collection Management
show_collection_dialog: bool = False
selected_collection: Optional[Dict] = None
current_collection: Optional[Dict] = None
# Chat System
chat_ready: bool = False
messages: Optional[List] = None # Changed from list[BaseMessage]
current_chat_id: Optional[int] = None
# Document Processing
vector_store: Optional[Any] = None
qa_system: Optional[Any] = None
# State Management
reinitialize_chat: bool = False
debug_mode: bool = False
def initialize_session_state() -> None:
"""
Initialize all session state variables with proper typing and documentation.
"""
defaults = SessionStateDefaults()
# Dictionary mapping state variables to their descriptions
state_vars: Dict[str, Dict[str, Any]] = {
'show_collection_dialog': {
'default': defaults.show_collection_dialog,
'desc': 'Controls visibility of collection creation dialog'
},
'selected_collection': {
'default': defaults.selected_collection,
'desc': 'Currently selected collection for viewing/editing'
},
'current_collection': {
'default': defaults.current_collection,
'desc': 'Active collection for chat/analysis'
},
'chat_ready': {
'default': defaults.chat_ready,
'desc': 'Indicates if chat system is initialized and ready'
},
'messages': {
'default': [] if defaults.messages is None else defaults.messages,
'desc': 'List of chat messages in the current session'
},
'current_chat_id': {
'default': defaults.current_chat_id,
'desc': 'ID of the current chat session'
},
'vector_store': {
'default': defaults.vector_store,
'desc': 'FAISS vector store for document embeddings'
},
'qa_system': {
'default': defaults.qa_system,
'desc': 'Initialized QA system for chat'
},
'reinitialize_chat': {
'default': defaults.reinitialize_chat,
'desc': 'Flag to trigger chat system reinitialization'
},
'debug_mode': {
'default': defaults.debug_mode,
'desc': 'Enable/disable debug information'
}
}
# Initialize each state variable if not already present
for var_name, config in state_vars.items():
if var_name not in st.session_state:
st.session_state[var_name] = config['default']
def reset_chat_state() -> None:
"""Reset chat-related session state variables to defaults."""
st.session_state.messages = []
st.session_state.current_chat_id = None
st.session_state.chat_ready = False
st.session_state.qa_system = None
def reset_collection_state() -> None:
"""Reset collection-related session state variables to defaults."""
st.session_state.selected_collection = None
st.session_state.current_collection = None
st.session_state.show_collection_dialog = False
def get_current_state() -> Dict[str, Any]:
"""
Get the current state of all session variables.
Useful for debugging and state management.
"""
return {
key: value for key, value in st.session_state.items()
if not key.startswith('_') # Exclude internal Streamlit states
}
def display_top_bar():
"""Display the application's top navigation bar."""
col1, col2, col3 = st.columns([1, 3, 1])
with col1:
if os.path.exists("img/logo.png"):
st.image("img/logo.png", width=100)
else:
st.warning("Logo not found at img/logo.png")
with col2:
st.title("Synaptyx.AI - RFP Analysis Agent")
with col3:
if st.session_state.current_collection:
st.info(f"πŸ“ Active Collection: {st.session_state.current_collection['name']}")
def initialize_chat_from_existing():
"""Initialize chat system from existing documents in database."""
if not st.session_state.get('chat_ready'):
try:
documents = get_all_documents(st.session_state.db_conn)
if documents:
# Initialize vector store and QA system with existing documents
embeddings = get_embeddings_model()
chunks = []
for doc in documents:
doc_chunks = text_splitter.split_text(doc['content'])
for chunk in doc_chunks:
chunks.append({
'content': chunk,
'metadata': {'source': doc['name'], 'document_id': doc['id']}
})
vector_store = FAISS.from_texts(
[chunk['content'] for chunk in chunks],
embeddings,
[chunk['metadata'] for chunk in chunks]
)
st.session_state.vector_store = vector_store
st.session_state.qa_system = initialize_qa_system(vector_store)
st.session_state.chat_ready = True
return True
except Exception as e:
st.error(f"Error initializing chat: {e}")
return False
def initialize_chat_from_collection():
"""Initialize chat system with vector store reuse."""
try:
documents = None
if st.session_state.get('current_collection'):
documents = get_collection_documents(st.session_state.db_conn,
st.session_state.current_collection['id'])
else:
documents = get_all_documents(st.session_state.db_conn)
if documents:
document_ids = [doc['id'] for doc in documents]
# Check for existing vector store
vector_store = get_existing_vector_store(document_ids)
if vector_store:
# Reuse existing vector store
st.session_state.vector_store = vector_store
st.session_state.qa_system = initialize_qa_system(vector_store)
return True
# If no existing vector store, create new one
with st.spinner("Initializing chat system..."):
embeddings = get_embeddings_model()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
chunks = []
for doc in documents:
doc_chunks = text_splitter.split_text(doc['content'])
for chunk in doc_chunks:
chunks.append({
'content': chunk,
'metadata': {'source': doc['name'], 'document_id': doc['id']}
})
vector_store = FAISS.from_texts(
[chunk['content'] for chunk in chunks],
embeddings,
[chunk['metadata'] for chunk in chunks]
)
st.session_state.vector_store = vector_store
st.session_state.qa_system = initialize_qa_system(vector_store)
return True
return False
except Exception as e:
st.error(f"Error initializing chat: {e}")
return False
def display_collection_sidebar():
"""Display enhanced sidebar with collection management."""
with st.sidebar:
st.title("πŸ’¬ Chat Controls")
if st.button("πŸ”„ Start New Chat", use_container_width=True):
st.session_state.messages = []
st.session_state.current_chat_id = None
if st.session_state.chat_ready:
st.rerun()
st.divider()
# Collection Management
st.title("πŸ“š Collections")
collections = get_collections(st.session_state.db_conn)
if st.button("βž• Create Collection", use_container_width=True):
st.session_state.show_collection_dialog = True
# Collection selection with document preview
if collections:
selected = st.selectbox(
"Select Collection",
options=["All Documents"] + [c['name'] for c in collections],
key="collection_select"
)
if selected != "All Documents":
collection = next((c for c in collections if c['name'] == selected), None)
if collection:
documents = get_collection_documents(st.session_state.db_conn, collection['id'])
if documents:
st.markdown("### Documents in Collection")
for doc in documents:
with st.expander(f"πŸ“„ {doc['name']}", expanded=False):
st.caption(f"Uploaded: {doc['upload_date']}")
if st.button("Use for Chat", key=f"use_{doc['id']}"):
initialize_chat_for_document(doc['id'])
def display_collection_dialog():
"""Display the create collection dialog."""
with st.sidebar:
st.subheader("Create New Collection")
name = st.text_input("Collection Name", key="sidebar_collection_name")
description = st.text_area("Description", key="sidebar_collection_desc")
col1, col2 = st.columns(2)
with col1:
if st.button("Create", key="sidebar_create_btn"):
if name:
if create_collection(st.session_state.db_conn, name, description):
st.success(f"Collection '{name}' created!")
st.session_state.show_collection_dialog = False
st.rerun()
else:
st.error("Please enter a collection name")
with col2:
if st.button("Cancel", key="sidebar_cancel_btn"):
st.session_state.show_collection_dialog = False
st.rerun()
def display_welcome_screen():
"""Display the welcome screen with getting started information and enhanced features."""
st.title("πŸ€– Welcome to SYNAPTYX")
st.markdown("### Your AI-powered RFP Analysis Assistant")
# Check for existing documents
documents = get_all_documents(st.session_state.db_conn)
collections = get_collections(st.session_state.db_conn)
col1, col2 = st.columns(2)
with col1:
st.markdown("""
#### Getting Started:
1. Upload your RFP documents using the sidebar
2. Organize documents into collections
3. Start analyzing with AI!
You can:
- Create multiple collections
- Upload documents to specific collections
- Search across all documents
- Get AI-powered insights and summaries
""")
# Add action buttons if documents exist
if documents:
st.success(f"πŸŽ‰ You have {len(documents)} documents ready for analysis!")
col_a, col_b = st.columns(2)
with col_a:
if st.button("Start Chatting", use_container_width=True):
if initialize_chat_from_collection():
st.session_state.chat_ready = True
st.rerun()
with col_b:
if st.button("View Documents", use_container_width=True):
st.session_state.show_document_store = True
st.rerun()
with col2:
st.markdown("#### Example Questions:")
examples = [
"πŸ“Š Summarize the main points of the document",
"πŸ“ Draft a 'Why Us' section based on the document",
"🎯 Extract key success metrics and outcomes",
"πŸ’‘ What are the innovative solutions mentioned?",
"🀝 Analyze the partnership benefits described",
"πŸ“ˆ Compare requirements across documents",
"πŸ” Find similar sections across RFPs"
]
for example in examples:
st.markdown(f"β€’ {example}")
# Add collection stats if they exist
if collections:
st.markdown("---")
st.markdown("#### Your Collections:")
for collection in collections[:3]: # Show top 3 collections
with st.container():
st.markdown(f"""
πŸ—‚οΈ **{collection['name']}**
Documents: {collection['doc_count']}
""")
if len(collections) > 3:
st.caption("... and more collections")
def display_chat_area():
"""Display the main chat interface or welcome screen with proper state management."""
if not st.session_state.chat_ready:
display_welcome_screen()
else:
col1, col2 = st.columns([3, 1])
with col1:
display_chat_interface()
with col2:
# Document context panel
st.markdown("### Current Context")
if st.session_state.current_collection:
st.info(f"πŸ“ Using Collection: {st.session_state.current_collection['name']}")
else:
st.info("πŸ“š Using All Documents")
# Quick actions
st.markdown("### Quick Actions")
if st.button("πŸ”„ New Chat", use_container_width=True):
reset_chat_state()
st.rerun()
if st.button("πŸ“‹ Export Chat", use_container_width=True):
export_chat_history()
# Example questions for quick reference
with st.expander("πŸ’‘ Question Examples", expanded=False):
examples = [
"Summarize main points",
"Extract requirements",
"Compare solutions",
"Analyze pricing"
]
for example in examples:
if st.button(example, key=f"example_{example}"):
# Set the example as the current question
st.session_state.current_question = example
st.rerun()
def export_chat_history():
"""Export current chat history."""
if st.session_state.messages:
chat_text = "\n\n".join([
f"{'User' if isinstance(m, HumanMessage) else 'Assistant'}: {m.content}"
for m in st.session_state.messages
])
st.download_button(
"Download Chat History",
chat_text,
file_name=f"chat_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt",
mime="text/plain"
)
def main():
"""Main application function."""
# Page configuration
st.set_page_config(
layout="wide",
page_title="SYNAPTYX - RFP Analysis Agent",
initial_sidebar_state="expanded"
)
# Initialize database and session state
if not initialize_database():
st.error("Failed to initialize database. Please contact support.")
return
initialize_session_state()
# Top bar with larger logo
col1, col2, col3 = st.columns([1.5, 4, 1])
with col1:
if os.path.exists("img/logo.png"):
st.image("img/logo.png", width=200) # Increased logo size
else:
st.info("Logo not found")
with col2:
st.title("SYNAPTYX - RFP Analysis Agent")
with col3:
if st.session_state.current_collection:
st.info(f"πŸ“ Active Collection: {st.session_state.current_collection['name']}")
# Sidebar
with st.sidebar:
# Chat Controls Section
st.title("πŸ’¬ Chat Controls")
# New Chat button
if st.button("πŸ”„ Start New Chat", use_container_width=True):
reset_chat_state()
st.rerun()
st.divider()
# Document Manager Section
st.title("πŸ“š Document Manager")
# Collection Creation Button
if st.button("βž• Create New Collection", use_container_width=True):
st.session_state.show_collection_dialog = True
# Collection Selection
collections = get_collections(st.session_state.db_conn)
if collections:
selected = st.selectbox(
"Select Collection",
options=["All Documents"] + [c['name'] for c in collections],
key="collection_select"
)
if selected != "All Documents":
collection = next((c for c in collections if c['name'] == selected), None)
if collection:
st.session_state.current_collection = collection
display_collection_documents(collection['id'])
# Upload Section
st.header("Upload Documents")
uploaded_files = st.file_uploader(
"Upload PDF documents",
type=['pdf'],
accept_multiple_files=True,
help="Limit 200MB per file β€’ PDF"
)
if uploaded_files:
handle_document_upload(uploaded_files,
collection_id=st.session_state.current_collection.get('id') if st.session_state.current_collection else None)
# Show collection creation dialog if triggered
if st.session_state.show_collection_dialog:
show_collection_creation_dialog()
# Main content area
if st.session_state.chat_ready:
display_chat_interface()
else:
display_welcome_screen()
# Debug mode (if enabled)
if st.session_state.debug_mode and st.sidebar.checkbox("Show Debug Info"):
st.sidebar.write("Current State:", get_current_state())
def display_collection_documents(collection_id: int):
"""Display documents in the selected collection."""
documents = get_collection_documents(st.session_state.db_conn, collection_id)
if documents:
st.markdown("### Documents in Collection")
for doc in documents:
with st.expander(f"πŸ“„ {doc['name']}", expanded=False):
st.caption(f"Uploaded: {doc['upload_date']}")
col1, col2 = st.columns(2)
with col1:
if st.button("Preview", key=f"preview_{doc['id']}"):
st.text_area(
"Content Preview",
value=doc['content'][:500] + "..." if len(doc['content']) > 500 else doc['content'],
height=100,
disabled=True
)
with col2:
if st.button("Use for Chat", key=f"chat_{doc['id']}"):
initialize_chat_from_collection()
st.rerun()
if __name__ == "__main__":
main()