import gradio as gr import requests import json import os from pathlib import Path from collections import Counter from datetime import datetime try: import plotly.graph_objects as go PLOTLY_AVAILABLE = True except ImportError: PLOTLY_AVAILABLE = False go = None BACKEND_BASE_URL = os.getenv("BACKEND_BASE_URL", "http://localhost:8000") def chat_with_agent(message, tenant_id, history): """ Send a message to the backend MCP agent and return the response. Uses streaming for real-time word-by-word updates. Args: message: User's message text tenant_id: Tenant ID for multi-tenant isolation history: Chat history (Gradio messages format) Yields: Updated chat history with agent response (streaming) """ if not message or not message.strip(): yield history return if not tenant_id or not tenant_id.strip(): error_msg = "Please enter a Tenant ID before sending a message." history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": error_msg}) yield history return # Add user message to history history.append({"role": "user", "content": message}) # Backend streaming endpoint backend_url = f"{BACKEND_BASE_URL}/agent/message/stream" # Prepare request payload payload = { "tenant_id": tenant_id.strip(), "message": message, "user_id": None, "conversation_history": [], "temperature": 0.0 } try: # Make streaming request response = requests.post( backend_url, json=payload, headers={"Content-Type": "application/json"}, stream=True, timeout=120 ) if response.status_code == 200: # Initialize assistant message assistant_message = "" history.append({"role": "assistant", "content": assistant_message}) yield history # Yield initial empty message # Stream tokens - use iter_lines for SSE format for line_bytes in response.iter_lines(): if line_bytes: try: line = line_bytes.decode('utf-8').strip() if not line: continue if line.startswith('data: '): data_str = line[6:] # Remove 'data: ' prefix try: data = json.loads(data_str) # Handle status messages if 'status' in data: status_msg = data.get('message', '') if status_msg: # Show status in the message temporarily history[-1] = {"role": "assistant", "content": f"⏳ {status_msg}"} yield history continue # Handle tokens token = data.get('token', '') if token: assistant_message += token # Update the last message in history history[-1] = {"role": "assistant", "content": assistant_message} yield history # Yield updated history immediately if data.get('done', False): break except json.JSONDecodeError: continue elif line.startswith('error:'): try: error_data = json.loads(line[6:]) error_msg = error_data.get('error', 'Unknown error') history[-1] = {"role": "assistant", "content": f"❌ Error: {error_msg}"} yield history break except: pass except UnicodeDecodeError: continue else: error_msg = f"Error {response.status_code}: {response.text}" history.append({"role": "assistant", "content": error_msg}) yield history except requests.exceptions.ConnectionError: error_msg = "❌ Connection Error: Could not connect to backend. Please ensure the FastAPI server is running at http://localhost:8000" history.append({"role": "assistant", "content": error_msg}) yield history except requests.exceptions.Timeout: error_msg = "⏱️ Request Timeout: The backend took longer than 2 minutes to respond. This may happen if:\n- The LLM is processing a complex query\n- Multiple tools (RAG, Web Search) are being used\n- The backend is under heavy load\n\nPlease try again with a simpler query, or check if the backend services (Ollama, MCP servers) are running properly." history.append({"role": "assistant", "content": error_msg}) yield history except requests.exceptions.RequestException as e: error_msg = f"❌ Request Error: {str(e)}" history.append({"role": "assistant", "content": error_msg}) yield history except Exception as e: error_msg = f"❌ Unexpected Error: {str(e)}" history.append({"role": "assistant", "content": error_msg}) yield history def ingest_document( tenant_id: str, source_type: str, content: str, document_url: str, filename: str, doc_id: str, metadata_json: str ): if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required to ingest documents." tenant_id = tenant_id.strip() payload_content = content or "" if source_type == "url" and document_url: payload_content = document_url.strip() metadata = {} if filename: metadata["filename"] = filename.strip() if document_url: metadata["url"] = document_url.strip() if doc_id: metadata["doc_id"] = doc_id.strip() if metadata_json: try: extra_metadata = json.loads(metadata_json) if isinstance(extra_metadata, dict): metadata.update(extra_metadata) else: return "❗ Metadata JSON must represent an object (key/value pairs)." except json.JSONDecodeError as exc: return f"❗ Invalid metadata JSON: {exc}" payload = { "action": "ingest_document", "tenant_id": tenant_id, "source_type": source_type, "content": payload_content, "metadata": metadata } try: response = requests.post( f"{BACKEND_BASE_URL}/rag/ingest-document", json=payload, headers={"Content-Type": "application/json"}, timeout=60 ) if response.status_code == 200: data = response.json() return f"✅ Document ingested successfully.\n\n{data.get('message', '')}" return f"❌ Ingestion failed ({response.status_code}): {response.text}" except requests.exceptions.ConnectionError: return "❌ Could not reach the backend. Make sure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ The ingestion request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error during ingestion: {exc}" def ingest_file(tenant_id: str, file_obj): if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required to ingest files." if file_obj is None: return "❗ Please select a file to upload." tenant_id = tenant_id.strip() try: file_path = Path(file_obj.name) with open(file_path, "rb") as f: file_bytes = f.read() files = { "file": (file_path.name, file_bytes, "application/octet-stream") } response = requests.post( f"{BACKEND_BASE_URL}/rag/ingest-file", files=files, headers={"x-tenant-id": tenant_id}, timeout=120 ) if response.status_code == 200: data = response.json() return f"✅ File ingested successfully.\n\n{data.get('message', '')}" return f"❌ File ingestion failed ({response.status_code}): {response.text}" except FileNotFoundError: return "❌ Could not read the uploaded file." except requests.exceptions.ConnectionError: return "❌ Could not reach the backend. Make sure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ File ingestion timed out. Please try again." except Exception as exc: return f"❌ Unexpected error during file ingestion: {exc}" def _format_rules_table(rules: list[str]) -> list[list]: return [[idx + 1, rule] for idx, rule in enumerate(rules)] def fetch_admin_rules(tenant_id: str) -> tuple[str, list[list]]: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", [] tenant_id = tenant_id.strip() try: response = requests.get( f"{BACKEND_BASE_URL}/admin/rules", headers={"x-tenant-id": tenant_id}, timeout=30 ) if response.status_code == 200: rules = response.json().get("rules", []) if not rules: return "✅ No admin rules have been configured yet.", [] summary = f"### Current Rules ({len(rules)})" return summary, _format_rules_table(rules) return f"❌ Error {response.status_code}: {response.text}", [] except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running.", [] except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again.", [] except Exception as exc: return f"❌ Unexpected error: {exc}", [] def extract_rules_from_file(file_path) -> str: """ Extract rules from uploaded file (TXT, PDF, DOC, DOCX). Returns the extracted text content. """ if file_path is None: return "" try: # Gradio File component returns file path as string if isinstance(file_path, str): file_path = Path(file_path) else: # Sometimes it's a file object with .name attribute file_path = Path(file_path.name if hasattr(file_path, 'name') else file_path) if not file_path.exists(): return f"❌ File not found: {file_path}" file_ext = file_path.suffix.lower() # Read file based on type if file_ext == '.txt' or file_ext == '.md': # Plain text files with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() return content elif file_ext == '.pdf': # PDF files - use PyPDF2 try: import PyPDF2 with open(file_path, 'rb') as f: pdf_reader = PyPDF2.PdfReader(f) content = [] for page in pdf_reader.pages: content.append(page.extract_text()) return '\n'.join(content) except ImportError: return "❌ PDF extraction requires PyPDF2. Install with: pip install PyPDF2" except Exception as e: return f"❌ Failed to extract text from PDF: {str(e)}" elif file_ext in ['.doc', '.docx']: # DOC/DOCX files - use python-docx try: from docx import Document doc = Document(file_path) content = [] for paragraph in doc.paragraphs: content.append(paragraph.text) return '\n'.join(content) except ImportError: return "❌ DOCX extraction requires python-docx. Install with: pip install python-docx" except Exception as e: return f"❌ Failed to extract text from DOCX: {str(e)}" else: return f"❌ Unsupported file type: {file_ext}. Supported: .txt, .pdf, .doc, .docx" except Exception as e: return f"❌ Error reading file: {str(e)}" def add_admin_rules(tenant_id: str, rules_text: str) -> str: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not rules_text or not rules_text.strip(): return "❗ Provide at least one rule to upload." tenant_id = tenant_id.strip() # Filter out comment lines (starting with #) and empty lines rules = [ rule.strip() for rule in rules_text.splitlines() if rule.strip() and not rule.strip().startswith("#") ] if not rules: return "❗ No valid rules detected. (Comment lines starting with # are ignored)" added = [] enhanced = [] errors = [] # Process rules in chunks to avoid timeout CHUNK_SIZE = 5 # Process 5 rules at a time total_rules = len(rules) if total_rules == 1: # Single rule - use regular endpoint try: resp = requests.post( f"{BACKEND_BASE_URL}/admin/rules", params={"rule": rules[0], "enhance": "true"}, headers={"x-tenant-id": tenant_id}, timeout=30 ) if resp.status_code == 200: data = resp.json() added.append(data.get("added_rule", rules[0])) if data.get("enhanced"): edge_cases = data.get("edge_cases", []) improvements = data.get("improvements", []) if edge_cases or improvements: enhanced.append(f"**{data.get('added_rule', rules[0])}**:") if improvements: enhanced.append(f" • Improvements: {', '.join(improvements[:3])}") if edge_cases: enhanced.append(f" • Edge cases identified: {len(edge_cases)}") else: errors.append(f"{rules[0]} -> {resp.status_code}: {resp.text}") except Exception as exc: errors.append(f"{rules[0]} -> {exc}") else: # Multiple rules - process in chunks for i in range(0, total_rules, CHUNK_SIZE): chunk = rules[i:i + CHUNK_SIZE] chunk_num = (i // CHUNK_SIZE) + 1 total_chunks = (total_rules + CHUNK_SIZE - 1) // CHUNK_SIZE try: resp = requests.post( f"{BACKEND_BASE_URL}/admin/rules/bulk", json={"rules": chunk}, headers={"x-tenant-id": tenant_id}, params={"enhance": "true"}, timeout=45 # Timeout per chunk (5 rules) ) if resp.status_code == 200: data = resp.json() chunk_added = data.get("added_rules", []) added.extend(chunk_added) if data.get("enhanced"): chunk_enhanced = data.get("enhancement_summary", []) enhanced.extend([f"[Chunk {chunk_num}/{total_chunks}] {e}" for e in chunk_enhanced]) else: errors.append(f"Chunk {chunk_num}/{total_chunks} failed: {resp.status_code}: {resp.text}") except requests.exceptions.Timeout: errors.append(f"Chunk {chunk_num}/{total_chunks} timed out after 45s") except Exception as exc: errors.append(f"Chunk {chunk_num}/{total_chunks} error: {exc}") summary = [] if added: summary.append(f"✅ Added {len(added)}/{total_rules} rule(s):\n" + "\n".join([f"- {r}" for r in added[:10]])) if len(added) > 10: summary.append(f"... and {len(added) - 10} more") if enhanced: summary.append(f"\n🤖 LLM Enhancement Applied:\n" + "\n".join(enhanced[:5])) if len(enhanced) > 5: summary.append(f"... and {len(enhanced) - 5} more enhancements") if errors: summary.append("\n⚠️ Errors:\n" + "\n".join(errors)) return "\n\n".join(summary) if summary else "No rules were added." def delete_admin_rule(tenant_id: str, rule: str) -> str: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not rule or not rule.strip(): return "❗ Provide the exact rule text to delete." tenant_id = tenant_id.strip() rule = rule.strip() try: resp = requests.delete( f"{BACKEND_BASE_URL}/admin/rules/{rule}", headers={"x-tenant-id": tenant_id}, timeout=15 ) if resp.status_code == 200: return f"🗑️ Deleted rule: {rule}" return f"❌ Error {resp.status_code}: {resp.text}" except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ Delete request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error: {exc}" def add_rules_from_file(tenant_id: str, file_path): """ Extract rules from uploaded file and add them. """ if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", "👉 Click **Refresh Rules** to see existing entries.", [] if file_path is None: return "❗ Please select a file to upload.", "👉 Click **Refresh Rules** to see existing entries.", [] # Extract text from file extracted_text = extract_rules_from_file(file_path) if extracted_text.startswith("❌"): # Error occurred during extraction summary, rows = fetch_admin_rules(tenant_id) return extracted_text, summary, rows if not extracted_text or not extracted_text.strip(): summary, rows = fetch_admin_rules(tenant_id) return "❗ No text could be extracted from the file.", summary, rows # Add rules from extracted text status = add_admin_rules(tenant_id, extracted_text) summary, rows = fetch_admin_rules(tenant_id) return status, summary, rows def add_rules_and_refresh(tenant_id: str, rules_text: str): status = add_admin_rules(tenant_id, rules_text) summary, rows = fetch_admin_rules(tenant_id) return status, summary, rows def delete_rule_and_refresh(tenant_id: str, rule: str): status = delete_admin_rule(tenant_id, rule) summary, rows = fetch_admin_rules(tenant_id) return status, summary, rows def fetch_admin_analytics(tenant_id: str): """Fetch analytics data and return formatted results with visualizations.""" if not tenant_id or not tenant_id.strip(): error_msg = "❗ Tenant ID is required to view analytics." return error_msg, {}, None, None, None, None tenant_id = tenant_id.strip() headers = {"x-tenant-id": tenant_id} overview_data = {} tool_usage_data = {} redflags_data = {} activity_data = {} error_msg = None # Fetch Overview try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/overview", headers=headers, timeout=30 ) if resp.status_code == 200: overview_data = resp.json() else: error_msg = f"❌ Error fetching overview: {resp.status_code}" except Exception as e: error_msg = f"❌ Error: {str(e)}" # Fetch Tool Usage try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/tool-usage", headers=headers, timeout=30 ) if resp.status_code == 200: tool_usage_data = resp.json() except Exception: pass # Fetch Red Flags try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/redflags", headers=headers, timeout=30 ) if resp.status_code == 200: redflags_data = resp.json() except Exception: pass # Fetch Activity try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/activity", headers=headers, timeout=30 ) if resp.status_code == 200: activity_data = resp.json() except Exception: pass # Extract data for visualizations overview = overview_data.get("overview", {}) tool_usage = overview.get("tool_usage", tool_usage_data.get("tool_usage", {})) rag_quality = overview.get("rag_quality", {}) # Create tool usage bar chart tool_chart = None if tool_usage and PLOTLY_AVAILABLE: try: tools = [] counts = [] latencies = [] colors_list = [] color_map = { "rag": "#3b82f6", "rag.search": "#2563eb", "rag.ingest": "#1d4ed8", "rag.list": "#1e40af", "web.search": "#06b6d4", "admin": "#a855f7", "llm": "#10b981" } for tool_name, stats in tool_usage.items(): tools.append(tool_name.replace(".", " ").title()) counts.append(stats.get("count", 0)) latencies.append(stats.get("avg_latency_ms", 0)) colors_list.append(color_map.get(tool_name, "#6b7280")) if tools: fig = go.Figure() fig.add_trace(go.Bar( x=tools, y=counts, name="Usage Count", marker_color=colors_list, text=counts, textposition='outside', hovertemplate='%{x}
Count: %{y}
' )) fig.update_layout( title={ "text": "Tool Usage Count", "x": 0.5, "xanchor": "center", "font": {"size": 16, "color": "#1f2937"} }, xaxis_title="Tool", yaxis_title="Count", height=380, showlegend=False, margin=dict(l=50, r=20, t=60, b=50), plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", font=dict(color="#374151", size=12), xaxis=dict(gridcolor="rgba(0,0,0,0.1)"), yaxis=dict(gridcolor="rgba(0,0,0,0.1)") ) tool_chart = fig except Exception: tool_chart = None # Create latency chart latency_chart = None if tool_usage and PLOTLY_AVAILABLE: try: tools = [] latencies = [] colors_list = [] color_map = { "rag": "#3b82f6", "rag.search": "#2563eb", "rag.ingest": "#1d4ed8", "rag.list": "#1e40af", "web.search": "#06b6d4", "admin": "#a855f7", "llm": "#10b981" } for tool_name, stats in tool_usage.items(): avg_latency = stats.get("avg_latency_ms", 0) if avg_latency > 0: tools.append(tool_name.replace(".", " ").title()) latencies.append(round(avg_latency, 2)) colors_list.append(color_map.get(tool_name, "#6b7280")) if tools: fig = go.Figure() fig.add_trace(go.Bar( x=tools, y=latencies, name="Avg Latency (ms)", marker_color=colors_list, text=[f"{l}ms" for l in latencies], textposition='outside', hovertemplate='%{x}
Avg Latency: %{y}ms' )) fig.update_layout( title={ "text": "Average Tool Latency", "x": 0.5, "xanchor": "center", "font": {"size": 16, "color": "#1f2937"} }, xaxis_title="Tool", yaxis_title="Latency (ms)", height=380, showlegend=False, margin=dict(l=50, r=20, t=60, b=50), plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", font=dict(color="#374151", size=12), xaxis=dict(gridcolor="rgba(0,0,0,0.1)"), yaxis=dict(gridcolor="rgba(0,0,0,0.1)") ) latency_chart = fig except Exception: latency_chart = None # Create RAG quality metrics visualization rag_chart = None if rag_quality and PLOTLY_AVAILABLE: try: metrics = ["Avg Hits", "Avg Score", "Avg Top Score"] values = [ rag_quality.get("avg_hits_per_search", 0), rag_quality.get("avg_score", 0) * 100, # Convert to percentage rag_quality.get("avg_top_score", 0) * 100 ] fig = go.Figure(data=[go.Bar( x=metrics, y=values, marker_color=["#3b82f6", "#10b981", "#f59e0b"], text=[f"{v:.2f}" for v in values], textposition='outside', hovertemplate='%{x}
Value: %{y:.2f}' )]) fig.update_layout( title={ "text": "RAG Quality Metrics", "x": 0.5, "xanchor": "center", "font": {"size": 16, "color": "#1f2937"} }, xaxis_title="Metric", yaxis_title="Value", height=350, showlegend=False, margin=dict(l=50, r=20, t=60, b=50), plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", font=dict(color="#374151", size=12), xaxis=dict(gridcolor="rgba(0,0,0,0.1)"), yaxis=dict(gridcolor="rgba(0,0,0,0.1)") ) rag_chart = fig except Exception: rag_chart = None # Format summary text total_queries = overview.get("total_queries", activity_data.get("activity", {}).get("total_queries", 0)) active_users = overview.get("active_users", activity_data.get("activity", {}).get("active_users", 0)) redflag_count = overview.get("redflag_count", redflags_data.get("count", 0)) last_query = overview.get("last_query", activity_data.get("activity", {}).get("last_query")) # Calculate total tool usage total_tool_calls = sum(stats.get("count", 0) for stats in tool_usage.values()) total_success = sum(stats.get("success_count", 0) for stats in tool_usage.values()) total_errors = sum(stats.get("error_count", 0) for stats in tool_usage.values()) success_rate = (total_success / total_tool_calls * 100) if total_tool_calls > 0 else 0 summary_text = f""" #### 📈 Activity Metrics - **Total Queries:** `{total_queries}` - **Active Users:** `{active_users}` - **Red Flags:** `{redflag_count}` - **Last Query:** `{last_query if last_query else "N/A"}` --- #### 🔧 Tool Usage Overview - **Total Tool Calls:** `{total_tool_calls}` - **Successful Calls:** `{total_success}` ✅ - **Failed Calls:** `{total_errors}` {'⚠️' if total_errors > 0 else ''} - **Success Rate:** `{success_rate:.1f}%` {'🟢' if success_rate >= 95 else '🟡' if success_rate >= 80 else '🔴'} --- #### 🔍 RAG Quality Metrics - **Total Searches:** `{rag_quality.get("total_searches", 0)}` - **Avg Hits per Search:** `{rag_quality.get("avg_hits_per_search", 0):.2f}` - **Avg Relevance Score:** `{rag_quality.get("avg_score", 0):.3f}` - **Avg Top Score:** `{rag_quality.get("avg_top_score", 0):.3f}` - **Avg Search Latency:** `{rag_quality.get("avg_latency_ms", 0):.2f}ms` --- #### 📊 Tool Breakdown """ # Add individual tool stats to summary for tool_name, stats in sorted(tool_usage.items(), key=lambda x: x[1].get("count", 0), reverse=True): tool_display = tool_name.replace(".", " ").title() count = stats.get("count", 0) latency = stats.get("avg_latency_ms", 0) success = stats.get("success_count", 0) errors = stats.get("error_count", 0) status_icon = "✅" if errors == 0 else "⚠️" summary_text += f"- **{tool_display}** {status_icon}
└ {count} calls • {latency:.1f}ms avg • {success} success • {errors} errors\n" return summary_text, tool_usage, tool_chart, latency_chart, rag_chart, error_msg def list_documents(tenant_id: str, limit: int = 1000, offset: int = 0): """ List all documents for a tenant. Returns a tuple of (status_message, documents_list, total_count, stats_dict, chart_fig). """ if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", [], 0, {}, None tenant_id = tenant_id.strip() try: response = requests.get( f"{BACKEND_BASE_URL}/rag/list", params={"tenant_id": tenant_id, "limit": limit, "offset": offset}, headers={"x-tenant-id": tenant_id}, timeout=30 ) if response.status_code == 200: data = response.json() documents = data.get("documents", []) total = data.get("total", 0) # Format documents for display and collect stats formatted_docs = [] type_counts = Counter() total_length = 0 for doc in documents: doc_id = doc.get("id", "N/A") text = doc.get("text", "") created_at = doc.get("created_at", "") preview = text[:200] + "..." if len(text) > 200 else text # Detect document type text_lower = text.lower() if "http://" in text_lower or "https://" in text_lower or "www." in text_lower: doc_type = "link" elif any(x in text_lower for x in ["q:", "question:", "faq", "frequently asked"]): doc_type = "faq" elif ".pdf" in text_lower or "pdf document" in text_lower: doc_type = "pdf" else: doc_type = "text" type_counts[doc_type] += 1 total_length += len(text) formatted_docs.append({ "ID": doc_id, "Type": doc_type, "Preview": preview, "Length": len(text), "Created": created_at[:10] if created_at else "N/A" }) # Create statistics dictionary stats = { "total": total, "types": dict(type_counts), "avg_length": total_length // total if total > 0 else 0, "total_chars": total_length } # Create pie chart for document types chart_fig = None if type_counts and PLOTLY_AVAILABLE: try: labels = list(type_counts.keys()) values = list(type_counts.values()) colors = { "text": "#3b82f6", # blue "pdf": "#ef4444", # red "faq": "#a855f7", # purple "link": "#06b6d4" # cyan } chart_colors = [colors.get(label, "#6b7280") for label in labels] fig = go.Figure(data=[go.Pie( labels=labels, values=values, hole=0.4, marker=dict(colors=chart_colors), textinfo='label+percent+value', textfont=dict(size=12), hovertemplate='%{label}
Count: %{value}
Percentage: %{percent}' )]) fig.update_layout( title={ "text": "Document Type Distribution", "x": 0.5, "xanchor": "center", "font": {"size": 16} }, height=400, showlegend=True, margin=dict(l=20, r=20, t=50, b=20) ) chart_fig = fig except Exception: chart_fig = None status = f"✅ Found {total} document(s)" return status, formatted_docs, total, stats, chart_fig else: error_msg = f"❌ Error {response.status_code}: {response.text}" return error_msg, [], 0, {}, None except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running.", [], 0, {}, None except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again.", [], 0, {}, None except Exception as exc: return f"❌ Unexpected error: {exc}", [], 0, {}, None def delete_document(tenant_id: str, document_id: int): """Delete a specific document by ID.""" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not document_id or document_id <= 0: return "❗ Invalid document ID." tenant_id = tenant_id.strip() try: response = requests.delete( f"{BACKEND_BASE_URL}/rag/delete/{document_id}", params={"tenant_id": tenant_id}, headers={"x-tenant-id": tenant_id}, timeout=30 ) if response.status_code == 200: return f"✅ Document {document_id} deleted successfully." elif response.status_code == 404: return f"❌ Document {document_id} not found or access denied." else: error_data = response.json() if response.headers.get("content-type", "").startswith("application/json") else {} error_msg = error_data.get("detail", error_data.get("error", response.text)) return f"❌ Error {response.status_code}: {error_msg}" except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error: {exc}" def delete_all_documents(tenant_id: str): """Delete all documents for a tenant.""" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." tenant_id = tenant_id.strip() try: response = requests.delete( f"{BACKEND_BASE_URL}/rag/delete-all", params={"tenant_id": tenant_id}, headers={"x-tenant-id": tenant_id}, timeout=60 ) if response.status_code == 200: data = response.json() deleted_count = data.get("deleted_count", 0) return f"✅ Deleted {deleted_count} document(s) successfully." else: error_data = response.json() if response.headers.get("content-type", "").startswith("application/json") else {} error_msg = error_data.get("detail", error_data.get("error", response.text)) return f"❌ Error {response.status_code}: {error_msg}" except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error: {exc}" def search_knowledge_base(tenant_id: str, query: str): """Search the knowledge base using RAG semantic search.""" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", [] if not query or not query.strip(): return "❗ Please enter a search query.", [] tenant_id = tenant_id.strip() query = query.strip() try: response = requests.post( f"{BACKEND_BASE_URL}/rag/search", json={"tenant_id": tenant_id, "query": query, "threshold": 0.3}, headers={"x-tenant-id": tenant_id, "Content-Type": "application/json"}, timeout=30 ) if response.status_code == 200: data = response.json() results = data.get("results", []) formatted_results = [] for idx, result in enumerate(results, 1): text = result.get("text", "") relevance = result.get("relevance", result.get("score", 0.0)) formatted_results.append({ "Rank": idx, "Text": text[:300] + "..." if len(text) > 300 else text, "Relevance": f"{relevance:.3f}" if relevance else "N/A" }) status = f"✅ Found {len(results)} result(s) for '{query}'" return status, formatted_results else: error_msg = f"❌ Error {response.status_code}: {response.text}" return error_msg, [] except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running.", [] except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again.", [] except Exception as exc: return f"❌ Unexpected error: {exc}", [] # Create Gradio interface with gr.Blocks( title="IntegraChat — MCP Autonomous Agent", theme=gr.themes.Soft(), css=""" .stat-card { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; border-radius: 12px; color: white; text-align: center; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); transition: transform 0.2s; } .stat-card:hover { transform: translateY(-2px); box-shadow: 0 6px 12px rgba(0, 0, 0, 0.15); } .stat-card h3 { margin: 0 0 10px 0; font-size: 14px; opacity: 0.9; } .stat-card strong { font-size: 24px; font-weight: bold; } .summary-box { background: linear-gradient(135deg, #1f2937 0%, #111827 100%); padding: 24px; border-radius: 12px; border: 2px solid #374151; max-height: 500px; overflow-y: auto; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.3); color: #f9fafb; } .summary-box h3, .summary-box h4 { margin-top: 0; margin-bottom: 12px; color: #ffffff; font-weight: 600; } .summary-box h4 { color: #e5e7eb; font-size: 16px; margin-top: 20px; margin-bottom: 10px; } .summary-box p { color: #f3f4f6; margin: 8px 0; line-height: 1.6; } .summary-box ul { margin: 10px 0; padding-left: 24px; color: #f3f4f6; } .summary-box li { margin: 8px 0; color: #f3f4f6; line-height: 1.6; } .summary-box code { background-color: #000000; color: #00ff00; padding: 2px 6px; border-radius: 4px; font-family: 'Courier New', monospace; font-size: 13px; border: 1px solid #374151; } .summary-box hr { border: none; border-top: 1px solid #4b5563; margin: 16px 0; } .summary-box strong { color: #ffffff; } .chart-title { margin-bottom: 8px; margin-top: 0; font-weight: 600; color: #1f2937; text-align: center; } """ ) as demo: gr.Markdown( """ # 🤖 IntegraChat — MCP Autonomous Agent **Enterprise-grade AI with autonomous agents, secure multi-tenant RAG, real-time web search, and governance.** Enter your Tenant ID to chat with the MCP-powered agent or ingest documents into the enterprise knowledge base. """ ) tenant_id_input = gr.Textbox( label="Tenant ID", placeholder="Enter your tenant ID (e.g., tenant123)", value="", interactive=True ) with gr.Tabs(): with gr.Tab("Chat"): with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot( label="Chat with Agent", height=500, show_label=True, container=True, type="messages" ) with gr.Row(): message_input = gr.Textbox( label="Message", placeholder="Type your message here...", scale=4, show_label=False, container=False ) send_button = gr.Button("Send", variant="primary", scale=1) with gr.Column(scale=1): gr.Markdown( """ ### 📝 Chat Instructions 1. Enter your **Tenant ID** above 2. Ask a question or give a task to the agent 3. The MCP agent will automatically select tools (RAG, Web, etc.) ### ⚙️ Backend Configuration The agent connects to the FastAPI backend at `http://localhost:8000/agent/message` """ ) # Event handlers for chat tab with streaming def send_message(message, tenant_id, history): # Clear message input immediately message_input_value = "" # Use streaming function which yields updates # Gradio will automatically handle the generator and update UI in real-time try: for updated_history in chat_with_agent(message, tenant_id, history): yield updated_history, message_input_value except Exception as e: # Fallback if streaming fails error_msg = f"Streaming error: {str(e)}" history.append({"role": "assistant", "content": error_msg}) yield history, message_input_value send_button.click( fn=send_message, inputs=[message_input, tenant_id_input, chatbot], outputs=[chatbot, message_input] ) message_input.submit( fn=send_message, inputs=[message_input, tenant_id_input, chatbot], outputs=[chatbot, message_input] ) with gr.Tab("Document Ingestion"): gr.Markdown( """ ### 📚 Knowledge Base Ingestion Ingest documents so the MCP agent can reference tenant-private knowledge. - **Raw text / URLs:** Use the fields below. - **Files (PDF, DOCX, TXT, MD):** Use the file upload section. """ ) ingestion_mode = gr.Radio( ["Raw Text", "URL", "File Upload"], value="Raw Text", label="Select Ingestion Mode" ) with gr.Row(): doc_filename = gr.Textbox(label="Filename (optional)") doc_id = gr.Textbox(label="Document ID (optional)") document_url = gr.Textbox( label="Document URL (for URL ingestion)", placeholder="https://example.com/policy", visible=False ) doc_content = gr.Textbox( label="Content / Notes", placeholder="Paste the document text here...", lines=8, visible=True ) metadata_json = gr.Textbox( label="Additional Metadata (JSON)", placeholder='{"department": "HR", "tags": ["policy", "benefits"]}' ) ingest_doc_button = gr.Button("Ingest Text / URL Document", variant="primary") document_status = gr.Markdown("") def handle_ingest_document( tenant_id, mode, content, doc_url, filename, doc_id_value, metadata ): source_type = "raw_text" if mode == "Raw Text" else "url" result = ingest_document( tenant_id=tenant_id, source_type=source_type, content=content, document_url=doc_url, filename=filename, doc_id=doc_id_value, metadata_json=metadata ) # Add note about refreshing Knowledge Base Library if "✅" in result: result += "\n\n💡 **Tip:** Go to the 'Knowledge Base Library' tab to view your ingested documents." return result ingest_doc_button.click( fn=handle_ingest_document, inputs=[ tenant_id_input, ingestion_mode, doc_content, document_url, doc_filename, doc_id, metadata_json ], outputs=document_status ) file_section = gr.Markdown("#### 📁 File Upload (PDF, DOCX, TXT, Markdown)", visible=False) file_upload = gr.File( label="Upload File", file_types=[".pdf", ".docx", ".txt", ".md", ".markdown"], visible=False ) ingest_file_button = gr.Button("Upload & Ingest File", visible=False) def handle_file_ingestion(tenant_id, file_obj): result = ingest_file(tenant_id, file_obj) # Add note about refreshing Knowledge Base Library if "✅" in result: result += "\n\n💡 **Tip:** Go to the 'Knowledge Base Library' tab to view your ingested documents." return result ingest_file_button.click( fn=handle_file_ingestion, inputs=[tenant_id_input, file_upload], outputs=document_status ) def toggle_source_fields(mode): show_text = mode == "Raw Text" show_url = mode == "URL" show_file = mode == "File Upload" return ( gr.update(visible=show_text), gr.update(visible=show_url), gr.update(visible=not show_file), gr.update(visible=not show_file), gr.update(visible=not show_file), gr.update(visible=show_file), gr.update(visible=show_file), gr.update(visible=show_file), ) ingestion_mode.change( fn=toggle_source_fields, inputs=[ingestion_mode], outputs=[ doc_content, document_url, doc_filename, doc_id, ingest_doc_button, file_section, file_upload, ingest_file_button, ] ) with gr.Tab("Knowledge Base Library"): gr.Markdown( """ ### 📚 Knowledge Base Library View, search, and manage all ingested documents for your tenant with visual analytics. - **📊 Statistics:** View document counts, types, and distribution - **🔍 Search:** Use semantic search to find relevant documents - **🔽 Filter:** Filter documents by type (text, PDF, FAQ, link) - **🗑️ Delete:** Remove individual documents or delete all at once """ ) # Statistics Section with gr.Row(): kb_total_docs = gr.Markdown("### 📄 Total Documents\n**0**", elem_classes=["stat-card"]) kb_text_docs = gr.Markdown("### 📝 Text Documents\n**0**", elem_classes=["stat-card"]) kb_pdf_docs = gr.Markdown("### 📄 PDF Documents\n**0**", elem_classes=["stat-card"]) kb_faq_docs = gr.Markdown("### ❓ FAQ Documents\n**0**", elem_classes=["stat-card"]) kb_link_docs = gr.Markdown("### 🔗 Link Documents\n**0**", elem_classes=["stat-card"]) # Chart and Search Section with gr.Row(): with gr.Column(scale=1): kb_chart = gr.Plot(label="Document Type Distribution", show_label=True) kb_refresh_button = gr.Button("🔄 Refresh Documents", variant="primary", size="lg") kb_delete_all_button = gr.Button("🗑️ Delete All Documents", variant="stop") with gr.Column(scale=1): kb_search_query = gr.Textbox( label="🔍 Search Knowledge Base", placeholder="Enter a search query (e.g., 'admin', 'policy', 'FAQ')...", show_label=True ) kb_search_button = gr.Button("Search", variant="primary") kb_search_status = gr.Markdown("") kb_search_results = gr.Dataframe( headers=["Rank", "Text", "Relevance"], datatype=["number", "str", "str"], interactive=False, label="Search Results", wrap=True ) # Status and Filter Section kb_status = gr.Markdown("👉 Click **Refresh Documents** to load your knowledge base.") with gr.Row(): with gr.Column(scale=2): kb_filter_type = gr.Radio( ["all", "text", "pdf", "faq", "link"], value="all", label="Filter by Type", info="Filter documents by detected type" ) with gr.Column(scale=1): kb_avg_length = gr.Markdown("**Average Length:** 0 characters") # Documents Table kb_documents_table = gr.Dataframe( headers=["ID", "Type", "Preview", "Length", "Created"], datatype=["number", "str", "str", "number", "str"], interactive=False, label="Documents", wrap=True ) # Delete Section with gr.Row(): kb_delete_id = gr.Number( label="Delete Document by ID", value=None, precision=0, info="Enter document ID to delete", scale=3 ) kb_delete_button = gr.Button("Delete Document", variant="stop", scale=1) kb_delete_status = gr.Markdown("") def refresh_documents(tenant_id, filter_type="all"): status, docs, total, stats, chart_fig = list_documents(tenant_id) # Filter documents by type if not "all" if filter_type != "all" and docs: filtered_docs = [doc for doc in docs if doc.get("Type", "").lower() == filter_type.lower()] docs = filtered_docs status = f"✅ Found {len(docs)} {filter_type} document(s) (out of {total} total)" # Update statistics cards type_counts = stats.get("types", {}) total_md = f"### 📄 Total Documents\n**{total}**" text_md = f"### 📝 Text Documents\n**{type_counts.get('text', 0)}**" pdf_md = f"### 📄 PDF Documents\n**{type_counts.get('pdf', 0)}**" faq_md = f"### ❓ FAQ Documents\n**{type_counts.get('faq', 0)}**" link_md = f"### 🔗 Link Documents\n**{type_counts.get('link', 0)}**" avg_length_md = f"**Average Length:** {stats.get('avg_length', 0):,} characters" status_msg = f"{status}\n\n**Total Documents:** {total} | **Total Characters:** {stats.get('total_chars', 0):,}" return ( status_msg, docs, total_md, text_md, pdf_md, faq_md, link_md, avg_length_md, chart_fig ) def filter_documents(tenant_id, filter_type): return refresh_documents(tenant_id, filter_type) def search_kb(tenant_id, query): status, results = search_knowledge_base(tenant_id, query) return status, results def delete_doc(tenant_id, doc_id): if doc_id is None or doc_id <= 0: return "❗ Please enter a valid document ID.", "", "", "", "", "", "", "", None result = delete_document(tenant_id, int(doc_id)) # Refresh document list after deletion return (result, *refresh_documents(tenant_id, "all")) def delete_all_docs(tenant_id): result = delete_all_documents(tenant_id) # Refresh document list after deletion return (result, *refresh_documents(tenant_id, "all")) kb_refresh_button.click( fn=refresh_documents, inputs=[tenant_id_input, kb_filter_type], outputs=[ kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) kb_filter_type.change( fn=filter_documents, inputs=[tenant_id_input, kb_filter_type], outputs=[ kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) kb_search_button.click( fn=search_kb, inputs=[tenant_id_input, kb_search_query], outputs=[kb_search_status, kb_search_results] ) kb_search_query.submit( fn=search_kb, inputs=[tenant_id_input, kb_search_query], outputs=[kb_search_status, kb_search_results] ) kb_delete_button.click( fn=delete_doc, inputs=[tenant_id_input, kb_delete_id], outputs=[ kb_delete_status, kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) kb_delete_all_button.click( fn=delete_all_docs, inputs=[tenant_id_input], outputs=[ kb_delete_status, kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) with gr.Tab("Admin Analytics"): gr.Markdown( """ # 📊 Admin Analytics Dashboard Comprehensive tenant-level analytics with visual insights, performance metrics, and detailed tool usage statistics. """ ) # Refresh Button at Top with gr.Row(): analytics_refresh = gr.Button("🔄 Fetch Analytics Snapshot", variant="primary", size="lg") gr.Markdown("") # Statistics Cards gr.Markdown("### 📈 Key Metrics") with gr.Row(): analytics_total_queries = gr.Markdown("### 📊 Total Queries\n**0**", elem_classes=["stat-card"]) analytics_active_users = gr.Markdown("### 👥 Active Users\n**0**", elem_classes=["stat-card"]) analytics_redflags = gr.Markdown("### 🚩 Red Flags\n**0**", elem_classes=["stat-card"]) analytics_rag_searches = gr.Markdown("### 🔍 RAG Searches\n**0**", elem_classes=["stat-card"]) # Charts Section gr.Markdown("### 📊 Performance Charts") with gr.Row(): with gr.Column(scale=1): gr.Markdown("#### 📈 Tool Usage Count", elem_classes=["chart-title"]) analytics_tool_chart = gr.Plot(label="", show_label=False) with gr.Column(scale=1): gr.Markdown("#### ⚡ Average Tool Latency", elem_classes=["chart-title"]) analytics_latency_chart = gr.Plot(label="", show_label=False) # RAG Quality and Summary Section with gr.Row(): with gr.Column(scale=1): gr.Markdown("#### 🔍 RAG Quality Metrics", elem_classes=["chart-title"]) analytics_rag_chart = gr.Plot(label="", show_label=False) with gr.Column(scale=1): gr.Markdown("### 📋 Analytics Summary") analytics_summary = gr.Markdown( "👉 Click **Fetch Analytics Snapshot** to load data.", elem_classes=["summary-box"] ) # Tool Usage Details Table gr.Markdown("### 🔧 Detailed Tool Usage") analytics_tool_table = gr.Dataframe( headers=["Tool", "Count", "Avg Latency (ms)", "Success", "Errors", "Total Tokens"], datatype=["str", "number", "number", "number", "number", "number"], interactive=False, label="", wrap=True ) analytics_error = gr.Markdown("", visible=False) def format_analytics(tenant_id): summary, tool_usage, tool_chart, latency_chart, rag_chart, error = fetch_admin_analytics(tenant_id) if error: return ( error, "", "", "", "", None, None, None, [] ) # Extract overview data - fetch_admin_analytics already fetched it, but we need it again for cards overview_data = {} try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/overview", headers={"x-tenant-id": tenant_id}, timeout=30 ) if resp.status_code == 200: data = resp.json() # The API returns {"overview": {...}} or direct overview object overview_data = data.get("overview", data) if isinstance(data, dict) else {} # Debug: print to see what we're getting print(f"DEBUG: Overview data keys: {overview_data.keys() if isinstance(overview_data, dict) else 'Not a dict'}") except Exception as e: print(f"Error fetching overview: {e}") pass # Extract values with proper fallbacks - handle both nested and flat structures if isinstance(overview_data, dict): total_queries = overview_data.get("total_queries", 0) active_users = overview_data.get("active_users", 0) redflag_count = overview_data.get("redflag_count", 0) rag_quality = overview_data.get("rag_quality", {}) rag_searches = rag_quality.get("total_searches", 0) if isinstance(rag_quality, dict) else 0 else: total_queries = 0 active_users = 0 redflag_count = 0 rag_quality = {} rag_searches = 0 # Format statistics cards queries_md = f"### 📊 Total Queries\n**{total_queries}**" users_md = f"### 👥 Active Users\n**{active_users}**" redflags_md = f"### 🚩 Red Flags\n**{redflag_count}**" rag_md = f"### 🔍 RAG Searches\n**{rag_searches}**" # Format tool usage table tool_table_data = [] for tool_name, stats in tool_usage.items(): tool_table_data.append({ "Tool": tool_name.replace(".", " ").title(), "Count": stats.get("count", 0), "Avg Latency (ms)": round(stats.get("avg_latency_ms", 0), 2), "Success": stats.get("success_count", 0), "Errors": stats.get("error_count", 0), "Total Tokens": stats.get("total_tokens", 0) }) return ( summary, queries_md, users_md, redflags_md, rag_md, tool_chart, latency_chart, rag_chart, tool_table_data ) analytics_refresh.click( fn=format_analytics, inputs=[tenant_id_input], outputs=[ analytics_summary, analytics_total_queries, analytics_active_users, analytics_redflags, analytics_rag_searches, analytics_tool_chart, analytics_latency_chart, analytics_rag_chart, analytics_tool_table ] ) with gr.Tab("Admin Rules & Compliance"): gr.Markdown( """ ### 🛡️ Admin Rules & Regulations Upload or manage tenant-specific governance rules (red-flag patterns, compliance policies, etc.). **Upload Methods:** - **Text Input:** Enter one rule per line in the text box - **File Upload:** Upload rules from TXT, PDF, DOC, or DOCX files **Features:** - Rules are automatically enhanced by LLM (identifies edge cases, improves patterns) - Comment lines (starting with #) are automatically ignored - Use the delete box to remove an exact rule - Refresh anytime to view the latest rule set """ ) rules_summary = gr.Markdown("👉 Click **Refresh Rules** to see existing entries.") rules_table = gr.Dataframe( headers=["#", "Rule"], datatype=["number", "str"], interactive=False, value=[] ) rules_status = gr.Markdown("") with gr.Row(): refresh_rules_button = gr.Button("Refresh Rules", variant="secondary") gr.Markdown("") with gr.Row(): with gr.Column(scale=1): rules_input = gr.Textbox( label="Rules / Regulations (Text Input)", placeholder="Enter one rule per line...", lines=6 ) upload_rules_button = gr.Button("Upload / Append Rules", variant="primary") with gr.Column(scale=1): gr.Markdown("**OR**") rules_file_upload = gr.File( label="Upload Rules File", file_types=[".txt", ".pdf", ".doc", ".docx"], type="filepath" ) upload_file_button = gr.Button("Upload Rules from File", variant="primary") delete_rule_input = gr.Textbox( label="Delete Rule", placeholder="Enter the exact rule text to remove..." ) delete_rule_button = gr.Button("Delete Rule", variant="stop") refresh_rules_button.click( fn=fetch_admin_rules, inputs=[tenant_id_input], outputs=[rules_summary, rules_table] ) upload_rules_button.click( fn=add_rules_and_refresh, inputs=[tenant_id_input, rules_input], outputs=[rules_status, rules_summary, rules_table] ) upload_file_button.click( fn=add_rules_from_file, inputs=[tenant_id_input, rules_file_upload], outputs=[rules_status, rules_summary, rules_table] ) delete_rule_button.click( fn=delete_rule_and_refresh, inputs=[tenant_id_input, delete_rule_input], outputs=[rules_status, rules_summary, rules_table] ) gr.Markdown( """ --- **Built with [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) for the MCP Hackathon** """ ) if __name__ == "__main__": import os # For Hugging Face Spaces, bind to 0.0.0.0; for local dev, use 127.0.0.1 # HF Spaces sets SPACE_ID environment variable server_name = "0.0.0.0" if os.getenv("SPACE_ID") else "127.0.0.1" demo.launch( server_name=server_name, server_port=7860, share=False )