import gradio as gr import requests import json import os import sys from pathlib import Path from collections import Counter from datetime import datetime from dotenv import load_dotenv load_dotenv() try: import plotly.graph_objects as go PLOTLY_AVAILABLE = True except ImportError: PLOTLY_AVAILABLE = False go = None BACKEND_BASE_URL = os.getenv("BACKEND_BASE_URL", "http://localhost:8000") # Role-based access control permissions VALID_ROLES = ["viewer", "editor", "admin", "owner"] DEFAULT_ROLE = "viewer" def can_manage_rules(role: str) -> bool: """Check if role can manage rules (admin/owner only).""" return role in ["admin", "owner"] def can_ingest_documents(role: str) -> bool: """Check if role can ingest documents (editor/admin/owner).""" return role in ["editor", "admin", "owner"] def can_delete_documents(role: str) -> bool: """Check if role can delete documents (admin/owner only).""" return role in ["admin", "owner"] def can_view_analytics(role: str) -> bool: """Check if role can view analytics (all roles can view).""" return role in VALID_ROLES # All roles can view analytics def convert_history_to_tuples(history): """ Convert history from dict format to tuple format for Gradio 4.20.0 compatibility. Input format: [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}] Output format: [("user message", "assistant response"), ...] """ if not history: return [] # If already in tuple format, return as-is if history and isinstance(history[0], (tuple, list)) and len(history[0]) == 2: return history # Convert dict format to tuple format result = [] current_user = None current_assistant = None for item in history: if isinstance(item, dict): if item.get("role") == "user": # If we have a pending assistant message, save the pair if current_user is not None and current_assistant is not None: result.append((current_user, current_assistant)) current_user = item.get("content", "") current_assistant = None elif item.get("role") == "assistant": current_assistant = item.get("content", "") elif isinstance(item, (tuple, list)) and len(item) == 2: # Already in tuple format result.append(tuple(item)) # Add the last pair if exists if current_user is not None: result.append((current_user, current_assistant or "")) return result def append_to_history(history, role, content): """ Append a message to history in tuple format for Gradio 4.20.0. """ history = convert_history_to_tuples(history) if role == "user": # For user messages, we need to add a new tuple with empty assistant response history.append((content, "")) elif role == "assistant": # For assistant messages, update the last tuple's assistant part if history and len(history[-1]) == 2: user_msg = history[-1][0] history[-1] = (user_msg, content) else: # If no user message exists, create one with empty user history.append(("", content)) return history def update_last_assistant_message(history, content): """ Update the last assistant message in history (tuple format). """ history = convert_history_to_tuples(history) if history and len(history[-1]) == 2: user_msg = history[-1][0] history[-1] = (user_msg, content) return history def chat_with_agent(message, tenant_id, role, history): """ Send a message to the backend MCP agent and return the response. Uses streaming for real-time character-by-character updates for smooth UX. Features: - Character-by-character streaming for smooth animation - Query caching for faster repeated queries - Enhanced error handling with actionable messages - Multi-query web search for better results Args: message: User's message text tenant_id: Tenant ID for multi-tenant isolation history: Chat history (Gradio messages format) Yields: Updated chat history with agent response (streaming character-by-character) """ # Convert history to tuple format for Gradio 4.20.0 compatibility history = convert_history_to_tuples(history) if not message or not message.strip(): yield history return if not tenant_id or not tenant_id.strip(): error_msg = "Please enter a Tenant ID before sending a message." history = append_to_history(history, "user", message) history = append_to_history(history, "assistant", error_msg) yield history return # Add user message to history history = append_to_history(history, "user", message) # Backend streaming endpoint backend_url = f"{BACKEND_BASE_URL}/agent/message/stream" # Prepare request payload payload = { "tenant_id": tenant_id.strip(), "message": message, "user_id": None, "conversation_history": [], "temperature": 0.0 } # Prepare headers with role headers = { "Content-Type": "application/json", "x-tenant-id": tenant_id.strip(), "x-user-role": role if role else DEFAULT_ROLE } try: # Make streaming request response = requests.post( backend_url, json=payload, headers=headers, stream=True, timeout=120 ) if response.status_code == 200: # Initialize assistant message assistant_message = "" history = append_to_history(history, "assistant", assistant_message) yield history # Yield initial empty message # Stream tokens character-by-character for smooth UX # Backend now streams character-by-character instead of word-by-word for line_bytes in response.iter_lines(): if line_bytes: try: line = line_bytes.decode('utf-8').strip() if not line: continue if line.startswith('data: '): data_str = line[6:] # Remove 'data: ' prefix try: data = json.loads(data_str) # Handle status messages if 'status' in data: status_msg = data.get('message', '') if status_msg: # Show status in the message temporarily history = update_last_assistant_message(history, f"⏳ {status_msg}") yield history continue # Handle tokens (now character-by-character for smoother streaming) token = data.get('token', '') if token: assistant_message += token # Update the last message in history history = update_last_assistant_message(history, assistant_message) yield history # Yield updated history immediately for smooth character-by-character display if data.get('done', False): break except json.JSONDecodeError: continue elif line.startswith('error:'): try: error_data = json.loads(line[6:]) error_msg = error_data.get('error', 'Unknown error') history = update_last_assistant_message(history, f"❌ Error: {error_msg}") yield history break except: pass except UnicodeDecodeError: continue else: error_msg = f"Error {response.status_code}: {response.text}" history = append_to_history(history, "assistant", error_msg) yield history except requests.exceptions.ConnectionError: error_msg = "❌ Connection Error: Could not connect to backend. Please ensure the FastAPI server is running at http://localhost:8000" history = append_to_history(history, "assistant", error_msg) yield history except requests.exceptions.Timeout: error_msg = "⏱️ Request Timeout: The backend took longer than 2 minutes to respond. This may happen if:\n- The LLM is processing a complex query\n- Multiple tools (RAG, Web Search) are being used\n- The backend is under heavy load\n\nPlease try again with a simpler query, or check if the backend services (Ollama, MCP servers) are running properly." history = append_to_history(history, "assistant", error_msg) yield history except requests.exceptions.RequestException as e: error_msg = f"❌ Request Error: {str(e)}" history = append_to_history(history, "assistant", error_msg) yield history except Exception as e: error_msg = f"❌ Unexpected Error: {str(e)}" history = append_to_history(history, "assistant", error_msg) yield history def get_reasoning_trace(tenant_id: str, role: str, message: str): """ Fetch reasoning trace and tool traces for a message using the debug endpoint. Returns formatted markdown showing the reasoning path. """ if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." try: headers = { "Content-Type": "application/json", "x-tenant-id": tenant_id.strip(), "x-user-role": role if role else DEFAULT_ROLE } response = requests.post( f"{BACKEND_BASE_URL}/agent/debug", json={ "tenant_id": tenant_id.strip(), "message": message, "conversation_history": [], "temperature": 0.0 }, headers=headers, timeout=60 ) if response.status_code == 200: data = response.json() response_data = data.get("response", {}) reasoning_trace = response_data.get("reasoning_trace", []) tool_traces = response_data.get("tool_traces", []) decision = response_data.get("decision", {}) # Format reasoning trace with latency predictions and context hints trace_md = "## 🧠 Reasoning Path\n\n" for idx, step in enumerate(reasoning_trace, 1): step_name = step.get("step", "unknown") trace_md += f"### {idx}. {step_name.replace('_', ' ').title()}\n" if step.get("intent"): trace_md += f"- **Intent:** {step['intent']}\n" if step.get("match_count"): trace_md += f"- **Rule Matches:** {step['match_count']}\n" if step.get("hit_count"): trace_md += f"- **RAG Hits:** {step['hit_count']}\n" if step.get("top_score"): trace_md += f"- **Top RAG Score:** {step['top_score']:.3f}\n" if step.get("latency_ms"): trace_md += f"- **Actual Latency:** {step['latency_ms']}ms\n" if step.get("decision"): dec = step['decision'] trace_md += f"- **Tool:** {dec.get('tool', 'N/A')}\n" trace_md += f"- **Action:** {dec.get('action', 'N/A')}\n" # Show latency prediction if available if dec.get('tool_input') and isinstance(dec['tool_input'], dict): est_latency = dec['tool_input'].get('_estimated_latency_ms') if est_latency: trace_md += f"- **⚡ Estimated Latency:** {est_latency}ms\n" trace_md += "\n" # Format tool traces with schema information if tool_traces: trace_md += "## ⚙️ Tool Invocations\n\n" for idx, tool in enumerate(tool_traces, 1): tool_name = tool.get("tool", tool.get("tool_name", "unknown")) response = tool.get("response", {}) latency = tool.get("latency_ms", response.get("latency_ms", 0)) status = tool.get("status", "success") trace_md += f"### {idx}. {tool_name.upper()}\n" trace_md += f"- **Status:** {status}\n" trace_md += f"- **Latency:** {latency}ms\n" # Show latency prediction vs actual if isinstance(response, dict) and response.get("latency_ms"): actual = response["latency_ms"] trace_md += f"- **⚡ Actual vs Estimated:** {actual}ms\n" # Show schema-validated output structure if isinstance(response, dict): if tool_name == "rag" and "results" in response: trace_md += f"- **📊 Schema:** Valid RAG output\n" trace_md += f"- **Results:** {len(response.get('results', []))} chunks\n" trace_md += f"- **Top Score:** {response.get('top_score', 0):.3f}\n" elif tool_name == "web" and "results" in response: trace_md += f"- **📊 Schema:** Valid Web output\n" trace_md += f"- **Results:** {len(response.get('results', []))} items\n" elif tool_name == "admin" and "violations" in response: trace_md += f"- **📊 Schema:** Valid Admin output\n" trace_md += f"- **Violations:** {len(response.get('violations', []))}\n" elif tool_name == "llm" and "text" in response: trace_md += f"- **📊 Schema:** Valid LLM output\n" trace_md += f"- **Tokens:** {response.get('tokens_used', 0)}\n" if tool.get("result_count"): trace_md += f"- **Result Count:** {tool['result_count']}\n" trace_md += "\n" # Format decision with context-aware routing and latency info if decision: trace_md += "## 🎯 Final Decision\n\n" trace_md += f"- **Tool:** {decision.get('tool', 'N/A')}\n" trace_md += f"- **Action:** {decision.get('action', 'N/A')}\n" if decision.get('reason'): reason = decision['reason'] trace_md += f"- **Reason:** {reason}\n" # Extract and highlight context-aware routing hints if "context:" in reason.lower(): trace_md += "\n### 🧠 Context-Aware Routing:\n" if "skip web" in reason.lower() or "rag high" in reason.lower(): trace_md += "- ⚡ **RAG high score → Web search skipped**\n" if "skip rag" in reason.lower() or "memory" in reason.lower(): trace_md += "- 💾 **Relevant memory available → RAG skipped**\n" if "skip reasoning" in reason.lower() or "critical" in reason.lower(): trace_md += "- 🚨 **Critical violation → Agent reasoning skipped**\n" # Extract latency estimates if "latency:" in reason.lower() or "est." in reason.lower(): import re latency_match = re.search(r'latency[:\s]+(\d+)ms', reason, re.IGNORECASE) if latency_match: est_latency = latency_match.group(1) trace_md += f"\n### ⚡ Latency Prediction:\n" trace_md += f"- **Estimated Total Latency:** {est_latency}ms\n" # Show tool sequence with latency estimates if decision.get('tool_input') and isinstance(decision['tool_input'], dict): steps = decision['tool_input'].get('steps', []) if steps: trace_md += "\n### 📋 Tool Execution Plan:\n" total_est_latency = 0 for step_idx, step in enumerate(steps, 1): if isinstance(step, dict): if "parallel" in step: trace_md += f"{step_idx}. **Parallel Execution:** RAG + Web\n" total_est_latency += max(90, 800) # Max of RAG and Web elif step.get("tool"): tool = step["tool"] est_lat = step.get("input", {}).get("_estimated_latency_ms", 0) if est_lat: total_est_latency += est_lat trace_md += f"{step_idx}. **{tool.upper()}** (est. {est_lat}ms)\n" else: trace_md += f"{step_idx}. **{tool.upper()}**\n" if total_est_latency > 0: trace_md += f"\n- **Total Estimated Latency:** {total_est_latency}ms\n" return trace_md else: return f"❌ Error {response.status_code}: {response.text}" except Exception as e: return f"❌ Error fetching reasoning trace: {str(e)}" def ingest_document( tenant_id: str, role: str, source_type: str, content: str, document_url: str, filename: str, doc_id: str, metadata_json: str ): # Debug: Log the role value print(f"🔍 DEBUG: ingest_document received role='{role}' (type: {type(role)})", file=sys.stderr) if not BACKEND_BASE_URL: return "❌ Backend URL is not configured. Please set BACKEND_BASE_URL environment variable or ensure it defaults to http://localhost:8000" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required to ingest documents." # Ensure role is not None or empty if not role or not role.strip(): role = DEFAULT_ROLE print(f"⚠️ WARNING: Role was empty/None in ingest_document, defaulting to '{role}'", file=sys.stderr) role = role.strip() if not can_ingest_documents(role): return "❌ Access Denied: You need Editor, Admin, or Owner role to ingest documents." tenant_id = tenant_id.strip() payload_content = content or "" if source_type == "url" and document_url: payload_content = document_url.strip() metadata = {} if filename: metadata["filename"] = filename.strip() if document_url: metadata["url"] = document_url.strip() if doc_id: metadata["doc_id"] = doc_id.strip() if metadata_json: try: extra_metadata = json.loads(metadata_json) if isinstance(extra_metadata, dict): metadata.update(extra_metadata) else: return "❗ Metadata JSON must represent an object (key/value pairs)." except json.JSONDecodeError as exc: return f"❗ Invalid metadata JSON: {exc}" payload = { "action": "ingest_document", "tenant_id": tenant_id, "source_type": source_type, "content": payload_content, "metadata": metadata } try: # Ensure role is set correctly for the header final_role = role.strip() if role and role.strip() else DEFAULT_ROLE print(f"🔍 DEBUG: Sending request with role='{final_role}' in x-user-role header", file=sys.stderr) headers = { "Content-Type": "application/json", "x-tenant-id": tenant_id, "x-user-role": final_role } response = requests.post( f"{BACKEND_BASE_URL}/rag/ingest-document", json=payload, headers=headers, timeout=60 ) if response.status_code == 200: data = response.json() message = f"✅ Document ingested successfully.\n\n{data.get('message', '')}" # Display extracted metadata if available extracted_metadata = data.get('extracted_metadata', {}) if extracted_metadata: message += "\n\n### 🤖 AI-Generated Metadata:\n" if extracted_metadata.get('title'): message += f"- **Title:** {extracted_metadata['title']}\n" if extracted_metadata.get('summary'): message += f"- **Summary:** {extracted_metadata['summary'][:200]}...\n" if extracted_metadata.get('tags'): tags = ', '.join(extracted_metadata['tags'][:5]) message += f"- **Tags:** {tags}\n" if extracted_metadata.get('topics'): topics = ', '.join(extracted_metadata['topics'][:3]) message += f"- **Topics:** {topics}\n" if extracted_metadata.get('quality_score'): quality = extracted_metadata['quality_score'] quality_bar = "█" * int(quality * 10) + "░" * (10 - int(quality * 10)) message += f"- **Quality Score:** {quality:.2f} {quality_bar}\n" if extracted_metadata.get('detected_date'): message += f"- **Detected Date:** {extracted_metadata['detected_date']}\n" if extracted_metadata.get('extraction_method'): method = extracted_metadata['extraction_method'].upper() message += f"- **Extraction Method:** {method}\n" return message elif response.status_code == 403: # Permission denied - show clear message try: error_data = response.json() error_detail = error_data.get('detail', response.text) except: error_detail = response.text return f"🔒 **Permission Denied (403):**\n\n{error_detail}\n\n**Solution:** Change your **User Role** dropdown (top right) from 'viewer' to 'editor', 'admin', or 'owner' and try again." return f"❌ Ingestion failed ({response.status_code}): {response.text}" except requests.exceptions.ConnectionError: return "❌ Could not reach the backend. Make sure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ The ingestion request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error during ingestion: {exc}" def ingest_file(tenant_id: str, role: str, file_obj): if not BACKEND_BASE_URL: return "❌ Backend URL is not configured. Please set BACKEND_BASE_URL environment variable or ensure it defaults to http://localhost:8000" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required to ingest files." if file_obj is None: return "❗ Please select a file to upload." if not can_ingest_documents(role): return "❌ Access Denied: You need Editor, Admin, or Owner role to ingest files." tenant_id = tenant_id.strip() try: file_path = Path(file_obj.name) with open(file_path, "rb") as f: file_bytes = f.read() files = { "file": (file_path.name, file_bytes, "application/octet-stream") } headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.post( f"{BACKEND_BASE_URL}/rag/ingest-file", files=files, headers=headers, timeout=120 ) if response.status_code == 200: data = response.json() message = f"✅ File ingested successfully.\n\n{data.get('message', '')}" # Display extracted metadata if available extracted_metadata = data.get('extracted_metadata', {}) if extracted_metadata: message += "\n\n### 🤖 AI-Generated Metadata:\n" if extracted_metadata.get('title'): message += f"- **Title:** {extracted_metadata['title']}\n" if extracted_metadata.get('summary'): message += f"- **Summary:** {extracted_metadata['summary'][:200]}...\n" if extracted_metadata.get('tags'): tags = ', '.join(extracted_metadata['tags'][:5]) message += f"- **Tags:** {tags}\n" if extracted_metadata.get('topics'): topics = ', '.join(extracted_metadata['topics'][:3]) message += f"- **Topics:** {topics}\n" if extracted_metadata.get('quality_score'): quality = extracted_metadata['quality_score'] quality_bar = "█" * int(quality * 10) + "░" * (10 - int(quality * 10)) message += f"- **Quality Score:** {quality:.2f} {quality_bar}\n" if extracted_metadata.get('detected_date'): message += f"- **Detected Date:** {extracted_metadata['detected_date']}\n" if extracted_metadata.get('extraction_method'): method = extracted_metadata['extraction_method'].upper() message += f"- **Extraction Method:** {method}\n" return message return f"❌ File ingestion failed ({response.status_code}): {response.text}" except FileNotFoundError: return "❌ Could not read the uploaded file." except requests.exceptions.ConnectionError: return "❌ Could not reach the backend. Make sure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ File ingestion timed out. Please try again." except Exception as exc: return f"❌ Unexpected error during file ingestion: {exc}" def _format_rules_table(rules: list[str]) -> list[list]: return [[idx + 1, rule] for idx, rule in enumerate(rules)] def fetch_admin_rules(tenant_id: str, role: str) -> tuple[str, list[list]]: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", [] tenant_id = tenant_id.strip() try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.get( f"{BACKEND_BASE_URL}/admin/rules", headers=headers, timeout=30 ) if response.status_code == 200: rules = response.json().get("rules", []) if not rules: return "✅ No admin rules have been configured yet.", [] summary = f"### Current Rules ({len(rules)})" return summary, _format_rules_table(rules) return f"❌ Error {response.status_code}: {response.text}", [] except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running.", [] except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again.", [] except Exception as exc: return f"❌ Unexpected error: {exc}", [] def extract_rules_from_file(file_path) -> str: """ Extract rules from uploaded file (TXT, PDF, DOC, DOCX). Returns the extracted text content. """ if file_path is None: return "" try: # Gradio File component returns file path as string if isinstance(file_path, str): file_path = Path(file_path) else: # Sometimes it's a file object with .name attribute file_path = Path(file_path.name if hasattr(file_path, 'name') else file_path) if not file_path.exists(): return f"❌ File not found: {file_path}" file_ext = file_path.suffix.lower() # Read file based on type if file_ext == '.txt' or file_ext == '.md': # Plain text files with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() return content elif file_ext == '.pdf': # PDF files - use PyPDF2 try: import PyPDF2 with open(file_path, 'rb') as f: pdf_reader = PyPDF2.PdfReader(f) content = [] for page in pdf_reader.pages: content.append(page.extract_text()) return '\n'.join(content) except ImportError: return "❌ PDF extraction requires PyPDF2. Install with: pip install PyPDF2" except Exception as e: return f"❌ Failed to extract text from PDF: {str(e)}" elif file_ext in ['.doc', '.docx']: # DOC/DOCX files - use python-docx try: from docx import Document doc = Document(file_path) content = [] for paragraph in doc.paragraphs: content.append(paragraph.text) return '\n'.join(content) except ImportError: return "❌ DOCX extraction requires python-docx. Install with: pip install python-docx" except Exception as e: return f"❌ Failed to extract text from DOCX: {str(e)}" else: return f"❌ Unsupported file type: {file_ext}. Supported: .txt, .pdf, .doc, .docx" except Exception as e: return f"❌ Error reading file: {str(e)}" def add_admin_rules(tenant_id: str, role: str, rules_text: str, enhance: bool = True) -> str: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not rules_text or not rules_text.strip(): return "❗ Provide at least one rule to upload." if not can_manage_rules(role): return "❌ Access Denied: You need Admin or Owner role to manage rules." tenant_id = tenant_id.strip() # Filter out comment lines (starting with #) and empty lines rules = [ rule.strip() for rule in rules_text.splitlines() if rule.strip() and not rule.strip().startswith("#") ] if not rules: return "❗ No valid rules detected. (Comment lines starting with # are ignored)" added = [] enhanced = [] errors = [] # Process rules in chunks to avoid timeout CHUNK_SIZE = 5 # Process 5 rules at a time total_rules = len(rules) if total_rules == 1: # Single rule - use regular endpoint try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } resp = requests.post( f"{BACKEND_BASE_URL}/admin/rules", params={"rule": rules[0], "enhance": "true" if enhance else "false"}, headers=headers, timeout=60 if enhance else 15 ) if resp.status_code == 200: data = resp.json() added.append(data.get("added_rule", rules[0])) if data.get("enhanced"): edge_cases = data.get("edge_cases", []) improvements = data.get("improvements", []) explanation = data.get("explanation", "") examples = data.get("examples", []) missing_patterns = data.get("missing_patterns", []) if explanation: enhanced.append(f"**💡 Explanation:** {explanation}") if examples: examples_list = "\n".join([f" • {ex}" for ex in examples[:5]]) enhanced.append(f"**📋 Examples:**\n{examples_list}") if missing_patterns: patterns_list = "\n".join([f" • {p}" for p in missing_patterns[:5]]) enhanced.append(f"**🔍 Suggested Patterns:**\n{patterns_list}") if edge_cases or improvements: enhanced.append(f"**{data.get('added_rule', rules[0])}**:") if improvements: enhanced.append(f" • Improvements: {', '.join(improvements[:3])}") if edge_cases: enhanced.append(f" • Edge cases identified: {len(edge_cases)}") else: errors.append(f"{rules[0]} -> {resp.status_code}: {resp.text}") except Exception as exc: errors.append(f"{rules[0]} -> {exc}") else: # Multiple rules - process in chunks for i in range(0, total_rules, CHUNK_SIZE): chunk = rules[i:i + CHUNK_SIZE] chunk_num = (i // CHUNK_SIZE) + 1 total_chunks = (total_rules + CHUNK_SIZE - 1) // CHUNK_SIZE try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } resp = requests.post( f"{BACKEND_BASE_URL}/admin/rules/bulk", json={"rules": chunk}, headers=headers, params={"enhance": "true" if enhance else "false"}, timeout=180 if enhance else 30 # Timeout per chunk (5 rules × 30s per rule + buffer if enhance, else quick) ) if resp.status_code == 200: data = resp.json() chunk_added = data.get("added_rules", []) added.extend(chunk_added) if data.get("enhanced"): chunk_enhanced = data.get("enhancement_summary", []) enhanced.extend([f"[Chunk {chunk_num}/{total_chunks}] {e}" for e in chunk_enhanced]) # Add explanations for bulk rules if available if data.get("explanations"): for exp in data["explanations"][:3]: # Show first 3 explanations if exp.get("explanation"): enhanced.append(f"\n💡 **{exp.get('rule', 'Rule')} Explanation:** {exp['explanation']}") if exp.get("examples"): examples_list = "\n".join([f" • {ex}" for ex in exp['examples'][:3]]) enhanced.append(f"📋 **Examples:**\n{examples_list}") if exp.get("missing_patterns"): patterns_list = "\n".join([f" • {p}" for p in exp['missing_patterns'][:3]]) enhanced.append(f"🔍 **Suggested Patterns:**\n{patterns_list}") else: errors.append(f"Chunk {chunk_num}/{total_chunks} failed: {resp.status_code}: {resp.text}") except requests.exceptions.Timeout: errors.append(f"Chunk {chunk_num}/{total_chunks} timed out after 180s. Try adding rules without enhancement (set enhance=false) or add fewer rules at once.") except Exception as exc: errors.append(f"Chunk {chunk_num}/{total_chunks} error: {exc}") summary = [] if added: summary.append(f"✅ Added {len(added)}/{total_rules} rule(s):\n" + "\n".join([f"- {r}" for r in added[:10]])) if len(added) > 10: summary.append(f"... and {len(added) - 10} more") if enhanced: summary.append(f"\n🤖 LLM Enhancement Applied:\n" + "\n".join(enhanced[:5])) if len(enhanced) > 5: summary.append(f"... and {len(enhanced) - 5} more enhancements") if errors: summary.append("\n⚠️ Errors:\n" + "\n".join(errors)) return "\n\n".join(summary) if summary else "No rules were added." def delete_admin_rule(tenant_id: str, role: str, rule: str) -> str: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not rule or not rule.strip(): return "❗ Provide the exact rule text to delete." if not can_manage_rules(role): return "❌ Access Denied: You need Admin or Owner role to delete rules." tenant_id = tenant_id.strip() rule = rule.strip() # If user entered just a number, try to find the rule by index if rule.isdigit(): try: # Fetch rules to get the actual rule text by index headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } resp = requests.get( f"{BACKEND_BASE_URL}/admin/rules", headers=headers, timeout=15 ) if resp.status_code == 200: rules = resp.json().get("rules", []) rule_idx = int(rule) - 1 # Convert to 0-based index if 0 <= rule_idx < len(rules): rule = rules[rule_idx] # Use the actual rule text else: return f"❌ Invalid rule number. Please enter a number between 1 and {len(rules)}, or enter the full rule text." except Exception as e: return f"❌ Error fetching rules: {e}" try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } # URL encode the rule text to handle special characters import urllib.parse encoded_rule = urllib.parse.quote(rule, safe='') resp = requests.delete( f"{BACKEND_BASE_URL}/admin/rules/{encoded_rule}", headers=headers, timeout=15 ) if resp.status_code == 200: return f"🗑️ Deleted rule: {rule}" elif resp.status_code == 404: return f"❌ Rule not found: '{rule}'. Please check the rules table and enter the exact rule text (or rule number)." return f"❌ Error {resp.status_code}: {resp.text}" except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ Delete request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error: {exc}" def add_rules_from_file(tenant_id: str, role: str, file_path, enhance: bool = True): """ Extract rules from uploaded file and add them. """ if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", "👉 Click **Refresh Rules** to see existing entries.", [] if file_path is None: return "❗ Please select a file to upload.", "👉 Click **Refresh Rules** to see existing entries.", [] # Extract text from file extracted_text = extract_rules_from_file(file_path) if extracted_text.startswith("❌"): # Error occurred during extraction summary, rows = fetch_admin_rules(tenant_id) return extracted_text, summary, rows if not extracted_text or not extracted_text.strip(): summary, rows = fetch_admin_rules(tenant_id) return "❗ No text could be extracted from the file.", summary, rows # Add rules from extracted text status = add_admin_rules(tenant_id, role, extracted_text, enhance=enhance) summary, rows = fetch_admin_rules(tenant_id, role) return status, summary, rows def add_rules_and_refresh(tenant_id: str, role: str, rules_text: str, enhance: bool = True): status = add_admin_rules(tenant_id, role, rules_text, enhance=enhance) summary, rows = fetch_admin_rules(tenant_id, role) return status, summary, rows def delete_rule_and_refresh(tenant_id: str, role: str, rule: str): status = delete_admin_rule(tenant_id, role, rule) summary, rows = fetch_admin_rules(tenant_id, role) return status, summary, rows def fetch_admin_analytics(tenant_id: str, role: str): """Fetch analytics data and return formatted results with visualizations.""" if not tenant_id or not tenant_id.strip(): error_msg = "❗ Tenant ID is required to view analytics." return error_msg, {}, None, None, None, None # All roles can view analytics (matching backend permissions) # No access check needed here tenant_id = tenant_id.strip() headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } overview_data = {} tool_usage_data = {} redflags_data = {} activity_data = {} error_msg = None # Fetch Overview try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/overview", headers=headers, timeout=30 ) if resp.status_code == 200: overview_data = resp.json() else: error_msg = f"❌ Error fetching overview: {resp.status_code}" except Exception as e: error_msg = f"❌ Error: {str(e)}" # Fetch Tool Usage try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/tool-usage", headers=headers, timeout=30 ) if resp.status_code == 200: tool_usage_data = resp.json() except Exception: pass # Fetch Red Flags try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/redflags", headers=headers, timeout=30 ) if resp.status_code == 200: redflags_data = resp.json() except Exception: pass # Fetch Activity try: resp = requests.get( f"{BACKEND_BASE_URL}/analytics/activity", headers=headers, timeout=30 ) if resp.status_code == 200: activity_data = resp.json() except Exception: pass # Extract data for visualizations overview = overview_data.get("overview", {}) tool_usage = overview.get("tool_usage", tool_usage_data.get("tool_usage", {})) rag_quality = overview.get("rag_quality", {}) # Create tool usage bar chart tool_chart = None if tool_usage and PLOTLY_AVAILABLE: try: tools = [] counts = [] latencies = [] colors_list = [] color_map = { "rag": "#3b82f6", "rag.search": "#2563eb", "rag.ingest": "#1d4ed8", "rag.list": "#1e40af", "web.search": "#06b6d4", "admin": "#a855f7", "llm": "#10b981" } for tool_name, stats in tool_usage.items(): tools.append(tool_name.replace(".", " ").title()) counts.append(stats.get("count", 0)) latencies.append(stats.get("avg_latency_ms", 0)) colors_list.append(color_map.get(tool_name, "#6b7280")) if tools: fig = go.Figure() fig.add_trace(go.Bar( x=tools, y=counts, name="Usage Count", marker_color=colors_list, text=counts, textposition='outside', hovertemplate='%{x}
Count: %{y}
' )) fig.update_layout( title={ "text": "Tool Usage Count", "x": 0.5, "xanchor": "center", "font": {"size": 16, "color": "#1f2937"} }, xaxis_title="Tool", yaxis_title="Count", height=380, showlegend=False, margin=dict(l=50, r=20, t=60, b=50), plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", font=dict(color="#374151", size=12), xaxis=dict(gridcolor="rgba(0,0,0,0.1)"), yaxis=dict(gridcolor="rgba(0,0,0,0.1)") ) tool_chart = fig except Exception: tool_chart = None # Create latency chart latency_chart = None if tool_usage and PLOTLY_AVAILABLE: try: tools = [] latencies = [] colors_list = [] color_map = { "rag": "#3b82f6", "rag.search": "#2563eb", "rag.ingest": "#1d4ed8", "rag.list": "#1e40af", "web.search": "#06b6d4", "admin": "#a855f7", "llm": "#10b981" } for tool_name, stats in tool_usage.items(): avg_latency = stats.get("avg_latency_ms", 0) if avg_latency > 0: tools.append(tool_name.replace(".", " ").title()) latencies.append(round(avg_latency, 2)) colors_list.append(color_map.get(tool_name, "#6b7280")) if tools: fig = go.Figure() fig.add_trace(go.Bar( x=tools, y=latencies, name="Avg Latency (ms)", marker_color=colors_list, text=[f"{l}ms" for l in latencies], textposition='outside', hovertemplate='%{x}
Avg Latency: %{y}ms' )) fig.update_layout( title={ "text": "Average Tool Latency", "x": 0.5, "xanchor": "center", "font": {"size": 16, "color": "#1f2937"} }, xaxis_title="Tool", yaxis_title="Latency (ms)", height=380, showlegend=False, margin=dict(l=50, r=20, t=60, b=50), plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", font=dict(color="#374151", size=12), xaxis=dict(gridcolor="rgba(0,0,0,0.1)"), yaxis=dict(gridcolor="rgba(0,0,0,0.1)") ) latency_chart = fig except Exception: latency_chart = None # Create RAG quality metrics visualization rag_chart = None if rag_quality and PLOTLY_AVAILABLE: try: metrics = ["Avg Hits", "Avg Score", "Avg Top Score"] values = [ rag_quality.get("avg_hits_per_search", 0), rag_quality.get("avg_score", 0) * 100, # Convert to percentage rag_quality.get("avg_top_score", 0) * 100 ] fig = go.Figure(data=[go.Bar( x=metrics, y=values, marker_color=["#3b82f6", "#10b981", "#f59e0b"], text=[f"{v:.2f}" for v in values], textposition='outside', hovertemplate='%{x}
Value: %{y:.2f}' )]) fig.update_layout( title={ "text": "RAG Quality Metrics", "x": 0.5, "xanchor": "center", "font": {"size": 16, "color": "#1f2937"} }, xaxis_title="Metric", yaxis_title="Value", height=350, showlegend=False, margin=dict(l=50, r=20, t=60, b=50), plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", font=dict(color="#374151", size=12), xaxis=dict(gridcolor="rgba(0,0,0,0.1)"), yaxis=dict(gridcolor="rgba(0,0,0,0.1)") ) rag_chart = fig except Exception: rag_chart = None # Format summary text total_queries = overview.get("total_queries", activity_data.get("activity", {}).get("total_queries", 0)) active_users = overview.get("active_users", activity_data.get("activity", {}).get("active_users", 0)) redflag_count = overview.get("redflag_count", redflags_data.get("count", 0)) last_query = overview.get("last_query", activity_data.get("activity", {}).get("last_query")) # Calculate total tool usage total_tool_calls = sum(stats.get("count", 0) for stats in tool_usage.values()) total_success = sum(stats.get("success_count", 0) for stats in tool_usage.values()) total_errors = sum(stats.get("error_count", 0) for stats in tool_usage.values()) success_rate = (total_success / total_tool_calls * 100) if total_tool_calls > 0 else 0 summary_text = f""" #### 📈 Activity Metrics - **Total Queries:** `{total_queries}` - **Active Users:** `{active_users}` - **Red Flags:** `{redflag_count}` - **Last Query:** `{last_query if last_query else "N/A"}` --- #### 🔧 Tool Usage Overview - **Total Tool Calls:** `{total_tool_calls}` - **Successful Calls:** `{total_success}` ✅ - **Failed Calls:** `{total_errors}` {'⚠️' if total_errors > 0 else ''} - **Success Rate:** `{success_rate:.1f}%` {'🟢' if success_rate >= 95 else '🟡' if success_rate >= 80 else '🔴'} --- #### 🔍 RAG Quality Metrics - **Total Searches:** `{rag_quality.get("total_searches", 0)}` - **Avg Hits per Search:** `{rag_quality.get("avg_hits_per_search", 0):.2f}` - **Avg Relevance Score:** `{rag_quality.get("avg_score", 0):.3f}` - **Avg Top Score:** `{rag_quality.get("avg_top_score", 0):.3f}` - **Avg Search Latency:** `{rag_quality.get("avg_latency_ms", 0):.2f}ms` --- #### 📊 Tool Breakdown """ # Add individual tool stats to summary for tool_name, stats in sorted(tool_usage.items(), key=lambda x: x[1].get("count", 0), reverse=True): tool_display = tool_name.replace(".", " ").title() count = stats.get("count", 0) latency = stats.get("avg_latency_ms", 0) success = stats.get("success_count", 0) errors = stats.get("error_count", 0) status_icon = "✅" if errors == 0 else "⚠️" summary_text += f"- **{tool_display}** {status_icon}
└ {count} calls • {latency:.1f}ms avg • {success} success • {errors} errors\n" return summary_text, tool_usage, tool_chart, latency_chart, rag_chart, error_msg def list_documents(tenant_id: str, role: str, limit: int = 1000, offset: int = 0): """ List all documents for a tenant. Returns a tuple of (status_message, documents_list, total_count, stats_dict, chart_fig). """ if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", [], 0, {}, None tenant_id = tenant_id.strip() try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.get( f"{BACKEND_BASE_URL}/rag/list", params={"tenant_id": tenant_id, "limit": limit, "offset": offset}, headers=headers, timeout=30 ) if response.status_code == 200: data = response.json() documents = data.get("documents", []) total = data.get("total", 0) # Format documents for display and collect stats formatted_docs = [] type_counts = Counter() total_length = 0 for doc in documents: doc_id = doc.get("id", "N/A") text = doc.get("text", "") created_at = doc.get("created_at", "") preview = text[:200] + "..." if len(text) > 200 else text # Detect document type text_lower = text.lower() if "http://" in text_lower or "https://" in text_lower or "www." in text_lower: doc_type = "link" elif any(x in text_lower for x in ["q:", "question:", "faq", "frequently asked"]): doc_type = "faq" elif ".pdf" in text_lower or "pdf document" in text_lower: doc_type = "pdf" else: doc_type = "text" type_counts[doc_type] += 1 total_length += len(text) # Format as list for Gradio Dataframe (list of lists) formatted_docs.append([ doc_id, doc_type, preview, len(text), created_at[:10] if created_at else "N/A" ]) # Create statistics dictionary stats = { "total": total, "types": dict(type_counts), "avg_length": total_length // total if total > 0 else 0, "total_chars": total_length } # Create pie chart for document types chart_fig = None if type_counts and PLOTLY_AVAILABLE: try: labels = list(type_counts.keys()) values = list(type_counts.values()) colors = { "text": "#3b82f6", # blue "pdf": "#ef4444", # red "faq": "#a855f7", # purple "link": "#06b6d4" # cyan } chart_colors = [colors.get(label, "#6b7280") for label in labels] fig = go.Figure(data=[go.Pie( labels=labels, values=values, hole=0.4, marker=dict(colors=chart_colors), textinfo='label+percent+value', textfont=dict(size=12), hovertemplate='%{label}
Count: %{value}
Percentage: %{percent}' )]) fig.update_layout( title={ "text": "Document Type Distribution", "x": 0.5, "xanchor": "center", "font": {"size": 16} }, height=400, showlegend=True, margin=dict(l=20, r=20, t=50, b=20) ) chart_fig = fig except Exception: chart_fig = None status = f"✅ Found {total} document(s)" return status, formatted_docs, total, stats, chart_fig else: error_msg = f"❌ Error {response.status_code}: {response.text}" return error_msg, [], 0, {}, None except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running.", [], 0, {}, None except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again.", [], 0, {}, None except Exception as exc: return f"❌ Unexpected error: {exc}", [], 0, {}, None def delete_document(tenant_id: str, role: str, document_id: int): """Delete a specific document by ID.""" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not document_id or document_id <= 0: return "❗ Invalid document ID." if not can_delete_documents(role): return "❌ Access Denied: You need Admin or Owner role to delete documents." tenant_id = tenant_id.strip() try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.delete( f"{BACKEND_BASE_URL}/rag/delete/{document_id}", params={"tenant_id": tenant_id}, headers=headers, timeout=30 ) if response.status_code == 200: return f"✅ Document {document_id} deleted successfully." elif response.status_code == 404: return f"❌ Document {document_id} not found or access denied." else: error_data = response.json() if response.headers.get("content-type", "").startswith("application/json") else {} error_msg = error_data.get("detail", error_data.get("error", response.text)) return f"❌ Error {response.status_code}: {error_msg}" except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error: {exc}" def delete_all_documents(tenant_id: str, role: str): """Delete all documents for a tenant.""" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." tenant_id = tenant_id.strip() if not can_delete_documents(role): return "❌ Access Denied: You need Admin or Owner role to delete documents." try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } response = requests.delete( f"{BACKEND_BASE_URL}/rag/delete-all", params={"tenant_id": tenant_id}, headers=headers, timeout=60 ) if response.status_code == 200: data = response.json() deleted_count = data.get("deleted_count", 0) return f"✅ Deleted {deleted_count} document(s) successfully." else: error_data = response.json() if response.headers.get("content-type", "").startswith("application/json") else {} error_msg = error_data.get("detail", error_data.get("error", response.text)) return f"❌ Error {response.status_code}: {error_msg}" except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error: {exc}" def search_knowledge_base(tenant_id: str, role: str, query: str): """Search the knowledge base using RAG semantic search with cross-encoder re-ranking.""" if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", [] if not query or not query.strip(): return "❗ Please enter a search query.", [] tenant_id = tenant_id.strip() query = query.strip() try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE, "Content-Type": "application/json" } response = requests.post( f"{BACKEND_BASE_URL}/rag/search", json={"tenant_id": tenant_id, "query": query, "threshold": 0.3}, headers=headers, timeout=30 ) if response.status_code == 200: data = response.json() results = data.get("results", []) formatted_results = [] for idx, result in enumerate(results, 1): text = result.get("text", "") relevance = result.get("relevance", result.get("score", 0.0)) formatted_results.append({ "Rank": idx, "Text": text[:300] + "..." if len(text) > 300 else text, "Relevance": f"{relevance:.3f}" if relevance else "N/A" }) status = f"✅ Found {len(results)} result(s) for '{query}' (re-ranked with cross-encoder)" return status, formatted_results else: error_msg = f"❌ Error {response.status_code}: {response.text}" return error_msg, [] except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running.", [] except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again.", [] except Exception as exc: return f"❌ Unexpected error: {exc}", [] # Create Gradio interface # Note: some Gradio versions (especially older ones) do not support the `theme` argument # on `Blocks`. To keep the Docker image compatible across environments, we rely on # custom CSS for styling instead of passing a `theme` kwarg here. with gr.Blocks( title="IntegraChat — MCP Autonomous Agent", css=""" /* Global dark theme with simpler, basic colors */ body, .gradio-container { font-family: 'Inter', system-ui, -apple-system, sans-serif; background: #020617; color: #e5e7eb; } /* Remove default card backgrounds so our custom sections stand out */ .gradio-container .block { background: transparent; } /* Header styling */ .header-section { background: #020617; padding: 28px 24px; border-radius: 18px; margin-bottom: 24px; box-shadow: 0 18px 60px rgba(15, 23, 42, 0.9); border: 1px solid rgba(148, 163, 184, 0.25); } .header-section h1 { color: #e5e7eb; font-size: 2.4rem; font-weight: 700; margin-bottom: 8px; letter-spacing: 0.02em; } .header-section p { color: #cbd5f5; font-size: 0.98rem; max-width: 720px; } /* Input fields strip */ .input-container { background: #020617; padding: 18px 20px 22px 20px; border-radius: 14px; border: 1px solid rgba(148, 163, 184, 0.35); backdrop-filter: blur(18px); box-shadow: 0 12px 40px rgba(15, 23, 42, 0.9); margin-bottom: 18px; } /* Tenant / role cards */ .tenant-card, .role-card { background: #020617; border-radius: 14px; padding: 16px 16px 14px 16px; border: 1px solid rgba(148, 163, 184, 0.6); box-shadow: 0 8px 26px rgba(15, 23, 42, 0.9); display: flex; flex-direction: column; transition: border-color 0.15s ease, box-shadow 0.15s ease, transform 0.15s ease; } # .tenant-card:hover, # .role-card:hover { # border-color: #38bdf8; # box-shadow: 0 12px 36px rgba(56, 189, 248, 0.35); # transform: translateY(-1px); # } .field-label-pill { display: inline-flex; align-items: center; gap: 8px; padding: 6px 12px; border-radius: 999px; background: #0f172a; color: #e5e7eb; font-size: 0.8rem; font-weight: 600; letter-spacing: 0.08em; text-transform: uppercase; border: 1px solid #38bdf8; } .field-label-pill span.icon { font-size: 1rem; } .field-label-subtitle { margin-top: 4px; margin-bottom: 4px; color: #9ca3af; font-size: 0.8rem; } /* Reduce spacing for dropdown in role card */ .role-card .field-label-subtitle { margin-bottom: 6px; } .role-card select, .role-card .gradio-dropdown { margin-top: 2px; } /* Left/right columns in Chat tab */ .chat-row > .col:nth-child(1) { min-width: 0; } /* Stat cards */ .stat-card { background: #020617; padding: 22px; border-radius: 16px; color: white; text-align: left; box-shadow: 0 12px 32px rgba(15, 23, 42, 0.9); transition: transform 0.2s ease, box-shadow 0.2s ease, border-color 0.2s ease; border: 1px solid rgba(248, 250, 252, 0.25); } .stat-card:hover { transform: translateY(-3px) scale(1.01); box-shadow: 0 16px 40px rgba(15, 23, 42, 0.95); border-color: #38bdf8; } .stat-card h3 { margin: 0 0 6px 0; font-size: 0.78rem; opacity: 0.9; font-weight: 600; letter-spacing: 0.16em; text-transform: uppercase; } .stat-card strong { font-size: 1.8rem; font-weight: 700; display: block; margin-top: 8px; } /* Summary / debug panel */ .summary-box { background: #020617; padding: 24px; border-radius: 18px; border: 1px solid rgba(148, 163, 184, 0.7); max-height: 520px; overflow-y: auto; box-shadow: 0 18px 48px rgba(15, 23, 42, 0.95); color: #e5e7eb; backdrop-filter: blur(18px); transition: border-color 0.15s ease, box-shadow 0.15s ease, transform 0.15s ease; } .summary-box::-webkit-scrollbar { width: 8px; } .summary-box::-webkit-scrollbar-track { background: rgba(15, 23, 42, 1); border-radius: 999px; } .summary-box::-webkit-scrollbar-thumb { background: rgba(148, 163, 184, 0.7); border-radius: 999px; } .summary-box:hover { border-color: #38bdf8; box-shadow: 0 22px 60px rgba(15, 23, 42, 1); transform: translateY(-1px); } .summary-box h3, .summary-box h4 { margin-top: 0; margin-bottom: 12px; color: #f9fafb; font-weight: 600; } .summary-box p, .summary-box li { color: #e5e7eb; margin: 8px 0; line-height: 1.7; } .summary-box code { background-color: rgba(15, 23, 42, 0.9); color: #22c55e; padding: 3px 7px; border-radius: 6px; font-family: 'Fira Code', 'Courier New', monospace; font-size: 0.78rem; border: 1px solid rgba(148, 163, 184, 0.45); } /* Chart titles / section headings */ .chart-title { margin-bottom: 8px; margin-top: 0; font-weight: 600; color: #e5e7eb; text-align: center; font-size: 1rem; } /* Primary buttons */ button.primary { background: #0ea5e9; border: none; box-shadow: 0 8px 26px rgba(15, 23, 42, 0.9); transition: transform 0.15s ease, box-shadow 0.15s ease, filter 0.15s ease; border-radius: 999px; font-weight: 600; } button.primary:hover { transform: translateY(-1px); filter: brightness(1.08); box-shadow: 0 12px 32px rgba(15, 23, 42, 1); } /* Tabs */ .tab-nav { border-bottom: 1px solid rgba(148, 163, 184, 0.35); } /* Role badges */ .role-badge { display: inline-block; padding: 5px 11px; border-radius: 999px; font-size: 0.7rem; font-weight: 600; text-transform: uppercase; letter-spacing: 0.1em; } .role-viewer { background: linear-gradient(135deg, #64748b 0%, #475569 100%); color: white; } .role-editor { background: linear-gradient(135deg, #06b6d4 0%, #0891b2 100%); color: white; } .role-admin { background: linear-gradient(135deg, #8b5cf6 0%, #7c3aed 100%); color: white; } .role-owner { background: linear-gradient(135deg, #f59e0b 0%, #d97706 100%); color: white; } /* Inputs */ input[type="text"], textarea, select { border-radius: 10px !important; border: 1px solid rgba(148, 163, 184, 0.5) !important; background: rgba(15, 23, 42, 0.92) !important; color: #e5e7eb !important; transition: border-color 0.2s ease, box-shadow 0.2s ease, background 0.2s ease !important; } input[type="text"]::placeholder, textarea::placeholder { color: rgba(148, 163, 184, 0.65) !important; } input[type="text"]:focus, textarea:focus, select:focus { border-color: #06b6d4 !important; box-shadow: 0 0 0 1px rgba(6, 182, 212, 0.65) !important; background: rgba(15, 23, 42, 1) !important; } /* Reduce spacing in dropdown menu items */ .gradio-dropdown ul, .gradio-dropdown .dropdown-menu, select option { padding: 4px 8px !important; margin: 0 !important; } /* Reduce gap between dropdown and label */ .role-card .gradio-dropdown { margin-top: 4px !important; } /* Generic section card */ .section-card { background: #020617; padding: 22px; border-radius: 16px; border: 1px solid rgba(148, 163, 184, 0.4); margin-bottom: 18px; backdrop-filter: blur(14px); box-shadow: 0 14px 40px rgba(15, 23, 42, 0.95); transition: border-color 0.15s ease, box-shadow 0.15s ease, transform 0.15s ease; } .section-card:hover { border-color: #38bdf8; box-shadow: 0 18px 52px rgba(15, 23, 42, 1); transform: translateY(-1px); } /* Chatbot + message bubbles */ .chatbot { border-radius: 18px !important; border: 1px solid rgba(148, 163, 184, 0.7) !important; background: #020617 !important; box-shadow: 0 18px 60px rgba(15, 23, 42, 1); } /* Keep Gradio's default layout, only adjust colors lightly */ .chatbot .message.user { background: #0ea5e9; color: #0b1020; } .chatbot .message.bot { background: #020617; border-color: rgba(148, 163, 184, 0.8); color: #e5e7eb; } .chatbot .message.error { background: rgba(239, 68, 68, 0.18); border-color: rgba(248, 113, 113, 0.9); } """ ) as demo: with gr.Column(elem_classes=["header-section"]): gr.Markdown( """ # 🤖 IntegraChat — MCP Autonomous Agent **Enterprise-grade AI with autonomous agents, secure multi-tenant RAG, real-time web search, and governance.** """ ) gr.Markdown( """
🔐 Role-Based Access Control: Features are automatically shown/hidden based on your role:
""" ) with gr.Row(elem_classes=["input-container"]): with gr.Column(scale=2, elem_classes=["tenant-card"]): gr.Markdown( """
🏢 Tenant ID
Required for all operations. Use a unique ID per customer / environment.
""" ) tenant_id_input = gr.Textbox( label="", placeholder="Enter your tenant ID (e.g., tenant123)", value="", interactive=True, scale=2, show_label=False, ) with gr.Column(scale=1, elem_classes=["role-card"]): gr.Markdown( """
👤 User Role
Select your role to automatically unlock the right capabilities.
""" ) role_input = gr.Dropdown( label="", choices=VALID_ROLES, value=DEFAULT_ROLE, interactive=True, scale=1, show_label=False, ) with gr.Tabs(): with gr.Tab("Chat"): # Access denied for Editor role - Editor should only see Document Ingestion chat_access_denied = gr.Markdown( """

🔒 Access Denied

Editor role can only access Document Ingestion.

Please switch to Owner or Admin role to access Chat functionality, or go to the Document Ingestion tab.

""", visible=False ) chat_content = gr.Column(visible=True) with chat_content: # Two-column layout: chat on the left, guidance panel on the right with gr.Row(elem_classes=["chat-row"]): with gr.Column(scale=2, elem_classes=["section-card"]): chatbot = gr.Chatbot( label="Chat with Agent", height=500, show_label=True, container=True, elem_classes=["chatbot"] ) with gr.Row(): message_input = gr.Textbox( label="Message", placeholder="Type your message here...", scale=4, show_label=False, container=False ) send_button = gr.Button("Send", variant="primary", scale=1) with gr.Column(scale=1, elem_classes=["summary-box"]): gr.Markdown( """ ### 📝 Chat Instructions 1. Enter your **Tenant ID** and **Role** above 2. Ask a question or give a task to the agent 3. The MCP agent will automatically select tools (RAG, Web, etc.) ### ⚡ Features - ✨ Real-time character-by-character streaming responses - 🚀 Query caching for faster repeated queries - 🔍 Query expansion for ambiguous terms (Al→AI, ML→machine learning) - 🌐 Multi-query web search with parallel execution - 🧠 Multi-step planning & reasoning - 🔍 Automatic tool selection with latency prediction - 🧠 Context-aware routing (intelligent tool skipping) - 💾 Conversation memory - 📊 Reasoning visualization (see Debug tab) - ⚡ Per-tool latency estimates (RAG: 60-120ms, Web: 400-1800ms) - 📋 Schema-validated tool outputs - 🛡️ Enhanced error handling with actionable messages """ ) # Reasoning trace viewer reasoning_trace_viewer = gr.Markdown( "💡 **Tip:** Use the Debug tab to view detailed reasoning traces for your messages.", visible=True ) # Event handlers for chat tab with streaming def send_message(message, tenant_id, role, history): # Clear message input immediately message_input_value = "" # Use streaming function which yields updates # Gradio will automatically handle the generator and update UI in real-time try: for updated_history in chat_with_agent(message, tenant_id, role, history): yield updated_history, message_input_value except Exception as e: # Fallback if streaming fails error_msg = f"Streaming error: {str(e)}" history = convert_history_to_tuples(history) history = append_to_history(history, "assistant", error_msg) yield history, message_input_value send_button.click( fn=send_message, inputs=[message_input, tenant_id_input, role_input, chatbot], outputs=[chatbot, message_input] ) message_input.submit( fn=send_message, inputs=[message_input, tenant_id_input, role_input, chatbot], outputs=[chatbot, message_input] ) # Function to update Chat tab visibility based on role (Editor sees access denied) def update_chat_visibility(role): is_editor = role == "editor" return ( gr.update(visible=is_editor), # Access denied message for Editor gr.update(visible=not is_editor), # Chat content for Owner/Admin ) role_input.change( fn=update_chat_visibility, inputs=[role_input], outputs=[chat_access_denied, chat_content] ) with gr.Tab("🔍 Debug & Reasoning"): gr.Markdown( """
### 🔍 Agent Reasoning Debugger View the complete reasoning path, tool invocations, and decision-making process for any message. **Features:** - 🧠 Step-by-step reasoning trace - ⚙️ Tool invocation timeline with schema-validated outputs - ⚡ Per-tool latency predictions (RAG: 60-120ms, Web: 400-1800ms, Admin: <20ms) - 🧠 Context-aware routing hints (skip web if RAG high, skip RAG if memory available) - 📊 Tool output schemas for easier debugging - 🎯 Final decision breakdown with estimated latency - 📊 Performance metrics
""" ) debug_message = gr.Textbox( label="Message to Debug", placeholder="Enter the same message you sent in Chat to see its reasoning path...", lines=2 ) debug_button = gr.Button("🔍 Analyze Reasoning", variant="primary") debug_output = gr.Markdown("👉 Enter a message and click 'Analyze Reasoning' to see the agent's reasoning path.") def analyze_reasoning(message, tenant_id, role): if not message or not message.strip(): return "❗ Please enter a message to analyze." if not tenant_id or not tenant_id.strip(): return "❗ Please enter a Tenant ID." return get_reasoning_trace(tenant_id, role, message) debug_button.click( fn=analyze_reasoning, inputs=[debug_message, tenant_id_input, role_input], outputs=[debug_output] ) debug_message.submit( fn=analyze_reasoning, inputs=[debug_message, tenant_id_input, role_input], outputs=[debug_output] ) with gr.Tab("📚 Document Ingestion"): gr.Markdown( """
### 📚 Knowledge Base Ingestion Ingest documents so the MCP agent can reference tenant-private knowledge. **📄 Supported Formats:** - **Raw text / URLs:** Use the fields below - **Files:** PDF, DOCX, TXT, Markdown - **Metadata:** Optional JSON metadata for better organization **🤖 AI-Generated Metadata (Automatic):** - ✨ **Title extraction** from filename, content, or URL - 📝 **Summary generation** (2-3 sentences via LLM) - 🏷️ **Tags extraction** (5-8 relevant tags) - 📚 **Topics identification** (3-5 main themes) - 📅 **Date detection** (multiple formats) - ⭐ **Quality score** (0.0-1.0 based on structure and completeness) - 🔄 **Intelligent fallback** when LLM is unavailable **⚠️ Note:** Editor role and above can ingest. Admin/Owner can delete.
""" ) ingestion_mode = gr.Radio( ["Raw Text", "URL", "File Upload"], value="Raw Text", label="Select Ingestion Mode" ) with gr.Row(): doc_filename = gr.Textbox(label="Filename (optional)") doc_id = gr.Textbox(label="Document ID (optional)") document_url = gr.Textbox( label="Document URL (for URL ingestion)", placeholder="https://example.com/policy", visible=False ) doc_content = gr.Textbox( label="Content / Notes", placeholder="Paste the document text here...", lines=8, visible=True ) metadata_json = gr.Textbox( label="Additional Metadata (JSON)", placeholder='{"department": "HR", "tags": ["policy", "benefits"]}' ) ingest_doc_button = gr.Button("Ingest Text / URL Document", variant="primary") document_status = gr.Markdown("") def handle_ingest_document( tenant_id, role, mode, content, doc_url, filename, doc_id_value, metadata ): # Debug: Log the role value received print(f"🔍 DEBUG: handle_ingest_document received role='{role}' (type: {type(role)})", file=sys.stderr) # Ensure role is not None or empty if not role or role.strip() == "": role = DEFAULT_ROLE print(f"⚠️ WARNING: Role was empty/None, defaulting to '{role}'", file=sys.stderr) source_type = "raw_text" if mode == "Raw Text" else "url" result = ingest_document( tenant_id=tenant_id, role=role.strip() if role else DEFAULT_ROLE, source_type=source_type, content=content, document_url=doc_url, filename=filename, doc_id=doc_id_value, metadata_json=metadata ) # Add note about refreshing Knowledge Base Library if "✅" in result: result += "\n\n💡 **Tip:** Go to the 'Knowledge Base Library' tab to view your ingested documents." return result ingest_doc_button.click( fn=handle_ingest_document, inputs=[ tenant_id_input, role_input, ingestion_mode, doc_content, document_url, doc_filename, doc_id, metadata_json ], outputs=document_status ) file_section = gr.Markdown("#### 📁 File Upload (PDF, DOCX, TXT, Markdown)", visible=False) file_upload = gr.File( label="Upload File", file_types=[".pdf", ".docx", ".txt", ".md", ".markdown"], visible=False ) ingest_file_button = gr.Button("Upload & Ingest File", visible=False) def handle_file_ingestion(tenant_id, role, file_obj): result = ingest_file(tenant_id, role, file_obj) # Add note about refreshing Knowledge Base Library if "✅" in result: result += "\n\n💡 **Tip:** Go to the 'Knowledge Base Library' tab to view your ingested documents." return result ingest_file_button.click( fn=handle_file_ingestion, inputs=[tenant_id_input, role_input, file_upload], outputs=document_status ) def toggle_source_fields(mode): show_text = mode == "Raw Text" show_url = mode == "URL" show_file = mode == "File Upload" return ( gr.update(visible=show_text), gr.update(visible=show_url), gr.update(visible=not show_file), gr.update(visible=not show_file), gr.update(visible=not show_file), gr.update(visible=show_file), gr.update(visible=show_file), gr.update(visible=show_file), ) ingestion_mode.change( fn=toggle_source_fields, inputs=[ingestion_mode], outputs=[ doc_content, document_url, doc_filename, doc_id, ingest_doc_button, file_section, file_upload, ingest_file_button, ] ) with gr.Tab("📖 Knowledge Base Library"): # Access denied for Editor role kb_access_denied = gr.Markdown( """

🔒 Access Denied

Editor role can only access Document Ingestion.

Please switch to Owner or Admin role to access Knowledge Base Library.

""", visible=False ) # Set initial visibility based on default role # Editor should NOT see Knowledge Base Library content initial_is_editor = (DEFAULT_ROLE or "").lower().strip() == "editor" kb_access_denied.visible = initial_is_editor # Show access denied for editor kb_library_content = gr.Column(visible=not initial_is_editor) with kb_library_content: gr.Markdown( """
### 📖 Knowledge Base Library View, search, and manage all ingested documents for your tenant with visual analytics. **Features:** - **📊 Statistics:** View document counts, types, and distribution - **🔍 Search:** Use semantic search with cross-encoder re-ranking for better results - **🤖 AI Metadata:** Documents include auto-extracted title, summary, tags, topics, and quality scores - **🔽 Filter:** Filter documents by type (text, PDF, FAQ, link) - **🗑️ Delete:** Remove individual documents or delete all at once (Admin/Owner only)
""" ) # Statistics Section with gr.Row(): kb_total_docs = gr.Markdown("### 📄 Total Documents\n**0**", elem_classes=["stat-card"]) kb_text_docs = gr.Markdown("### 📝 Text Documents\n**0**", elem_classes=["stat-card"]) kb_pdf_docs = gr.Markdown("### 📄 PDF Documents\n**0**", elem_classes=["stat-card"]) kb_faq_docs = gr.Markdown("### ❓ FAQ Documents\n**0**", elem_classes=["stat-card"]) kb_link_docs = gr.Markdown("### 🔗 Link Documents\n**0**", elem_classes=["stat-card"]) # Chart and Search Section with gr.Row(): with gr.Column(scale=1): kb_chart = gr.Plot(label="Document Type Distribution", show_label=True) kb_refresh_button = gr.Button("🔄 Refresh Documents", variant="primary", size="lg") kb_delete_all_button = gr.Button("🗑️ Delete All Documents", variant="stop") with gr.Column(scale=1): kb_search_query = gr.Textbox( label="🔍 Search Knowledge Base", placeholder="Enter a search query (e.g., 'admin', 'policy', 'FAQ')...", show_label=True ) kb_search_button = gr.Button("Search", variant="primary") kb_search_status = gr.Markdown("") kb_search_results = gr.Dataframe( headers=["Rank", "Text", "Relevance"], datatype=["number", "str", "str"], interactive=False, label="Search Results", wrap=True ) # Status and Filter Section kb_status = gr.Markdown("👉 Click **Refresh Documents** to load your knowledge base.") with gr.Row(): with gr.Column(scale=2): kb_filter_type = gr.Radio( ["all", "text", "pdf", "faq", "link"], value="all", label="Filter by Type", info="Filter documents by detected type" ) with gr.Column(scale=1): kb_avg_length = gr.Markdown("**Average Length:** 0 characters") # Documents Table kb_documents_table = gr.Dataframe( headers=["ID", "Type", "Preview", "Length", "Created"], datatype=["number", "str", "str", "number", "str"], interactive=False, label="Documents", wrap=True ) # Delete Section (Admin/Owner only) kb_delete_section = gr.Row() with kb_delete_section: kb_delete_id = gr.Number( label="Delete Document by ID", value=None, precision=0, info="Enter document ID to delete", scale=3 ) kb_delete_button = gr.Button("Delete Document", variant="stop", scale=1) kb_delete_status = gr.Markdown("") # Function to update KB tab visibility based on role def update_kb_visibility(role): can_delete = can_delete_documents(role) return ( gr.update(visible=can_delete), # Delete all button gr.update(visible=can_delete), # Delete section ) def refresh_documents(tenant_id, role, filter_type="all"): status, docs, total, stats, chart_fig = list_documents(tenant_id, role) # Filter documents by type if not "all" # docs is now a list of lists: [ID, Type, Preview, Length, Created] if filter_type != "all" and docs: filtered_docs = [doc for doc in docs if len(doc) > 1 and doc[1].lower() == filter_type.lower()] docs = filtered_docs status = f"✅ Found {len(docs)} {filter_type} document(s) (out of {total} total)" # Update statistics cards type_counts = stats.get("types", {}) total_md = f"### 📄 Total Documents\n**{total}**" text_md = f"### 📝 Text Documents\n**{type_counts.get('text', 0)}**" pdf_md = f"### 📄 PDF Documents\n**{type_counts.get('pdf', 0)}**" faq_md = f"### ❓ FAQ Documents\n**{type_counts.get('faq', 0)}**" link_md = f"### 🔗 Link Documents\n**{type_counts.get('link', 0)}**" avg_length_md = f"**Average Length:** {stats.get('avg_length', 0):,} characters" status_msg = f"{status}\n\n**Total Documents:** {total} | **Total Characters:** {stats.get('total_chars', 0):,}" return ( status_msg, docs, total_md, text_md, pdf_md, faq_md, link_md, avg_length_md, chart_fig ) def filter_documents(tenant_id, role, filter_type): return refresh_documents(tenant_id, role, filter_type) def search_kb(tenant_id, role, query): status, results = search_knowledge_base(tenant_id, role, query) return status, results def delete_doc(tenant_id, role, doc_id): if doc_id is None or doc_id <= 0: return "❗ Please enter a valid document ID.", "", "", "", "", "", "", "", None result = delete_document(tenant_id, role, int(doc_id)) # Refresh document list after deletion return (result, *refresh_documents(tenant_id, role, "all")) def delete_all_docs(tenant_id, role): result = delete_all_documents(tenant_id, role) # Refresh document list after deletion return (result, *refresh_documents(tenant_id, role, "all")) kb_refresh_button.click( fn=refresh_documents, inputs=[tenant_id_input, role_input, kb_filter_type], outputs=[ kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) kb_filter_type.change( fn=filter_documents, inputs=[tenant_id_input, role_input, kb_filter_type], outputs=[ kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) kb_search_button.click( fn=search_kb, inputs=[tenant_id_input, role_input, kb_search_query], outputs=[kb_search_status, kb_search_results] ) kb_search_query.submit( fn=search_kb, inputs=[tenant_id_input, role_input, kb_search_query], outputs=[kb_search_status, kb_search_results] ) kb_delete_button.click( fn=delete_doc, inputs=[tenant_id_input, role_input, kb_delete_id], outputs=[ kb_delete_status, kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) kb_delete_all_button.click( fn=delete_all_docs, inputs=[tenant_id_input, role_input], outputs=[ kb_delete_status, kb_status, kb_documents_table, kb_total_docs, kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart ] ) # Update visibility when role changes def update_kb_full_visibility(role): # Normalize role to lowercase for comparison role_lower = (role or DEFAULT_ROLE).lower().strip() is_editor = role_lower == "editor" can_delete = can_delete_documents(role_lower) return ( gr.update(visible=is_editor), # Access denied for Editor gr.update(visible=not is_editor), # KB content for Owner/Admin/Viewer gr.update(visible=can_delete), # Delete all button gr.update(visible=can_delete), # Delete section ) role_input.change( fn=update_kb_full_visibility, inputs=[role_input], outputs=[kb_access_denied, kb_library_content, kb_delete_all_button, kb_delete_section] ) with gr.Tab("📊 Admin Analytics"): # Access denied message for non-admin/owner roles analytics_access_denied = gr.Markdown( """

🔒 Access Denied

Analytics is available to all roles.

If you're seeing this message, there may be a configuration issue.

""", visible=False ) # Analytics content (visible for admin/owner) analytics_content = gr.Column(visible=True) with analytics_content: gr.Markdown( """
# 📊 Admin Analytics Dashboard Comprehensive tenant-level analytics with visual insights, performance metrics, and detailed tool usage statistics. **🔒 Access:** Admin and Owner roles only
""" ) # Refresh Button at Top with gr.Row(): analytics_refresh = gr.Button("🔄 Fetch Analytics Snapshot", variant="primary", size="lg") gr.Markdown("") # Statistics Cards gr.Markdown("### 📈 Key Metrics") with gr.Row(): analytics_total_queries = gr.Markdown("### 📊 Total Queries\n**0**", elem_classes=["stat-card"]) analytics_active_users = gr.Markdown("### 👥 Active Users\n**0**", elem_classes=["stat-card"]) analytics_redflags = gr.Markdown("### 🚩 Red Flags\n**0**", elem_classes=["stat-card"]) analytics_rag_searches = gr.Markdown("### 🔍 RAG Searches\n**0**", elem_classes=["stat-card"]) # Charts Section gr.Markdown("### 📊 Performance Charts") with gr.Row(): with gr.Column(scale=1): gr.Markdown("#### 📈 Tool Usage Count", elem_classes=["chart-title"]) analytics_tool_chart = gr.Plot(label="", show_label=False) with gr.Column(scale=1): gr.Markdown("#### ⚡ Average Tool Latency", elem_classes=["chart-title"]) analytics_latency_chart = gr.Plot(label="", show_label=False) # RAG Quality and Summary Section with gr.Row(): with gr.Column(scale=1): gr.Markdown("#### 🔍 RAG Quality Metrics", elem_classes=["chart-title"]) analytics_rag_chart = gr.Plot(label="", show_label=False) with gr.Column(scale=1): gr.Markdown("### 📋 Analytics Summary") analytics_summary = gr.Markdown( "👉 Click **Fetch Analytics Snapshot** to load data.", elem_classes=["summary-box"] ) # Tool Usage Details Table gr.Markdown("### 🔧 Detailed Tool Usage") analytics_tool_table = gr.Dataframe( headers=["Tool", "Count", "Avg Latency (ms)", "Success", "Errors", "Total Tokens"], datatype=["str", "number", "number", "number", "number", "number"], interactive=False, label="", wrap=True ) analytics_error = gr.Markdown("", visible=False) def format_analytics(tenant_id, role): summary, tool_usage, tool_chart, latency_chart, rag_chart, error = fetch_admin_analytics(tenant_id, role) if error: return ( error, "", "", "", "", None, None, None, [] ) # Extract overview data - fetch_admin_analytics already fetched it, but we need it again for cards overview_data = {} try: headers = { "x-tenant-id": tenant_id, "x-user-role": role if role else DEFAULT_ROLE } resp = requests.get( f"{BACKEND_BASE_URL}/analytics/overview", headers=headers, timeout=30 ) if resp.status_code == 200: data = resp.json() # The API returns {"overview": {...}} or direct overview object overview_data = data.get("overview", data) if isinstance(data, dict) else {} # Debug: print to see what we're getting print(f"DEBUG: Overview data keys: {overview_data.keys() if isinstance(overview_data, dict) else 'Not a dict'}") except Exception as e: print(f"Error fetching overview: {e}") pass # Extract values with proper fallbacks - handle both nested and flat structures if isinstance(overview_data, dict): total_queries = overview_data.get("total_queries", 0) active_users = overview_data.get("active_users", 0) redflag_count = overview_data.get("redflag_count", 0) rag_quality = overview_data.get("rag_quality", {}) rag_searches = rag_quality.get("total_searches", 0) if isinstance(rag_quality, dict) else 0 else: total_queries = 0 active_users = 0 redflag_count = 0 rag_quality = {} rag_searches = 0 # Format statistics cards queries_md = f"### 📊 Total Queries\n**{total_queries}**" users_md = f"### 👥 Active Users\n**{active_users}**" redflags_md = f"### 🚩 Red Flags\n**{redflag_count}**" rag_md = f"### 🔍 RAG Searches\n**{rag_searches}**" # Format tool usage table tool_table_data = [] for tool_name, stats in tool_usage.items(): tool_table_data.append({ "Tool": tool_name.replace(".", " ").title(), "Count": stats.get("count", 0), "Avg Latency (ms)": round(stats.get("avg_latency_ms", 0), 2), "Success": stats.get("success_count", 0), "Errors": stats.get("error_count", 0), "Total Tokens": stats.get("total_tokens", 0) }) return ( summary, queries_md, users_md, redflags_md, rag_md, tool_chart, latency_chart, rag_chart, tool_table_data ) analytics_refresh.click( fn=format_analytics, inputs=[tenant_id_input, role_input], outputs=[ analytics_summary, analytics_total_queries, analytics_active_users, analytics_redflags, analytics_rag_searches, analytics_tool_chart, analytics_latency_chart, analytics_rag_chart, analytics_tool_table ] ) # Function to update Analytics tab visibility based on role (all roles can view) def update_analytics_visibility(role): has_access = can_view_analytics(role) # All roles can view now return ( gr.update(visible=False), # No access denied message gr.update(visible=True), # Analytics content visible for all ) # Update visibility when role changes role_input.change( fn=update_analytics_visibility, inputs=[role_input], outputs=[analytics_access_denied, analytics_content] ) with gr.Tab("🛡️ Admin Rules & Compliance"): # Access denied for Editor role rules_access_denied = gr.Markdown( """

🔒 Access Denied

Editor role can only access Document Ingestion.

Admin Rules & Compliance is restricted to Admin and Owner roles only.

""", visible=False ) rules_content = gr.Column(visible=True) with rules_content: gr.Markdown( """
### 🛡️ Admin Rules & Regulations Upload or manage tenant-specific governance rules (red-flag patterns, compliance policies, etc.). **📤 Upload Methods:** - **Text Input:** Enter one rule per line in the text box - **File Upload:** Upload rules from TXT, PDF, DOC, or DOCX files **✨ Features:** - 🤖 Rules are automatically enhanced by LLM (identifies edge cases, improves patterns) - 💬 Comment lines (starting with #) are automatically ignored - 🗑️ Use the delete box to remove an exact rule - 🔄 Refresh anytime to view the latest rule set **🔒 Access:** Admin and Owner roles only
""" ) rules_summary = gr.Markdown("👉 Click **Refresh Rules** to see existing entries.") rules_table = gr.Dataframe( headers=["#", "Rule"], datatype=["number", "str"], interactive=False, value=[] ) rules_status = gr.Markdown("") with gr.Row(): refresh_rules_button = gr.Button("Refresh Rules", variant="secondary") gr.Markdown("") with gr.Row(): with gr.Column(scale=1): rules_input = gr.Textbox( label="Rules / Regulations (Text Input)", placeholder="Enter one rule per line...", lines=6 ) enhance_rules_checkbox = gr.Checkbox( label="🤖 Enable LLM Enhancement (slower but provides better patterns and explanations)", value=True, info="Uncheck to add rules quickly without LLM enhancement" ) upload_rules_button = gr.Button("Upload / Append Rules", variant="primary") with gr.Column(scale=1): gr.Markdown("**OR**") rules_file_upload = gr.File( label="Upload Rules File", file_types=[".txt", ".pdf", ".doc", ".docx"], type="filepath" ) enhance_file_checkbox = gr.Checkbox( label="🤖 Enable LLM Enhancement", value=True, info="Uncheck to add rules quickly without LLM enhancement" ) upload_file_button = gr.Button("Upload Rules from File", variant="primary") delete_rule_input = gr.Textbox( label="Delete Rule", placeholder="Enter rule number (e.g., 1) or the full rule text to remove..." ) delete_rule_button = gr.Button("Delete Rule", variant="stop") refresh_rules_button.click( fn=fetch_admin_rules, inputs=[tenant_id_input, role_input], outputs=[rules_summary, rules_table] ) upload_rules_button.click( fn=add_rules_and_refresh, inputs=[tenant_id_input, role_input, rules_input, enhance_rules_checkbox], outputs=[rules_status, rules_summary, rules_table] ) upload_file_button.click( fn=lambda tenant_id, role, file_path, enhance: add_rules_from_file(tenant_id, role, file_path, enhance), inputs=[tenant_id_input, role_input, rules_file_upload, enhance_file_checkbox], outputs=[rules_status, rules_summary, rules_table] ) delete_rule_button.click( fn=delete_rule_and_refresh, inputs=[tenant_id_input, role_input, delete_rule_input], outputs=[rules_status, rules_summary, rules_table] ) # Function to update Admin Rules tab visibility based on role def update_rules_visibility(role): is_editor = role == "editor" has_access = can_manage_rules(role) return ( gr.update(visible=is_editor or not has_access), # Access denied for Editor or non-admin gr.update(visible=has_access and not is_editor), # Rules content for Admin/Owner only ) role_input.change( fn=update_rules_visibility, inputs=[role_input], outputs=[rules_access_denied, rules_content] ) gr.Markdown( """

Built with ❤️ using Model Context Protocol (MCP)

Enterprise-Grade MCP Autonomous Agent Platform

""" ) if __name__ == "__main__": import os import threading import time import requests # Detect environment # - HF Spaces sets SPACE_ID # - Docker entrypoint script manages services, so don't auto-start here is_hf_space = os.getenv("SPACE_ID") is not None is_docker = os.path.exists("/.dockerenv") or os.getenv("DOCKER_CONTAINER") == "1" # For Hugging Face Spaces or Docker, bind to 0.0.0.0; for local dev, use 127.0.0.1 server_name = "0.0.0.0" if (is_hf_space or is_docker) else "127.0.0.1" # Start backend services if running in HF Spaces (but NOT in Docker - entrypoint handles that) if is_hf_space and not is_docker: def start_mcp_server(): """Start MCP server in a background process.""" try: import sys import subprocess # Use subprocess.Popen to run in background and surface logs in HF Spaces subprocess.Popen( [ sys.executable, "-m", "uvicorn", "backend.mcp_server.server:app", "--host", "0.0.0.0", "--port", os.getenv("MCP_PORT", "8900"), "--log-level", "info", ] ) except Exception as e: print(f"Warning: Could not start MCP server: {e}") def start_fastapi_server(): """Start FastAPI server in a background process.""" try: import sys import subprocess # Use subprocess.Popen to run in background and surface logs in HF Spaces subprocess.Popen( [ sys.executable, "-m", "uvicorn", "backend.api.main:app", "--host", "0.0.0.0", "--port", os.getenv("API_PORT", "8000"), "--log-level", "info", ] ) except Exception as e: print(f"Warning: Could not start FastAPI server: {e}") # Start services in background threads print("Starting backend services...") mcp_thread = threading.Thread(target=start_mcp_server, daemon=True) api_thread = threading.Thread(target=start_fastapi_server, daemon=True) mcp_thread.start() time.sleep(4) # Give MCP server time to start api_thread.start() # Give FastAPI extra time to start on first cold boot (model downloads etc.) time.sleep(10) demo.launch( server_name=server_name, server_port=7860, share=False, show_error=True )