Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| import logging | |
| import uuid | |
| import json | |
| from datetime import datetime | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Core AgenticRAG imports with fallbacks | |
| try: | |
| from smolagents import CodeAgent, GradioUI, HfApiModel, tool, Tool | |
| from smolagents.tools import DuckDuckGoSearchTool | |
| SMOLAGENTS_AVAILABLE = True | |
| except ImportError: | |
| logger.warning("smolagents not available - using fallback implementation") | |
| SMOLAGENTS_AVAILABLE = False | |
| # Enterprise RAG stack imports | |
| try: | |
| # Vector store and embeddings (MTEB leaderboard models) | |
| from sentence_transformers import SentenceTransformer | |
| import chromadb | |
| from chromadb.config import Settings | |
| # Document processing | |
| from unstructured.partition.auto import partition | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.docstore.document import Document | |
| # Data processing | |
| import pandas as pd | |
| import numpy as np | |
| # Web search and APIs | |
| import requests | |
| from duckduckgo_search import DDGS | |
| ENTERPRISE_DEPS_AVAILABLE = True | |
| logger.info("β Enterprise dependencies loaded") | |
| except ImportError as e: | |
| ENTERPRISE_DEPS_AVAILABLE = False | |
| logger.warning(f"Enterprise dependencies missing: {e}") | |
| class EnterpriseDocumentRetriever(Tool): | |
| """ | |
| Enterprise-grade document retrieval tool using ChromaDB and MTEB models | |
| Following AgenticRAG architecture patterns | |
| """ | |
| name = "document_retriever" | |
| description = """ | |
| Retrieves relevant documents from the enterprise knowledge base using semantic similarity. | |
| Uses state-of-the-art embeddings from MTEB leaderboard for high accuracy retrieval. | |
| """ | |
| inputs = { | |
| "query": { | |
| "type": "string", | |
| "description": "The search query. Should be semantically close to target documents." | |
| }, | |
| "max_results": { | |
| "type": "integer", | |
| "description": "Maximum number of documents to retrieve (default: 5)" | |
| } | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| super().__init__() | |
| self.setup_complete = False | |
| self.documents = {} | |
| self.collection = None | |
| self.embedding_model = None | |
| self.session_id = str(uuid.uuid4()) | |
| if ENTERPRISE_DEPS_AVAILABLE: | |
| self._initialize_system() | |
| def _initialize_system(self): | |
| """Initialize ChromaDB and MTEB embedding model""" | |
| try: | |
| # Initialize ChromaDB with persistence | |
| self.chroma_client = chromadb.PersistentClient( | |
| path="./enterprise_vectordb", | |
| settings=Settings( | |
| anonymized_telemetry=False, | |
| allow_reset=True | |
| ) | |
| ) | |
| # Create enterprise collection | |
| self.collection = self.chroma_client.get_or_create_collection( | |
| name="enterprise_documents", | |
| metadata={"description": "Enterprise RAG knowledge base"} | |
| ) | |
| # Initialize MTEB leaderboard embedding model | |
| embedding_models = [ | |
| "BAAI/bge-base-en-v1.5", # Top MTEB model | |
| "sentence-transformers/all-MiniLM-L6-v2", # Fallback | |
| "sentence-transformers/all-mpnet-base-v2" # Alternative | |
| ] | |
| for model_name in embedding_models: | |
| try: | |
| self.embedding_model = SentenceTransformer(model_name) | |
| logger.info(f"β Loaded embedding model: {model_name}") | |
| break | |
| except Exception as e: | |
| logger.warning(f"Failed to load {model_name}: {e}") | |
| continue | |
| if self.embedding_model: | |
| self.setup_complete = True | |
| logger.info("β Enterprise retrieval system initialized") | |
| else: | |
| raise Exception("No embedding model could be loaded") | |
| except Exception as e: | |
| logger.error(f"β Failed to initialize retrieval system: {e}") | |
| self.setup_complete = False | |
| def add_documents(self, files: List[str]) -> Dict[str, Any]: | |
| """Process and add documents to vector store""" | |
| if not self.setup_complete: | |
| return {"success": False, "error": "System not initialized"} | |
| results = { | |
| "processed": 0, | |
| "total_chunks": 0, | |
| "errors": [], | |
| "documents": [] | |
| } | |
| for file_path in files: | |
| try: | |
| # Extract text using unstructured | |
| elements = partition(filename=file_path) | |
| text_content = "\n\n".join([str(element) for element in elements]) | |
| if len(text_content.strip()) < 100: | |
| results["errors"].append(f"{Path(file_path).name}: No substantial content") | |
| continue | |
| # Advanced chunking | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=512, | |
| chunk_overlap=50, | |
| separators=["\n\n", "\n", ". ", " ", ""] | |
| ) | |
| chunks = text_splitter.split_text(text_content) | |
| if chunks: | |
| # Generate embeddings | |
| embeddings = self.embedding_model.encode(chunks).tolist() | |
| # Prepare metadata | |
| metadatas = [] | |
| ids = [] | |
| for i, chunk in enumerate(chunks): | |
| chunk_id = f"{Path(file_path).name}_{i}_{uuid.uuid4().hex[:8]}" | |
| ids.append(chunk_id) | |
| metadatas.append({ | |
| "filename": Path(file_path).name, | |
| "chunk_index": i, | |
| "file_path": file_path, | |
| "chunk_size": len(chunk), | |
| "session_id": self.session_id, | |
| "added_at": datetime.now().isoformat() | |
| }) | |
| # Add to ChromaDB | |
| self.collection.add( | |
| documents=chunks, | |
| embeddings=embeddings, | |
| metadatas=metadatas, | |
| ids=ids | |
| ) | |
| results["processed"] += 1 | |
| results["total_chunks"] += len(chunks) | |
| results["documents"].append({ | |
| "filename": Path(file_path).name, | |
| "chunks": len(chunks), | |
| "size": len(text_content) | |
| }) | |
| logger.info(f"β Processed {Path(file_path).name}: {len(chunks)} chunks") | |
| except Exception as e: | |
| results["errors"].append(f"{Path(file_path).name}: {str(e)}") | |
| logger.error(f"Error processing {file_path}: {e}") | |
| return results | |
| def forward(self, query: str, max_results: int = 5) -> str: | |
| """Retrieve relevant documents using semantic search""" | |
| if not self.setup_complete: | |
| return "β Document retrieval system not available. Please check configuration." | |
| try: | |
| # Generate query embedding | |
| query_embedding = self.embedding_model.encode([query]).tolist()[0] | |
| # Search ChromaDB | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=max_results, | |
| include=["documents", "metadatas", "distances"] | |
| ) | |
| if not results["documents"] or not results["documents"][0]: | |
| return f"No relevant documents found for query: '{query}'" | |
| # Format results | |
| formatted_results = [] | |
| for i, (doc, metadata, distance) in enumerate(zip( | |
| results["documents"][0], | |
| results["metadatas"][0], | |
| results["distances"][0] | |
| )): | |
| similarity = 1 - distance | |
| if similarity > 0.3: # Similarity threshold | |
| formatted_results.append({ | |
| "content": doc, | |
| "filename": metadata.get("filename", "Unknown"), | |
| "similarity": similarity, | |
| "rank": i + 1 | |
| }) | |
| if not formatted_results: | |
| return f"No sufficiently relevant documents found for query: '{query}'" | |
| # Create response | |
| response = f"π **Retrieved {len(formatted_results)} relevant documents for: '{query}'**\n\n" | |
| for result in formatted_results: | |
| content = result["content"] | |
| if len(content) > 400: | |
| content = content[:400] + "..." | |
| response += f"**π {result['filename']}** (Similarity: {result['similarity']:.3f})\n" | |
| response += f"{content}\n\n---\n\n" | |
| return response | |
| except Exception as e: | |
| logger.error(f"Retrieval error: {e}") | |
| return f"β Error during document retrieval: {str(e)}" | |
| class EnterpriseWebSearchTool(Tool): | |
| """Advanced web search tool for current information""" | |
| name = "web_search" | |
| description = "Search the web for current information and recent developments" | |
| inputs = { | |
| "query": { | |
| "type": "string", | |
| "description": "The search query" | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, query: str) -> str: | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=5)) | |
| if not results: | |
| return f"No web search results found for: {query}" | |
| response = f"π **Web search results for: '{query}'**\n\n" | |
| for i, result in enumerate(results, 1): | |
| title = result.get('title', 'No title') | |
| snippet = result.get('body', 'No description') | |
| url = result.get('href', 'No URL') | |
| if len(snippet) > 200: | |
| snippet = snippet[:200] + "..." | |
| response += f"**{i}. {title}**\n" | |
| response += f"{snippet}\n" | |
| response += f"π {url}\n\n---\n\n" | |
| return response | |
| except Exception as e: | |
| return f"β Web search error: {str(e)}" | |
| class WeatherTool(Tool): | |
| """Weather information tool""" | |
| name = "weather_info" | |
| description = "Get current weather information for any location" | |
| inputs = { | |
| "location": { | |
| "type": "string", | |
| "description": "Location to get weather for" | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, location: str) -> str: | |
| # Mock weather data for demo | |
| return f""" | |
| π€οΈ **Weather for {location}** | |
| Temperature: 22Β°C (72Β°F) | |
| Condition: Partly Cloudy | |
| Humidity: 65% | |
| Wind: 8 mph NW | |
| *Note: This is demo weather data. Connect to a real weather API for production use.* | |
| """ | |
| class EnterpriseRAGAgent: | |
| """ | |
| Main Enterprise RAG Agent using AgenticRAG architecture | |
| """ | |
| def __init__(self): | |
| self.document_retriever = EnterpriseDocumentRetriever() | |
| self.web_search_tool = EnterpriseWebSearchTool() | |
| self.weather_tool = WeatherTool() | |
| # Initialize agent based on available dependencies | |
| if SMOLAGENTS_AVAILABLE: | |
| self._init_smolagents() | |
| else: | |
| self._init_fallback_agent() | |
| def _init_smolagents(self): | |
| """Initialize with smolagents (preferred)""" | |
| try: | |
| # Use HfApiModel for best results (Facebook RAG, DataGemma models) | |
| model = HfApiModel( | |
| model_id="microsoft/DialoGPT-medium", # Fallback model | |
| token=os.getenv("HF_TOKEN") | |
| ) | |
| self.agent = CodeAgent( | |
| model=model, | |
| tools=[ | |
| self.document_retriever, | |
| self.web_search_tool, | |
| self.weather_tool | |
| ], | |
| add_base_tools=True, | |
| planning_interval=3 # Enable planning | |
| ) | |
| self.agent_type = "smolagents" | |
| logger.info("β Initialized smolagents CodeAgent") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize smolagents: {e}") | |
| self._init_fallback_agent() | |
| def _init_fallback_agent(self): | |
| """Fallback agent implementation""" | |
| self.agent_type = "fallback" | |
| logger.info("β Initialized fallback agent") | |
| def process_documents(self, files): | |
| """Process uploaded documents""" | |
| if not files: | |
| return "β No files provided for processing" | |
| file_paths = [file.name for file in files] | |
| results = self.document_retriever.add_documents(file_paths) | |
| if results["processed"] == 0: | |
| return f"β No documents were processed successfully.\nErrors: {results['errors']}" | |
| response = f""" | |
| β **Document Processing Complete** | |
| π **Results Summary:** | |
| β’ **Processed:** {results['processed']} documents | |
| β’ **Total chunks:** {results['total_chunks']} searchable segments | |
| β’ **Processing method:** Unstructured + ChromaDB + MTEB embeddings | |
| π **Processed Documents:** | |
| """ | |
| for doc in results["documents"]: | |
| response += f"β’ **{doc['filename']}** - {doc['chunks']} chunks ({doc['size']:,} characters)\n" | |
| if results["errors"]: | |
| response += f"\nβ οΈ **Errors ({len(results['errors'])}):**\n" | |
| for error in results["errors"][:3]: | |
| response += f"β’ {error}\n" | |
| return response | |
| def query(self, message: str, history: List = None) -> str: | |
| """Process user query through the agent""" | |
| if not message.strip(): | |
| return "Please provide a question or query." | |
| try: | |
| if self.agent_type == "smolagents": | |
| # Use smolagents CodeAgent | |
| enhanced_query = f""" | |
| You are an enterprise AI assistant with access to multiple information sources. | |
| User Query: {message} | |
| Use your available tools strategically: | |
| 1. For questions about uploaded documents, use the document_retriever tool | |
| 2. For current events or recent information, use the web_search tool | |
| 3. For weather queries, use the weather_info tool | |
| 4. Combine multiple sources when appropriate | |
| Provide comprehensive, well-sourced answers with citations. | |
| """ | |
| response = self.agent.run(enhanced_query) | |
| return response | |
| else: | |
| # Fallback implementation | |
| return self._fallback_query(message) | |
| except Exception as e: | |
| logger.error(f"Query processing error: {e}") | |
| return f"β Error processing query: {str(e)}" | |
| def _fallback_query(self, message: str) -> str: | |
| """Fallback query processing""" | |
| # Simple routing logic | |
| if any(word in message.lower() for word in ['document', 'file', 'upload', 'pdf']): | |
| return self.document_retriever.forward(message) | |
| elif any(word in message.lower() for word in ['weather', 'temperature', 'forecast']): | |
| return self.weather_tool.forward("New York") # Default location | |
| elif any(word in message.lower() for word in ['search', 'current', 'recent', 'news']): | |
| return self.web_search_tool.forward(message) | |
| else: | |
| # Try document retrieval first | |
| doc_results = self.document_retriever.forward(message) | |
| if "No relevant documents" not in doc_results: | |
| return doc_results | |
| else: | |
| return self.web_search_tool.forward(message) | |
| def get_system_status(self) -> str: | |
| """Get comprehensive system status""" | |
| try: | |
| doc_count = self.document_retriever.collection.count() if self.document_retriever.collection else 0 | |
| except: | |
| doc_count = 0 | |
| return f""" | |
| π€ **Enterprise AgenticRAG System Status** | |
| **Agent Type:** {self.agent_type.title()} | |
| **Dependencies:** {"β Full" if ENTERPRISE_DEPS_AVAILABLE else "β οΈ Limited"} | |
| **Document Store:** {doc_count} chunks indexed | |
| **Vector DB:** {"β ChromaDB Active" if self.document_retriever.setup_complete else "β Not Available"} | |
| **Embedding Model:** {"β MTEB Model Loaded" if self.document_retriever.embedding_model else "β Not Available"} | |
| **Available Tools:** | |
| β’ π Document Retrieval (ChromaDB + MTEB) | |
| β’ π Web Search (DuckDuckGo) | |
| β’ π€οΈ Weather Information | |
| β’ π§ Agentic Planning & Reasoning | |
| **Enterprise Features:** | |
| β’ Multi-format document processing | |
| β’ Semantic similarity search | |
| β’ Agent-based query routing | |
| β’ Source attribution | |
| β’ Real-time information access | |
| """ | |
| # Initialize the enterprise RAG system | |
| enterprise_rag = EnterpriseRAGAgent() | |
| def upload_and_process(files): | |
| """Handle document upload and processing""" | |
| return enterprise_rag.process_documents(files) | |
| def chat_with_agent(message, history): | |
| """Handle chat interactions""" | |
| return enterprise_rag.query(message, history) | |
| def get_status(): | |
| """Get system status""" | |
| return enterprise_rag.get_system_status() | |
| # Create Gradio interface | |
| def create_interface(): | |
| """Create the enterprise Gradio interface""" | |
| custom_css = """ | |
| .enterprise-header { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| padding: 2rem; | |
| border-radius: 15px; | |
| text-align: center; | |
| margin-bottom: 2rem; | |
| } | |
| .status-panel { | |
| background: #f8f9fa; | |
| border: 2px solid #e9ecef; | |
| border-radius: 10px; | |
| padding: 1.5rem; | |
| } | |
| """ | |
| with gr.Blocks( | |
| title="Enterprise AgenticRAG System", | |
| theme=gr.themes.Soft(), | |
| css=custom_css | |
| ) as interface: | |
| # Header | |
| gr.HTML(""" | |
| <div class="enterprise-header"> | |
| <h1>π Enterprise AgenticRAG System</h1> | |
| <p>Production-grade Retrieval-Augmented Generation with Agent Planning</p> | |
| <p><strong>ChromaDB β’ MTEB Embeddings β’ Multi-Tool Reasoning β’ Real-time Search</strong></p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| # Main content | |
| with gr.Column(scale=3): | |
| with gr.Tab("π Document Processing"): | |
| gr.Markdown(""" | |
| ### Enterprise Document Processing | |
| **Advanced pipeline:** Unstructured extraction β Semantic chunking β ChromaDB indexing β MTEB embeddings | |
| """) | |
| file_upload = gr.File( | |
| file_count="multiple", | |
| file_types=[".pdf", ".docx", ".txt", ".md", ".html", ".json"], | |
| label="Upload Enterprise Documents", | |
| height=150 | |
| ) | |
| process_btn = gr.Button("βοΈ Process Documents", variant="primary", size="lg") | |
| processing_results = gr.Markdown(label="Processing Results") | |
| process_btn.click( | |
| fn=upload_and_process, | |
| inputs=[file_upload], | |
| outputs=[processing_results] | |
| ) | |
| with gr.Tab("π€ Agentic Chat"): | |
| gr.Markdown(""" | |
| ### Chat with Enterprise Agent | |
| **Intelligent routing:** Document retrieval β’ Web search β’ Multi-step reasoning β’ Source attribution | |
| """) | |
| if SMOLAGENTS_AVAILABLE and enterprise_rag.agent_type == "smolagents": | |
| # Use GradioUI for smolagents | |
| try: | |
| gradio_ui = GradioUI(enterprise_rag.agent) | |
| gradio_ui.render() | |
| except: | |
| # Fallback to ChatInterface | |
| gr.ChatInterface( | |
| fn=chat_with_agent, | |
| title="Enterprise Agent Chat", | |
| examples=[ | |
| "What information do you have about Jimmy?", | |
| "Search for recent AI developments", | |
| "Analyze the uploaded documents", | |
| "What's the weather in London?", | |
| "Compare information across multiple sources" | |
| ] | |
| ) | |
| else: | |
| # Fallback ChatInterface | |
| gr.ChatInterface( | |
| fn=chat_with_agent, | |
| title="Enterprise Agent Chat", | |
| examples=[ | |
| "What information do you have about Jimmy?", | |
| "Search for recent AI developments", | |
| "Analyze the uploaded documents", | |
| "What's the weather in London?", | |
| "Compare information across multiple sources" | |
| ] | |
| ) | |
| with gr.Tab("π API Integration"): | |
| gr.Markdown(""" | |
| ### Enterprise API Access | |
| **REST Endpoint:** `/api/v1/query` | |
| **Request:** | |
| ```json | |
| { | |
| "query": "Your question here", | |
| "max_results": 5, | |
| "use_web_search": true | |
| } | |
| ``` | |
| **Response:** | |
| ```json | |
| { | |
| "answer": "Agent response", | |
| "sources": [{"type": "document", "filename": "doc.pdf"}], | |
| "processing_time": 1.23, | |
| "agent_steps": ["retrieve", "analyze", "synthesize"] | |
| } | |
| ``` | |
| **Authentication:** Set `ENTERPRISE_API_KEY` environment variable | |
| """) | |
| # Sidebar | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### π System Status") | |
| status_display = gr.Markdown( | |
| value=get_status(), | |
| elem_classes=["status-panel"] | |
| ) | |
| refresh_btn = gr.Button("π Refresh Status", size="sm") | |
| refresh_btn.click(fn=get_status, outputs=[status_display]) | |
| with gr.Group(): | |
| gr.Markdown(""" | |
| ### π― Enterprise Architecture | |
| **Agent Framework:** | |
| β’ smolagents CodeAgent | |
| β’ Multi-tool orchestration | |
| β’ Planning & reasoning | |
| **Vector Database:** | |
| β’ ChromaDB persistence | |
| β’ MTEB embeddings | |
| β’ Semantic similarity | |
| **Document Processing:** | |
| β’ Unstructured extraction | |
| β’ Intelligent chunking | |
| β’ Multi-format support | |
| **Real-time Data:** | |
| β’ Web search integration | |
| β’ Current information | |
| β’ Source attribution | |
| """) | |
| with gr.Group(): | |
| gr.Markdown(""" | |
| ### π‘ Usage Guide | |
| **1. Upload Documents** | |
| β’ PDF, DOCX, TXT, HTML, JSON | |
| β’ Automatic text extraction | |
| β’ Semantic indexing | |
| **2. Ask Questions** | |
| β’ Natural language queries | |
| β’ Multi-source answers | |
| β’ Cited responses | |
| **3. Agent Features** | |
| β’ Intelligent tool selection | |
| β’ Multi-step reasoning | |
| β’ Context awareness | |
| β’ Source verification | |
| """) | |
| # Footer | |
| gr.HTML(""" | |
| <div style="text-align: center; margin-top: 2rem; padding: 1.5rem; background: #f1f3f4; border-radius: 10px;"> | |
| <p><strong>Enterprise AgenticRAG System</strong> β’ Built on Hugging Face Enterprise Stack</p> | |
| <p>π’ smolagents β’ ποΈ ChromaDB β’ π§ MTEB Embeddings β’ π Multi-source Intelligence</p> | |
| </div> | |
| """) | |
| return interface | |
| # Launch the application | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.queue(max_size=20) | |
| demo.launch( | |
| share=False, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True, | |
| show_api=True | |
| ) |