mcps / apps /app01.py
geqintan's picture
update
36a9cae
import asyncio
from fastapi import Request
from mcp.server.fastmcp import FastMCP
# 创建 MCP 应用实例
class App01:
def __init__(self):
self.mcp = FastMCP(name="app01", stateless_http=True)
self.register_endpoints()
def register_endpoints(self):
@self.mcp.resource("app01://info")
def app01_info() -> str:
"""Information about App01"""
return "This is App01"
@self.mcp.tool()
def app01_echo(message: str) -> str:
"""Echo a message in App01"""
return f"App01 Echo: {message}"
@self.mcp.tool()
def app01_add_integers(num1: int, num2: int) -> int:
"""Add two integers in App01"""
return num1 + num2
# async def sse_endpoint(self, request: Request):
# print("Entering App01 SSE endpoint")
# try:
# # 这里可以实现 app01 的 SSE 逻辑
# yield {"event": "message", "data": "Hello from App01"}
# await asyncio.sleep(1) # 示例:每秒发送一条消息
# yield {"event": "message", "data": "Another message from App01"}
# except Exception as e:
# print(f"Error in App01 SSE endpoint: {e}")
# # Optionally yield an error event to the client
# # yield {"event": "error", "data": str(e)}
# print("Exiting App01 SSE endpoint")