import gradio as gr import requests import json import os from pathlib import Path BACKEND_BASE_URL = os.getenv("BACKEND_BASE_URL", "http://localhost:8000") def chat_with_agent(message, tenant_id, history): """ Send a message to the backend MCP agent and return the response. Args: message: User's message text tenant_id: Tenant ID for multi-tenant isolation history: Chat history (Gradio messages format) Returns: Updated chat history with agent response """ if not message or not message.strip(): return history if not tenant_id or not tenant_id.strip(): error_msg = "Please enter a Tenant ID before sending a message." history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": error_msg}) return history # Backend API endpoint backend_url = f"{BACKEND_BASE_URL}/agent/message" # Prepare request payload (matching backend API format) payload = { "tenant_id": tenant_id.strip(), "message": message, "user_id": None, "conversation_history": [], "temperature": 0.0 } # Prepare headers headers = { "Content-Type": "application/json" } try: # Send POST request to backend # Increased timeout to 120 seconds for complex agent operations # (RAG search, web search, LLM calls can take time) response = requests.post( backend_url, json=payload, headers=headers, timeout=120 ) # Check if request was successful if response.status_code == 200: response_data = response.json() # Backend returns response in "text" field agent_response = response_data.get("text", "No response received from agent.") history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": agent_response}) else: error_msg = f"Error {response.status_code}: {response.text}" history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": error_msg}) except requests.exceptions.ConnectionError: error_msg = "❌ Connection Error: Could not connect to backend. Please ensure the FastAPI server is running at http://localhost:8000" history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": error_msg}) except requests.exceptions.Timeout: error_msg = "⏱️ Request Timeout: The backend took longer than 2 minutes to respond. This may happen if:\n- The LLM is processing a complex query\n- Multiple tools (RAG, Web Search) are being used\n- The backend is under heavy load\n\nPlease try again with a simpler query, or check if the backend services (Ollama, MCP servers) are running properly." history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": error_msg}) except requests.exceptions.RequestException as e: error_msg = f"❌ Request Error: {str(e)}" history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": error_msg}) except Exception as e: error_msg = f"❌ Unexpected Error: {str(e)}" history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": error_msg}) return history def ingest_document( tenant_id: str, source_type: str, content: str, document_url: str, filename: str, doc_id: str, metadata_json: str ): if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required to ingest documents." tenant_id = tenant_id.strip() payload_content = content or "" if source_type == "url" and document_url: payload_content = document_url.strip() metadata = {} if filename: metadata["filename"] = filename.strip() if document_url: metadata["url"] = document_url.strip() if doc_id: metadata["doc_id"] = doc_id.strip() if metadata_json: try: extra_metadata = json.loads(metadata_json) if isinstance(extra_metadata, dict): metadata.update(extra_metadata) else: return "❗ Metadata JSON must represent an object (key/value pairs)." except json.JSONDecodeError as exc: return f"❗ Invalid metadata JSON: {exc}" payload = { "action": "ingest_document", "tenant_id": tenant_id, "source_type": source_type, "content": payload_content, "metadata": metadata } try: response = requests.post( f"{BACKEND_BASE_URL}/rag/ingest-document", json=payload, headers={"Content-Type": "application/json"}, timeout=60 ) if response.status_code == 200: data = response.json() return f"✅ Document ingested successfully.\n\n{data.get('message', '')}" return f"❌ Ingestion failed ({response.status_code}): {response.text}" except requests.exceptions.ConnectionError: return "❌ Could not reach the backend. Make sure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ The ingestion request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error during ingestion: {exc}" def ingest_file(tenant_id: str, file_obj): if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required to ingest files." if file_obj is None: return "❗ Please select a file to upload." tenant_id = tenant_id.strip() try: file_path = Path(file_obj.name) with open(file_path, "rb") as f: file_bytes = f.read() files = { "file": (file_path.name, file_bytes, "application/octet-stream") } response = requests.post( f"{BACKEND_BASE_URL}/rag/ingest-file", files=files, headers={"x-tenant-id": tenant_id}, timeout=120 ) if response.status_code == 200: data = response.json() return f"✅ File ingested successfully.\n\n{data.get('message', '')}" return f"❌ File ingestion failed ({response.status_code}): {response.text}" except FileNotFoundError: return "❌ Could not read the uploaded file." except requests.exceptions.ConnectionError: return "❌ Could not reach the backend. Make sure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ File ingestion timed out. Please try again." except Exception as exc: return f"❌ Unexpected error during file ingestion: {exc}" def _format_rules_table(rules: list[str]) -> list[list]: return [[idx + 1, rule] for idx, rule in enumerate(rules)] def fetch_admin_rules(tenant_id: str) -> tuple[str, list[list]]: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required.", [] tenant_id = tenant_id.strip() try: response = requests.get( f"{BACKEND_BASE_URL}/admin/rules", headers={"x-tenant-id": tenant_id}, timeout=30 ) if response.status_code == 200: rules = response.json().get("rules", []) if not rules: return "✅ No admin rules have been configured yet.", [] summary = f"### Current Rules ({len(rules)})" return summary, _format_rules_table(rules) return f"❌ Error {response.status_code}: {response.text}", [] except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running.", [] except requests.exceptions.Timeout: return "⏱️ Request timed out. Please try again.", [] except Exception as exc: return f"❌ Unexpected error: {exc}", [] def add_admin_rules(tenant_id: str, rules_text: str) -> str: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not rules_text or not rules_text.strip(): return "❗ Provide at least one rule to upload." tenant_id = tenant_id.strip() rules = [rule.strip() for rule in rules_text.splitlines() if rule.strip()] if not rules: return "❗ No valid rules detected." added = [] errors = [] for rule in rules: try: resp = requests.post( f"{BACKEND_BASE_URL}/admin/rules", params={"rule": rule}, headers={"x-tenant-id": tenant_id}, timeout=15 ) if resp.status_code == 200: added.append(rule) else: errors.append(f"{rule} -> {resp.status_code}: {resp.text}") except Exception as exc: errors.append(f"{rule} -> {exc}") summary = [] if added: summary.append(f"✅ Added {len(added)} rule(s):\n" + "\n".join([f"- {r}" for r in added])) if errors: summary.append("⚠️ Errors:\n" + "\n".join(errors)) return "\n\n".join(summary) if summary else "No rules were added." def delete_admin_rule(tenant_id: str, rule: str) -> str: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required." if not rule or not rule.strip(): return "❗ Provide the exact rule text to delete." tenant_id = tenant_id.strip() rule = rule.strip() try: resp = requests.delete( f"{BACKEND_BASE_URL}/admin/rules/{rule}", headers={"x-tenant-id": tenant_id}, timeout=15 ) if resp.status_code == 200: return f"🗑️ Deleted rule: {rule}" return f"❌ Error {resp.status_code}: {resp.text}" except requests.exceptions.ConnectionError: return "❌ Could not reach backend. Ensure the FastAPI server is running." except requests.exceptions.Timeout: return "⏱️ Delete request timed out. Please try again." except Exception as exc: return f"❌ Unexpected error: {exc}" def add_rules_and_refresh(tenant_id: str, rules_text: str): status = add_admin_rules(tenant_id, rules_text) summary, rows = fetch_admin_rules(tenant_id) return status, summary, rows def delete_rule_and_refresh(tenant_id: str, rule: str): status = delete_admin_rule(tenant_id, rule) summary, rows = fetch_admin_rules(tenant_id) return status, summary, rows def fetch_admin_analytics(tenant_id: str) -> str: if not tenant_id or not tenant_id.strip(): return "❗ Tenant ID is required to view analytics." tenant_id = tenant_id.strip() headers = {"x-tenant-id": tenant_id} sections = [] endpoints = [ ("Overview", "/analytics/overview"), ("Tool Usage", "/analytics/tool-usage"), ("Red Flags", "/analytics/redflags"), ("Activity", "/analytics/activity"), ] for label, path in endpoints: try: resp = requests.get( f"{BACKEND_BASE_URL}{path}", headers=headers, timeout=30 ) if resp.status_code == 200: data = resp.json() pretty = json.dumps(data, indent=2) sections.append(f"### {label}\n```json\n{pretty}\n```") else: sections.append(f"### {label}\n❌ Error {resp.status_code}: {resp.text}") except requests.exceptions.ConnectionError: sections.append(f"### {label}\n❌ Could not reach backend. Is the FastAPI server running?") except requests.exceptions.Timeout: sections.append(f"### {label}\n⏱️ Request timed out. Please try again.") except Exception as exc: sections.append(f"### {label}\n❌ Unexpected error: {exc}") return "\n\n".join(sections) if sections else "No analytics available." # Create Gradio interface with gr.Blocks(title="IntegraChat — MCP Autonomous Agent", theme=gr.themes.Soft()) as demo: gr.Markdown( """ # 🤖 IntegraChat — MCP Autonomous Agent **Enterprise-grade AI with autonomous agents, secure multi-tenant RAG, real-time web search, and governance.** Enter your Tenant ID to chat with the MCP-powered agent or ingest documents into the enterprise knowledge base. """ ) tenant_id_input = gr.Textbox( label="Tenant ID", placeholder="Enter your tenant ID (e.g., tenant123)", value="", interactive=True ) with gr.Tabs(): with gr.Tab("Chat"): with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot( label="Chat with Agent", height=500, show_label=True, container=True, type="messages" ) with gr.Row(): message_input = gr.Textbox( label="Message", placeholder="Type your message here...", scale=4, show_label=False, container=False ) send_button = gr.Button("Send", variant="primary", scale=1) with gr.Column(scale=1): gr.Markdown( """ ### 📝 Chat Instructions 1. Enter your **Tenant ID** above 2. Ask a question or give a task to the agent 3. The MCP agent will automatically select tools (RAG, Web, etc.) ### ⚙️ Backend Configuration The agent connects to the FastAPI backend at `http://localhost:8000/agent/message` """ ) # Event handlers for chat tab def send_message(message, tenant_id, history): updated_history = chat_with_agent(message, tenant_id, history) return updated_history, "" # Clear message input after sending send_button.click( fn=send_message, inputs=[message_input, tenant_id_input, chatbot], outputs=[chatbot, message_input] ) message_input.submit( fn=send_message, inputs=[message_input, tenant_id_input, chatbot], outputs=[chatbot, message_input] ) with gr.Tab("Document Ingestion"): gr.Markdown( """ ### 📚 Knowledge Base Ingestion Ingest documents so the MCP agent can reference tenant-private knowledge. - **Raw text / URLs:** Use the fields below. - **Files (PDF, DOCX, TXT, MD):** Use the file upload section. """ ) ingestion_mode = gr.Radio( ["Raw Text", "URL", "File Upload"], value="Raw Text", label="Select Ingestion Mode" ) with gr.Row(): doc_filename = gr.Textbox(label="Filename (optional)") doc_id = gr.Textbox(label="Document ID (optional)") document_url = gr.Textbox( label="Document URL (for URL ingestion)", placeholder="https://example.com/policy", visible=False ) doc_content = gr.Textbox( label="Content / Notes", placeholder="Paste the document text here...", lines=8, visible=True ) metadata_json = gr.Textbox( label="Additional Metadata (JSON)", placeholder='{"department": "HR", "tags": ["policy", "benefits"]}' ) ingest_doc_button = gr.Button("Ingest Text / URL Document", variant="primary") document_status = gr.Markdown("") def handle_ingest_document( tenant_id, mode, content, doc_url, filename, doc_id_value, metadata ): source_type = "raw_text" if mode == "Raw Text" else "url" return ingest_document( tenant_id=tenant_id, source_type=source_type, content=content, document_url=doc_url, filename=filename, doc_id=doc_id_value, metadata_json=metadata ) ingest_doc_button.click( fn=handle_ingest_document, inputs=[ tenant_id_input, ingestion_mode, doc_content, document_url, doc_filename, doc_id, metadata_json ], outputs=document_status ) file_section = gr.Markdown("#### 📁 File Upload (PDF, DOCX, TXT, Markdown)", visible=False) file_upload = gr.File( label="Upload File", file_types=[".pdf", ".docx", ".txt", ".md", ".markdown"], visible=False ) ingest_file_button = gr.Button("Upload & Ingest File", visible=False) def handle_file_ingestion(tenant_id, file_obj): return ingest_file(tenant_id, file_obj) ingest_file_button.click( fn=handle_file_ingestion, inputs=[tenant_id_input, file_upload], outputs=document_status ) def toggle_source_fields(mode): show_text = mode == "Raw Text" show_url = mode == "URL" show_file = mode == "File Upload" return ( gr.update(visible=show_text), gr.update(visible=show_url), gr.update(visible=not show_file), gr.update(visible=not show_file), gr.update(visible=not show_file), gr.update(visible=show_file), gr.update(visible=show_file), gr.update(visible=show_file), ) ingestion_mode.change( fn=toggle_source_fields, inputs=[ingestion_mode], outputs=[ doc_content, document_url, doc_filename, doc_id, ingest_doc_button, file_section, file_upload, ingest_file_button, ] ) with gr.Tab("Admin Analytics"): gr.Markdown( """ ### 📊 Admin Analytics Review tenant-level analytics generated by the IntegraChat backend. - **Overview:** Total queries, active users, red-flag count. - **Tool Usage:** How often RAG, Web, and Admin tools are invoked. - **Red Flags:** Recent governance events for this tenant. - **Activity:** Summary of tenant activity metrics. """ ) analytics_refresh = gr.Button("Fetch Analytics Snapshot", variant="primary") analytics_output = gr.Markdown("👉 Click the button to load analytics for the current tenant.") analytics_refresh.click( fn=fetch_admin_analytics, inputs=[tenant_id_input], outputs=analytics_output ) with gr.Tab("Admin Rules & Compliance"): gr.Markdown( """ ### 🛡️ Admin Rules & Regulations Upload or manage tenant-specific governance rules (red-flag patterns, compliance policies, etc.). - Enter one rule per line to upload multiple at once. - Use the delete box to remove an exact rule. - Refresh anytime to view the latest rule set. """ ) rules_summary = gr.Markdown("👉 Click **Refresh Rules** to see existing entries.") rules_table = gr.Dataframe( headers=["#", "Rule"], datatype=["number", "str"], interactive=False, value=[] ) rules_status = gr.Markdown("") with gr.Row(): refresh_rules_button = gr.Button("Refresh Rules", variant="secondary") gr.Markdown("") rules_input = gr.Textbox( label="Rules / Regulations", placeholder="Enter one rule per line...", lines=6 ) upload_rules_button = gr.Button("Upload / Append Rules", variant="primary") delete_rule_input = gr.Textbox( label="Delete Rule", placeholder="Enter the exact rule text to remove..." ) delete_rule_button = gr.Button("Delete Rule", variant="stop") refresh_rules_button.click( fn=fetch_admin_rules, inputs=[tenant_id_input], outputs=[rules_summary, rules_table] ) upload_rules_button.click( fn=add_rules_and_refresh, inputs=[tenant_id_input, rules_input], outputs=[rules_status, rules_summary, rules_table] ) delete_rule_button.click( fn=delete_rule_and_refresh, inputs=[tenant_id_input, delete_rule_input], outputs=[rules_status, rules_summary, rules_table] ) gr.Markdown( """ --- **Built with [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) for the MCP Hackathon** """ ) if __name__ == "__main__": import os # For Hugging Face Spaces, bind to 0.0.0.0; for local dev, use 127.0.0.1 # HF Spaces sets SPACE_ID environment variable server_name = "0.0.0.0" if os.getenv("SPACE_ID") else "127.0.0.1" demo.launch( server_name=server_name, server_port=7860, share=False )