import gradio as gr import requests import json import os from pathlib import Path from collections import Counter from datetime import datetime try: import plotly.graph_objects as go PLOTLY_AVAILABLE = True except ImportError: PLOTLY_AVAILABLE = False go = None BACKEND_BASE_URL = os.getenv("BACKEND_BASE_URL", "http://localhost:8000") # Role-based access control permissions VALID_ROLES = ["viewer", "editor", "admin", "owner"] DEFAULT_ROLE = "viewer" def can_manage_rules(role: str) -> bool: """Check if role can manage rules (admin/owner only).""" return role in ["admin", "owner"] def can_ingest_documents(role: str) -> bool: """Check if role can ingest documents (editor/admin/owner).""" return role in ["editor", "admin", "owner"] def can_delete_documents(role: str) -> bool: """Check if role can delete documents (admin/owner only).""" return role in ["admin", "owner"] def can_view_analytics(role: str) -> bool: """Check if role can view analytics (all roles can view).""" return role in VALID_ROLES # All roles can view analytics def chat_with_agent(message, tenant_id, role, history): """ Send a message to the backend MCP agent and return the response. Uses streaming for real-time word-by-word updates. Args: message: User's message text tenant_id: Tenant ID for multi-tenant isolation history: Chat history (Gradio messages format) Yields: Updated chat history with agent response (streaming) """ if not message or not message.strip(): yield history return if not tenant_id or not tenant_id.strip(): error_msg = "Please enter a Tenant ID before sending a message." history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": error_msg}) yield history return # Add user message to history history.append({"role": "user", "content": message}) # Backend streaming endpoint backend_url = f"{BACKEND_BASE_URL}/agent/message/stream" # Prepare request payload payload = { "tenant_id": tenant_id.strip(), "message": message, "user_id": None, "conversation_history": [], "temperature": 0.0 } # Prepare headers with role headers = { "Content-Type": "application/json", "x-tenant-id": tenant_id.strip(), "x-user-role": role if role else DEFAULT_ROLE } try: # Make streaming request response = requests.post( backend_url, json=payload, headers=headers, stream=True, timeout=120 ) if response.status_code == 200: # Initialize assistant message assistant_message = "" history.append({"role": "assistant", "content": assistant_message}) yield history # Yield initial empty message # Stream tokens - use iter_lines for SSE format for line_bytes in response.iter_lines(): if line_bytes: try: line = line_bytes.decode('utf-8').strip() if not line: continue if line.startswith('data: '): data_str = line[6:] # Remove 'data: ' prefix try: data = json.loads(data_str) # Handle status messages if 'status' in data: status_msg = data.get('message', '') if status_msg: # Show status in the message temporarily history[-1] = {"role": "assistant", "content": f"⏳ {status_msg}"} yield history continue # Handle tokens token = data.get('token', '') if token: assistant_message += token # Update the last message in history history[-1] = {"role": "assistant", "content": assistant_message} yield history # Yield updated history immediately if data.get('done', False): break except json.JSONDecodeError: continue elif line.startswith('error:'): try: error_data = json.loads(line[6:]) error_msg = error_data.get('error', 'Unknown error') history[-1] = {"role": "assistant", "content": f"❌ Error: {error_msg}"} yield history break except: pass except UnicodeDecodeError: continue else: error_msg = f"Error {response.status_code}: {response.text}" history.append({"role": "assistant", "content": error_msg}) yield history except requests.exceptions.ConnectionError: error_msg = "❌ Connection Error: Could not connect to backend. Please ensure the FastAPI server is running at http://localhost:8000" history.append({"role": "assistant", "content": error_msg}) yield history except requests.exceptions.Timeout: error_msg = "⏱️ Request Timeout: The backend took longer than 2 minutes to respond. This may happen if:\n- The LLM is processing a complex query\n- Multiple tools (RAG, Web Search) are being used\n- The backend is under heavy load\n\nPlease try again with a simpler query, or check if the backend services (Ollama, MCP servers) are running properly." history.append({"role": "assistant", "content": error_msg}) yield history except requests.exceptions.RequestException as e: error_msg = f"❌ Request Error: {str(e)}" history.append({"role": "assistant", "content": error_msg}) yield history except Exception as e: error_msg = f"❌ Unexpected Error: {str(e)}" history.append({"role": "assistant", "content": error_msg}) yield history def get_reasoning_trace(tenant_id: str, role: str, message: str): """ Fetch reasoning trace and tool traces for a message using the debug endpoint. Returns formatted markdown showing the reasoning path. """ if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." try: headers = { "Content-Type": "application/json", "x-tenant-id": tenant_id.strip(), "x-user-role": role if role else DEFAULT_ROLE } response = requests.post( f"{BACKEND_BASE_URL}/agent/debug", json={ "tenant_id": tenant_id.strip(), "message": message, "conversation_history": [], "temperature": 0.0 }, headers=headers, timeout=60 ) if response.status_code == 200: data = response.json() response_data = data.get("response", {}) reasoning_trace = response_data.get("reasoning_trace", []) tool_traces = response_data.get("tool_traces", []) decision = response_data.get("decision", {}) # Format reasoning trace trace_md = "## 🧠 Reasoning Path\n\n" for idx, step in enumerate(reasoning_trace, 1): step_name = step.get("step", "unknown") trace_md += f"### {idx}. {step_name.replace('_', ' ').title()}\n" if step.get("intent"): trace_md += f"- **Intent:** {step['intent']}\n" if step.get("match_count"): trace_md += f"- **Rule Matches:** {step['match_count']}\n" if step.get("hit_count"): trace_md += f"- **RAG Hits:** {step['hit_count']}\n" if step.get("latency_ms"): trace_md += f"- **Latency:** {step['latency_ms']}ms\n" if step.get("decision"): dec = step['decision'] trace_md += f"- **Tool:** {dec.get('tool', 'N/A')}\n" trace_md += f"- **Action:** {dec.get('action', 'N/A')}\n" trace_md += "\n" # Format tool traces if tool_traces: trace_md += "## ⚙️ Tool Invocations\n\n" for idx, tool in enumerate(tool_traces, 1): tool_name = tool.get("tool", tool.get("tool_name", "unknown")) latency = tool.get("latency_ms", tool.get("latency", 0)) status = tool.get("status", "success") trace_md += f"### {idx}. {tool_name}\n" trace_md += f"- **Status:** {status}\n" trace_md += f"- **Latency:** {latency}ms\n" if tool.get("result_count"): trace_md += f"- **Results:** {tool['result_count']}\n" trace_md += "\n" # Format decision if decision: trace_md += "## 🎯 Final Decision\n\n" trace_md += f"- **Tool:** {decision.get('tool', 'N/A')}\n" trace_md += f"- **Action:** {decision.get('action', 'N/A')}\n" if decision.get('reason'): trace_md += f"- **Reason:** {decision['reason']}\n" return trace_md else: return f"❌ Error {response.status_code}: {response.text}" except Exception as e: return f"❌ Error fetching reasoning trace: {str(e)}" def ingest_document( tenant_id: str, role: str, source_type: str, content: str, document_url: str, filename: str, doc_id: str, metadata_json: str ): if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required to ingest documents." if not can_ingest_documents(role): return "❌ Access Denied: You need Editor, Admin, or Owner role to ingest documents." tenant_id = tenant_id.strip() payload_content = content or "" if source_type == "url" and document_url: payload_content = document_url.strip() metadata = {} if filename: metadata["filename"] = filename.strip() if document_url: metadata["url"] = document_url.strip() if doc_id: metadata["doc_id"] = doc_id.strip() if metadata_json: try: extra_metadata = json.loads(metadata_json) if isinstance(extra_metadata, dict): metadata.update(extra_metadata) else: return "❗ Metadata JSON must represent an object (key/value pairs)." except json.JSONDecodeError as exc: return f"❗ Invalid metadata JSON: {exc}" payload = { "action": "ingest_document", "tenant_id": tenant_id, "source_type": source_type, "content": payload_content, "metadata": metadata } try: headers = { "Content-Type": "application/json", "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.post( f"{BACKEND_BASE_URL}/rag/ingest-document", json=payload, headers=headers, timeout=60 ) if response.status_code == 200: data = response.json() return f"✅ Document ingested successfully.\n\n{data.get('message', '')}" return f"❌ Ingestion failed ({response.status_code}): {response.text}" except requests.exceptions.ConnectionError: return "❌ Could not reach the backend. Make sure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ The ingestion request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error during ingestion: {exc}" def ingest_file(tenant_id: str, role: str, file_obj): if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required to ingest files." if file_obj is None: return "❗ Please select a file to upload." if not can_ingest_documents(role): return "❌ Access Denied: You need Editor, Admin, or Owner role to ingest files." tenant_id = tenant_id.strip() try: file_path = Path(file_obj.name) with open(file_path, "rb") as f: file_bytes = f.read() files = { "file": (file_path.name, file_bytes, "application/octet-stream") } headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.post( f"{BACKEND_BASE_URL}/rag/ingest-file", files=files, headers=headers, timeout=120 ) if response.status_code == 200: data = response.json() return f"✅ File ingested successfully.\n\n{data.get('message', '')}" return f"❌ File ingestion failed ({response.status_code}): {response.text}" except FileNotFoundError: return "❌ Could not read the uploaded file." except requests.exceptions.ConnectionError: return "❌ Could not reach the backend. Make sure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ File ingestion timed out. Please try again." except Exception as exc: return f"❌ Unexpected error during file ingestion: {exc}" def _format_rules_table(rules: list[str]) -> list[list]: return [[idx + 1, rule] for idx, rule in enumerate(rules)] def fetch_admin_rules(tenant_id: str, role: str) -> tuple[str, list[list]]: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", [] tenant_id = tenant_id.strip() try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.get( f"{BACKEND_BASE_URL}/admin/rules", headers=headers, timeout=30 ) if response.status_code == 200: rules = response.json().get("rules", []) if not rules: return "✅ No admin rules have been configured yet.", [] summary = f"### Current Rules ({len(rules)})" return summary, _format_rules_table(rules) return f"❌ Error {response.status_code}: {response.text}", [] except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running.", [] except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again.", [] except Exception as exc: return f"❌ Unexpected error: {exc}", [] def extract_rules_from_file(file_path) -> str: """ Extract rules from uploaded file (TXT, PDF, DOC, DOCX). Returns the extracted text content. """ if file_path is None: return "" try: # Gradio File component returns file path as string if isinstance(file_path, str): file_path = Path(file_path) else: # Sometimes it's a file object with .name attribute file_path = Path(file_path.name if hasattr(file_path, 'name') else file_path) if not file_path.exists(): return f"❌ File not found: {file_path}" file_ext = file_path.suffix.lower() # Read file based on type if file_ext == '.txt' or file_ext == '.md': # Plain text files with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() return content elif file_ext == '.pdf': # PDF files - use PyPDF2 try: import PyPDF2 with open(file_path, 'rb') as f: pdf_reader = PyPDF2.PdfReader(f) content = [] for page in pdf_reader.pages: content.append(page.extract_text()) return '\n'.join(content) except ImportError: return "❌ PDF extraction requires PyPDF2. Install with: pip install PyPDF2" except Exception as e: return f"❌ Failed to extract text from PDF: {str(e)}" elif file_ext in ['.doc', '.docx']: # DOC/DOCX files - use python-docx try: from docx import Document doc = Document(file_path) content = [] for paragraph in doc.paragraphs: content.append(paragraph.text) return '\n'.join(content) except ImportError: return "❌ DOCX extraction requires python-docx. Install with: pip install python-docx" except Exception as e: return f"❌ Failed to extract text from DOCX: {str(e)}" else: return f"❌ Unsupported file type: {file_ext}. Supported: .txt, .pdf, .doc, .docx" except Exception as e: return f"❌ Error reading file: {str(e)}" def add_admin_rules(tenant_id: str, role: str, rules_text: str) -> str: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not rules_text or not rules_text.strip(): return "❗ Provide at least one rule to upload." if not can_manage_rules(role): return "❌ Access Denied: You need Admin or Owner role to manage rules." tenant_id = tenant_id.strip() # Filter out comment lines (starting with #) and empty lines rules = [ rule.strip() for rule in rules_text.splitlines() if rule.strip() and not rule.strip().startswith("#") ] if not rules: return "❗ No valid rules detected. (Comment lines starting with # are ignored)" added = [] enhanced = [] errors = [] # Process rules in chunks to avoid timeout CHUNK_SIZE = 5 # Process 5 rules at a time total_rules = len(rules) if total_rules == 1: # Single rule - use regular endpoint try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } resp = requests.post( f"{BACKEND_BASE_URL}/admin/rules", params={"rule": rules[0], "enhance": "true"}, headers=headers, timeout=30 ) if resp.status_code == 200: data = resp.json() added.append(data.get("added_rule", rules[0])) if data.get("enhanced"): edge_cases = data.get("edge_cases", []) improvements = data.get("improvements", []) explanation = data.get("explanation", "") examples = data.get("examples", []) missing_patterns = data.get("missing_patterns", []) if explanation: enhanced.append(f"**💡 Explanation:** {explanation}") if examples: examples_list = "\n".join([f" • {ex}" for ex in examples[:5]]) enhanced.append(f"**📋 Examples:**\n{examples_list}") if missing_patterns: patterns_list = "\n".join([f" • {p}" for p in missing_patterns[:5]]) enhanced.append(f"**🔍 Suggested Patterns:**\n{patterns_list}") if edge_cases or improvements: enhanced.append(f"**{data.get('added_rule', rules[0])}**:") if improvements: enhanced.append(f" • Improvements: {', '.join(improvements[:3])}") if edge_cases: enhanced.append(f" • Edge cases identified: {len(edge_cases)}") else: errors.append(f"{rules[0]} -> {resp.status_code}: {resp.text}") except Exception as exc: errors.append(f"{rules[0]} -> {exc}") else: # Multiple rules - process in chunks for i in range(0, total_rules, CHUNK_SIZE): chunk = rules[i:i + CHUNK_SIZE] chunk_num = (i // CHUNK_SIZE) + 1 total_chunks = (total_rules + CHUNK_SIZE - 1) // CHUNK_SIZE try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } resp = requests.post( f"{BACKEND_BASE_URL}/admin/rules/bulk", json={"rules": chunk}, headers=headers, params={"enhance": "true"}, timeout=45 # Timeout per chunk (5 rules) ) if resp.status_code == 200: data = resp.json() chunk_added = data.get("added_rules", []) added.extend(chunk_added) if data.get("enhanced"): chunk_enhanced = data.get("enhancement_summary", []) enhanced.extend([f"[Chunk {chunk_num}/{total_chunks}] {e}" for e in chunk_enhanced]) # Add explanations for bulk rules if available if data.get("explanations"): for exp in data["explanations"][:3]: # Show first 3 explanations if exp.get("explanation"): enhanced.append(f"\n💡 **{exp.get('rule', 'Rule')} Explanation:** {exp['explanation']}") if exp.get("examples"): examples_list = "\n".join([f" • {ex}" for ex in exp['examples'][:3]]) enhanced.append(f"📋 **Examples:**\n{examples_list}") if exp.get("missing_patterns"): patterns_list = "\n".join([f" • {p}" for p in exp['missing_patterns'][:3]]) enhanced.append(f"🔍 **Suggested Patterns:**\n{patterns_list}") else: errors.append(f"Chunk {chunk_num}/{total_chunks} failed: {resp.status_code}: {resp.text}") except requests.exceptions.Timeout: errors.append(f"Chunk {chunk_num}/{total_chunks} timed out after 45s") except Exception as exc: errors.append(f"Chunk {chunk_num}/{total_chunks} error: {exc}") summary = [] if added: summary.append(f"✅ Added {len(added)}/{total_rules} rule(s):\n" + "\n".join([f"- {r}" for r in added[:10]])) if len(added) > 10: summary.append(f"... and {len(added) - 10} more") if enhanced: summary.append(f"\n🤖 LLM Enhancement Applied:\n" + "\n".join(enhanced[:5])) if len(enhanced) > 5: summary.append(f"... and {len(enhanced) - 5} more enhancements") if errors: summary.append("\n⚠️ Errors:\n" + "\n".join(errors)) return "\n\n".join(summary) if summary else "No rules were added." def delete_admin_rule(tenant_id: str, role: str, rule: str) -> str: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not rule or not rule.strip(): return "❗ Provide the exact rule text to delete." if not can_manage_rules(role): return "❌ Access Denied: You need Admin or Owner role to delete rules." tenant_id = tenant_id.strip() rule = rule.strip() try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } resp = requests.delete( f"{BACKEND_BASE_URL}/admin/rules/{rule}", headers=headers, timeout=15 ) if resp.status_code == 200: return f"🗑️ Deleted rule: {rule}" return f"❌ Error {resp.status_code}: {resp.text}" except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ Delete request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error: {exc}" def add_rules_from_file(tenant_id: str, role: str, file_path): """ Extract rules from uploaded file and add them. """ if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", "👉 Click **Refresh Rules** to see existing entries.", [] if file_path is None: return "❗ Please select a file to upload.", "👉 Click **Refresh Rules** to see existing entries.", [] # Extract text from file extracted_text = extract_rules_from_file(file_path) if extracted_text.startswith("❌"): # Error occurred during extraction summary, rows = fetch_admin_rules(tenant_id) return extracted_text, summary, rows if not extracted_text or not extracted_text.strip(): summary, rows = fetch_admin_rules(tenant_id) return "❗ No text could be extracted from the file.", summary, rows # Add rules from extracted text status = add_admin_rules(tenant_id, role, extracted_text) summary, rows = fetch_admin_rules(tenant_id, role) return status, summary, rows def add_rules_and_refresh(tenant_id: str, role: str, rules_text: str): status = add_admin_rules(tenant_id, role, rules_text) summary, rows = fetch_admin_rules(tenant_id, role) return status, summary, rows def delete_rule_and_refresh(tenant_id: str, role: str, rule: str): status = delete_admin_rule(tenant_id, role, rule) summary, rows = fetch_admin_rules(tenant_id, role) return status, summary, rows def fetch_admin_analytics(tenant_id: str, role: str): """Fetch analytics data and return formatted results with visualizations.""" if not tenant_id or not tenant_id.strip(): error_msg = "❗ Tenant ID is required to view analytics." return error_msg, {}, None, None, None, None # All roles can view analytics (matching backend permissions) # No access check needed here tenant_id = tenant_id.strip() headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } overview_data = {} tool_usage_data = {} redflags_data = {} activity_data = {} error_msg = None # Fetch Overview try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/overview", headers=headers, timeout=30 ) if resp.status_code == 200: overview_data = resp.json() else: error_msg = f"❌ Error fetching overview: {resp.status_code}" except Exception as e: error_msg = f"❌ Error: {str(e)}" # Fetch Tool Usage try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/tool-usage", headers=headers, timeout=30 ) if resp.status_code == 200: tool_usage_data = resp.json() except Exception: pass # Fetch Red Flags try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/redflags", headers=headers, timeout=30 ) if resp.status_code == 200: redflags_data = resp.json() except Exception: pass # Fetch Activity try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/activity", headers=headers, timeout=30 ) if resp.status_code == 200: activity_data = resp.json() except Exception: pass # Extract data for visualizations overview = overview_data.get("overview", {}) tool_usage = overview.get("tool_usage", tool_usage_data.get("tool_usage", {})) rag_quality = overview.get("rag_quality", {}) # Create tool usage bar chart tool_chart = None if tool_usage and PLOTLY_AVAILABLE: try: tools = [] counts = [] latencies = [] colors_list = [] color_map = { "rag": "#3b82f6", "rag.search": "#2563eb", "rag.ingest": "#1d4ed8", "rag.list": "#1e40af", "web.search": "#06b6d4", "admin": "#a855f7", "llm": "#10b981" } for tool_name, stats in tool_usage.items(): tools.append(tool_name.replace(".", " ").title()) counts.append(stats.get("count", 0)) latencies.append(stats.get("avg_latency_ms", 0)) colors_list.append(color_map.get(tool_name, "#6b7280")) if tools: fig = go.Figure() fig.add_trace(go.Bar( x=tools, y=counts, name="Usage Count", marker_color=colors_list, text=counts, textposition='outside', hovertemplate='%{x}
Count: %{y}
' )) fig.update_layout( title={ "text": "Tool Usage Count", "x": 0.5, "xanchor": "center", "font": {"size": 16, "color": "#1f2937"} }, xaxis_title="Tool", yaxis_title="Count", height=380, showlegend=False, margin=dict(l=50, r=20, t=60, b=50), plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", font=dict(color="#374151", size=12), xaxis=dict(gridcolor="rgba(0,0,0,0.1)"), yaxis=dict(gridcolor="rgba(0,0,0,0.1)") ) tool_chart = fig except Exception: tool_chart = None # Create latency chart latency_chart = None if tool_usage and PLOTLY_AVAILABLE: try: tools = [] latencies = [] colors_list = [] color_map = { "rag": "#3b82f6", "rag.search": "#2563eb", "rag.ingest": "#1d4ed8", "rag.list": "#1e40af", "web.search": "#06b6d4", "admin": "#a855f7", "llm": "#10b981" } for tool_name, stats in tool_usage.items(): avg_latency = stats.get("avg_latency_ms", 0) if avg_latency > 0: tools.append(tool_name.replace(".", " ").title()) latencies.append(round(avg_latency, 2)) colors_list.append(color_map.get(tool_name, "#6b7280")) if tools: fig = go.Figure() fig.add_trace(go.Bar( x=tools, y=latencies, name="Avg Latency (ms)", marker_color=colors_list, text=[f"{l}ms" for l in latencies], textposition='outside', hovertemplate='%{x}
Avg Latency: %{y}ms' )) fig.update_layout( title={ "text": "Average Tool Latency", "x": 0.5, "xanchor": "center", "font": {"size": 16, "color": "#1f2937"} }, xaxis_title="Tool", yaxis_title="Latency (ms)", height=380, showlegend=False, margin=dict(l=50, r=20, t=60, b=50), plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", font=dict(color="#374151", size=12), xaxis=dict(gridcolor="rgba(0,0,0,0.1)"), yaxis=dict(gridcolor="rgba(0,0,0,0.1)") ) latency_chart = fig except Exception: latency_chart = None # Create RAG quality metrics visualization rag_chart = None if rag_quality and PLOTLY_AVAILABLE: try: metrics = ["Avg Hits", "Avg Score", "Avg Top Score"] values = [ rag_quality.get("avg_hits_per_search", 0), rag_quality.get("avg_score", 0) * 100, # Convert to percentage rag_quality.get("avg_top_score", 0) * 100 ] fig = go.Figure(data=[go.Bar( x=metrics, y=values, marker_color=["#3b82f6", "#10b981", "#f59e0b"], text=[f"{v:.2f}" for v in values], textposition='outside', hovertemplate='%{x}
Value: %{y:.2f}' )]) fig.update_layout( title={ "text": "RAG Quality Metrics", "x": 0.5, "xanchor": "center", "font": {"size": 16, "color": "#1f2937"} }, xaxis_title="Metric", yaxis_title="Value", height=350, showlegend=False, margin=dict(l=50, r=20, t=60, b=50), plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", font=dict(color="#374151", size=12), xaxis=dict(gridcolor="rgba(0,0,0,0.1)"), yaxis=dict(gridcolor="rgba(0,0,0,0.1)") ) rag_chart = fig except Exception: rag_chart = None # Format summary text total_queries = overview.get("total_queries", activity_data.get("activity", {}).get("total_queries", 0)) active_users = overview.get("active_users", activity_data.get("activity", {}).get("active_users", 0)) redflag_count = overview.get("redflag_count", redflags_data.get("count", 0)) last_query = overview.get("last_query", activity_data.get("activity", {}).get("last_query")) # Calculate total tool usage total_tool_calls = sum(stats.get("count", 0) for stats in tool_usage.values()) total_success = sum(stats.get("success_count", 0) for stats in tool_usage.values()) total_errors = sum(stats.get("error_count", 0) for stats in tool_usage.values()) success_rate = (total_success / total_tool_calls * 100) if total_tool_calls > 0 else 0 summary_text = f""" #### 📈 Activity Metrics - **Total Queries:** `{total_queries}` - **Active Users:** `{active_users}` - **Red Flags:** `{redflag_count}` - **Last Query:** `{last_query if last_query else "N/A"}` --- #### 🔧 Tool Usage Overview - **Total Tool Calls:** `{total_tool_calls}` - **Successful Calls:** `{total_success}` ✅ - **Failed Calls:** `{total_errors}` {'⚠️' if total_errors > 0 else ''} - **Success Rate:** `{success_rate:.1f}%` {'🟢' if success_rate >= 95 else '🟡' if success_rate >= 80 else '🔴'} --- #### 🔍 RAG Quality Metrics - **Total Searches:** `{rag_quality.get("total_searches", 0)}` - **Avg Hits per Search:** `{rag_quality.get("avg_hits_per_search", 0):.2f}` - **Avg Relevance Score:** `{rag_quality.get("avg_score", 0):.3f}` - **Avg Top Score:** `{rag_quality.get("avg_top_score", 0):.3f}` - **Avg Search Latency:** `{rag_quality.get("avg_latency_ms", 0):.2f}ms` --- #### 📊 Tool Breakdown """ # Add individual tool stats to summary for tool_name, stats in sorted(tool_usage.items(), key=lambda x: x[1].get("count", 0), reverse=True): tool_display = tool_name.replace(".", " ").title() count = stats.get("count", 0) latency = stats.get("avg_latency_ms", 0) success = stats.get("success_count", 0) errors = stats.get("error_count", 0) status_icon = "✅" if errors == 0 else "⚠️" summary_text += f"- **{tool_display}** {status_icon}
└ {count} calls • {latency:.1f}ms avg • {success} success • {errors} errors\n" return summary_text, tool_usage, tool_chart, latency_chart, rag_chart, error_msg def list_documents(tenant_id: str, role: str, limit: int = 1000, offset: int = 0): """ List all documents for a tenant. Returns a tuple of (status_message, documents_list, total_count, stats_dict, chart_fig). """ if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", [], 0, {}, None tenant_id = tenant_id.strip() try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.get( f"{BACKEND_BASE_URL}/rag/list", params={"tenant_id": tenant_id, "limit": limit, "offset": offset}, headers=headers, timeout=30 ) if response.status_code == 200: data = response.json() documents = data.get("documents", []) total = data.get("total", 0) # Format documents for display and collect stats formatted_docs = [] type_counts = Counter() total_length = 0 for doc in documents: doc_id = doc.get("id", "N/A") text = doc.get("text", "") created_at = doc.get("created_at", "") preview = text[:200] + "..." if len(text) > 200 else text # Detect document type text_lower = text.lower() if "http://" in text_lower or "https://" in text_lower or "www." in text_lower: doc_type = "link" elif any(x in text_lower for x in ["q:", "question:", "faq", "frequently asked"]): doc_type = "faq" elif ".pdf" in text_lower or "pdf document" in text_lower: doc_type = "pdf" else: doc_type = "text" type_counts[doc_type] += 1 total_length += len(text) formatted_docs.append({ "ID": doc_id, "Type": doc_type, "Preview": preview, "Length": len(text), "Created": created_at[:10] if created_at else "N/A" }) # Create statistics dictionary stats = { "total": total, "types": dict(type_counts), "avg_length": total_length // total if total > 0 else 0, "total_chars": total_length } # Create pie chart for document types chart_fig = None if type_counts and PLOTLY_AVAILABLE: try: labels = list(type_counts.keys()) values = list(type_counts.values()) colors = { "text": "#3b82f6", # blue "pdf": "#ef4444", # red "faq": "#a855f7", # purple "link": "#06b6d4" # cyan } chart_colors = [colors.get(label, "#6b7280") for label in labels] fig = go.Figure(data=[go.Pie( labels=labels, values=values, hole=0.4, marker=dict(colors=chart_colors), textinfo='label+percent+value', textfont=dict(size=12), hovertemplate='%{label}
Count: %{value}
Percentage: %{percent}' )]) fig.update_layout( title={ "text": "Document Type Distribution", "x": 0.5, "xanchor": "center", "font": {"size": 16} }, height=400, showlegend=True, margin=dict(l=20, r=20, t=50, b=20) ) chart_fig = fig except Exception: chart_fig = None status = f"✅ Found {total} document(s)" return status, formatted_docs, total, stats, chart_fig else: error_msg = f"❌ Error {response.status_code}: {response.text}" return error_msg, [], 0, {}, None except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running.", [], 0, {}, None except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again.", [], 0, {}, None except Exception as exc: return f"❌ Unexpected error: {exc}", [], 0, {}, None def delete_document(tenant_id: str, role: str, document_id: int): """Delete a specific document by ID.""" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not document_id or document_id <= 0: return "❗ Invalid document ID." if not can_delete_documents(role): return "❌ Access Denied: You need Admin or Owner role to delete documents." tenant_id = tenant_id.strip() try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.delete( f"{BACKEND_BASE_URL}/rag/delete/{document_id}", params={"tenant_id": tenant_id}, headers=headers, timeout=30 ) if response.status_code == 200: return f"✅ Document {document_id} deleted successfully." elif response.status_code == 404: return f"❌ Document {document_id} not found or access denied." else: error_data = response.json() if response.headers.get("content-type", "").startswith("application/json") else {} error_msg = error_data.get("detail", error_data.get("error", response.text)) return f"❌ Error {response.status_code}: {error_msg}" except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error: {exc}" def delete_all_documents(tenant_id: str, role: str): """Delete all documents for a tenant.""" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." tenant_id = tenant_id.strip() if not can_delete_documents(role): return "❌ Access Denied: You need Admin or Owner role to delete documents." try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.delete( f"{BACKEND_BASE_URL}/rag/delete-all", params={"tenant_id": tenant_id}, headers=headers, timeout=60 ) if response.status_code == 200: data = response.json() deleted_count = data.get("deleted_count", 0) return f"✅ Deleted {deleted_count} document(s) successfully." else: error_data = response.json() if response.headers.get("content-type", "").startswith("application/json") else {} error_msg = error_data.get("detail", error_data.get("error", response.text)) return f"❌ Error {response.status_code}: {error_msg}" except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error: {exc}" def search_knowledge_base(tenant_id: str, role: str, query: str): """Search the knowledge base using RAG semantic search.""" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", [] if not query or not query.strip(): return "❗ Please enter a search query.", [] tenant_id = tenant_id.strip() query = query.strip() try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE, "Content-Type": "application/json" } response = requests.post( f"{BACKEND_BASE_URL}/rag/search", json={"tenant_id": tenant_id, "query": query, "threshold": 0.3}, headers=headers, timeout=30 ) if response.status_code == 200: data = response.json() results = data.get("results", []) formatted_results = [] for idx, result in enumerate(results, 1): text = result.get("text", "") relevance = result.get("relevance", result.get("score", 0.0)) formatted_results.append({ "Rank": idx, "Text": text[:300] + "..." if len(text) > 300 else text, "Relevance": f"{relevance:.3f}" if relevance else "N/A" }) status = f"✅ Found {len(results)} result(s) for '{query}'" return status, formatted_results else: error_msg = f"❌ Error {response.status_code}: {response.text}" return error_msg, [] except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running.", [] except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again.", [] except Exception as exc: return f"❌ Unexpected error: {exc}", [] # Create Gradio interface with gr.Blocks( title="IntegraChat — MCP Autonomous Agent", theme=gr.themes.Soft( primary_hue="cyan", secondary_hue="blue", neutral_hue="slate", font=("Inter", "system-ui", "sans-serif") ), css=""" /* Global improvements */ .gradio-container { font-family: 'Inter', system-ui, -apple-system, sans-serif; } /* Header styling */ .header-section { background: linear-gradient(135deg, #0f172a 0%, #1e293b 50%, #334155 100%); padding: 32px 24px; border-radius: 16px; margin-bottom: 24px; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.2); border: 1px solid rgba(148, 163, 184, 0.1); } .header-section h1 { background: linear-gradient(135deg, #06b6d4 0%, #3b82f6 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; background-clip: text; font-size: 2.5rem; font-weight: 700; margin-bottom: 12px; } /* Input fields styling */ .input-container { background: rgba(255, 255, 255, 0.05); padding: 20px; border-radius: 12px; border: 1px solid rgba(148, 163, 184, 0.2); backdrop-filter: blur(10px); } /* Stat cards with better gradients */ .stat-card { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 24px; border-radius: 16px; color: white; text-align: center; box-shadow: 0 8px 24px rgba(102, 126, 234, 0.3); transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1); border: 1px solid rgba(255, 255, 255, 0.1); } .stat-card:hover { transform: translateY(-4px) scale(1.02); box-shadow: 0 12px 32px rgba(102, 126, 234, 0.4); } .stat-card h3 { margin: 0 0 12px 0; font-size: 14px; opacity: 0.95; font-weight: 500; letter-spacing: 0.5px; text-transform: uppercase; } .stat-card strong { font-size: 32px; font-weight: 700; display: block; margin-top: 8px; } /* Enhanced summary box */ .summary-box { background: linear-gradient(135deg, #0f172a 0%, #1e293b 50%, #334155 100%); padding: 28px; border-radius: 16px; border: 1px solid rgba(148, 163, 184, 0.2); max-height: 500px; overflow-y: auto; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.3); color: #f1f5f9; backdrop-filter: blur(10px); } .summary-box::-webkit-scrollbar { width: 8px; } .summary-box::-webkit-scrollbar-track { background: rgba(255, 255, 255, 0.05); border-radius: 4px; } .summary-box::-webkit-scrollbar-thumb { background: rgba(148, 163, 184, 0.3); border-radius: 4px; } .summary-box::-webkit-scrollbar-thumb:hover { background: rgba(148, 163, 184, 0.5); } .summary-box h3, .summary-box h4 { margin-top: 0; margin-bottom: 16px; color: #ffffff; font-weight: 600; } .summary-box h4 { color: #e2e8f0; font-size: 16px; margin-top: 24px; margin-bottom: 12px; } .summary-box p { color: #f1f5f9; margin: 10px 0; line-height: 1.7; } .summary-box ul { margin: 12px 0; padding-left: 28px; color: #f1f5f9; } .summary-box li { margin: 10px 0; color: #f1f5f9; line-height: 1.7; } .summary-box code { background-color: rgba(0, 0, 0, 0.4); color: #00ff88; padding: 3px 8px; border-radius: 6px; font-family: 'Fira Code', 'Courier New', monospace; font-size: 13px; border: 1px solid rgba(148, 163, 184, 0.2); } .summary-box hr { border: none; border-top: 1px solid rgba(148, 163, 184, 0.2); margin: 20px 0; } .summary-box strong { color: #ffffff; font-weight: 600; } /* Chart titles */ .chart-title { margin-bottom: 12px; margin-top: 0; font-weight: 600; color: #1e293b; text-align: center; font-size: 18px; } /* Button enhancements */ button.primary { background: linear-gradient(135deg, #06b6d4 0%, #3b82f6 100%); border: none; box-shadow: 0 4px 12px rgba(6, 182, 212, 0.3); transition: all 0.3s ease; } button.primary:hover { transform: translateY(-2px); box-shadow: 0 6px 16px rgba(6, 182, 212, 0.4); } /* Tab styling */ .tab-nav { border-bottom: 2px solid rgba(148, 163, 184, 0.1); } /* Role badge styling */ .role-badge { display: inline-block; padding: 6px 12px; border-radius: 20px; font-size: 12px; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; } .role-viewer { background: linear-gradient(135deg, #64748b 0%, #475569 100%); color: white; } .role-editor { background: linear-gradient(135deg, #06b6d4 0%, #0891b2 100%); color: white; } .role-admin { background: linear-gradient(135deg, #8b5cf6 0%, #7c3aed 100%); color: white; } .role-owner { background: linear-gradient(135deg, #f59e0b 0%, #d97706 100%); color: white; } /* Improved input styling */ input[type="text"], textarea, select { border-radius: 10px !important; border: 2px solid rgba(148, 163, 184, 0.2) !important; transition: all 0.3s ease !important; } input[type="text"]:focus, textarea:focus, select:focus { border-color: #06b6d4 !important; box-shadow: 0 0 0 3px rgba(6, 182, 212, 0.1) !important; } /* Card styling for sections */ .section-card { background: rgba(255, 255, 255, 0.02); padding: 24px; border-radius: 16px; border: 1px solid rgba(148, 163, 184, 0.1); margin-bottom: 20px; backdrop-filter: blur(10px); } /* Chatbot styling */ .chatbot { border-radius: 12px !important; border: 1px solid rgba(148, 163, 184, 0.2) !important; } """ ) as demo: with gr.Column(elem_classes=["header-section"]): gr.Markdown( """ # 🤖 IntegraChat — MCP Autonomous Agent **Enterprise-grade AI with autonomous agents, secure multi-tenant RAG, real-time web search, and governance.** """ ) gr.Markdown( """
🔐 Role-Based Access Control: Features are automatically shown/hidden based on your role:
""" ) with gr.Row(elem_classes=["input-container"]): tenant_id_input = gr.Textbox( label="🏢 Tenant ID", placeholder="Enter your tenant ID (e.g., tenant123)", value="", interactive=True, scale=2, info="Required for all operations" ) role_input = gr.Dropdown( label="👤 User Role", choices=VALID_ROLES, value=DEFAULT_ROLE, interactive=True, scale=1, info="Select your role to access appropriate features" ) with gr.Tabs(): with gr.Tab("Chat"): # Access denied for Editor role - Editor should only see Document Ingestion chat_access_denied = gr.Markdown( """

🔒 Access Denied

Editor role can only access Document Ingestion.

Please switch to Owner or Admin role to access Chat functionality, or go to the Document Ingestion tab.

""", visible=False ) chat_content = gr.Column(visible=True) with chat_content: with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot( label="Chat with Agent", height=500, show_label=True, container=True, type="messages" ) with gr.Row(): message_input = gr.Textbox( label="Message", placeholder="Type your message here...", scale=4, show_label=False, container=False ) send_button = gr.Button("Send", variant="primary", scale=1) with gr.Column(scale=1): gr.Markdown( """
### 📝 Chat Instructions 1. Enter your **Tenant ID** and **Role** above 2. Ask a question or give a task to the agent 3. The MCP agent will automatically select tools (RAG, Web, etc.) ### ⚡ Features - ✨ Real-time streaming responses - 🧠 Multi-step planning & reasoning - 🔍 Automatic tool selection - 💾 Conversation memory - 📊 Reasoning visualization (see Debug tab)
""" ) # Reasoning trace viewer reasoning_trace_viewer = gr.Markdown( "💡 **Tip:** Use the Debug tab to view detailed reasoning traces for your messages.", visible=True ) # Event handlers for chat tab with streaming def send_message(message, tenant_id, role, history): # Clear message input immediately message_input_value = "" # Use streaming function which yields updates # Gradio will automatically handle the generator and update UI in real-time try: for updated_history in chat_with_agent(message, tenant_id, role, history): yield updated_history, message_input_value except Exception as e: # Fallback if streaming fails error_msg = f"Streaming error: {str(e)}" history.append({"role": "assistant", "content": error_msg}) yield history, message_input_value send_button.click( fn=send_message, inputs=[message_input, tenant_id_input, role_input, chatbot], outputs=[chatbot, message_input] ) message_input.submit( fn=send_message, inputs=[message_input, tenant_id_input, role_input, chatbot], outputs=[chatbot, message_input] ) # Function to update Chat tab visibility based on role (Editor sees access denied) def update_chat_visibility(role): is_editor = role == "editor" return ( gr.update(visible=is_editor), # Access denied message for Editor gr.update(visible=not is_editor), # Chat content for Owner/Admin ) role_input.change( fn=update_chat_visibility, inputs=[role_input], outputs=[chat_access_denied, chat_content] ) with gr.Tab("🔍 Debug & Reasoning"): gr.Markdown( """
### 🔍 Agent Reasoning Debugger View the complete reasoning path, tool invocations, and decision-making process for any message. **Features:** - 🧠 Step-by-step reasoning trace - ⚙️ Tool invocation timeline - 🎯 Final decision breakdown - 📊 Performance metrics
""" ) debug_message = gr.Textbox( label="Message to Debug", placeholder="Enter the same message you sent in Chat to see its reasoning path...", lines=2 ) debug_button = gr.Button("🔍 Analyze Reasoning", variant="primary") debug_output = gr.Markdown("👉 Enter a message and click 'Analyze Reasoning' to see the agent's reasoning path.") def analyze_reasoning(message, tenant_id, role): if not message or not message.strip(): return "❗ Please enter a message to analyze." if not tenant_id or not tenant_id.strip(): return "❗ Please enter a Tenant ID." return get_reasoning_trace(tenant_id, role, message) debug_button.click( fn=analyze_reasoning, inputs=[debug_message, tenant_id_input, role_input], outputs=[debug_output] ) debug_message.submit( fn=analyze_reasoning, inputs=[debug_message, tenant_id_input, role_input], outputs=[debug_output] ) with gr.Tab("📚 Document Ingestion"): gr.Markdown( """
### 📚 Knowledge Base Ingestion Ingest documents so the MCP agent can reference tenant-private knowledge. **📄 Supported Formats:** - **Raw text / URLs:** Use the fields below - **Files:** PDF, DOCX, TXT, Markdown - **Metadata:** Optional JSON metadata for better organization **⚠️ Note:** Editor role and above can ingest. Admin/Owner can delete.
""" ) ingestion_mode = gr.Radio( ["Raw Text", "URL", "File Upload"], value="Raw Text", label="Select Ingestion Mode" ) with gr.Row(): doc_filename = gr.Textbox(label="Filename (optional)") doc_id = gr.Textbox(label="Document ID (optional)") document_url = gr.Textbox( label="Document URL (for URL ingestion)", placeholder="https://example.com/policy", visible=False ) doc_content = gr.Textbox( label="Content / Notes", placeholder="Paste the document text here...", lines=8, visible=True ) metadata_json = gr.Textbox( label="Additional Metadata (JSON)", placeholder='{"department": "HR", "tags": ["policy", "benefits"]}' ) ingest_doc_button = gr.Button("Ingest Text / URL Document", variant="primary") document_status = gr.Markdown("") def handle_ingest_document( tenant_id, role, mode, content, doc_url, filename, doc_id_value, metadata ): source_type = "raw_text" if mode == "Raw Text" else "url" result = ingest_document( tenant_id=tenant_id, role=role, source_type=source_type, content=content, document_url=doc_url, filename=filename, doc_id=doc_id_value, metadata_json=metadata ) # Add note about refreshing Knowledge Base Library if "✅" in result: result += "\n\n💡 **Tip:** Go to the 'Knowledge Base Library' tab to view your ingested documents." return result ingest_doc_button.click( fn=handle_ingest_document, inputs=[ tenant_id_input, role_input, ingestion_mode, doc_content, document_url, doc_filename, doc_id, metadata_json ], outputs=document_status ) file_section = gr.Markdown("#### 📁 File Upload (PDF, DOCX, TXT, Markdown)", visible=False) file_upload = gr.File( label="Upload File", file_types=[".pdf", ".docx", ".txt", ".md", ".markdown"], visible=False ) ingest_file_button = gr.Button("Upload & Ingest File", visible=False) def handle_file_ingestion(tenant_id, role, file_obj): result = ingest_file(tenant_id, role, file_obj) # Add note about refreshing Knowledge Base Library if "✅" in result: result += "\n\n💡 **Tip:** Go to the 'Knowledge Base Library' tab to view your ingested documents." return result ingest_file_button.click( fn=handle_file_ingestion, inputs=[tenant_id_input, role_input, file_upload], outputs=document_status ) def toggle_source_fields(mode): show_text = mode == "Raw Text" show_url = mode == "URL" show_file = mode == "File Upload" return ( gr.update(visible=show_text), gr.update(visible=show_url), gr.update(visible=not show_file), gr.update(visible=not show_file), gr.update(visible=not show_file), gr.update(visible=show_file), gr.update(visible=show_file), gr.update(visible=show_file), ) ingestion_mode.change( fn=toggle_source_fields, inputs=[ingestion_mode], outputs=[ doc_content, document_url, doc_filename, doc_id, ingest_doc_button, file_section, file_upload, ingest_file_button, ] ) with gr.Tab("📖 Knowledge Base Library"): # Access denied for Editor role kb_access_denied = gr.Markdown( """

🔒 Access Denied

Editor role can only access Document Ingestion.

Please switch to Owner or Admin role to access Knowledge Base Library.

""", visible=False ) kb_library_content = gr.Column(visible=True) with kb_library_content: gr.Markdown( """
### 📖 Knowledge Base Library View, search, and manage all ingested documents for your tenant with visual analytics. **Features:** - **📊 Statistics:** View document counts, types, and distribution - **🔍 Search:** Use semantic search to find relevant documents - **🔽 Filter:** Filter documents by type (text, PDF, FAQ, link) - **🗑️ Delete:** Remove individual documents or delete all at once (Admin/Owner only)
""" ) # Statistics Section with gr.Row(): kb_total_docs = gr.Markdown("### 📄 Total Documents\n**0**", elem_classes=["stat-card"]) kb_text_docs = gr.Markdown("### 📝 Text Documents\n**0**", elem_classes=["stat-card"]) kb_pdf_docs = gr.Markdown("### 📄 PDF Documents\n**0**", elem_classes=["stat-card"]) kb_faq_docs = gr.Markdown("### ❓ FAQ Documents\n**0**", elem_classes=["stat-card"]) kb_link_docs = gr.Markdown("### 🔗 Link Documents\n**0**", elem_classes=["stat-card"]) # Chart and Search Section with gr.Row(): with gr.Column(scale=1): kb_chart = gr.Plot(label="Document Type Distribution", show_label=True) kb_refresh_button = gr.Button("🔄 Refresh Documents", variant="primary", size="lg") kb_delete_all_button = gr.Button("🗑️ Delete All Documents", variant="stop") with gr.Column(scale=1): kb_search_query = gr.Textbox( label="🔍 Search Knowledge Base", placeholder="Enter a search query (e.g., 'admin', 'policy', 'FAQ')...", show_label=True ) kb_search_button = gr.Button("Search", variant="primary") kb_search_status = gr.Markdown("") kb_search_results = gr.Dataframe( headers=["Rank", "Text", "Relevance"], datatype=["number", "str", "str"], interactive=False, label="Search Results", wrap=True ) # Status and Filter Section kb_status = gr.Markdown("👉 Click **Refresh Documents** to load your knowledge base.") with gr.Row(): with gr.Column(scale=2): kb_filter_type = gr.Radio( ["all", "text", "pdf", "faq", "link"], value="all", label="Filter by Type", info="Filter documents by detected type" ) with gr.Column(scale=1): kb_avg_length = gr.Markdown("**Average Length:** 0 characters") # Documents Table kb_documents_table = gr.Dataframe( headers=["ID", "Type", "Preview", "Length", "Created"], datatype=["number", "str", "str", "number", "str"], interactive=False, label="Documents", wrap=True ) # Delete Section (Admin/Owner only) kb_delete_section = gr.Row() with kb_delete_section: kb_delete_id = gr.Number( label="Delete Document by ID", value=None, precision=0, info="Enter document ID to delete", scale=3 ) kb_delete_button = gr.Button("Delete Document", variant="stop", scale=1) kb_delete_status = gr.Markdown("") # Function to update KB tab visibility based on role def update_kb_visibility(role): can_delete = can_delete_documents(role) return ( gr.update(visible=can_delete), # Delete all button gr.update(visible=can_delete), # Delete section ) def refresh_documents(tenant_id, role, filter_type="all"): status, docs, total, stats, chart_fig = list_documents(tenant_id, role) # Filter documents by type if not "all" if filter_type != "all" and docs: filtered_docs = [doc for doc in docs if doc.get("Type", "").lower() == filter_type.lower()] docs = filtered_docs status = f"✅ Found {len(docs)} {filter_type} document(s) (out of {total} total)" # Update statistics cards type_counts = stats.get("types", {}) total_md = f"### 📄 Total Documents\n**{total}**" text_md = f"### 📝 Text Documents\n**{type_counts.get('text', 0)}**" pdf_md = f"### 📄 PDF Documents\n**{type_counts.get('pdf', 0)}**" faq_md = f"### ❓ FAQ Documents\n**{type_counts.get('faq', 0)}**" link_md = f"### 🔗 Link Documents\n**{type_counts.get('link', 0)}**" avg_length_md = f"**Average Length:** {stats.get('avg_length', 0):,} characters" status_msg = f"{status}\n\n**Total Documents:** {total} | **Total Characters:** {stats.get('total_chars', 0):,}" return ( status_msg, docs, total_md, text_md, pdf_md, faq_md, link_md, avg_length_md, chart_fig ) def filter_documents(tenant_id, role, filter_type): return refresh_documents(tenant_id, role, filter_type) def search_kb(tenant_id, role, query): status, results = search_knowledge_base(tenant_id, role, query) return status, results def delete_doc(tenant_id, role, doc_id): if doc_id is None or doc_id <= 0: return "❗ Please enter a valid document ID.", "", "", "", "", "", "", "", None result = delete_document(tenant_id, role, int(doc_id)) # Refresh document list after deletion return (result, *refresh_documents(tenant_id, role, "all")) def delete_all_docs(tenant_id, role): result = delete_all_documents(tenant_id, role) # Refresh document list after deletion return (result, *refresh_documents(tenant_id, role, "all")) kb_refresh_button.click( fn=refresh_documents, inputs=[tenant_id_input, role_input, kb_filter_type], outputs=[ kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) kb_filter_type.change( fn=filter_documents, inputs=[tenant_id_input, role_input, kb_filter_type], outputs=[ kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) kb_search_button.click( fn=search_kb, inputs=[tenant_id_input, role_input, kb_search_query], outputs=[kb_search_status, kb_search_results] ) kb_search_query.submit( fn=search_kb, inputs=[tenant_id_input, role_input, kb_search_query], outputs=[kb_search_status, kb_search_results] ) kb_delete_button.click( fn=delete_doc, inputs=[tenant_id_input, role_input, kb_delete_id], outputs=[ kb_delete_status, kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) kb_delete_all_button.click( fn=delete_all_docs, inputs=[tenant_id_input, role_input], outputs=[ kb_delete_status, kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) # Update visibility when role changes def update_kb_full_visibility(role): is_editor = role == "editor" can_delete = can_delete_documents(role) return ( gr.update(visible=is_editor), # Access denied for Editor gr.update(visible=not is_editor), # KB content for Owner/Admin gr.update(visible=can_delete), # Delete all button gr.update(visible=can_delete), # Delete section ) role_input.change( fn=update_kb_full_visibility, inputs=[role_input], outputs=[kb_access_denied, kb_library_content, kb_delete_all_button, kb_delete_section] ) with gr.Tab("📊 Admin Analytics"): # Access denied message for non-admin/owner roles analytics_access_denied = gr.Markdown( """

🔒 Access Denied

Analytics is available to all roles.

If you're seeing this message, there may be a configuration issue.

""", visible=False ) # Analytics content (visible for admin/owner) analytics_content = gr.Column(visible=True) with analytics_content: gr.Markdown( """
# 📊 Admin Analytics Dashboard Comprehensive tenant-level analytics with visual insights, performance metrics, and detailed tool usage statistics. **🔒 Access:** Admin and Owner roles only
""" ) # Refresh Button at Top with gr.Row(): analytics_refresh = gr.Button("🔄 Fetch Analytics Snapshot", variant="primary", size="lg") gr.Markdown("") # Statistics Cards gr.Markdown("### 📈 Key Metrics") with gr.Row(): analytics_total_queries = gr.Markdown("### 📊 Total Queries\n**0**", elem_classes=["stat-card"]) analytics_active_users = gr.Markdown("### 👥 Active Users\n**0**", elem_classes=["stat-card"]) analytics_redflags = gr.Markdown("### 🚩 Red Flags\n**0**", elem_classes=["stat-card"]) analytics_rag_searches = gr.Markdown("### 🔍 RAG Searches\n**0**", elem_classes=["stat-card"]) # Charts Section gr.Markdown("### 📊 Performance Charts") with gr.Row(): with gr.Column(scale=1): gr.Markdown("#### 📈 Tool Usage Count", elem_classes=["chart-title"]) analytics_tool_chart = gr.Plot(label="", show_label=False) with gr.Column(scale=1): gr.Markdown("#### ⚡ Average Tool Latency", elem_classes=["chart-title"]) analytics_latency_chart = gr.Plot(label="", show_label=False) # RAG Quality and Summary Section with gr.Row(): with gr.Column(scale=1): gr.Markdown("#### 🔍 RAG Quality Metrics", elem_classes=["chart-title"]) analytics_rag_chart = gr.Plot(label="", show_label=False) with gr.Column(scale=1): gr.Markdown("### 📋 Analytics Summary") analytics_summary = gr.Markdown( "👉 Click **Fetch Analytics Snapshot** to load data.", elem_classes=["summary-box"] ) # Tool Usage Details Table gr.Markdown("### 🔧 Detailed Tool Usage") analytics_tool_table = gr.Dataframe( headers=["Tool", "Count", "Avg Latency (ms)", "Success", "Errors", "Total Tokens"], datatype=["str", "number", "number", "number", "number", "number"], interactive=False, label="", wrap=True ) analytics_error = gr.Markdown("", visible=False) def format_analytics(tenant_id, role): summary, tool_usage, tool_chart, latency_chart, rag_chart, error = fetch_admin_analytics(tenant_id, role) if error: return ( error, "", "", "", "", None, None, None, [] ) # Extract overview data - fetch_admin_analytics already fetched it, but we need it again for cards overview_data = {} try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } resp = requests.get( f"{BACKEND_BASE_URL}/analytics/overview", headers=headers, timeout=30 ) if resp.status_code == 200: data = resp.json() # The API returns {"overview": {...}} or direct overview object overview_data = data.get("overview", data) if isinstance(data, dict) else {} # Debug: print to see what we're getting print(f"DEBUG: Overview data keys: {overview_data.keys() if isinstance(overview_data, dict) else 'Not a dict'}") except Exception as e: print(f"Error fetching overview: {e}") pass # Extract values with proper fallbacks - handle both nested and flat structures if isinstance(overview_data, dict): total_queries = overview_data.get("total_queries", 0) active_users = overview_data.get("active_users", 0) redflag_count = overview_data.get("redflag_count", 0) rag_quality = overview_data.get("rag_quality", {}) rag_searches = rag_quality.get("total_searches", 0) if isinstance(rag_quality, dict) else 0 else: total_queries = 0 active_users = 0 redflag_count = 0 rag_quality = {} rag_searches = 0 # Format statistics cards queries_md = f"### 📊 Total Queries\n**{total_queries}**" users_md = f"### 👥 Active Users\n**{active_users}**" redflags_md = f"### 🚩 Red Flags\n**{redflag_count}**" rag_md = f"### 🔍 RAG Searches\n**{rag_searches}**" # Format tool usage table tool_table_data = [] for tool_name, stats in tool_usage.items(): tool_table_data.append({ "Tool": tool_name.replace(".", " ").title(), "Count": stats.get("count", 0), "Avg Latency (ms)": round(stats.get("avg_latency_ms", 0), 2), "Success": stats.get("success_count", 0), "Errors": stats.get("error_count", 0), "Total Tokens": stats.get("total_tokens", 0) }) return ( summary, queries_md, users_md, redflags_md, rag_md, tool_chart, latency_chart, rag_chart, tool_table_data ) analytics_refresh.click( fn=format_analytics, inputs=[tenant_id_input, role_input], outputs=[ analytics_summary, analytics_total_queries, analytics_active_users, analytics_redflags, analytics_rag_searches, analytics_tool_chart, analytics_latency_chart, analytics_rag_chart, analytics_tool_table ] ) # Function to update Analytics tab visibility based on role (all roles can view) def update_analytics_visibility(role): has_access = can_view_analytics(role) # All roles can view now return ( gr.update(visible=False), # No access denied message gr.update(visible=True), # Analytics content visible for all ) # Update visibility when role changes role_input.change( fn=update_analytics_visibility, inputs=[role_input], outputs=[analytics_access_denied, analytics_content] ) with gr.Tab("🛡️ Admin Rules & Compliance"): # Access denied for Editor role rules_access_denied = gr.Markdown( """

🔒 Access Denied

Editor role can only access Document Ingestion.

Admin Rules & Compliance is restricted to Admin and Owner roles only.

""", visible=False ) rules_content = gr.Column(visible=True) with rules_content: gr.Markdown( """
### 🛡️ Admin Rules & Regulations Upload or manage tenant-specific governance rules (red-flag patterns, compliance policies, etc.). **📤 Upload Methods:** - **Text Input:** Enter one rule per line in the text box - **File Upload:** Upload rules from TXT, PDF, DOC, or DOCX files **✨ Features:** - 🤖 Rules are automatically enhanced by LLM (identifies edge cases, improves patterns) - 💬 Comment lines (starting with #) are automatically ignored - 🗑️ Use the delete box to remove an exact rule - 🔄 Refresh anytime to view the latest rule set **🔒 Access:** Admin and Owner roles only
""" ) rules_summary = gr.Markdown("👉 Click **Refresh Rules** to see existing entries.") rules_table = gr.Dataframe( headers=["#", "Rule"], datatype=["number", "str"], interactive=False, value=[] ) rules_status = gr.Markdown("") with gr.Row(): refresh_rules_button = gr.Button("Refresh Rules", variant="secondary") gr.Markdown("") with gr.Row(): with gr.Column(scale=1): rules_input = gr.Textbox( label="Rules / Regulations (Text Input)", placeholder="Enter one rule per line...", lines=6 ) upload_rules_button = gr.Button("Upload / Append Rules", variant="primary") with gr.Column(scale=1): gr.Markdown("**OR**") rules_file_upload = gr.File( label="Upload Rules File", file_types=[".txt", ".pdf", ".doc", ".docx"], type="filepath" ) upload_file_button = gr.Button("Upload Rules from File", variant="primary") delete_rule_input = gr.Textbox( label="Delete Rule", placeholder="Enter the exact rule text to remove..." ) delete_rule_button = gr.Button("Delete Rule", variant="stop") refresh_rules_button.click( fn=fetch_admin_rules, inputs=[tenant_id_input, role_input], outputs=[rules_summary, rules_table] ) upload_rules_button.click( fn=add_rules_and_refresh, inputs=[tenant_id_input, role_input, rules_input], outputs=[rules_status, rules_summary, rules_table] ) upload_file_button.click( fn=add_rules_from_file, inputs=[tenant_id_input, role_input, rules_file_upload], outputs=[rules_status, rules_summary, rules_table] ) delete_rule_button.click( fn=delete_rule_and_refresh, inputs=[tenant_id_input, role_input, delete_rule_input], outputs=[rules_status, rules_summary, rules_table] ) # Function to update Admin Rules tab visibility based on role def update_rules_visibility(role): is_editor = role == "editor" has_access = can_manage_rules(role) return ( gr.update(visible=is_editor or not has_access), # Access denied for Editor or non-admin gr.update(visible=has_access and not is_editor), # Rules content for Admin/Owner only ) role_input.change( fn=update_rules_visibility, inputs=[role_input], outputs=[rules_access_denied, rules_content] ) gr.Markdown( """

Built with ❤️ using Model Context Protocol (MCP)

Enterprise-Grade MCP Autonomous Agent Platform

""" ) if __name__ == "__main__": import os # For Hugging Face Spaces, bind to 0.0.0.0; for local dev, use 127.0.0.1 # HF Spaces sets SPACE_ID environment variable server_name = "0.0.0.0" if os.getenv("SPACE_ID") else "127.0.0.1" demo.launch( server_name=server_name, server_port=7860, share=False )