import os import asyncio import sys import json from fastapi import FastAPI, Request from sse_starlette.sse import EventSourceResponse from fastapi.responses import JSONResponse app = FastAPI() # 获取环境变量(在 Space Settings 中设置的那些) # 注意:我们直接从环境读取,不需要在这里硬编码 Token ENV_VARS = os.environ.copy() # 设置 MCP 进程的运行命令 # 对应原本的: uvx --from paddleocr-mcp paddleocr_mcp MCP_COMMAND = ["python", "-m", "paddleocr_mcp"] # 存储活跃的进程 (简单的会话管理) # 注意:这是一个简化的实现,适合单人或小团队使用 active_processes = {} async def run_mcp_session(request: Request): """ 为每个连接启动一个独立的 paddleocr-mcp 子进程 并将其输出流式传输回客户端 (SSE) """ process = await asyncio.create_subprocess_exec( *MCP_COMMAND, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=sys.stderr, # 将错误日志打印到控制台 env=ENV_VARS ) # 使用 client_id 或简单的标记来存储进程,以便 POST 请求能找到它 # 在这个简单版本中,我们假设主要通过 SSE 接收输出, # 实际的 MCP SSE 协议通常需要 session_id,这里为了简化演示通用性: # 我们将最近的一个进程存为全局,这在单用户场景下是可行的。 # ⚠️ 生产环境建议使用更复杂的 Session 管理。 global active_processes active_processes["default"] = process async def event_generator(): try: while True: if await request.is_disconnected(): break # 读取子进程的一行输出 line = await process.stdout.readline() if not line: break # 解码并发送 SSE 事件 data = line.decode().strip() if data: yield { "event": "message", "data": data } except Exception as e: print(f"Error in event stream: {e}") finally: # 清理进程 if process.returncode is None: process.terminate() await process.wait() return EventSourceResponse(event_generator()) @app.get("/sse") async def handle_sse(request: Request): return await run_mcp_session(request) @app.post("/messages") async def handle_messages(request: Request): """ 接收 Claude 发来的指令 (JSON-RPC),写入子进程的 stdin """ if "default" not in active_processes or active_processes["default"].returncode is not None: return JSONResponse(status_code=400, content={"error": "No active MCP session"}) process = active_processes["default"] try: body = await request.json() # 将 JSON 转为字符串并写入子进程,必须加换行符 message_str = json.dumps(body) + "\n" process.stdin.write(message_str.encode()) await process.stdin.drain() return JSONResponse(content={"status": "accepted"}) except Exception as e: return JSONResponse(status_code=500, content={"error": str(e)}) @app.get("/") def health_check(): return {"status": "paddleocr-mcp-server is running"}