IntegraChat / app.py
nothingworry's picture
feat: add admin rules ingestion UI and persistent backend storage
e4abb85
raw
history blame
23.3 kB
import gradio as gr
import requests
import json
import os
from pathlib import Path
BACKEND_BASE_URL = os.getenv("BACKEND_BASE_URL", "http://localhost:8000")
def chat_with_agent(message, tenant_id, history):
"""
Send a message to the backend MCP agent and return the response.
Args:
message: User's message text
tenant_id: Tenant ID for multi-tenant isolation
history: Chat history (Gradio messages format)
Returns:
Updated chat history with agent response
"""
if not message or not message.strip():
return history
if not tenant_id or not tenant_id.strip():
error_msg = "Please enter a Tenant ID before sending a message."
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": error_msg})
return history
# Backend API endpoint
backend_url = f"{BACKEND_BASE_URL}/agent/message"
# Prepare request payload (matching backend API format)
payload = {
"tenant_id": tenant_id.strip(),
"message": message,
"user_id": None,
"conversation_history": [],
"temperature": 0.0
}
# Prepare headers
headers = {
"Content-Type": "application/json"
}
try:
# Send POST request to backend
# Increased timeout to 120 seconds for complex agent operations
# (RAG search, web search, LLM calls can take time)
response = requests.post(
backend_url,
json=payload,
headers=headers,
timeout=120
)
# Check if request was successful
if response.status_code == 200:
response_data = response.json()
# Backend returns response in "text" field
agent_response = response_data.get("text", "No response received from agent.")
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": agent_response})
else:
error_msg = f"Error {response.status_code}: {response.text}"
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": error_msg})
except requests.exceptions.ConnectionError:
error_msg = "❌ Connection Error: Could not connect to backend. Please ensure the FastAPI server is running at http://localhost:8000"
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": error_msg})
except requests.exceptions.Timeout:
error_msg = "⏱️ Request Timeout: The backend took longer than 2 minutes to respond. This may happen if:\n- The LLM is processing a complex query\n- Multiple tools (RAG, Web Search) are being used\n- The backend is under heavy load\n\nPlease try again with a simpler query, or check if the backend services (Ollama, MCP servers) are running properly."
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": error_msg})
except requests.exceptions.RequestException as e:
error_msg = f"❌ Request Error: {str(e)}"
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": error_msg})
except Exception as e:
error_msg = f"❌ Unexpected Error: {str(e)}"
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": error_msg})
return history
def ingest_document(
tenant_id: str,
source_type: str,
content: str,
document_url: str,
filename: str,
doc_id: str,
metadata_json: str
):
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required to ingest documents."
tenant_id = tenant_id.strip()
payload_content = content or ""
if source_type == "url" and document_url:
payload_content = document_url.strip()
metadata = {}
if filename:
metadata["filename"] = filename.strip()
if document_url:
metadata["url"] = document_url.strip()
if doc_id:
metadata["doc_id"] = doc_id.strip()
if metadata_json:
try:
extra_metadata = json.loads(metadata_json)
if isinstance(extra_metadata, dict):
metadata.update(extra_metadata)
else:
return "❗ Metadata JSON must represent an object (key/value pairs)."
except json.JSONDecodeError as exc:
return f"❗ Invalid metadata JSON: {exc}"
payload = {
"action": "ingest_document",
"tenant_id": tenant_id,
"source_type": source_type,
"content": payload_content,
"metadata": metadata
}
try:
response = requests.post(
f"{BACKEND_BASE_URL}/rag/ingest-document",
json=payload,
headers={"Content-Type": "application/json"},
timeout=60
)
if response.status_code == 200:
data = response.json()
return f"βœ… Document ingested successfully.\n\n{data.get('message', '')}"
return f"❌ Ingestion failed ({response.status_code}): {response.text}"
except requests.exceptions.ConnectionError:
return "❌ Could not reach the backend. Make sure the FastAPI server is running."
except requests.exceptions.Timeout:
return "⏱️ The ingestion request timed out. Please try again."
except Exception as exc:
return f"❌ Unexpected error during ingestion: {exc}"
def ingest_file(tenant_id: str, file_obj):
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required to ingest files."
if file_obj is None:
return "❗ Please select a file to upload."
tenant_id = tenant_id.strip()
try:
file_path = Path(file_obj.name)
with open(file_path, "rb") as f:
file_bytes = f.read()
files = {
"file": (file_path.name, file_bytes, "application/octet-stream")
}
response = requests.post(
f"{BACKEND_BASE_URL}/rag/ingest-file",
files=files,
headers={"x-tenant-id": tenant_id},
timeout=120
)
if response.status_code == 200:
data = response.json()
return f"βœ… File ingested successfully.\n\n{data.get('message', '')}"
return f"❌ File ingestion failed ({response.status_code}): {response.text}"
except FileNotFoundError:
return "❌ Could not read the uploaded file."
except requests.exceptions.ConnectionError:
return "❌ Could not reach the backend. Make sure the FastAPI server is running."
except requests.exceptions.Timeout:
return "⏱️ File ingestion timed out. Please try again."
except Exception as exc:
return f"❌ Unexpected error during file ingestion: {exc}"
def _format_rules_table(rules: list[str]) -> list[list]:
return [[idx + 1, rule] for idx, rule in enumerate(rules)]
def fetch_admin_rules(tenant_id: str) -> tuple[str, list[list]]:
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required.", []
tenant_id = tenant_id.strip()
try:
response = requests.get(
f"{BACKEND_BASE_URL}/admin/rules",
headers={"x-tenant-id": tenant_id},
timeout=30
)
if response.status_code == 200:
rules = response.json().get("rules", [])
if not rules:
return "βœ… No admin rules have been configured yet.", []
summary = f"### Current Rules ({len(rules)})"
return summary, _format_rules_table(rules)
return f"❌ Error {response.status_code}: {response.text}", []
except requests.exceptions.ConnectionError:
return "❌ Could not reach backend. Ensure the FastAPI server is running.", []
except requests.exceptions.Timeout:
return "⏱️ Request timed out. Please try again.", []
except Exception as exc:
return f"❌ Unexpected error: {exc}", []
def add_admin_rules(tenant_id: str, rules_text: str) -> str:
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required."
if not rules_text or not rules_text.strip():
return "❗ Provide at least one rule to upload."
tenant_id = tenant_id.strip()
rules = [rule.strip() for rule in rules_text.splitlines() if rule.strip()]
if not rules:
return "❗ No valid rules detected."
added = []
errors = []
for rule in rules:
try:
resp = requests.post(
f"{BACKEND_BASE_URL}/admin/rules",
params={"rule": rule},
headers={"x-tenant-id": tenant_id},
timeout=15
)
if resp.status_code == 200:
added.append(rule)
else:
errors.append(f"{rule} -> {resp.status_code}: {resp.text}")
except Exception as exc:
errors.append(f"{rule} -> {exc}")
summary = []
if added:
summary.append(f"βœ… Added {len(added)} rule(s):\n" + "\n".join([f"- {r}" for r in added]))
if errors:
summary.append("⚠️ Errors:\n" + "\n".join(errors))
return "\n\n".join(summary) if summary else "No rules were added."
def delete_admin_rule(tenant_id: str, rule: str) -> str:
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required."
if not rule or not rule.strip():
return "❗ Provide the exact rule text to delete."
tenant_id = tenant_id.strip()
rule = rule.strip()
try:
resp = requests.delete(
f"{BACKEND_BASE_URL}/admin/rules/{rule}",
headers={"x-tenant-id": tenant_id},
timeout=15
)
if resp.status_code == 200:
return f"πŸ—‘οΈ Deleted rule: {rule}"
return f"❌ Error {resp.status_code}: {resp.text}"
except requests.exceptions.ConnectionError:
return "❌ Could not reach backend. Ensure the FastAPI server is running."
except requests.exceptions.Timeout:
return "⏱️ Delete request timed out. Please try again."
except Exception as exc:
return f"❌ Unexpected error: {exc}"
def add_rules_and_refresh(tenant_id: str, rules_text: str):
status = add_admin_rules(tenant_id, rules_text)
summary, rows = fetch_admin_rules(tenant_id)
return status, summary, rows
def delete_rule_and_refresh(tenant_id: str, rule: str):
status = delete_admin_rule(tenant_id, rule)
summary, rows = fetch_admin_rules(tenant_id)
return status, summary, rows
def fetch_admin_analytics(tenant_id: str) -> str:
if not tenant_id or not tenant_id.strip():
return "❗ Tenant ID is required to view analytics."
tenant_id = tenant_id.strip()
headers = {"x-tenant-id": tenant_id}
sections = []
endpoints = [
("Overview", "/analytics/overview"),
("Tool Usage", "/analytics/tool-usage"),
("Red Flags", "/analytics/redflags"),
("Activity", "/analytics/activity"),
]
for label, path in endpoints:
try:
resp = requests.get(
f"{BACKEND_BASE_URL}{path}",
headers=headers,
timeout=30
)
if resp.status_code == 200:
data = resp.json()
pretty = json.dumps(data, indent=2)
sections.append(f"### {label}\n```json\n{pretty}\n```")
else:
sections.append(f"### {label}\n❌ Error {resp.status_code}: {resp.text}")
except requests.exceptions.ConnectionError:
sections.append(f"### {label}\n❌ Could not reach backend. Is the FastAPI server running?")
except requests.exceptions.Timeout:
sections.append(f"### {label}\n⏱️ Request timed out. Please try again.")
except Exception as exc:
sections.append(f"### {label}\n❌ Unexpected error: {exc}")
return "\n\n".join(sections) if sections else "No analytics available."
# Create Gradio interface
with gr.Blocks(title="IntegraChat β€” MCP Autonomous Agent", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# πŸ€– IntegraChat β€” MCP Autonomous Agent
**Enterprise-grade AI with autonomous agents, secure multi-tenant RAG, real-time web search, and governance.**
Enter your Tenant ID to chat with the MCP-powered agent or ingest documents into the enterprise knowledge base.
"""
)
tenant_id_input = gr.Textbox(
label="Tenant ID",
placeholder="Enter your tenant ID (e.g., tenant123)",
value="",
interactive=True
)
with gr.Tabs():
with gr.Tab("Chat"):
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(
label="Chat with Agent",
height=500,
show_label=True,
container=True,
type="messages"
)
with gr.Row():
message_input = gr.Textbox(
label="Message",
placeholder="Type your message here...",
scale=4,
show_label=False,
container=False
)
send_button = gr.Button("Send", variant="primary", scale=1)
with gr.Column(scale=1):
gr.Markdown(
"""
### πŸ“ Chat Instructions
1. Enter your **Tenant ID** above
2. Ask a question or give a task to the agent
3. The MCP agent will automatically select tools (RAG, Web, etc.)
### βš™οΈ Backend Configuration
The agent connects to the FastAPI backend at `http://localhost:8000/agent/message`
"""
)
# Event handlers for chat tab
def send_message(message, tenant_id, history):
updated_history = chat_with_agent(message, tenant_id, history)
return updated_history, "" # Clear message input after sending
send_button.click(
fn=send_message,
inputs=[message_input, tenant_id_input, chatbot],
outputs=[chatbot, message_input]
)
message_input.submit(
fn=send_message,
inputs=[message_input, tenant_id_input, chatbot],
outputs=[chatbot, message_input]
)
with gr.Tab("Document Ingestion"):
gr.Markdown(
"""
### πŸ“š Knowledge Base Ingestion
Ingest documents so the MCP agent can reference tenant-private knowledge.
- **Raw text / URLs:** Use the fields below.
- **Files (PDF, DOCX, TXT, MD):** Use the file upload section.
"""
)
ingestion_mode = gr.Radio(
["Raw Text", "URL", "File Upload"],
value="Raw Text",
label="Select Ingestion Mode"
)
with gr.Row():
doc_filename = gr.Textbox(label="Filename (optional)")
doc_id = gr.Textbox(label="Document ID (optional)")
document_url = gr.Textbox(
label="Document URL (for URL ingestion)",
placeholder="https://example.com/policy",
visible=False
)
doc_content = gr.Textbox(
label="Content / Notes",
placeholder="Paste the document text here...",
lines=8,
visible=True
)
metadata_json = gr.Textbox(
label="Additional Metadata (JSON)",
placeholder='{"department": "HR", "tags": ["policy", "benefits"]}'
)
ingest_doc_button = gr.Button("Ingest Text / URL Document", variant="primary")
document_status = gr.Markdown("")
def handle_ingest_document(
tenant_id,
mode,
content,
doc_url,
filename,
doc_id_value,
metadata
):
source_type = "raw_text" if mode == "Raw Text" else "url"
return ingest_document(
tenant_id=tenant_id,
source_type=source_type,
content=content,
document_url=doc_url,
filename=filename,
doc_id=doc_id_value,
metadata_json=metadata
)
ingest_doc_button.click(
fn=handle_ingest_document,
inputs=[
tenant_id_input,
ingestion_mode,
doc_content,
document_url,
doc_filename,
doc_id,
metadata_json
],
outputs=document_status
)
file_section = gr.Markdown("#### πŸ“ File Upload (PDF, DOCX, TXT, Markdown)", visible=False)
file_upload = gr.File(
label="Upload File",
file_types=[".pdf", ".docx", ".txt", ".md", ".markdown"],
visible=False
)
ingest_file_button = gr.Button("Upload & Ingest File", visible=False)
def handle_file_ingestion(tenant_id, file_obj):
return ingest_file(tenant_id, file_obj)
ingest_file_button.click(
fn=handle_file_ingestion,
inputs=[tenant_id_input, file_upload],
outputs=document_status
)
def toggle_source_fields(mode):
show_text = mode == "Raw Text"
show_url = mode == "URL"
show_file = mode == "File Upload"
return (
gr.update(visible=show_text),
gr.update(visible=show_url),
gr.update(visible=not show_file),
gr.update(visible=not show_file),
gr.update(visible=not show_file),
gr.update(visible=show_file),
gr.update(visible=show_file),
gr.update(visible=show_file),
)
ingestion_mode.change(
fn=toggle_source_fields,
inputs=[ingestion_mode],
outputs=[
doc_content,
document_url,
doc_filename,
doc_id,
ingest_doc_button,
file_section,
file_upload,
ingest_file_button,
]
)
with gr.Tab("Admin Analytics"):
gr.Markdown(
"""
### πŸ“Š Admin Analytics
Review tenant-level analytics generated by the IntegraChat backend.
- **Overview:** Total queries, active users, red-flag count.
- **Tool Usage:** How often RAG, Web, and Admin tools are invoked.
- **Red Flags:** Recent governance events for this tenant.
- **Activity:** Summary of tenant activity metrics.
"""
)
analytics_refresh = gr.Button("Fetch Analytics Snapshot", variant="primary")
analytics_output = gr.Markdown("πŸ‘‰ Click the button to load analytics for the current tenant.")
analytics_refresh.click(
fn=fetch_admin_analytics,
inputs=[tenant_id_input],
outputs=analytics_output
)
with gr.Tab("Admin Rules & Compliance"):
gr.Markdown(
"""
### πŸ›‘οΈ Admin Rules & Regulations
Upload or manage tenant-specific governance rules (red-flag patterns, compliance policies, etc.).
- Enter one rule per line to upload multiple at once.
- Use the delete box to remove an exact rule.
- Refresh anytime to view the latest rule set.
"""
)
rules_summary = gr.Markdown("πŸ‘‰ Click **Refresh Rules** to see existing entries.")
rules_table = gr.Dataframe(
headers=["#", "Rule"],
datatype=["number", "str"],
interactive=False,
value=[]
)
rules_status = gr.Markdown("")
with gr.Row():
refresh_rules_button = gr.Button("Refresh Rules", variant="secondary")
gr.Markdown("")
rules_input = gr.Textbox(
label="Rules / Regulations",
placeholder="Enter one rule per line...",
lines=6
)
upload_rules_button = gr.Button("Upload / Append Rules", variant="primary")
delete_rule_input = gr.Textbox(
label="Delete Rule",
placeholder="Enter the exact rule text to remove..."
)
delete_rule_button = gr.Button("Delete Rule", variant="stop")
refresh_rules_button.click(
fn=fetch_admin_rules,
inputs=[tenant_id_input],
outputs=[rules_summary, rules_table]
)
upload_rules_button.click(
fn=add_rules_and_refresh,
inputs=[tenant_id_input, rules_input],
outputs=[rules_status, rules_summary, rules_table]
)
delete_rule_button.click(
fn=delete_rule_and_refresh,
inputs=[tenant_id_input, delete_rule_input],
outputs=[rules_status, rules_summary, rules_table]
)
gr.Markdown(
"""
---
**Built with [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) for the MCP Hackathon**
"""
)
if __name__ == "__main__":
import os
# For Hugging Face Spaces, bind to 0.0.0.0; for local dev, use 127.0.0.1
# HF Spaces sets SPACE_ID environment variable
server_name = "0.0.0.0" if os.getenv("SPACE_ID") else "127.0.0.1"
demo.launch(
server_name=server_name,
server_port=7860,
share=False
)