from __future__ import annotations import logging import os from contextlib import asynccontextmanager from typing import Awaitable, Callable, Dict, Optional from fastapi import FastAPI, Query import uvicorn from backend.mcp_server.admin.rules import admin_add_rule, admin_delete_rule, admin_get_rules from backend.mcp_server.admin.violations import log_violation as admin_log_violation from backend.mcp_server.rag.delete import rag_delete from backend.mcp_server.rag.ingest import rag_ingest from backend.mcp_server.rag.list import rag_list from backend.mcp_server.rag.search import rag_search from backend.mcp_server.web.search import web_search ToolHandler = Callable[[Dict], Awaitable[Dict] | Dict] logger = logging.getLogger("integrachat.mcp.server") if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter( "[%(asctime)s] %(levelname)s %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) print("formatter", formatter) handler.setFormatter(formatter) logger.addHandler(handler) print("logger", logger) logger.setLevel(logging.INFO) @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for startup and shutdown events.""" # Startup try: routes = [] for route in app.routes: if hasattr(route, "path") and hasattr(route, "methods"): routes.append(f"{', '.join(route.methods)} {route.path}") logger.info("Registered routes: %s", ", ".join(sorted(routes))) except Exception as e: logger.warning("Could not log routes during startup: %s", e) yield # Shutdown (if needed in the future) app = FastAPI(title="IntegraChat MCP", version="1.0.0", lifespan=lifespan) def _register_tool(tool_name: str, handler: ToolHandler) -> None: """ Register the given tool handler under both a namespaced route (/rag/search) and an optional root route (/search) so the server works whether clients point to /rag or directly to the namespace port. """ namespace, action = tool_name.split(".", 1) namespaced_path = f"/{namespace}/{action}" root_path = f"/{action}" @app.post(namespaced_path) async def namespaced_endpoint(payload: Dict) -> Dict: return await handler(payload) # type: ignore[arg-type] @app.post(root_path) async def root_endpoint(payload: Dict) -> Dict: return await handler(payload) # type: ignore[arg-type] # Add GET endpoint support for /rag/list (register BEFORE POST to avoid conflicts) @app.get("/rag/list") async def rag_list_get( tenant_id: str = Query(..., description="Tenant ID"), limit: Optional[int] = Query(1000, description="Maximum number of documents to return"), offset: Optional[int] = Query(0, description="Number of documents to skip") ) -> Dict: """GET endpoint for listing RAG documents.""" logger.info("GET /rag/list called with tenant_id=%s, limit=%s, offset=%s", tenant_id, limit, offset) payload = { "tenant_id": tenant_id, "limit": limit, "offset": offset } result = await rag_list(payload) # type: ignore[arg-type] return result @app.get("/list") async def rag_list_get_root( tenant_id: str = Query(..., description="Tenant ID"), limit: Optional[int] = Query(1000, description="Maximum number of documents to return"), offset: Optional[int] = Query(0, description="Number of documents to skip") ) -> Dict: """GET endpoint for listing RAG documents (root path).""" logger.info("GET /list called with tenant_id=%s, limit=%s, offset=%s", tenant_id, limit, offset) payload = { "tenant_id": tenant_id, "limit": limit, "offset": offset } result = await rag_list(payload) # type: ignore[arg-type] return result # Add DELETE endpoint support for /rag/delete/{document_id} @app.delete("/rag/delete/{document_id}") async def rag_delete_document( document_id: int, tenant_id: str = Query(..., description="Tenant ID") ) -> Dict: """DELETE endpoint for deleting a specific document.""" try: logger.info("DELETE /rag/delete/%s called with tenant_id=%s", document_id, tenant_id) payload = { "tenant_id": tenant_id, "document_id": document_id } result = await rag_delete(payload) # type: ignore[arg-type] logger.info("DELETE /rag/delete/%s result: %s", document_id, result) return result except Exception as e: logger.error("Error in DELETE /rag/delete/%s: %s", document_id, e, exc_info=True) raise @app.delete("/delete/{document_id}") async def rag_delete_document_root( document_id: int, tenant_id: str = Query(..., description="Tenant ID") ) -> Dict: """DELETE endpoint for deleting a specific document (root path).""" logger.info("DELETE /delete/%s called with tenant_id=%s", document_id, tenant_id) payload = { "tenant_id": tenant_id, "document_id": document_id } result = await rag_delete(payload) # type: ignore[arg-type] return result # Add DELETE endpoint support for /rag/delete-all @app.delete("/rag/delete-all") async def rag_delete_all( tenant_id: str = Query(..., description="Tenant ID") ) -> Dict: """DELETE endpoint for deleting all documents.""" try: logger.info("DELETE /rag/delete-all called with tenant_id=%s", tenant_id) payload = { "tenant_id": tenant_id, "delete_all": True } result = await rag_delete(payload) # type: ignore[arg-type] return result except Exception as e: logger.error("Error in DELETE /rag/delete-all: %s", e, exc_info=True) raise @app.delete("/delete-all") async def rag_delete_all_root( tenant_id: str = Query(..., description="Tenant ID") ) -> Dict: """DELETE endpoint for deleting all documents (root path).""" try: logger.info("DELETE /delete-all called with tenant_id=%s", tenant_id) payload = { "tenant_id": tenant_id, "delete_all": True } result = await rag_delete(payload) # type: ignore[arg-type] return result except Exception as e: logger.error("Error in DELETE /delete-all: %s", e, exc_info=True) raise _register_tool("rag.search", rag_search) _register_tool("rag.ingest", rag_ingest) _register_tool("rag.delete", rag_delete) _register_tool("rag.list", rag_list) _register_tool("web.search", web_search) _register_tool("admin.getRules", admin_get_rules) _register_tool("admin.addRule", admin_add_rule) _register_tool("admin.deleteRule", admin_delete_rule) _register_tool("admin.logViolation", admin_log_violation) @app.get("/health") async def health() -> Dict[str, str]: return {"status": "ok", "service": "mcp"} def main(): host = os.getenv("MCP_HOST", "0.0.0.0") port = int(os.getenv("MCP_PORT", "8001")) logger.info("Starting IntegraChat MCP HTTP server on %s:%s", host, port) uvicorn.run("backend.mcp_server.server:app", host=host, port=port) if __name__ == "__main__": main()