Spaces:
Sleeping
Sleeping
| """Streamlit UI for Agentic RAG System.""" | |
| import streamlit as st | |
| import json | |
| import os | |
| import sys | |
| import asyncio | |
| from pathlib import Path | |
| from typing import Optional | |
| # Add parent directory to path for imports | |
| parent_dir = Path(__file__).parent.parent | |
| if str(parent_dir) not in sys.path: | |
| sys.path.insert(0, str(parent_dir)) | |
| # Import orchestrator and memory directly | |
| from src.core.orchestrator import get_orchestrator | |
| from src.memory.long_term_memory import LongTermMemory | |
| from src.retrieval.vector_store import get_vector_store | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="Agentic RAG System", | |
| page_icon="π€", | |
| layout="wide", | |
| ) | |
| def get_orchestrator_instance(): | |
| """Get cached orchestrator instance.""" | |
| return get_orchestrator() | |
| def get_vector_store_instance(): | |
| """Get cached vector store instance.""" | |
| return get_vector_store() | |
| def add_text_documents(texts: list, metadatas: Optional[list] = None): | |
| """Add text documents to the vector store.""" | |
| vector_store = get_vector_store_instance() | |
| if metadatas is None: | |
| metadatas = [{}] * len(texts) | |
| ids = vector_store.add_documents(texts, metadatas) | |
| return ids | |
| def add_file_documents(file_paths: list, chunk_size: int = 1000): | |
| """Add documents from text files to the vector store.""" | |
| all_documents = [] | |
| all_metadatas = [] | |
| for file_path in file_paths: | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| continue | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Split large documents into chunks | |
| if len(content) > chunk_size: | |
| chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] | |
| for i, chunk in enumerate(chunks): | |
| all_documents.append(chunk) | |
| all_metadatas.append({ | |
| "source": str(file_path.name), | |
| "chunk": i + 1, | |
| "type": "file" | |
| }) | |
| else: | |
| all_documents.append(content) | |
| all_metadatas.append({ | |
| "source": str(file_path.name), | |
| "type": "file" | |
| }) | |
| except Exception as e: | |
| st.error(f"Error reading {file_path}: {e}") | |
| if all_documents: | |
| ids = add_text_documents(all_documents, all_metadatas) | |
| return ids | |
| else: | |
| return [] | |
| def add_from_directory(directory: str, extensions: list = None): | |
| """Add all text files from a directory.""" | |
| if extensions is None: | |
| extensions = ['.txt', '.md', '.py', '.json'] | |
| directory = Path(directory) | |
| if not directory.exists(): | |
| return [] | |
| file_paths = [] | |
| for ext in extensions: | |
| file_paths.extend(directory.glob(f"**/*{ext}")) | |
| if not file_paths: | |
| return [] | |
| return add_file_documents([str(f) for f in file_paths]) | |
| def query_api(query: str, tier: str, session_id: Optional[str] = None) -> dict: | |
| """Query the orchestrator directly (no HTTP needed).""" | |
| try: | |
| orchestrator = get_orchestrator_instance() | |
| # Run async function in sync context | |
| # Handle different event loop scenarios (including uvloop) | |
| try: | |
| # Check if there's a running loop | |
| loop = asyncio.get_running_loop() | |
| # If there's a running loop, we need to run in a new thread | |
| # This handles uvloop and other non-patchable loops | |
| import concurrent.futures | |
| import threading | |
| def run_in_new_loop(): | |
| """Create a new event loop in this thread.""" | |
| new_loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(new_loop) | |
| try: | |
| return new_loop.run_until_complete( | |
| orchestrator.process_query( | |
| query=query, | |
| tier=tier, | |
| session_id=session_id, | |
| ) | |
| ) | |
| finally: | |
| new_loop.close() | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(run_in_new_loop) | |
| response = future.result(timeout=120) # 2 minute timeout | |
| except RuntimeError: | |
| # No running loop, safe to use asyncio.run | |
| # This creates a new event loop compatible with the current environment | |
| response = asyncio.run( | |
| orchestrator.process_query( | |
| query=query, | |
| tier=tier, | |
| session_id=session_id, | |
| ) | |
| ) | |
| return response | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| def get_agent_status() -> dict: | |
| """Get agent status directly from orchestrator.""" | |
| try: | |
| orchestrator = get_orchestrator_instance() | |
| return orchestrator.get_agent_status() | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def get_system_info() -> dict: | |
| """Get system information directly from orchestrator.""" | |
| try: | |
| orchestrator = get_orchestrator_instance() | |
| return orchestrator.get_system_info() | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def main(): | |
| """Main Streamlit app.""" | |
| st.title("π€ Agentic RAG System") | |
| st.markdown("Production-ready RAG system with multiple agents and MCP servers") | |
| # Initialize session state | |
| if "session_id" not in st.session_state: | |
| import uuid | |
| st.session_state.session_id = str(uuid.uuid4()) | |
| if "conversation_history" not in st.session_state: | |
| st.session_state.conversation_history = [] | |
| # Sidebar | |
| with st.sidebar: | |
| st.header("Configuration") | |
| # Tier selection | |
| tier = st.selectbox( | |
| "Select Tier", | |
| ["basic", "agent", "advanced"], | |
| help="Basic: Simple RAG | Agent: With Tools | Advanced: Multi-Agent", | |
| ) | |
| st.markdown("---") | |
| st.header("System Status") | |
| # System info | |
| if st.button("Refresh Status"): | |
| system_info = get_system_info() | |
| if "error" not in system_info: | |
| st.json(system_info) | |
| else: | |
| st.error(f"Error: {system_info['error']}") | |
| # Agent status | |
| agent_status = get_agent_status() | |
| if "error" not in agent_status: | |
| st.subheader("Agents") | |
| for agent_name, status in agent_status.get("agents", {}).items(): | |
| with st.expander(agent_name.upper()): | |
| st.json(status) | |
| st.markdown("---") | |
| st.markdown(f"**Session ID:** `{st.session_state.session_id}`") | |
| if st.button("New Session"): | |
| import uuid | |
| st.session_state.session_id = str(uuid.uuid4()) | |
| st.session_state.conversation_history = [] | |
| st.rerun() | |
| # Main content area | |
| tab1, tab2, tab3, tab4 = st.tabs(["π¬ Chat", "π System Info", "π§ Memory", "π Documents"]) | |
| with tab1: | |
| st.header("Query Interface") | |
| # Display conversation history | |
| if st.session_state.conversation_history: | |
| st.subheader("Conversation History") | |
| for i, entry in enumerate(st.session_state.conversation_history): | |
| with st.expander(f"Query {i+1}: {entry['query'][:50]}..."): | |
| col1, col2 = st.columns([1, 3]) | |
| with col1: | |
| st.markdown("**Query:**") | |
| st.text(entry["query"]) | |
| st.markdown(f"**Tier:** {entry['tier']}") | |
| with col2: | |
| st.markdown("**Response:**") | |
| if entry["response"].get("success"): | |
| st.success(entry["response"].get("answer", "No answer")) | |
| else: | |
| st.error(entry["response"].get("error", "Unknown error")) | |
| # Query input | |
| st.markdown("---") | |
| query = st.text_area( | |
| "Enter your query:", | |
| height=100, | |
| placeholder="Ask a question...", | |
| ) | |
| col1, col2 = st.columns([1, 4]) | |
| with col1: | |
| submit_button = st.button("Submit", type="primary", use_container_width=True) | |
| if submit_button and query: | |
| with st.spinner("Processing query..."): | |
| response = query_api( | |
| query=query, | |
| tier=tier, | |
| session_id=st.session_state.session_id, | |
| ) | |
| # Add to conversation history | |
| st.session_state.conversation_history.append({ | |
| "query": query, | |
| "tier": tier, | |
| "response": response, | |
| }) | |
| # Display response | |
| if response.get("success"): | |
| st.success("β Query processed successfully!") | |
| st.markdown("### Answer:") | |
| st.markdown(response.get("answer", "No answer provided")) | |
| # Show sources if available | |
| if response.get("sources"): | |
| with st.expander("Sources"): | |
| for source in response["sources"]: | |
| st.json(source) | |
| # Show metadata | |
| with st.expander("Response Metadata"): | |
| metadata = {k: v for k, v in response.items() if k not in ["answer", "sources"]} | |
| st.json(metadata) | |
| else: | |
| st.error(f"β Error: {response.get('error', 'Unknown error')}") | |
| st.rerun() | |
| with tab2: | |
| st.header("System Information") | |
| if st.button("Refresh System Info"): | |
| system_info = get_system_info() | |
| if "error" not in system_info: | |
| st.json(system_info) | |
| # Display key metrics | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric( | |
| "Documents", | |
| system_info.get("vector_store", {}).get("document_count", 0), | |
| ) | |
| with col2: | |
| tools_count = sum( | |
| 1 for v in system_info.get("tools", {}).values() if v | |
| ) | |
| st.metric("Active Tools", tools_count) | |
| with col3: | |
| st.metric("Model", system_info.get("model", "Unknown")) | |
| else: | |
| st.error(f"Error: {system_info['error']}") | |
| with tab3: | |
| st.header("Memory Management") | |
| session_id = st.text_input( | |
| "Session ID", | |
| value=st.session_state.session_id, | |
| ) | |
| if st.button("Load Memory"): | |
| try: | |
| long_term_memory = LongTermMemory() | |
| memories = long_term_memory.get_session_memories(session_id, limit=50) | |
| st.metric("Memories", len(memories)) | |
| if memories: | |
| st.subheader("Memory Entries") | |
| for memory in memories: | |
| memory_id = memory.get('id', 'Unknown') | |
| with st.expander(f"Memory: {str(memory_id)[:8]}..."): | |
| st.text(memory.get("content", "")) | |
| st.json(memory.get("metadata", {})) | |
| except Exception as e: | |
| st.error(f"Error loading memory: {e}") | |
| if st.button("Clear Memory", type="secondary"): | |
| try: | |
| long_term_memory = LongTermMemory() | |
| deleted_count = long_term_memory.delete_session_memories(session_id) | |
| st.success(f"Deleted {deleted_count} memories") | |
| except Exception as e: | |
| st.error(f"Error clearing memory: {e}") | |
| with tab4: | |
| st.header("Document Management") | |
| st.markdown("Add documents to the vector store for RAG queries.") | |
| # Get document count | |
| try: | |
| system_info = get_system_info() | |
| if "error" not in system_info: | |
| doc_count = system_info.get("vector_store", {}).get("document_count", 0) | |
| st.metric("Documents in Vector Store", doc_count) | |
| else: | |
| st.warning(f"Could not fetch document count: {system_info.get('error')}") | |
| doc_count = 0 | |
| except Exception as e: | |
| st.warning(f"Could not fetch document count: {e}") | |
| doc_count = 0 | |
| st.markdown("---") | |
| # Option 1: Add sample documents | |
| st.subheader("Quick Start: Add Sample Documents") | |
| st.markdown("Add pre-configured sample documents about Oracle Exadata migration.") | |
| if st.button("Add Sample Documents", type="primary"): | |
| try: | |
| # Sample documents from add_documents.py | |
| sample_docs = [ | |
| { | |
| "text": """ | |
| Oracle Exadata is a database machine that combines hardware and software | |
| to provide high-performance database solutions. When migrating Exadata | |
| workloads to the cloud, it's important to consider compatibility, | |
| performance, and feature parity. | |
| """, | |
| "metadata": {"source": "exadata_migration_guide", "type": "documentation"}, | |
| }, | |
| { | |
| "text": """ | |
| Cloud migration strategies for Oracle Exadata include: | |
| 1. Lift and shift - moving workloads with minimal changes | |
| 2. Replatforming - adapting to cloud-native services | |
| 3. Refactoring - redesigning for cloud architecture | |
| Each approach has different trade-offs in terms of effort, cost, and feature availability. | |
| """, | |
| "metadata": {"source": "migration_strategies", "type": "guide"}, | |
| }, | |
| { | |
| "text": """ | |
| Oracle Cloud Infrastructure (OCI) provides Exadata Cloud Service which | |
| maintains full feature compatibility with on-premises Exadata. This | |
| service offers the same architecture and capabilities, making it ideal | |
| for migrations requiring minimal changes. | |
| """, | |
| "metadata": {"source": "oci_exadata", "type": "cloud_service"}, | |
| }, | |
| { | |
| "text": """ | |
| Oracle AI Database services on AWS provide customers with a simplified path | |
| to migrate Oracle Exadata workloads. These services run on AWS infrastructure | |
| and offer managed database solutions that maintain Oracle compatibility while | |
| leveraging AWS cloud capabilities. The services include automated migration tools, | |
| performance optimization, and seamless integration with AWS services. | |
| """, | |
| "metadata": {"source": "oracle_aws_services", "type": "cloud_service"}, | |
| }, | |
| ] | |
| documents = [doc["text"].strip() for doc in sample_docs] | |
| metadatas = [doc["metadata"] for doc in sample_docs] | |
| ids = add_text_documents(documents, metadatas) | |
| st.success(f"β Added {len(ids)} sample documents successfully!") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error adding sample documents: {e}") | |
| st.markdown("---") | |
| # Option 2: Add text directly | |
| st.subheader("Add Text Documents") | |
| text_input = st.text_area( | |
| "Enter document text:", | |
| height=200, | |
| placeholder="Paste your document text here...", | |
| ) | |
| col1, col2 = st.columns([1, 4]) | |
| with col1: | |
| add_text_button = st.button("Add Text", type="primary") | |
| if add_text_button and text_input: | |
| try: | |
| ids = add_text_documents([text_input]) | |
| st.success(f"β Document added successfully! (ID: {ids[0] if ids else 'N/A'})") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error adding document: {e}") | |
| st.markdown("---") | |
| # Option 3: Upload files | |
| st.subheader("Upload Text Files") | |
| uploaded_files = st.file_uploader( | |
| "Choose text files to upload", | |
| type=["txt", "md", "py", "json"], | |
| accept_multiple_files=True | |
| ) | |
| if uploaded_files: | |
| if st.button("Add Uploaded Files", type="primary"): | |
| try: | |
| import tempfile | |
| file_paths = [] | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| for uploaded_file in uploaded_files: | |
| file_path = os.path.join(tmpdir, uploaded_file.name) | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| file_paths.append(file_path) | |
| ids = add_file_documents(file_paths) | |
| st.success(f"β Added {len(ids)} document(s) from {len(uploaded_files)} file(s) successfully!") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error adding files: {e}") | |
| st.markdown("---") | |
| # Option 4: Add from directory | |
| st.subheader("Add Documents from Directory") | |
| st.markdown("Add all text files from a directory (e.g., `data/sample_documents`)") | |
| directory_path = st.text_input( | |
| "Directory path:", | |
| value="data/sample_documents", | |
| placeholder="data/sample_documents", | |
| ) | |
| if st.button("Add from Directory"): | |
| if directory_path: | |
| try: | |
| ids = add_from_directory(directory_path) | |
| if ids: | |
| st.success(f"β Added {len(ids)} document(s) from directory successfully!") | |
| else: | |
| st.warning("β οΈ No documents found in the specified directory") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error adding from directory: {e}") | |
| else: | |
| st.warning("Please enter a directory path") | |
| st.markdown("---") | |
| # Instructions | |
| with st.expander("π How to Add Documents"): | |
| st.markdown(""" | |
| ### Methods to Add Documents: | |
| 1. **Sample Documents**: Click "Add Sample Documents" to add pre-configured example documents. | |
| 2. **Text Input**: Paste text directly into the text area and click "Add Text". | |
| 3. **File Upload**: Upload `.txt`, `.md`, `.py`, or `.json` files using the file uploader. | |
| 4. **Directory**: Specify a directory path containing text files to add all files at once. | |
| ### Command Line Alternative: | |
| You can also add documents using the command line: | |
| ```bash | |
| # Add sample documents | |
| python scripts/add_documents.py --sample-docs | |
| # Add specific files | |
| python scripts/add_documents.py --file doc1.txt doc2.txt | |
| # Add from directory | |
| python scripts/add_documents.py --directory data/sample_documents | |
| # Add text directly | |
| python scripts/add_documents.py --text "Your document text here" | |
| ``` | |
| ### Tips: | |
| - Documents are automatically chunked if they're too large | |
| - Each document can have metadata (source, type, etc.) | |
| - After adding documents, queries will search through them | |
| - The vector store uses semantic search to find relevant documents | |
| """) | |
| if __name__ == "__main__": | |
| main() | |