notion / app.py
Gemini
fix: Pass Notion MCP server auth token as environment variable to subprocess
4c1c4ca
import asyncio
import httpx
from fastapi import FastAPI, Response, Request
from fastapi.responses import StreamingResponse
import subprocess
import os
import time
from collections import deque
app = FastAPI()
# Circuit breaker settings
ERROR_THRESHOLD = 5
ERROR_WINDOW_SECONDS = 60
CIRCUIT_BREAKER_OPEN_SECONDS = 300 # 5 minutes
error_timestamps = deque()
circuit_breaker_open_until = 0
async def run_mcp_server():
# Change to the Node.js project directory
os.chdir("/app/notion-mcp-server")
# Start the Notion MCP server
auth_token = os.environ.get("AUTH_TOKEN")
command = f"node dist/index.js --transport http --port 7861"
env = os.environ.copy()
if auth_token:
env["AUTH_TOKEN"] = auth_token
print(f"Starting Notion MCP server with command: {command}")
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
print(f"Notion MCP server started with PID: {process.pid}")
# Continuously read stdout and stderr from the MCP server process
async def read_stream(stream, name):
while True:
line = await stream.readline()
if not line:
break
print(f"[MCP Server {name}]: {line.decode().strip()}")
asyncio.create_task(read_stream(process.stdout, "stdout"))
asyncio.create_task(read_stream(process.stderr, "stderr"))
# Change back to the original working directory (optional, but good practice)
os.chdir("/app")
@app.api_route("/health", methods=["GET"])
async def proxy_health(request: Request):
global circuit_breaker_open_until
global error_timestamps
# Check if circuit breaker is open
if time.time() < circuit_breaker_open_until:
return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503)
async with httpx.AsyncClient() as client:
url = "http://localhost:7861/health"
headers = {key: value for key, value in request.headers.items() if key.lower() not in ('host',)}
try:
response = await client.request(
method=request.method,
url=url,
headers=headers,
params=request.query_params,
content=await request.body(),
)
# If successful, clear old errors
error_timestamps.clear()
return Response(content=response.content, status_code=response.status_code, headers=dict(response.headers))
except httpx.RequestError as e:
print(f"HTTPX Request Error in proxy_health: {e}")
error_timestamps.append(time.time())
# Clean up old timestamps
while error_timestamps and error_timestamps[0] < time.time() - ERROR_WINDOW_SECONDS:
error_timestamps.popleft()
if len(error_timestamps) >= ERROR_THRESHOLD:
circuit_breaker_open_until = time.time() + CIRCUIT_BREAKER_OPEN_SECONDS
print(f"Circuit breaker opened until {time.ctime(circuit_breaker_open_until)}")
return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503)
return Response(content=f"Proxy error: {e}", status_code=500)
@app.api_route("/mcp", methods=["GET", "POST", "DELETE"])
async def proxy_mcp(request: Request):
global circuit_breaker_open_until
global error_timestamps
# Check if circuit breaker is open
if time.time() < circuit_breaker_open_until:
return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503)
async with httpx.AsyncClient() as client:
url = "http://localhost:7861/mcp"
headers = {key: value for key, value in request.headers.items() if key.lower() not in ('host',)}
try:
response = await client.request(
method=request.method,
url=url,
headers=headers,
params=request.query_params,
content=await request.body(),
)
# If successful, clear old errors
error_timestamps.clear()
return Response(content=response.content, status_code=response.status_code, headers=dict(response.headers))
except httpx.RequestError as e:
print(f"HTTPX Request Error in proxy_mcp: {e}")
error_timestamps.append(time.time())
# Clean up old timestamps
while error_timestamps and error_timestamps[0] < time.time() - ERROR_WINDOW_SECONDS:
error_timestamps.popleft()
if len(error_timestamps) >= ERROR_THRESHOLD:
circuit_breaker_open_until = time.time() + CIRCUIT_BREAKER_OPEN_SECONDS
print(f"Circuit breaker opened until {time.ctime(circuit_breaker_open_until)}")
return Response(content="Service Unavailable (Circuit Breaker Open)", status_code=503)
return Response(content=f"Proxy error: {e}", status_code=500)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(run_mcp_server())
@app.get("/")
async def root():
return {"message": "Notion MCP Keyed is running"}