import asyncio import httpx from fastapi import FastAPI, Response, Request from fastapi.responses import StreamingResponse import subprocess import os import time from collections import deque app = FastAPI() # Circuit breaker settings ERROR_THRESHOLD = 5 ERROR_WINDOW_SECONDS = 60 CIRCUIT_BREAKER_OPEN_SECONDS = 300 # 5 minutes error_timestamps = deque() circuit_breaker_open_until = 0 async def run_mcp_server(): # Change to the Node.js project directory os.chdir("/app/notion-mcp-server") # Start the Notion MCP server auth_token = os.environ.get("AUTH_TOKEN") command = f"node dist/index.js --transport http --port 7861" env = os.environ.copy() if auth_token: env["AUTH_TOKEN"] = auth_token print(f"Starting Notion MCP server with command: {command}") process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) print(f"Notion MCP server started with PID: {process.pid}") # Continuously read stdout and stderr from the MCP server process async def read_stream(stream, name): while True: line = await stream.readline() if not line: break print(f"[MCP Server {name}]: {line.decode().strip()}") asyncio.create_task(read_stream(process.stdout, "stdout")) asyncio.create_task(read_stream(process.stderr, "stderr")) # Change back to the original working directory (optional, but good practice) os.chdir("/app") @app.api_route("/health", methods=["GET"]) async def proxy_health(request: Request): global circuit_breaker_open_until global error_timestamps # Check if circuit breaker is open if time.time() < circuit_breaker_open_until: return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503) async with httpx.AsyncClient() as client: url = "http://localhost:7861/health" headers = {key: value for key, value in request.headers.items() if key.lower() not in ('host',)} try: response = await client.request( method=request.method, url=url, headers=headers, params=request.query_params, content=await request.body(), ) # If successful, clear old errors error_timestamps.clear() return Response(content=response.content, status_code=response.status_code, headers=dict(response.headers)) except httpx.RequestError as e: print(f"HTTPX Request Error in proxy_health: {e}") error_timestamps.append(time.time()) # Clean up old timestamps while error_timestamps and error_timestamps[0] < time.time() - ERROR_WINDOW_SECONDS: error_timestamps.popleft() if len(error_timestamps) >= ERROR_THRESHOLD: circuit_breaker_open_until = time.time() + CIRCUIT_BREAKER_OPEN_SECONDS print(f"Circuit breaker opened until {time.ctime(circuit_breaker_open_until)}") return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503) return Response(content=f"Proxy error: {e}", status_code=500) @app.api_route("/mcp", methods=["GET", "POST", "DELETE"]) async def proxy_mcp(request: Request): global circuit_breaker_open_until global error_timestamps # Check if circuit breaker is open if time.time() < circuit_breaker_open_until: return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503) async with httpx.AsyncClient() as client: url = "http://localhost:7861/mcp" headers = {key: value for key, value in request.headers.items() if key.lower() not in ('host',)} try: response = await client.request( method=request.method, url=url, headers=headers, params=request.query_params, content=await request.body(), ) # If successful, clear old errors error_timestamps.clear() return Response(content=response.content, status_code=response.status_code, headers=dict(response.headers)) except httpx.RequestError as e: print(f"HTTPX Request Error in proxy_mcp: {e}") error_timestamps.append(time.time()) # Clean up old timestamps while error_timestamps and error_timestamps[0] < time.time() - ERROR_WINDOW_SECONDS: error_timestamps.popleft() if len(error_timestamps) >= ERROR_THRESHOLD: circuit_breaker_open_until = time.time() + CIRCUIT_BREAKER_OPEN_SECONDS print(f"Circuit breaker opened until {time.ctime(circuit_breaker_open_until)}") return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503) return Response(content=f"Proxy error: {e}", status_code=500) @app.on_event("startup") async def startup_event(): asyncio.create_task(run_mcp_server()) @app.get("/") async def root(): return {"message": "Notion MCP Keyed is running"}