import asyncio from fastapi import Request from mcp.server.fastmcp import FastMCP # 创建 MCP 应用实例 class App01: def __init__(self): self.mcp = FastMCP(name="app01", stateless_http=True) self.register_endpoints() def register_endpoints(self): @self.mcp.resource("app01://info") def app01_info() -> str: """Information about App01""" return "This is App01" @self.mcp.tool() def app01_echo(message: str) -> str: """Echo a message in App01""" return f"App01 Echo: {message}" @self.mcp.tool() def app01_add_integers(num1: int, num2: int) -> int: """Add two integers in App01""" return num1 + num2 # async def sse_endpoint(self, request: Request): # print("Entering App01 SSE endpoint") # try: # # 这里可以实现 app01 的 SSE 逻辑 # yield {"event": "message", "data": "Hello from App01"} # await asyncio.sleep(1) # 示例:每秒发送一条消息 # yield {"event": "message", "data": "Another message from App01"} # except Exception as e: # print(f"Error in App01 SSE endpoint: {e}") # # Optionally yield an error event to the client # # yield {"event": "error", "data": str(e)} # print("Exiting App01 SSE endpoint")