Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import json | |
| import os | |
| from pathlib import Path | |
| BACKEND_BASE_URL = os.getenv("BACKEND_BASE_URL", "http://localhost:8000") | |
| def chat_with_agent(message, tenant_id, history): | |
| """ | |
| Send a message to the backend MCP agent and return the response. | |
| Args: | |
| message: User's message text | |
| tenant_id: Tenant ID for multi-tenant isolation | |
| history: Chat history (Gradio messages format) | |
| Returns: | |
| Updated chat history with agent response | |
| """ | |
| if not message or not message.strip(): | |
| return history | |
| if not tenant_id or not tenant_id.strip(): | |
| error_msg = "Please enter a Tenant ID before sending a message." | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": error_msg}) | |
| return history | |
| # Backend API endpoint | |
| backend_url = f"{BACKEND_BASE_URL}/agent/message" | |
| # Prepare request payload (matching backend API format) | |
| payload = { | |
| "tenant_id": tenant_id.strip(), | |
| "message": message, | |
| "user_id": None, | |
| "conversation_history": [], | |
| "temperature": 0.0 | |
| } | |
| # Prepare headers | |
| headers = { | |
| "Content-Type": "application/json" | |
| } | |
| try: | |
| # Send POST request to backend | |
| # Increased timeout to 120 seconds for complex agent operations | |
| # (RAG search, web search, LLM calls can take time) | |
| response = requests.post( | |
| backend_url, | |
| json=payload, | |
| headers=headers, | |
| timeout=120 | |
| ) | |
| # Check if request was successful | |
| if response.status_code == 200: | |
| response_data = response.json() | |
| # Backend returns response in "text" field | |
| agent_response = response_data.get("text", "No response received from agent.") | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": agent_response}) | |
| else: | |
| error_msg = f"Error {response.status_code}: {response.text}" | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": error_msg}) | |
| except requests.exceptions.ConnectionError: | |
| error_msg = "β Connection Error: Could not connect to backend. Please ensure the FastAPI server is running at http://localhost:8000" | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": error_msg}) | |
| except requests.exceptions.Timeout: | |
| error_msg = "β±οΈ Request Timeout: The backend took longer than 2 minutes to respond. This may happen if:\n- The LLM is processing a complex query\n- Multiple tools (RAG, Web Search) are being used\n- The backend is under heavy load\n\nPlease try again with a simpler query, or check if the backend services (Ollama, MCP servers) are running properly." | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": error_msg}) | |
| except requests.exceptions.RequestException as e: | |
| error_msg = f"β Request Error: {str(e)}" | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": error_msg}) | |
| except Exception as e: | |
| error_msg = f"β Unexpected Error: {str(e)}" | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": error_msg}) | |
| return history | |
| def ingest_document( | |
| tenant_id: str, | |
| source_type: str, | |
| content: str, | |
| document_url: str, | |
| filename: str, | |
| doc_id: str, | |
| metadata_json: str | |
| ): | |
| if not tenant_id or not tenant_id.strip(): | |
| return "β Tenant ID is required to ingest documents." | |
| tenant_id = tenant_id.strip() | |
| payload_content = content or "" | |
| if source_type == "url" and document_url: | |
| payload_content = document_url.strip() | |
| metadata = {} | |
| if filename: | |
| metadata["filename"] = filename.strip() | |
| if document_url: | |
| metadata["url"] = document_url.strip() | |
| if doc_id: | |
| metadata["doc_id"] = doc_id.strip() | |
| if metadata_json: | |
| try: | |
| extra_metadata = json.loads(metadata_json) | |
| if isinstance(extra_metadata, dict): | |
| metadata.update(extra_metadata) | |
| else: | |
| return "β Metadata JSON must represent an object (key/value pairs)." | |
| except json.JSONDecodeError as exc: | |
| return f"β Invalid metadata JSON: {exc}" | |
| payload = { | |
| "action": "ingest_document", | |
| "tenant_id": tenant_id, | |
| "source_type": source_type, | |
| "content": payload_content, | |
| "metadata": metadata | |
| } | |
| try: | |
| response = requests.post( | |
| f"{BACKEND_BASE_URL}/rag/ingest-document", | |
| json=payload, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=60 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return f"β Document ingested successfully.\n\n{data.get('message', '')}" | |
| return f"β Ingestion failed ({response.status_code}): {response.text}" | |
| except requests.exceptions.ConnectionError: | |
| return "β Could not reach the backend. Make sure the FastAPI server is running." | |
| except requests.exceptions.Timeout: | |
| return "β±οΈ The ingestion request timed out. Please try again." | |
| except Exception as exc: | |
| return f"β Unexpected error during ingestion: {exc}" | |
| def ingest_file(tenant_id: str, file_obj): | |
| if not tenant_id or not tenant_id.strip(): | |
| return "β Tenant ID is required to ingest files." | |
| if file_obj is None: | |
| return "β Please select a file to upload." | |
| tenant_id = tenant_id.strip() | |
| try: | |
| file_path = Path(file_obj.name) | |
| with open(file_path, "rb") as f: | |
| file_bytes = f.read() | |
| files = { | |
| "file": (file_path.name, file_bytes, "application/octet-stream") | |
| } | |
| response = requests.post( | |
| f"{BACKEND_BASE_URL}/rag/ingest-file", | |
| files=files, | |
| headers={"x-tenant-id": tenant_id}, | |
| timeout=120 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return f"β File ingested successfully.\n\n{data.get('message', '')}" | |
| return f"β File ingestion failed ({response.status_code}): {response.text}" | |
| except FileNotFoundError: | |
| return "β Could not read the uploaded file." | |
| except requests.exceptions.ConnectionError: | |
| return "β Could not reach the backend. Make sure the FastAPI server is running." | |
| except requests.exceptions.Timeout: | |
| return "β±οΈ File ingestion timed out. Please try again." | |
| except Exception as exc: | |
| return f"β Unexpected error during file ingestion: {exc}" | |
| def _format_rules_table(rules: list[str]) -> list[list]: | |
| return [[idx + 1, rule] for idx, rule in enumerate(rules)] | |
| def fetch_admin_rules(tenant_id: str) -> tuple[str, list[list]]: | |
| if not tenant_id or not tenant_id.strip(): | |
| return "β Tenant ID is required.", [] | |
| tenant_id = tenant_id.strip() | |
| try: | |
| response = requests.get( | |
| f"{BACKEND_BASE_URL}/admin/rules", | |
| headers={"x-tenant-id": tenant_id}, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| rules = response.json().get("rules", []) | |
| if not rules: | |
| return "β No admin rules have been configured yet.", [] | |
| summary = f"### Current Rules ({len(rules)})" | |
| return summary, _format_rules_table(rules) | |
| return f"β Error {response.status_code}: {response.text}", [] | |
| except requests.exceptions.ConnectionError: | |
| return "β Could not reach backend. Ensure the FastAPI server is running.", [] | |
| except requests.exceptions.Timeout: | |
| return "β±οΈ Request timed out. Please try again.", [] | |
| except Exception as exc: | |
| return f"β Unexpected error: {exc}", [] | |
| def add_admin_rules(tenant_id: str, rules_text: str) -> str: | |
| if not tenant_id or not tenant_id.strip(): | |
| return "β Tenant ID is required." | |
| if not rules_text or not rules_text.strip(): | |
| return "β Provide at least one rule to upload." | |
| tenant_id = tenant_id.strip() | |
| rules = [rule.strip() for rule in rules_text.splitlines() if rule.strip()] | |
| if not rules: | |
| return "β No valid rules detected." | |
| added = [] | |
| errors = [] | |
| for rule in rules: | |
| try: | |
| resp = requests.post( | |
| f"{BACKEND_BASE_URL}/admin/rules", | |
| params={"rule": rule}, | |
| headers={"x-tenant-id": tenant_id}, | |
| timeout=15 | |
| ) | |
| if resp.status_code == 200: | |
| added.append(rule) | |
| else: | |
| errors.append(f"{rule} -> {resp.status_code}: {resp.text}") | |
| except Exception as exc: | |
| errors.append(f"{rule} -> {exc}") | |
| summary = [] | |
| if added: | |
| summary.append(f"β Added {len(added)} rule(s):\n" + "\n".join([f"- {r}" for r in added])) | |
| if errors: | |
| summary.append("β οΈ Errors:\n" + "\n".join(errors)) | |
| return "\n\n".join(summary) if summary else "No rules were added." | |
| def delete_admin_rule(tenant_id: str, rule: str) -> str: | |
| if not tenant_id or not tenant_id.strip(): | |
| return "β Tenant ID is required." | |
| if not rule or not rule.strip(): | |
| return "β Provide the exact rule text to delete." | |
| tenant_id = tenant_id.strip() | |
| rule = rule.strip() | |
| try: | |
| resp = requests.delete( | |
| f"{BACKEND_BASE_URL}/admin/rules/{rule}", | |
| headers={"x-tenant-id": tenant_id}, | |
| timeout=15 | |
| ) | |
| if resp.status_code == 200: | |
| return f"ποΈ Deleted rule: {rule}" | |
| return f"β Error {resp.status_code}: {resp.text}" | |
| except requests.exceptions.ConnectionError: | |
| return "β Could not reach backend. Ensure the FastAPI server is running." | |
| except requests.exceptions.Timeout: | |
| return "β±οΈ Delete request timed out. Please try again." | |
| except Exception as exc: | |
| return f"β Unexpected error: {exc}" | |
| def add_rules_and_refresh(tenant_id: str, rules_text: str): | |
| status = add_admin_rules(tenant_id, rules_text) | |
| summary, rows = fetch_admin_rules(tenant_id) | |
| return status, summary, rows | |
| def delete_rule_and_refresh(tenant_id: str, rule: str): | |
| status = delete_admin_rule(tenant_id, rule) | |
| summary, rows = fetch_admin_rules(tenant_id) | |
| return status, summary, rows | |
| def fetch_admin_analytics(tenant_id: str) -> str: | |
| if not tenant_id or not tenant_id.strip(): | |
| return "β Tenant ID is required to view analytics." | |
| tenant_id = tenant_id.strip() | |
| headers = {"x-tenant-id": tenant_id} | |
| sections = [] | |
| endpoints = [ | |
| ("Overview", "/analytics/overview"), | |
| ("Tool Usage", "/analytics/tool-usage"), | |
| ("Red Flags", "/analytics/redflags"), | |
| ("Activity", "/analytics/activity"), | |
| ] | |
| for label, path in endpoints: | |
| try: | |
| resp = requests.get( | |
| f"{BACKEND_BASE_URL}{path}", | |
| headers=headers, | |
| timeout=30 | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| pretty = json.dumps(data, indent=2) | |
| sections.append(f"### {label}\n```json\n{pretty}\n```") | |
| else: | |
| sections.append(f"### {label}\nβ Error {resp.status_code}: {resp.text}") | |
| except requests.exceptions.ConnectionError: | |
| sections.append(f"### {label}\nβ Could not reach backend. Is the FastAPI server running?") | |
| except requests.exceptions.Timeout: | |
| sections.append(f"### {label}\nβ±οΈ Request timed out. Please try again.") | |
| except Exception as exc: | |
| sections.append(f"### {label}\nβ Unexpected error: {exc}") | |
| return "\n\n".join(sections) if sections else "No analytics available." | |
| # Create Gradio interface | |
| with gr.Blocks(title="IntegraChat β MCP Autonomous Agent", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| """ | |
| # π€ IntegraChat β MCP Autonomous Agent | |
| **Enterprise-grade AI with autonomous agents, secure multi-tenant RAG, real-time web search, and governance.** | |
| Enter your Tenant ID to chat with the MCP-powered agent or ingest documents into the enterprise knowledge base. | |
| """ | |
| ) | |
| tenant_id_input = gr.Textbox( | |
| label="Tenant ID", | |
| placeholder="Enter your tenant ID (e.g., tenant123)", | |
| value="", | |
| interactive=True | |
| ) | |
| with gr.Tabs(): | |
| with gr.Tab("Chat"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot( | |
| label="Chat with Agent", | |
| height=500, | |
| show_label=True, | |
| container=True, | |
| type="messages" | |
| ) | |
| with gr.Row(): | |
| message_input = gr.Textbox( | |
| label="Message", | |
| placeholder="Type your message here...", | |
| scale=4, | |
| show_label=False, | |
| container=False | |
| ) | |
| send_button = gr.Button("Send", variant="primary", scale=1) | |
| with gr.Column(scale=1): | |
| gr.Markdown( | |
| """ | |
| ### π Chat Instructions | |
| 1. Enter your **Tenant ID** above | |
| 2. Ask a question or give a task to the agent | |
| 3. The MCP agent will automatically select tools (RAG, Web, etc.) | |
| ### βοΈ Backend Configuration | |
| The agent connects to the FastAPI backend at `http://localhost:8000/agent/message` | |
| """ | |
| ) | |
| # Event handlers for chat tab | |
| def send_message(message, tenant_id, history): | |
| updated_history = chat_with_agent(message, tenant_id, history) | |
| return updated_history, "" # Clear message input after sending | |
| send_button.click( | |
| fn=send_message, | |
| inputs=[message_input, tenant_id_input, chatbot], | |
| outputs=[chatbot, message_input] | |
| ) | |
| message_input.submit( | |
| fn=send_message, | |
| inputs=[message_input, tenant_id_input, chatbot], | |
| outputs=[chatbot, message_input] | |
| ) | |
| with gr.Tab("Document Ingestion"): | |
| gr.Markdown( | |
| """ | |
| ### π Knowledge Base Ingestion | |
| Ingest documents so the MCP agent can reference tenant-private knowledge. | |
| - **Raw text / URLs:** Use the fields below. | |
| - **Files (PDF, DOCX, TXT, MD):** Use the file upload section. | |
| """ | |
| ) | |
| ingestion_mode = gr.Radio( | |
| ["Raw Text", "URL", "File Upload"], | |
| value="Raw Text", | |
| label="Select Ingestion Mode" | |
| ) | |
| with gr.Row(): | |
| doc_filename = gr.Textbox(label="Filename (optional)") | |
| doc_id = gr.Textbox(label="Document ID (optional)") | |
| document_url = gr.Textbox( | |
| label="Document URL (for URL ingestion)", | |
| placeholder="https://example.com/policy", | |
| visible=False | |
| ) | |
| doc_content = gr.Textbox( | |
| label="Content / Notes", | |
| placeholder="Paste the document text here...", | |
| lines=8, | |
| visible=True | |
| ) | |
| metadata_json = gr.Textbox( | |
| label="Additional Metadata (JSON)", | |
| placeholder='{"department": "HR", "tags": ["policy", "benefits"]}' | |
| ) | |
| ingest_doc_button = gr.Button("Ingest Text / URL Document", variant="primary") | |
| document_status = gr.Markdown("") | |
| def handle_ingest_document( | |
| tenant_id, | |
| mode, | |
| content, | |
| doc_url, | |
| filename, | |
| doc_id_value, | |
| metadata | |
| ): | |
| source_type = "raw_text" if mode == "Raw Text" else "url" | |
| return ingest_document( | |
| tenant_id=tenant_id, | |
| source_type=source_type, | |
| content=content, | |
| document_url=doc_url, | |
| filename=filename, | |
| doc_id=doc_id_value, | |
| metadata_json=metadata | |
| ) | |
| ingest_doc_button.click( | |
| fn=handle_ingest_document, | |
| inputs=[ | |
| tenant_id_input, | |
| ingestion_mode, | |
| doc_content, | |
| document_url, | |
| doc_filename, | |
| doc_id, | |
| metadata_json | |
| ], | |
| outputs=document_status | |
| ) | |
| file_section = gr.Markdown("#### π File Upload (PDF, DOCX, TXT, Markdown)", visible=False) | |
| file_upload = gr.File( | |
| label="Upload File", | |
| file_types=[".pdf", ".docx", ".txt", ".md", ".markdown"], | |
| visible=False | |
| ) | |
| ingest_file_button = gr.Button("Upload & Ingest File", visible=False) | |
| def handle_file_ingestion(tenant_id, file_obj): | |
| return ingest_file(tenant_id, file_obj) | |
| ingest_file_button.click( | |
| fn=handle_file_ingestion, | |
| inputs=[tenant_id_input, file_upload], | |
| outputs=document_status | |
| ) | |
| def toggle_source_fields(mode): | |
| show_text = mode == "Raw Text" | |
| show_url = mode == "URL" | |
| show_file = mode == "File Upload" | |
| return ( | |
| gr.update(visible=show_text), | |
| gr.update(visible=show_url), | |
| gr.update(visible=not show_file), | |
| gr.update(visible=not show_file), | |
| gr.update(visible=not show_file), | |
| gr.update(visible=show_file), | |
| gr.update(visible=show_file), | |
| gr.update(visible=show_file), | |
| ) | |
| ingestion_mode.change( | |
| fn=toggle_source_fields, | |
| inputs=[ingestion_mode], | |
| outputs=[ | |
| doc_content, | |
| document_url, | |
| doc_filename, | |
| doc_id, | |
| ingest_doc_button, | |
| file_section, | |
| file_upload, | |
| ingest_file_button, | |
| ] | |
| ) | |
| with gr.Tab("Admin Analytics"): | |
| gr.Markdown( | |
| """ | |
| ### π Admin Analytics | |
| Review tenant-level analytics generated by the IntegraChat backend. | |
| - **Overview:** Total queries, active users, red-flag count. | |
| - **Tool Usage:** How often RAG, Web, and Admin tools are invoked. | |
| - **Red Flags:** Recent governance events for this tenant. | |
| - **Activity:** Summary of tenant activity metrics. | |
| """ | |
| ) | |
| analytics_refresh = gr.Button("Fetch Analytics Snapshot", variant="primary") | |
| analytics_output = gr.Markdown("π Click the button to load analytics for the current tenant.") | |
| analytics_refresh.click( | |
| fn=fetch_admin_analytics, | |
| inputs=[tenant_id_input], | |
| outputs=analytics_output | |
| ) | |
| with gr.Tab("Admin Rules & Compliance"): | |
| gr.Markdown( | |
| """ | |
| ### π‘οΈ Admin Rules & Regulations | |
| Upload or manage tenant-specific governance rules (red-flag patterns, compliance policies, etc.). | |
| - Enter one rule per line to upload multiple at once. | |
| - Use the delete box to remove an exact rule. | |
| - Refresh anytime to view the latest rule set. | |
| """ | |
| ) | |
| rules_summary = gr.Markdown("π Click **Refresh Rules** to see existing entries.") | |
| rules_table = gr.Dataframe( | |
| headers=["#", "Rule"], | |
| datatype=["number", "str"], | |
| interactive=False, | |
| value=[] | |
| ) | |
| rules_status = gr.Markdown("") | |
| with gr.Row(): | |
| refresh_rules_button = gr.Button("Refresh Rules", variant="secondary") | |
| gr.Markdown("") | |
| rules_input = gr.Textbox( | |
| label="Rules / Regulations", | |
| placeholder="Enter one rule per line...", | |
| lines=6 | |
| ) | |
| upload_rules_button = gr.Button("Upload / Append Rules", variant="primary") | |
| delete_rule_input = gr.Textbox( | |
| label="Delete Rule", | |
| placeholder="Enter the exact rule text to remove..." | |
| ) | |
| delete_rule_button = gr.Button("Delete Rule", variant="stop") | |
| refresh_rules_button.click( | |
| fn=fetch_admin_rules, | |
| inputs=[tenant_id_input], | |
| outputs=[rules_summary, rules_table] | |
| ) | |
| upload_rules_button.click( | |
| fn=add_rules_and_refresh, | |
| inputs=[tenant_id_input, rules_input], | |
| outputs=[rules_status, rules_summary, rules_table] | |
| ) | |
| delete_rule_button.click( | |
| fn=delete_rule_and_refresh, | |
| inputs=[tenant_id_input, delete_rule_input], | |
| outputs=[rules_status, rules_summary, rules_table] | |
| ) | |
| gr.Markdown( | |
| """ | |
| --- | |
| **Built with [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) for the MCP Hackathon** | |
| """ | |
| ) | |
| if __name__ == "__main__": | |
| import os | |
| # For Hugging Face Spaces, bind to 0.0.0.0; for local dev, use 127.0.0.1 | |
| # HF Spaces sets SPACE_ID environment variable | |
| server_name = "0.0.0.0" if os.getenv("SPACE_ID") else "127.0.0.1" | |
| demo.launch( | |
| server_name=server_name, | |
| server_port=7860, | |
| share=False | |
| ) | |