# app.py import streamlit as st import asyncio from pathlib import Path import tempfile import time from typing import Dict, List import pandas as pd from core.document_processor import DocumentProcessor from core.embeddings import DocumentEmbedder from core.vector_store import FAISSVectorStore from modules.qa_module import EnhancedQAModule # Page configuration st.set_page_config( page_title="SYNAPTYX - AI Accelerator", page_icon="🧠", layout="wide", initial_sidebar_state="expanded" ) # Custom CSS st.markdown(""" """, unsafe_allow_html=True) # Initialize session state if 'processed_docs' not in st.session_state: st.session_state.processed_docs = 0 if 'total_chunks' not in st.session_state: st.session_state.total_chunks = 0 if 'demo' not in st.session_state: st.session_state.demo = None if 'history' not in st.session_state: st.session_state.history = [] class SynaptyxDemo: def __init__(self): self.embedder = DocumentEmbedder() self.vector_store = FAISSVectorStore() self.qa_module = EnhancedQAModule() self.doc_processor = DocumentProcessor() async def process_document(self, file) -> Dict: try: # Create temp file path with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.name).suffix) as tmp_file: tmp_file.write(file.getbuffer()) temp_path = tmp_file.name # Process document doc_content = self.doc_processor.process_document(temp_path) # Process for vector store chunks, embeddings, metadata = self.embedder.process_documents([{ "content": doc_content["content"], "source": file.name }]) # Add to vector store self.vector_store.add_documents(chunks, embeddings, metadata) return { "status": "success", "chunks": len(chunks), "metadata": doc_content["metadata"] } except Exception as e: return {"status": "error", "error": str(e)} finally: Path(temp_path).unlink(missing_ok=True) async def query(self, question: str, k: int = 5) -> Dict: try: # Get relevant documents relevant_docs = self.vector_store.similarity_search( question, self.embedder, k=k ) # Get answer answer = await self.qa_module.process(question, relevant_docs) return { "status": "success", "answer": answer["answer"], "confidence": answer["confidence"], "sources": answer["sources"] } except Exception as e: return {"status": "error", "error": str(e)} def main(): # Initialize demo instance if not exists if st.session_state.demo is None: st.session_state.demo = SynaptyxDemo() # Sidebar with st.sidebar: st.image("https://via.placeholder.com/150?text=SYNAPTYX", width=150) st.markdown("### 🧠 SYNAPTYX") st.markdown("#### AI Accelerator Platform") st.markdown("---") st.markdown("### 📊 Analytics") st.markdown(f"Documents Processed: {st.session_state.processed_docs}") st.markdown(f"Total Chunks: {st.session_state.total_chunks}") st.markdown("---") st.markdown("### 🔧 Settings") k_value = st.slider("Number of relevant chunks", 1, 10, 5) # Clear history button if st.button("🗑️ Clear History"): st.session_state.history = [] st.success("History cleared!") # Main content st.markdown("