import gradio as gr
import requests
import json
import os
from pathlib import Path
from collections import Counter
from datetime import datetime
try:
import plotly.graph_objects as go
PLOTLY_AVAILABLE = True
except ImportError:
PLOTLY_AVAILABLE = False
go = None
BACKEND_BASE_URL = os.getenv("BACKEND_BASE_URL", "http://localhost:8000")
def chat_with_agent(message, tenant_id, history):
"""
Send a message to the backend MCP agent and return the response.
Uses streaming for real-time word-by-word updates.
Args:
message: User's message text
tenant_id: Tenant ID for multi-tenant isolation
history: Chat history (Gradio messages format)
Yields:
Updated chat history with agent response (streaming)
"""
if not message or not message.strip():
yield history
return
if not tenant_id or not tenant_id.strip():
error_msg = "Please enter a Tenant ID before sending a message."
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": error_msg})
yield history
return
# Add user message to history
history.append({"role": "user", "content": message})
# Backend streaming endpoint
backend_url = f"{BACKEND_BASE_URL}/agent/message/stream"
# Prepare request payload
payload = {
"tenant_id": tenant_id.strip(),
"message": message,
"user_id": None,
"conversation_history": [],
"temperature": 0.0
}
try:
# Make streaming request
response = requests.post(
backend_url,
json=payload,
headers={"Content-Type": "application/json"},
stream=True,
timeout=120
)
if response.status_code == 200:
# Initialize assistant message
assistant_message = ""
history.append({"role": "assistant", "content": assistant_message})
yield history # Yield initial empty message
# Stream tokens - use iter_lines for SSE format
for line_bytes in response.iter_lines():
if line_bytes:
try:
line = line_bytes.decode('utf-8').strip()
if not line:
continue
if line.startswith('data: '):
data_str = line[6:] # Remove 'data: ' prefix
try:
data = json.loads(data_str)
# Handle status messages
if 'status' in data:
status_msg = data.get('message', '')
if status_msg:
# Show status in the message temporarily
history[-1] = {"role": "assistant", "content": f"⏳ {status_msg}"}
yield history
continue
# Handle tokens
token = data.get('token', '')
if token:
assistant_message += token
# Update the last message in history
history[-1] = {"role": "assistant", "content": assistant_message}
yield history # Yield updated history immediately
if data.get('done', False):
break
except json.JSONDecodeError:
continue
elif line.startswith('error:'):
try:
error_data = json.loads(line[6:])
error_msg = error_data.get('error', 'Unknown error')
history[-1] = {"role": "assistant", "content": f"❌ Error: {error_msg}"}
yield history
break
except:
pass
except UnicodeDecodeError:
continue
else:
error_msg = f"Error {response.status_code}: {response.text}"
history.append({"role": "assistant", "content": error_msg})
yield history
except requests.exceptions.ConnectionError:
error_msg = "❌ Connection Error: Could not connect to backend. Please ensure the FastAPI server is running at http://localhost:8000"
history.append({"role": "assistant", "content": error_msg})
yield history
except requests.exceptions.Timeout:
error_msg = "⏱️ Request Timeout: The backend took longer than 2 minutes to respond. This may happen if:\n- The LLM is processing a complex query\n- Multiple tools (RAG, Web Search) are being used\n- The backend is under heavy load\n\nPlease try again with a simpler query, or check if the backend services (Ollama, MCP servers) are running properly."
history.append({"role": "assistant", "content": error_msg})
yield history
except requests.exceptions.RequestException as e:
error_msg = f"❌ Request Error: {str(e)}"
history.append({"role": "assistant", "content": error_msg})
yield history
except Exception as e:
error_msg = f"❌ Unexpected Error: {str(e)}"
history.append({"role": "assistant", "content": error_msg})
yield history
def ingest_document(
tenant_id: str,
source_type: str,
content: str,
document_url: str,
filename: str,
doc_id: str,
metadata_json: str
):
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required to ingest documents."
tenant_id = tenant_id.strip()
payload_content = content or ""
if source_type == "url" and document_url:
payload_content = document_url.strip()
metadata = {}
if filename:
metadata["filename"] = filename.strip()
if document_url:
metadata["url"] = document_url.strip()
if doc_id:
metadata["doc_id"] = doc_id.strip()
if metadata_json:
try:
extra_metadata = json.loads(metadata_json)
if isinstance(extra_metadata, dict):
metadata.update(extra_metadata)
else:
return "❗ Metadata JSON must represent an object (key/value pairs)."
except json.JSONDecodeError as exc:
return f"❗ Invalid metadata JSON: {exc}"
payload = {
"action": "ingest_document",
"tenant_id": tenant_id,
"source_type": source_type,
"content": payload_content,
"metadata": metadata
}
try:
response = requests.post(
f"{BACKEND_BASE_URL}/rag/ingest-document",
json=payload,
headers={"Content-Type": "application/json"},
timeout=60
)
if response.status_code == 200:
data = response.json()
return f"✅ Document ingested successfully.\n\n{data.get('message', '')}"
return f"❌ Ingestion failed ({response.status_code}): {response.text}"
except requests.exceptions.ConnectionError:
return "❌ Could not reach the backend. Make sure the FastAPI server is running."
except requests.exceptions.Timeout:
return "⏱️ The ingestion request timed out. Please try again."
except Exception as exc:
return f"❌ Unexpected error during ingestion: {exc}"
def ingest_file(tenant_id: str, file_obj):
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required to ingest files."
if file_obj is None:
return "❗ Please select a file to upload."
tenant_id = tenant_id.strip()
try:
file_path = Path(file_obj.name)
with open(file_path, "rb") as f:
file_bytes = f.read()
files = {
"file": (file_path.name, file_bytes, "application/octet-stream")
}
response = requests.post(
f"{BACKEND_BASE_URL}/rag/ingest-file",
files=files,
headers={"x-tenant-id": tenant_id},
timeout=120
)
if response.status_code == 200:
data = response.json()
return f"✅ File ingested successfully.\n\n{data.get('message', '')}"
return f"❌ File ingestion failed ({response.status_code}): {response.text}"
except FileNotFoundError:
return "❌ Could not read the uploaded file."
except requests.exceptions.ConnectionError:
return "❌ Could not reach the backend. Make sure the FastAPI server is running."
except requests.exceptions.Timeout:
return "⏱️ File ingestion timed out. Please try again."
except Exception as exc:
return f"❌ Unexpected error during file ingestion: {exc}"
def _format_rules_table(rules: list[str]) -> list[list]:
return [[idx + 1, rule] for idx, rule in enumerate(rules)]
def fetch_admin_rules(tenant_id: str) -> tuple[str, list[list]]:
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required.", []
tenant_id = tenant_id.strip()
try:
response = requests.get(
f"{BACKEND_BASE_URL}/admin/rules",
headers={"x-tenant-id": tenant_id},
timeout=30
)
if response.status_code == 200:
rules = response.json().get("rules", [])
if not rules:
return "✅ No admin rules have been configured yet.", []
summary = f"### Current Rules ({len(rules)})"
return summary, _format_rules_table(rules)
return f"❌ Error {response.status_code}: {response.text}", []
except requests.exceptions.ConnectionError:
return "❌ Could not reach backend. Ensure the FastAPI server is running.", []
except requests.exceptions.Timeout:
return "⏱️ Request timed out. Please try again.", []
except Exception as exc:
return f"❌ Unexpected error: {exc}", []
def extract_rules_from_file(file_path) -> str:
"""
Extract rules from uploaded file (TXT, PDF, DOC, DOCX).
Returns the extracted text content.
"""
if file_path is None:
return ""
try:
# Gradio File component returns file path as string
if isinstance(file_path, str):
file_path = Path(file_path)
else:
# Sometimes it's a file object with .name attribute
file_path = Path(file_path.name if hasattr(file_path, 'name') else file_path)
if not file_path.exists():
return f"❌ File not found: {file_path}"
file_ext = file_path.suffix.lower()
# Read file based on type
if file_ext == '.txt' or file_ext == '.md':
# Plain text files
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return content
elif file_ext == '.pdf':
# PDF files - use PyPDF2
try:
import PyPDF2
with open(file_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
content = []
for page in pdf_reader.pages:
content.append(page.extract_text())
return '\n'.join(content)
except ImportError:
return "❌ PDF extraction requires PyPDF2. Install with: pip install PyPDF2"
except Exception as e:
return f"❌ Failed to extract text from PDF: {str(e)}"
elif file_ext in ['.doc', '.docx']:
# DOC/DOCX files - use python-docx
try:
from docx import Document
doc = Document(file_path)
content = []
for paragraph in doc.paragraphs:
content.append(paragraph.text)
return '\n'.join(content)
except ImportError:
return "❌ DOCX extraction requires python-docx. Install with: pip install python-docx"
except Exception as e:
return f"❌ Failed to extract text from DOCX: {str(e)}"
else:
return f"❌ Unsupported file type: {file_ext}. Supported: .txt, .pdf, .doc, .docx"
except Exception as e:
return f"❌ Error reading file: {str(e)}"
def add_admin_rules(tenant_id: str, rules_text: str) -> str:
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required."
if not rules_text or not rules_text.strip():
return "❗ Provide at least one rule to upload."
tenant_id = tenant_id.strip()
# Filter out comment lines (starting with #) and empty lines
rules = [
rule.strip()
for rule in rules_text.splitlines()
if rule.strip() and not rule.strip().startswith("#")
]
if not rules:
return "❗ No valid rules detected. (Comment lines starting with # are ignored)"
added = []
enhanced = []
errors = []
# Process rules in chunks to avoid timeout
CHUNK_SIZE = 5 # Process 5 rules at a time
total_rules = len(rules)
if total_rules == 1:
# Single rule - use regular endpoint
try:
resp = requests.post(
f"{BACKEND_BASE_URL}/admin/rules",
params={"rule": rules[0], "enhance": "true"},
headers={"x-tenant-id": tenant_id},
timeout=30
)
if resp.status_code == 200:
data = resp.json()
added.append(data.get("added_rule", rules[0]))
if data.get("enhanced"):
edge_cases = data.get("edge_cases", [])
improvements = data.get("improvements", [])
if edge_cases or improvements:
enhanced.append(f"**{data.get('added_rule', rules[0])}**:")
if improvements:
enhanced.append(f" • Improvements: {', '.join(improvements[:3])}")
if edge_cases:
enhanced.append(f" • Edge cases identified: {len(edge_cases)}")
else:
errors.append(f"{rules[0]} -> {resp.status_code}: {resp.text}")
except Exception as exc:
errors.append(f"{rules[0]} -> {exc}")
else:
# Multiple rules - process in chunks
for i in range(0, total_rules, CHUNK_SIZE):
chunk = rules[i:i + CHUNK_SIZE]
chunk_num = (i // CHUNK_SIZE) + 1
total_chunks = (total_rules + CHUNK_SIZE - 1) // CHUNK_SIZE
try:
resp = requests.post(
f"{BACKEND_BASE_URL}/admin/rules/bulk",
json={"rules": chunk},
headers={"x-tenant-id": tenant_id},
params={"enhance": "true"},
timeout=45 # Timeout per chunk (5 rules)
)
if resp.status_code == 200:
data = resp.json()
chunk_added = data.get("added_rules", [])
added.extend(chunk_added)
if data.get("enhanced"):
chunk_enhanced = data.get("enhancement_summary", [])
enhanced.extend([f"[Chunk {chunk_num}/{total_chunks}] {e}" for e in chunk_enhanced])
else:
errors.append(f"Chunk {chunk_num}/{total_chunks} failed: {resp.status_code}: {resp.text}")
except requests.exceptions.Timeout:
errors.append(f"Chunk {chunk_num}/{total_chunks} timed out after 45s")
except Exception as exc:
errors.append(f"Chunk {chunk_num}/{total_chunks} error: {exc}")
summary = []
if added:
summary.append(f"✅ Added {len(added)}/{total_rules} rule(s):\n" + "\n".join([f"- {r}" for r in added[:10]]))
if len(added) > 10:
summary.append(f"... and {len(added) - 10} more")
if enhanced:
summary.append(f"\n🤖 LLM Enhancement Applied:\n" + "\n".join(enhanced[:5]))
if len(enhanced) > 5:
summary.append(f"... and {len(enhanced) - 5} more enhancements")
if errors:
summary.append("\n⚠️ Errors:\n" + "\n".join(errors))
return "\n\n".join(summary) if summary else "No rules were added."
def delete_admin_rule(tenant_id: str, rule: str) -> str:
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required."
if not rule or not rule.strip():
return "❗ Provide the exact rule text to delete."
tenant_id = tenant_id.strip()
rule = rule.strip()
try:
resp = requests.delete(
f"{BACKEND_BASE_URL}/admin/rules/{rule}",
headers={"x-tenant-id": tenant_id},
timeout=15
)
if resp.status_code == 200:
return f"🗑️ Deleted rule: {rule}"
return f"❌ Error {resp.status_code}: {resp.text}"
except requests.exceptions.ConnectionError:
return "❌ Could not reach backend. Ensure the FastAPI server is running."
except requests.exceptions.Timeout:
return "⏱️ Delete request timed out. Please try again."
except Exception as exc:
return f"❌ Unexpected error: {exc}"
def add_rules_from_file(tenant_id: str, file_path):
"""
Extract rules from uploaded file and add them.
"""
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required.", "👉 Click **Refresh Rules** to see existing entries.", []
if file_path is None:
return "❗ Please select a file to upload.", "👉 Click **Refresh Rules** to see existing entries.", []
# Extract text from file
extracted_text = extract_rules_from_file(file_path)
if extracted_text.startswith("❌"):
# Error occurred during extraction
summary, rows = fetch_admin_rules(tenant_id)
return extracted_text, summary, rows
if not extracted_text or not extracted_text.strip():
summary, rows = fetch_admin_rules(tenant_id)
return "❗ No text could be extracted from the file.", summary, rows
# Add rules from extracted text
status = add_admin_rules(tenant_id, extracted_text)
summary, rows = fetch_admin_rules(tenant_id)
return status, summary, rows
def add_rules_and_refresh(tenant_id: str, rules_text: str):
status = add_admin_rules(tenant_id, rules_text)
summary, rows = fetch_admin_rules(tenant_id)
return status, summary, rows
def delete_rule_and_refresh(tenant_id: str, rule: str):
status = delete_admin_rule(tenant_id, rule)
summary, rows = fetch_admin_rules(tenant_id)
return status, summary, rows
def fetch_admin_analytics(tenant_id: str):
"""Fetch analytics data and return formatted results with visualizations."""
if not tenant_id or not tenant_id.strip():
error_msg = "❗ Tenant ID is required to view analytics."
return error_msg, {}, None, None, None, None
tenant_id = tenant_id.strip()
headers = {"x-tenant-id": tenant_id}
overview_data = {}
tool_usage_data = {}
redflags_data = {}
activity_data = {}
error_msg = None
# Fetch Overview
try:
resp = requests.get(
f"{BACKEND_BASE_URL}/analytics/overview",
headers=headers,
timeout=30
)
if resp.status_code == 200:
overview_data = resp.json()
else:
error_msg = f"❌ Error fetching overview: {resp.status_code}"
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
# Fetch Tool Usage
try:
resp = requests.get(
f"{BACKEND_BASE_URL}/analytics/tool-usage",
headers=headers,
timeout=30
)
if resp.status_code == 200:
tool_usage_data = resp.json()
except Exception:
pass
# Fetch Red Flags
try:
resp = requests.get(
f"{BACKEND_BASE_URL}/analytics/redflags",
headers=headers,
timeout=30
)
if resp.status_code == 200:
redflags_data = resp.json()
except Exception:
pass
# Fetch Activity
try:
resp = requests.get(
f"{BACKEND_BASE_URL}/analytics/activity",
headers=headers,
timeout=30
)
if resp.status_code == 200:
activity_data = resp.json()
except Exception:
pass
# Extract data for visualizations
overview = overview_data.get("overview", {})
tool_usage = overview.get("tool_usage", tool_usage_data.get("tool_usage", {}))
rag_quality = overview.get("rag_quality", {})
# Create tool usage bar chart
tool_chart = None
if tool_usage and PLOTLY_AVAILABLE:
try:
tools = []
counts = []
latencies = []
colors_list = []
color_map = {
"rag": "#3b82f6",
"rag.search": "#2563eb",
"rag.ingest": "#1d4ed8",
"rag.list": "#1e40af",
"web.search": "#06b6d4",
"admin": "#a855f7",
"llm": "#10b981"
}
for tool_name, stats in tool_usage.items():
tools.append(tool_name.replace(".", " ").title())
counts.append(stats.get("count", 0))
latencies.append(stats.get("avg_latency_ms", 0))
colors_list.append(color_map.get(tool_name, "#6b7280"))
if tools:
fig = go.Figure()
fig.add_trace(go.Bar(
x=tools,
y=counts,
name="Usage Count",
marker_color=colors_list,
text=counts,
textposition='outside',
hovertemplate='%{x}
Count: %{y}
'
))
fig.update_layout(
title={
"text": "Tool Usage Count",
"x": 0.5,
"xanchor": "center",
"font": {"size": 16, "color": "#1f2937"}
},
xaxis_title="Tool",
yaxis_title="Count",
height=380,
showlegend=False,
margin=dict(l=50, r=20, t=60, b=50),
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#374151", size=12),
xaxis=dict(gridcolor="rgba(0,0,0,0.1)"),
yaxis=dict(gridcolor="rgba(0,0,0,0.1)")
)
tool_chart = fig
except Exception:
tool_chart = None
# Create latency chart
latency_chart = None
if tool_usage and PLOTLY_AVAILABLE:
try:
tools = []
latencies = []
colors_list = []
color_map = {
"rag": "#3b82f6",
"rag.search": "#2563eb",
"rag.ingest": "#1d4ed8",
"rag.list": "#1e40af",
"web.search": "#06b6d4",
"admin": "#a855f7",
"llm": "#10b981"
}
for tool_name, stats in tool_usage.items():
avg_latency = stats.get("avg_latency_ms", 0)
if avg_latency > 0:
tools.append(tool_name.replace(".", " ").title())
latencies.append(round(avg_latency, 2))
colors_list.append(color_map.get(tool_name, "#6b7280"))
if tools:
fig = go.Figure()
fig.add_trace(go.Bar(
x=tools,
y=latencies,
name="Avg Latency (ms)",
marker_color=colors_list,
text=[f"{l}ms" for l in latencies],
textposition='outside',
hovertemplate='%{x}
Avg Latency: %{y}ms'
))
fig.update_layout(
title={
"text": "Average Tool Latency",
"x": 0.5,
"xanchor": "center",
"font": {"size": 16, "color": "#1f2937"}
},
xaxis_title="Tool",
yaxis_title="Latency (ms)",
height=380,
showlegend=False,
margin=dict(l=50, r=20, t=60, b=50),
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#374151", size=12),
xaxis=dict(gridcolor="rgba(0,0,0,0.1)"),
yaxis=dict(gridcolor="rgba(0,0,0,0.1)")
)
latency_chart = fig
except Exception:
latency_chart = None
# Create RAG quality metrics visualization
rag_chart = None
if rag_quality and PLOTLY_AVAILABLE:
try:
metrics = ["Avg Hits", "Avg Score", "Avg Top Score"]
values = [
rag_quality.get("avg_hits_per_search", 0),
rag_quality.get("avg_score", 0) * 100, # Convert to percentage
rag_quality.get("avg_top_score", 0) * 100
]
fig = go.Figure(data=[go.Bar(
x=metrics,
y=values,
marker_color=["#3b82f6", "#10b981", "#f59e0b"],
text=[f"{v:.2f}" for v in values],
textposition='outside',
hovertemplate='%{x}
Value: %{y:.2f}'
)])
fig.update_layout(
title={
"text": "RAG Quality Metrics",
"x": 0.5,
"xanchor": "center",
"font": {"size": 16, "color": "#1f2937"}
},
xaxis_title="Metric",
yaxis_title="Value",
height=350,
showlegend=False,
margin=dict(l=50, r=20, t=60, b=50),
plot_bgcolor="rgba(0,0,0,0)",
paper_bgcolor="rgba(0,0,0,0)",
font=dict(color="#374151", size=12),
xaxis=dict(gridcolor="rgba(0,0,0,0.1)"),
yaxis=dict(gridcolor="rgba(0,0,0,0.1)")
)
rag_chart = fig
except Exception:
rag_chart = None
# Format summary text
total_queries = overview.get("total_queries", activity_data.get("activity", {}).get("total_queries", 0))
active_users = overview.get("active_users", activity_data.get("activity", {}).get("active_users", 0))
redflag_count = overview.get("redflag_count", redflags_data.get("count", 0))
last_query = overview.get("last_query", activity_data.get("activity", {}).get("last_query"))
# Calculate total tool usage
total_tool_calls = sum(stats.get("count", 0) for stats in tool_usage.values())
total_success = sum(stats.get("success_count", 0) for stats in tool_usage.values())
total_errors = sum(stats.get("error_count", 0) for stats in tool_usage.values())
success_rate = (total_success / total_tool_calls * 100) if total_tool_calls > 0 else 0
summary_text = f"""
#### 📈 Activity Metrics
- **Total Queries:** `{total_queries}`
- **Active Users:** `{active_users}`
- **Red Flags:** `{redflag_count}`
- **Last Query:** `{last_query if last_query else "N/A"}`
---
#### 🔧 Tool Usage Overview
- **Total Tool Calls:** `{total_tool_calls}`
- **Successful Calls:** `{total_success}` ✅
- **Failed Calls:** `{total_errors}` {'⚠️' if total_errors > 0 else ''}
- **Success Rate:** `{success_rate:.1f}%` {'🟢' if success_rate >= 95 else '🟡' if success_rate >= 80 else '🔴'}
---
#### 🔍 RAG Quality Metrics
- **Total Searches:** `{rag_quality.get("total_searches", 0)}`
- **Avg Hits per Search:** `{rag_quality.get("avg_hits_per_search", 0):.2f}`
- **Avg Relevance Score:** `{rag_quality.get("avg_score", 0):.3f}`
- **Avg Top Score:** `{rag_quality.get("avg_top_score", 0):.3f}`
- **Avg Search Latency:** `{rag_quality.get("avg_latency_ms", 0):.2f}ms`
---
#### 📊 Tool Breakdown
"""
# Add individual tool stats to summary
for tool_name, stats in sorted(tool_usage.items(), key=lambda x: x[1].get("count", 0), reverse=True):
tool_display = tool_name.replace(".", " ").title()
count = stats.get("count", 0)
latency = stats.get("avg_latency_ms", 0)
success = stats.get("success_count", 0)
errors = stats.get("error_count", 0)
status_icon = "✅" if errors == 0 else "⚠️"
summary_text += f"- **{tool_display}** {status_icon}
└ {count} calls • {latency:.1f}ms avg • {success} success • {errors} errors\n"
return summary_text, tool_usage, tool_chart, latency_chart, rag_chart, error_msg
def list_documents(tenant_id: str, limit: int = 1000, offset: int = 0):
"""
List all documents for a tenant.
Returns a tuple of (status_message, documents_list, total_count, stats_dict, chart_fig).
"""
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required.", [], 0, {}, None
tenant_id = tenant_id.strip()
try:
response = requests.get(
f"{BACKEND_BASE_URL}/rag/list",
params={"tenant_id": tenant_id, "limit": limit, "offset": offset},
headers={"x-tenant-id": tenant_id},
timeout=30
)
if response.status_code == 200:
data = response.json()
documents = data.get("documents", [])
total = data.get("total", 0)
# Format documents for display and collect stats
formatted_docs = []
type_counts = Counter()
total_length = 0
for doc in documents:
doc_id = doc.get("id", "N/A")
text = doc.get("text", "")
created_at = doc.get("created_at", "")
preview = text[:200] + "..." if len(text) > 200 else text
# Detect document type
text_lower = text.lower()
if "http://" in text_lower or "https://" in text_lower or "www." in text_lower:
doc_type = "link"
elif any(x in text_lower for x in ["q:", "question:", "faq", "frequently asked"]):
doc_type = "faq"
elif ".pdf" in text_lower or "pdf document" in text_lower:
doc_type = "pdf"
else:
doc_type = "text"
type_counts[doc_type] += 1
total_length += len(text)
formatted_docs.append({
"ID": doc_id,
"Type": doc_type,
"Preview": preview,
"Length": len(text),
"Created": created_at[:10] if created_at else "N/A"
})
# Create statistics dictionary
stats = {
"total": total,
"types": dict(type_counts),
"avg_length": total_length // total if total > 0 else 0,
"total_chars": total_length
}
# Create pie chart for document types
chart_fig = None
if type_counts and PLOTLY_AVAILABLE:
try:
labels = list(type_counts.keys())
values = list(type_counts.values())
colors = {
"text": "#3b82f6", # blue
"pdf": "#ef4444", # red
"faq": "#a855f7", # purple
"link": "#06b6d4" # cyan
}
chart_colors = [colors.get(label, "#6b7280") for label in labels]
fig = go.Figure(data=[go.Pie(
labels=labels,
values=values,
hole=0.4,
marker=dict(colors=chart_colors),
textinfo='label+percent+value',
textfont=dict(size=12),
hovertemplate='%{label}
Count: %{value}
Percentage: %{percent}'
)])
fig.update_layout(
title={
"text": "Document Type Distribution",
"x": 0.5,
"xanchor": "center",
"font": {"size": 16}
},
height=400,
showlegend=True,
margin=dict(l=20, r=20, t=50, b=20)
)
chart_fig = fig
except Exception:
chart_fig = None
status = f"✅ Found {total} document(s)"
return status, formatted_docs, total, stats, chart_fig
else:
error_msg = f"❌ Error {response.status_code}: {response.text}"
return error_msg, [], 0, {}, None
except requests.exceptions.ConnectionError:
return "❌ Could not reach backend. Ensure the FastAPI server is running.", [], 0, {}, None
except requests.exceptions.Timeout:
return "⏱️ Request timed out. Please try again.", [], 0, {}, None
except Exception as exc:
return f"❌ Unexpected error: {exc}", [], 0, {}, None
def delete_document(tenant_id: str, document_id: int):
"""Delete a specific document by ID."""
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required."
if not document_id or document_id <= 0:
return "❗ Invalid document ID."
tenant_id = tenant_id.strip()
try:
response = requests.delete(
f"{BACKEND_BASE_URL}/rag/delete/{document_id}",
params={"tenant_id": tenant_id},
headers={"x-tenant-id": tenant_id},
timeout=30
)
if response.status_code == 200:
return f"✅ Document {document_id} deleted successfully."
elif response.status_code == 404:
return f"❌ Document {document_id} not found or access denied."
else:
error_data = response.json() if response.headers.get("content-type", "").startswith("application/json") else {}
error_msg = error_data.get("detail", error_data.get("error", response.text))
return f"❌ Error {response.status_code}: {error_msg}"
except requests.exceptions.ConnectionError:
return "❌ Could not reach backend. Ensure the FastAPI server is running."
except requests.exceptions.Timeout:
return "⏱️ Request timed out. Please try again."
except Exception as exc:
return f"❌ Unexpected error: {exc}"
def delete_all_documents(tenant_id: str):
"""Delete all documents for a tenant."""
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required."
tenant_id = tenant_id.strip()
try:
response = requests.delete(
f"{BACKEND_BASE_URL}/rag/delete-all",
params={"tenant_id": tenant_id},
headers={"x-tenant-id": tenant_id},
timeout=60
)
if response.status_code == 200:
data = response.json()
deleted_count = data.get("deleted_count", 0)
return f"✅ Deleted {deleted_count} document(s) successfully."
else:
error_data = response.json() if response.headers.get("content-type", "").startswith("application/json") else {}
error_msg = error_data.get("detail", error_data.get("error", response.text))
return f"❌ Error {response.status_code}: {error_msg}"
except requests.exceptions.ConnectionError:
return "❌ Could not reach backend. Ensure the FastAPI server is running."
except requests.exceptions.Timeout:
return "⏱️ Request timed out. Please try again."
except Exception as exc:
return f"❌ Unexpected error: {exc}"
def search_knowledge_base(tenant_id: str, query: str):
"""Search the knowledge base using RAG semantic search."""
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required.", []
if not query or not query.strip():
return "❗ Please enter a search query.", []
tenant_id = tenant_id.strip()
query = query.strip()
try:
response = requests.post(
f"{BACKEND_BASE_URL}/rag/search",
json={"tenant_id": tenant_id, "query": query, "threshold": 0.3},
headers={"x-tenant-id": tenant_id, "Content-Type": "application/json"},
timeout=30
)
if response.status_code == 200:
data = response.json()
results = data.get("results", [])
formatted_results = []
for idx, result in enumerate(results, 1):
text = result.get("text", "")
relevance = result.get("relevance", result.get("score", 0.0))
formatted_results.append({
"Rank": idx,
"Text": text[:300] + "..." if len(text) > 300 else text,
"Relevance": f"{relevance:.3f}" if relevance else "N/A"
})
status = f"✅ Found {len(results)} result(s) for '{query}'"
return status, formatted_results
else:
error_msg = f"❌ Error {response.status_code}: {response.text}"
return error_msg, []
except requests.exceptions.ConnectionError:
return "❌ Could not reach backend. Ensure the FastAPI server is running.", []
except requests.exceptions.Timeout:
return "⏱️ Request timed out. Please try again.", []
except Exception as exc:
return f"❌ Unexpected error: {exc}", []
# Create Gradio interface
with gr.Blocks(
title="IntegraChat — MCP Autonomous Agent",
theme=gr.themes.Soft(),
css="""
.stat-card {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
padding: 20px;
border-radius: 12px;
color: white;
text-align: center;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
transition: transform 0.2s;
}
.stat-card:hover {
transform: translateY(-2px);
box-shadow: 0 6px 12px rgba(0, 0, 0, 0.15);
}
.stat-card h3 {
margin: 0 0 10px 0;
font-size: 14px;
opacity: 0.9;
}
.stat-card strong {
font-size: 24px;
font-weight: bold;
}
.summary-box {
background: linear-gradient(135deg, #1f2937 0%, #111827 100%);
padding: 24px;
border-radius: 12px;
border: 2px solid #374151;
max-height: 500px;
overflow-y: auto;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.3);
color: #f9fafb;
}
.summary-box h3, .summary-box h4 {
margin-top: 0;
margin-bottom: 12px;
color: #ffffff;
font-weight: 600;
}
.summary-box h4 {
color: #e5e7eb;
font-size: 16px;
margin-top: 20px;
margin-bottom: 10px;
}
.summary-box p {
color: #f3f4f6;
margin: 8px 0;
line-height: 1.6;
}
.summary-box ul {
margin: 10px 0;
padding-left: 24px;
color: #f3f4f6;
}
.summary-box li {
margin: 8px 0;
color: #f3f4f6;
line-height: 1.6;
}
.summary-box code {
background-color: #000000;
color: #00ff00;
padding: 2px 6px;
border-radius: 4px;
font-family: 'Courier New', monospace;
font-size: 13px;
border: 1px solid #374151;
}
.summary-box hr {
border: none;
border-top: 1px solid #4b5563;
margin: 16px 0;
}
.summary-box strong {
color: #ffffff;
}
.chart-title {
margin-bottom: 8px;
margin-top: 0;
font-weight: 600;
color: #1f2937;
text-align: center;
}
"""
) as demo:
gr.Markdown(
"""
# 🤖 IntegraChat — MCP Autonomous Agent
**Enterprise-grade AI with autonomous agents, secure multi-tenant RAG, real-time web search, and governance.**
Enter your Tenant ID to chat with the MCP-powered agent or ingest documents into the enterprise knowledge base.
"""
)
tenant_id_input = gr.Textbox(
label="Tenant ID",
placeholder="Enter your tenant ID (e.g., tenant123)",
value="",
interactive=True
)
with gr.Tabs():
with gr.Tab("Chat"):
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(
label="Chat with Agent",
height=500,
show_label=True,
container=True,
type="messages"
)
with gr.Row():
message_input = gr.Textbox(
label="Message",
placeholder="Type your message here...",
scale=4,
show_label=False,
container=False
)
send_button = gr.Button("Send", variant="primary", scale=1)
with gr.Column(scale=1):
gr.Markdown(
"""
### 📝 Chat Instructions
1. Enter your **Tenant ID** above
2. Ask a question or give a task to the agent
3. The MCP agent will automatically select tools (RAG, Web, etc.)
### ⚙️ Backend Configuration
The agent connects to the FastAPI backend at `http://localhost:8000/agent/message`
"""
)
# Event handlers for chat tab with streaming
def send_message(message, tenant_id, history):
# Clear message input immediately
message_input_value = ""
# Use streaming function which yields updates
# Gradio will automatically handle the generator and update UI in real-time
try:
for updated_history in chat_with_agent(message, tenant_id, history):
yield updated_history, message_input_value
except Exception as e:
# Fallback if streaming fails
error_msg = f"Streaming error: {str(e)}"
history.append({"role": "assistant", "content": error_msg})
yield history, message_input_value
send_button.click(
fn=send_message,
inputs=[message_input, tenant_id_input, chatbot],
outputs=[chatbot, message_input]
)
message_input.submit(
fn=send_message,
inputs=[message_input, tenant_id_input, chatbot],
outputs=[chatbot, message_input]
)
with gr.Tab("Document Ingestion"):
gr.Markdown(
"""
### 📚 Knowledge Base Ingestion
Ingest documents so the MCP agent can reference tenant-private knowledge.
- **Raw text / URLs:** Use the fields below.
- **Files (PDF, DOCX, TXT, MD):** Use the file upload section.
"""
)
ingestion_mode = gr.Radio(
["Raw Text", "URL", "File Upload"],
value="Raw Text",
label="Select Ingestion Mode"
)
with gr.Row():
doc_filename = gr.Textbox(label="Filename (optional)")
doc_id = gr.Textbox(label="Document ID (optional)")
document_url = gr.Textbox(
label="Document URL (for URL ingestion)",
placeholder="https://example.com/policy",
visible=False
)
doc_content = gr.Textbox(
label="Content / Notes",
placeholder="Paste the document text here...",
lines=8,
visible=True
)
metadata_json = gr.Textbox(
label="Additional Metadata (JSON)",
placeholder='{"department": "HR", "tags": ["policy", "benefits"]}'
)
ingest_doc_button = gr.Button("Ingest Text / URL Document", variant="primary")
document_status = gr.Markdown("")
def handle_ingest_document(
tenant_id,
mode,
content,
doc_url,
filename,
doc_id_value,
metadata
):
source_type = "raw_text" if mode == "Raw Text" else "url"
result = ingest_document(
tenant_id=tenant_id,
source_type=source_type,
content=content,
document_url=doc_url,
filename=filename,
doc_id=doc_id_value,
metadata_json=metadata
)
# Add note about refreshing Knowledge Base Library
if "✅" in result:
result += "\n\n💡 **Tip:** Go to the 'Knowledge Base Library' tab to view your ingested documents."
return result
ingest_doc_button.click(
fn=handle_ingest_document,
inputs=[
tenant_id_input,
ingestion_mode,
doc_content,
document_url,
doc_filename,
doc_id,
metadata_json
],
outputs=document_status
)
file_section = gr.Markdown("#### 📁 File Upload (PDF, DOCX, TXT, Markdown)", visible=False)
file_upload = gr.File(
label="Upload File",
file_types=[".pdf", ".docx", ".txt", ".md", ".markdown"],
visible=False
)
ingest_file_button = gr.Button("Upload & Ingest File", visible=False)
def handle_file_ingestion(tenant_id, file_obj):
result = ingest_file(tenant_id, file_obj)
# Add note about refreshing Knowledge Base Library
if "✅" in result:
result += "\n\n💡 **Tip:** Go to the 'Knowledge Base Library' tab to view your ingested documents."
return result
ingest_file_button.click(
fn=handle_file_ingestion,
inputs=[tenant_id_input, file_upload],
outputs=document_status
)
def toggle_source_fields(mode):
show_text = mode == "Raw Text"
show_url = mode == "URL"
show_file = mode == "File Upload"
return (
gr.update(visible=show_text),
gr.update(visible=show_url),
gr.update(visible=not show_file),
gr.update(visible=not show_file),
gr.update(visible=not show_file),
gr.update(visible=show_file),
gr.update(visible=show_file),
gr.update(visible=show_file),
)
ingestion_mode.change(
fn=toggle_source_fields,
inputs=[ingestion_mode],
outputs=[
doc_content,
document_url,
doc_filename,
doc_id,
ingest_doc_button,
file_section,
file_upload,
ingest_file_button,
]
)
with gr.Tab("Knowledge Base Library"):
gr.Markdown(
"""
### 📚 Knowledge Base Library
View, search, and manage all ingested documents for your tenant with visual analytics.
- **📊 Statistics:** View document counts, types, and distribution
- **🔍 Search:** Use semantic search to find relevant documents
- **🔽 Filter:** Filter documents by type (text, PDF, FAQ, link)
- **🗑️ Delete:** Remove individual documents or delete all at once
"""
)
# Statistics Section
with gr.Row():
kb_total_docs = gr.Markdown("### 📄 Total Documents\n**0**", elem_classes=["stat-card"])
kb_text_docs = gr.Markdown("### 📝 Text Documents\n**0**", elem_classes=["stat-card"])
kb_pdf_docs = gr.Markdown("### 📄 PDF Documents\n**0**", elem_classes=["stat-card"])
kb_faq_docs = gr.Markdown("### ❓ FAQ Documents\n**0**", elem_classes=["stat-card"])
kb_link_docs = gr.Markdown("### 🔗 Link Documents\n**0**", elem_classes=["stat-card"])
# Chart and Search Section
with gr.Row():
with gr.Column(scale=1):
kb_chart = gr.Plot(label="Document Type Distribution", show_label=True)
kb_refresh_button = gr.Button("🔄 Refresh Documents", variant="primary", size="lg")
kb_delete_all_button = gr.Button("🗑️ Delete All Documents", variant="stop")
with gr.Column(scale=1):
kb_search_query = gr.Textbox(
label="🔍 Search Knowledge Base",
placeholder="Enter a search query (e.g., 'admin', 'policy', 'FAQ')...",
show_label=True
)
kb_search_button = gr.Button("Search", variant="primary")
kb_search_status = gr.Markdown("")
kb_search_results = gr.Dataframe(
headers=["Rank", "Text", "Relevance"],
datatype=["number", "str", "str"],
interactive=False,
label="Search Results",
wrap=True
)
# Status and Filter Section
kb_status = gr.Markdown("👉 Click **Refresh Documents** to load your knowledge base.")
with gr.Row():
with gr.Column(scale=2):
kb_filter_type = gr.Radio(
["all", "text", "pdf", "faq", "link"],
value="all",
label="Filter by Type",
info="Filter documents by detected type"
)
with gr.Column(scale=1):
kb_avg_length = gr.Markdown("**Average Length:** 0 characters")
# Documents Table
kb_documents_table = gr.Dataframe(
headers=["ID", "Type", "Preview", "Length", "Created"],
datatype=["number", "str", "str", "number", "str"],
interactive=False,
label="Documents",
wrap=True
)
# Delete Section
with gr.Row():
kb_delete_id = gr.Number(
label="Delete Document by ID",
value=None,
precision=0,
info="Enter document ID to delete",
scale=3
)
kb_delete_button = gr.Button("Delete Document", variant="stop", scale=1)
kb_delete_status = gr.Markdown("")
def refresh_documents(tenant_id, filter_type="all"):
status, docs, total, stats, chart_fig = list_documents(tenant_id)
# Filter documents by type if not "all"
if filter_type != "all" and docs:
filtered_docs = [doc for doc in docs if doc.get("Type", "").lower() == filter_type.lower()]
docs = filtered_docs
status = f"✅ Found {len(docs)} {filter_type} document(s) (out of {total} total)"
# Update statistics cards
type_counts = stats.get("types", {})
total_md = f"### 📄 Total Documents\n**{total}**"
text_md = f"### 📝 Text Documents\n**{type_counts.get('text', 0)}**"
pdf_md = f"### 📄 PDF Documents\n**{type_counts.get('pdf', 0)}**"
faq_md = f"### ❓ FAQ Documents\n**{type_counts.get('faq', 0)}**"
link_md = f"### 🔗 Link Documents\n**{type_counts.get('link', 0)}**"
avg_length_md = f"**Average Length:** {stats.get('avg_length', 0):,} characters"
status_msg = f"{status}\n\n**Total Documents:** {total} | **Total Characters:** {stats.get('total_chars', 0):,}"
return (
status_msg, docs, total_md, text_md, pdf_md, faq_md, link_md,
avg_length_md, chart_fig
)
def filter_documents(tenant_id, filter_type):
return refresh_documents(tenant_id, filter_type)
def search_kb(tenant_id, query):
status, results = search_knowledge_base(tenant_id, query)
return status, results
def delete_doc(tenant_id, doc_id):
if doc_id is None or doc_id <= 0:
return "❗ Please enter a valid document ID.", "", "", "", "", "", "", "", None
result = delete_document(tenant_id, int(doc_id))
# Refresh document list after deletion
return (result, *refresh_documents(tenant_id, "all"))
def delete_all_docs(tenant_id):
result = delete_all_documents(tenant_id)
# Refresh document list after deletion
return (result, *refresh_documents(tenant_id, "all"))
kb_refresh_button.click(
fn=refresh_documents,
inputs=[tenant_id_input, kb_filter_type],
outputs=[
kb_status, kb_documents_table, kb_total_docs, kb_text_docs,
kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart
]
)
kb_filter_type.change(
fn=filter_documents,
inputs=[tenant_id_input, kb_filter_type],
outputs=[
kb_status, kb_documents_table, kb_total_docs, kb_text_docs,
kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart
]
)
kb_search_button.click(
fn=search_kb,
inputs=[tenant_id_input, kb_search_query],
outputs=[kb_search_status, kb_search_results]
)
kb_search_query.submit(
fn=search_kb,
inputs=[tenant_id_input, kb_search_query],
outputs=[kb_search_status, kb_search_results]
)
kb_delete_button.click(
fn=delete_doc,
inputs=[tenant_id_input, kb_delete_id],
outputs=[
kb_delete_status, kb_status, kb_documents_table, kb_total_docs,
kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart
]
)
kb_delete_all_button.click(
fn=delete_all_docs,
inputs=[tenant_id_input],
outputs=[
kb_delete_status, kb_status, kb_documents_table, kb_total_docs,
kb_text_docs, kb_pdf_docs, kb_faq_docs, kb_link_docs, kb_avg_length, kb_chart
]
)
with gr.Tab("Admin Analytics"):
gr.Markdown(
"""
# 📊 Admin Analytics Dashboard
Comprehensive tenant-level analytics with visual insights, performance metrics, and detailed tool usage statistics.
"""
)
# Refresh Button at Top
with gr.Row():
analytics_refresh = gr.Button("🔄 Fetch Analytics Snapshot", variant="primary", size="lg")
gr.Markdown("")
# Statistics Cards
gr.Markdown("### 📈 Key Metrics")
with gr.Row():
analytics_total_queries = gr.Markdown("### 📊 Total Queries\n**0**", elem_classes=["stat-card"])
analytics_active_users = gr.Markdown("### 👥 Active Users\n**0**", elem_classes=["stat-card"])
analytics_redflags = gr.Markdown("### 🚩 Red Flags\n**0**", elem_classes=["stat-card"])
analytics_rag_searches = gr.Markdown("### 🔍 RAG Searches\n**0**", elem_classes=["stat-card"])
# Charts Section
gr.Markdown("### 📊 Performance Charts")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("#### 📈 Tool Usage Count", elem_classes=["chart-title"])
analytics_tool_chart = gr.Plot(label="", show_label=False)
with gr.Column(scale=1):
gr.Markdown("#### ⚡ Average Tool Latency", elem_classes=["chart-title"])
analytics_latency_chart = gr.Plot(label="", show_label=False)
# RAG Quality and Summary Section
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("#### 🔍 RAG Quality Metrics", elem_classes=["chart-title"])
analytics_rag_chart = gr.Plot(label="", show_label=False)
with gr.Column(scale=1):
gr.Markdown("### 📋 Analytics Summary")
analytics_summary = gr.Markdown(
"👉 Click **Fetch Analytics Snapshot** to load data.",
elem_classes=["summary-box"]
)
# Tool Usage Details Table
gr.Markdown("### 🔧 Detailed Tool Usage")
analytics_tool_table = gr.Dataframe(
headers=["Tool", "Count", "Avg Latency (ms)", "Success", "Errors", "Total Tokens"],
datatype=["str", "number", "number", "number", "number", "number"],
interactive=False,
label="",
wrap=True
)
analytics_error = gr.Markdown("", visible=False)
def format_analytics(tenant_id):
summary, tool_usage, tool_chart, latency_chart, rag_chart, error = fetch_admin_analytics(tenant_id)
if error:
return (
error, "", "", "", "", None, None, None, []
)
# Extract overview data - fetch_admin_analytics already fetched it, but we need it again for cards
overview_data = {}
try:
resp = requests.get(
f"{BACKEND_BASE_URL}/analytics/overview",
headers={"x-tenant-id": tenant_id},
timeout=30
)
if resp.status_code == 200:
data = resp.json()
# The API returns {"overview": {...}} or direct overview object
overview_data = data.get("overview", data) if isinstance(data, dict) else {}
# Debug: print to see what we're getting
print(f"DEBUG: Overview data keys: {overview_data.keys() if isinstance(overview_data, dict) else 'Not a dict'}")
except Exception as e:
print(f"Error fetching overview: {e}")
pass
# Extract values with proper fallbacks - handle both nested and flat structures
if isinstance(overview_data, dict):
total_queries = overview_data.get("total_queries", 0)
active_users = overview_data.get("active_users", 0)
redflag_count = overview_data.get("redflag_count", 0)
rag_quality = overview_data.get("rag_quality", {})
rag_searches = rag_quality.get("total_searches", 0) if isinstance(rag_quality, dict) else 0
else:
total_queries = 0
active_users = 0
redflag_count = 0
rag_quality = {}
rag_searches = 0
# Format statistics cards
queries_md = f"### 📊 Total Queries\n**{total_queries}**"
users_md = f"### 👥 Active Users\n**{active_users}**"
redflags_md = f"### 🚩 Red Flags\n**{redflag_count}**"
rag_md = f"### 🔍 RAG Searches\n**{rag_searches}**"
# Format tool usage table
tool_table_data = []
for tool_name, stats in tool_usage.items():
tool_table_data.append({
"Tool": tool_name.replace(".", " ").title(),
"Count": stats.get("count", 0),
"Avg Latency (ms)": round(stats.get("avg_latency_ms", 0), 2),
"Success": stats.get("success_count", 0),
"Errors": stats.get("error_count", 0),
"Total Tokens": stats.get("total_tokens", 0)
})
return (
summary, queries_md, users_md, redflags_md, rag_md,
tool_chart, latency_chart, rag_chart, tool_table_data
)
analytics_refresh.click(
fn=format_analytics,
inputs=[tenant_id_input],
outputs=[
analytics_summary,
analytics_total_queries,
analytics_active_users,
analytics_redflags,
analytics_rag_searches,
analytics_tool_chart,
analytics_latency_chart,
analytics_rag_chart,
analytics_tool_table
]
)
with gr.Tab("Admin Rules & Compliance"):
gr.Markdown(
"""
### 🛡️ Admin Rules & Regulations
Upload or manage tenant-specific governance rules (red-flag patterns, compliance policies, etc.).
**Upload Methods:**
- **Text Input:** Enter one rule per line in the text box
- **File Upload:** Upload rules from TXT, PDF, DOC, or DOCX files
**Features:**
- Rules are automatically enhanced by LLM (identifies edge cases, improves patterns)
- Comment lines (starting with #) are automatically ignored
- Use the delete box to remove an exact rule
- Refresh anytime to view the latest rule set
"""
)
rules_summary = gr.Markdown("👉 Click **Refresh Rules** to see existing entries.")
rules_table = gr.Dataframe(
headers=["#", "Rule"],
datatype=["number", "str"],
interactive=False,
value=[]
)
rules_status = gr.Markdown("")
with gr.Row():
refresh_rules_button = gr.Button("Refresh Rules", variant="secondary")
gr.Markdown("")
with gr.Row():
with gr.Column(scale=1):
rules_input = gr.Textbox(
label="Rules / Regulations (Text Input)",
placeholder="Enter one rule per line...",
lines=6
)
upload_rules_button = gr.Button("Upload / Append Rules", variant="primary")
with gr.Column(scale=1):
gr.Markdown("**OR**")
rules_file_upload = gr.File(
label="Upload Rules File",
file_types=[".txt", ".pdf", ".doc", ".docx"],
type="filepath"
)
upload_file_button = gr.Button("Upload Rules from File", variant="primary")
delete_rule_input = gr.Textbox(
label="Delete Rule",
placeholder="Enter the exact rule text to remove..."
)
delete_rule_button = gr.Button("Delete Rule", variant="stop")
refresh_rules_button.click(
fn=fetch_admin_rules,
inputs=[tenant_id_input],
outputs=[rules_summary, rules_table]
)
upload_rules_button.click(
fn=add_rules_and_refresh,
inputs=[tenant_id_input, rules_input],
outputs=[rules_status, rules_summary, rules_table]
)
upload_file_button.click(
fn=add_rules_from_file,
inputs=[tenant_id_input, rules_file_upload],
outputs=[rules_status, rules_summary, rules_table]
)
delete_rule_button.click(
fn=delete_rule_and_refresh,
inputs=[tenant_id_input, delete_rule_input],
outputs=[rules_status, rules_summary, rules_table]
)
gr.Markdown(
"""
---
**Built with [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) for the MCP Hackathon**
"""
)
if __name__ == "__main__":
import os
# For Hugging Face Spaces, bind to 0.0.0.0; for local dev, use 127.0.0.1
# HF Spaces sets SPACE_ID environment variable
server_name = "0.0.0.0" if os.getenv("SPACE_ID") else "127.0.0.1"
demo.launch(
server_name=server_name,
server_port=7860,
share=False
)