| import asyncio |
| import httpx |
| from fastapi import FastAPI, Response, Request |
| from fastapi.responses import StreamingResponse |
| import subprocess |
| import os |
| import time |
| from collections import deque |
|
|
| app = FastAPI() |
|
|
| |
| ERROR_THRESHOLD = 5 |
| ERROR_WINDOW_SECONDS = 60 |
| CIRCUIT_BREAKER_OPEN_SECONDS = 300 |
|
|
| error_timestamps = deque() |
| circuit_breaker_open_until = 0 |
|
|
| async def run_mcp_server(): |
| |
| os.chdir("/app/notion-mcp-server") |
|
|
| |
| auth_token = os.environ.get("AUTH_TOKEN") |
| command = f"node dist/index.js --transport http --port 7861" |
| env = os.environ.copy() |
| if auth_token: |
| env["AUTH_TOKEN"] = auth_token |
|
|
| print(f"Starting Notion MCP server with command: {command}") |
| process = await asyncio.create_subprocess_shell( |
| command, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE, |
| env=env |
| ) |
| print(f"Notion MCP server started with PID: {process.pid}") |
|
|
| |
| async def read_stream(stream, name): |
| while True: |
| line = await stream.readline() |
| if not line: |
| break |
| print(f"[MCP Server {name}]: {line.decode().strip()}") |
|
|
| asyncio.create_task(read_stream(process.stdout, "stdout")) |
| asyncio.create_task(read_stream(process.stderr, "stderr")) |
|
|
| |
| os.chdir("/app") |
|
|
| @app.api_route("/health", methods=["GET"]) |
| async def proxy_health(request: Request): |
| global circuit_breaker_open_until |
| global error_timestamps |
|
|
| |
| if time.time() < circuit_breaker_open_until: |
| return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503) |
|
|
| async with httpx.AsyncClient() as client: |
| url = "http://localhost:7861/health" |
| headers = {key: value for key, value in request.headers.items() if key.lower() not in ('host',)} |
| |
| try: |
| response = await client.request( |
| method=request.method, |
| url=url, |
| headers=headers, |
| params=request.query_params, |
| content=await request.body(), |
| ) |
| |
| error_timestamps.clear() |
| return Response(content=response.content, status_code=response.status_code, headers=dict(response.headers)) |
| except httpx.RequestError as e: |
| print(f"HTTPX Request Error in proxy_health: {e}") |
| error_timestamps.append(time.time()) |
| |
| |
| while error_timestamps and error_timestamps[0] < time.time() - ERROR_WINDOW_SECONDS: |
| error_timestamps.popleft() |
| |
| if len(error_timestamps) >= ERROR_THRESHOLD: |
| circuit_breaker_open_until = time.time() + CIRCUIT_BREAKER_OPEN_SECONDS |
| print(f"Circuit breaker opened until {time.ctime(circuit_breaker_open_until)}") |
| return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503) |
| |
| return Response(content=f"Proxy error: {e}", status_code=500) |
|
|
| @app.api_route("/mcp", methods=["GET", "POST", "DELETE"]) |
| async def proxy_mcp(request: Request): |
| global circuit_breaker_open_until |
| global error_timestamps |
|
|
| |
| if time.time() < circuit_breaker_open_until: |
| return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503) |
|
|
| async with httpx.AsyncClient() as client: |
| url = "http://localhost:7861/mcp" |
| headers = {key: value for key, value in request.headers.items() if key.lower() not in ('host',)} |
| |
| try: |
| response = await client.request( |
| method=request.method, |
| url=url, |
| headers=headers, |
| params=request.query_params, |
| content=await request.body(), |
| ) |
| |
| error_timestamps.clear() |
| return Response(content=response.content, status_code=response.status_code, headers=dict(response.headers)) |
| except httpx.RequestError as e: |
| print(f"HTTPX Request Error in proxy_mcp: {e}") |
| error_timestamps.append(time.time()) |
| |
| |
| while error_timestamps and error_timestamps[0] < time.time() - ERROR_WINDOW_SECONDS: |
| error_timestamps.popleft() |
| |
| if len(error_timestamps) >= ERROR_THRESHOLD: |
| circuit_breaker_open_until = time.time() + CIRCUIT_BREAKER_OPEN_SECONDS |
| print(f"Circuit breaker opened until {time.ctime(circuit_breaker_open_until)}") |
| return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503) |
| |
| return Response(content=f"Proxy error: {e}", status_code=500) |
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| asyncio.create_task(run_mcp_server()) |
|
|
| @app.get("/") |
| async def root(): |
| return {"message": "Notion MCP Keyed is running"} |