Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import logging | |
| import os | |
| from contextlib import asynccontextmanager | |
| from typing import Awaitable, Callable, Dict, Optional | |
| from fastapi import FastAPI, Query | |
| import uvicorn | |
| from backend.mcp_server.admin.rules import admin_add_rule, admin_delete_rule, admin_get_rules | |
| from backend.mcp_server.admin.violations import log_violation as admin_log_violation | |
| from backend.mcp_server.rag.delete import rag_delete | |
| from backend.mcp_server.rag.ingest import rag_ingest | |
| from backend.mcp_server.rag.list import rag_list | |
| from backend.mcp_server.rag.search import rag_search | |
| from backend.mcp_server.web.search import web_search | |
| ToolHandler = Callable[[Dict], Awaitable[Dict] | Dict] | |
| logger = logging.getLogger("integrachat.mcp.server") | |
| if not logger.handlers: | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter( | |
| "[%(asctime)s] %(levelname)s %(name)s - %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.INFO) | |
| async def lifespan(app: FastAPI): | |
| """Lifespan context manager for startup and shutdown events.""" | |
| # Startup | |
| try: | |
| routes = [] | |
| for route in app.routes: | |
| if hasattr(route, "path") and hasattr(route, "methods"): | |
| routes.append(f"{', '.join(route.methods)} {route.path}") | |
| logger.info("Registered routes: %s", ", ".join(sorted(routes))) | |
| except Exception as e: | |
| logger.warning("Could not log routes during startup: %s", e) | |
| yield | |
| # Shutdown (if needed in the future) | |
| app = FastAPI(title="IntegraChat MCP", version="1.0.0", lifespan=lifespan) | |
| def _register_tool(tool_name: str, handler: ToolHandler) -> None: | |
| """ | |
| Register the given tool handler under both a namespaced route | |
| (/rag/search) and an optional root route (/search) so the server works | |
| whether clients point to /rag or directly to the namespace port. | |
| """ | |
| namespace, action = tool_name.split(".", 1) | |
| namespaced_path = f"/{namespace}/{action}" | |
| root_path = f"/{action}" | |
| async def namespaced_endpoint(payload: Dict) -> Dict: | |
| return await handler(payload) # type: ignore[arg-type] | |
| async def root_endpoint(payload: Dict) -> Dict: | |
| return await handler(payload) # type: ignore[arg-type] | |
| # Add GET endpoint support for /rag/list (register BEFORE POST to avoid conflicts) | |
| async def rag_list_get( | |
| tenant_id: str = Query(..., description="Tenant ID"), | |
| limit: Optional[int] = Query(1000, description="Maximum number of documents to return"), | |
| offset: Optional[int] = Query(0, description="Number of documents to skip") | |
| ) -> Dict: | |
| """GET endpoint for listing RAG documents.""" | |
| logger.info("GET /rag/list called with tenant_id=%s, limit=%s, offset=%s", tenant_id, limit, offset) | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "limit": limit, | |
| "offset": offset | |
| } | |
| result = await rag_list(payload) # type: ignore[arg-type] | |
| return result | |
| async def rag_list_get_root( | |
| tenant_id: str = Query(..., description="Tenant ID"), | |
| limit: Optional[int] = Query(1000, description="Maximum number of documents to return"), | |
| offset: Optional[int] = Query(0, description="Number of documents to skip") | |
| ) -> Dict: | |
| """GET endpoint for listing RAG documents (root path).""" | |
| logger.info("GET /list called with tenant_id=%s, limit=%s, offset=%s", tenant_id, limit, offset) | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "limit": limit, | |
| "offset": offset | |
| } | |
| result = await rag_list(payload) # type: ignore[arg-type] | |
| return result | |
| # Add DELETE endpoint support for /rag/delete/{document_id} | |
| async def rag_delete_document( | |
| document_id: int, | |
| tenant_id: str = Query(..., description="Tenant ID") | |
| ) -> Dict: | |
| """DELETE endpoint for deleting a specific document.""" | |
| try: | |
| logger.info("DELETE /rag/delete/%s called with tenant_id=%s", document_id, tenant_id) | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "document_id": document_id | |
| } | |
| result = await rag_delete(payload) # type: ignore[arg-type] | |
| logger.info("DELETE /rag/delete/%s result: %s", document_id, result) | |
| return result | |
| except Exception as e: | |
| logger.error("Error in DELETE /rag/delete/%s: %s", document_id, e, exc_info=True) | |
| raise | |
| async def rag_delete_document_root( | |
| document_id: int, | |
| tenant_id: str = Query(..., description="Tenant ID") | |
| ) -> Dict: | |
| """DELETE endpoint for deleting a specific document (root path).""" | |
| logger.info("DELETE /delete/%s called with tenant_id=%s", document_id, tenant_id) | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "document_id": document_id | |
| } | |
| result = await rag_delete(payload) # type: ignore[arg-type] | |
| return result | |
| # Add DELETE endpoint support for /rag/delete-all | |
| async def rag_delete_all( | |
| tenant_id: str = Query(..., description="Tenant ID") | |
| ) -> Dict: | |
| """DELETE endpoint for deleting all documents.""" | |
| try: | |
| logger.info("DELETE /rag/delete-all called with tenant_id=%s", tenant_id) | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "delete_all": True | |
| } | |
| result = await rag_delete(payload) # type: ignore[arg-type] | |
| return result | |
| except Exception as e: | |
| logger.error("Error in DELETE /rag/delete-all: %s", e, exc_info=True) | |
| raise | |
| async def rag_delete_all_root( | |
| tenant_id: str = Query(..., description="Tenant ID") | |
| ) -> Dict: | |
| """DELETE endpoint for deleting all documents (root path).""" | |
| try: | |
| logger.info("DELETE /delete-all called with tenant_id=%s", tenant_id) | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "delete_all": True | |
| } | |
| result = await rag_delete(payload) # type: ignore[arg-type] | |
| return result | |
| except Exception as e: | |
| logger.error("Error in DELETE /delete-all: %s", e, exc_info=True) | |
| raise | |
| _register_tool("rag.search", rag_search) | |
| _register_tool("rag.ingest", rag_ingest) | |
| _register_tool("rag.delete", rag_delete) | |
| _register_tool("rag.list", rag_list) | |
| _register_tool("web.search", web_search) | |
| _register_tool("admin.getRules", admin_get_rules) | |
| _register_tool("admin.addRule", admin_add_rule) | |
| _register_tool("admin.deleteRule", admin_delete_rule) | |
| _register_tool("admin.logViolation", admin_log_violation) | |
| async def health() -> Dict[str, str]: | |
| return {"status": "ok", "service": "mcp"} | |
| def main(): | |
| host = os.getenv("MCP_HOST", "0.0.0.0") | |
| port = int(os.getenv("MCP_PORT", "8001")) | |
| logger.info("Starting IntegraChat MCP HTTP server on %s:%s", host, port) | |
| uvicorn.run("backend.mcp_server.server:app", host=host, port=port) | |
| if __name__ == "__main__": | |
| main() | |