AlexTransformer's picture
Create server.py
bfdead1 verified
import os
import asyncio
import sys
import json
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
from fastapi.responses import JSONResponse
app = FastAPI()
# 获取环境变量(在 Space Settings 中设置的那些)
# 注意:我们直接从环境读取,不需要在这里硬编码 Token
ENV_VARS = os.environ.copy()
# 设置 MCP 进程的运行命令
# 对应原本的: uvx --from paddleocr-mcp paddleocr_mcp
MCP_COMMAND = ["python", "-m", "paddleocr_mcp"]
# 存储活跃的进程 (简单的会话管理)
# 注意:这是一个简化的实现,适合单人或小团队使用
active_processes = {}
async def run_mcp_session(request: Request):
"""
为每个连接启动一个独立的 paddleocr-mcp 子进程
并将其输出流式传输回客户端 (SSE)
"""
process = await asyncio.create_subprocess_exec(
*MCP_COMMAND,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=sys.stderr, # 将错误日志打印到控制台
env=ENV_VARS
)
# 使用 client_id 或简单的标记来存储进程,以便 POST 请求能找到它
# 在这个简单版本中,我们假设主要通过 SSE 接收输出,
# 实际的 MCP SSE 协议通常需要 session_id,这里为了简化演示通用性:
# 我们将最近的一个进程存为全局,这在单用户场景下是可行的。
# ⚠️ 生产环境建议使用更复杂的 Session 管理。
global active_processes
active_processes["default"] = process
async def event_generator():
try:
while True:
if await request.is_disconnected():
break
# 读取子进程的一行输出
line = await process.stdout.readline()
if not line:
break
# 解码并发送 SSE 事件
data = line.decode().strip()
if data:
yield {
"event": "message",
"data": data
}
except Exception as e:
print(f"Error in event stream: {e}")
finally:
# 清理进程
if process.returncode is None:
process.terminate()
await process.wait()
return EventSourceResponse(event_generator())
@app.get("/sse")
async def handle_sse(request: Request):
return await run_mcp_session(request)
@app.post("/messages")
async def handle_messages(request: Request):
"""
接收 Claude 发来的指令 (JSON-RPC),写入子进程的 stdin
"""
if "default" not in active_processes or active_processes["default"].returncode is not None:
return JSONResponse(status_code=400, content={"error": "No active MCP session"})
process = active_processes["default"]
try:
body = await request.json()
# 将 JSON 转为字符串并写入子进程,必须加换行符
message_str = json.dumps(body) + "\n"
process.stdin.write(message_str.encode())
await process.stdin.drain()
return JSONResponse(content={"status": "accepted"})
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@app.get("/")
def health_check():
return {"status": "paddleocr-mcp-server is running"}