mcps / apps /app02.py
geqintan's picture
update
36a9cae
import asyncio
from fastapi import Request
from mcp.server.fastmcp import FastMCP
# 创建 MCP 应用实例
class App02:
def __init__(self):
self.mcp = FastMCP(name="app02", stateless_http=True)
self.register_endpoints()
def register_endpoints(self):
@self.mcp.resource("app02://status")
def app02_status() -> str:
"""Status of App02"""
return "App02 is running"
@self.mcp.tool()
def app02_process(data: str) -> str:
"""Process data in App02"""
return f"App02 Processing: {data}"
@self.mcp.tool()
def app02_multiply_integers(num1: int, num2: int) -> int:
"""Multiply two integers in App02"""
return num1 * num2
# async def sse_endpoint(self, request: Request):
# print("Entering App02 SSE endpoint")
# try:
# # 这里可以实现 app02 的 SSE 逻辑
# yield {"event": "message", "data": "Hello from App02"}
# await asyncio.sleep(2) # 示例:每2秒发送一条消息
# yield {"event": "message", "data": "Another message from App02"}
# except Exception as e:
# print(f"Error in App02 SSE endpoint: {e}")
# # Optionally yield an error event to the client
# # yield {"event": "error", "data": str(e)}
# print("Exiting App02 SSE endpoint")