import asyncio from fastapi import Request from mcp.server.fastmcp import FastMCP import os # 用于获取环境变量 from airs.base import Base # 导入 Base 类 from pydantic import BaseModel, Field # 用于定义数据模型 from typing import Optional # 定义 Pydantic 模型 class TaskCreate(BaseModel): title: str = Field(..., description="任务的标题(必填)") description: Optional[str] = Field(None, description="任务的详细描述(可选)") # 添加描述 due_date: Optional[str] = None # 可以是 'YYYY-MM-DD' 格式的字符串 status: Optional[str] = None priority: Optional[str] = None parent_task_id: Optional[str] = None # UUID 字符串,可选 class TaskUpdate(BaseModel): id: int = Field(..., description="任务的ID(必填)") title: Optional[str] = Field(None, description="任务的标题(可选)") description: Optional[str] = Field(None, description="任务的详细描述(可选)") due_date: Optional[str] = None status: Optional[str] = None priority: Optional[str] = None parent_task_id: Optional[str] = None # 创建 MCP 应用实例 class Tasks(Base): # Tasks 类继承自 Base 类 def __init__(self): super().__init__() # 初始化 Base 类 self.mcp = FastMCP(name="tasks", stateless_http=True) self.register_endpoints() # 登录测试 (暂时注释掉,排查超时问题) # test_user = os.getenv("TEST_USER") # test_passsword = os.getenv("TEST_PASSWORD") # print("尝试登录...") # login_result = self.sign_in_with_email(test_user, test_passsword) # if login_result["success"]: # # print("登录成功!用户信息:", login_result["user"]) # print("登录成功!") # else: # print("登录失败!错误信息:", login_result["error"]) def register_endpoints(self): @self.mcp.resource("tasks://info") def info() -> str: """Information about Tasks""" return "This is Airs Tasks" @self.mcp.tool() def echo(message: str) -> str: """Echo a message in Tasks""" return f"Tasks Echo: {message}" @self.mcp.tool() def app01_add_integers(num1: int, num2: int) -> int: """Add two integers in App01""" print(self.test()) # 调用 Base 类的 test 方法 return num1 + num2 @self.mcp.tool() def add_task(task: TaskCreate) -> dict: """ Add a new task to the database. 向数据库添加新任务 Args: task (TaskCreate): 任务数据对象,包含以下属性: - title (str): 任务标题 - description (str): 任务描述 - due_date (str): 截止日期(YYYY-MM-DD) """ try: # 直接使用 Supabase 客户端插入数据 data_to_insert = { 'title': task.title, 'description': task.description, 'parent_task_id': task.parent_task_id, 'due_date': task.due_date, 'status': task.status, 'priority': task.priority } response = self.supabase.table('tasks').insert(data_to_insert).execute() # 检查 Supabase 响应 if response.data: return {"success": True, "task": response.data[0]} else: return {"success": False, "error": response.error.message if response.error else "Unknown error"} except Exception as e: return {"success": False, "error": str(e)} # @self.mcp.tool() # def get_tasks( # status: Optional[str] = None, # priority: Optional[str] = None # ) -> dict: # """ # 获取数据库中的任务列表(可选状态/优先级过滤) # Get a list of tasks from the database, with optional filtering by status or priority. # Args: # status: 任务状态过滤值(可选) # priority: 任务优先级过滤值(可选) # Returns: # { # "success": True, # "tasks": [...] # 成功时返回任务列表 # } or { # "success": False, # "error": "错误描述" # 失败时返回错误原因 # } # """ # try: # # 构造基础查询 # query = self.supabase.table('tasks').select('*') # # 动态添加过滤条件 # if status: # query = query.eq('status', status.lower()) # 添加状态小写转换保证一致性 # if priority: # query = query.eq('priority', priority.lower()) # print("正在执行 Supabase 查询...") # response = query.execute() # print("Supabase 查询执行完成。") # # 明确处理空结果(非错误状态) # if not response.data: # print("未找到任务。") # return {"success": True, "tasks": []} # print(f"找到 {len(response.data)} 个任务。") # return {"success": True, "tasks": response.data} # except Exception as e: # # 捕获具体异常类型 # error_msg = ( # f"Database query failed: {str(e)}. " # f"Params: status={status}, priority={priority}" # ) # return {"success": False, "error": error_msg} @self.mcp.tool() def update_task(task: TaskUpdate) -> dict: """ Update an existing task in the database. 更新数据库中的现有任务。 """ try: data_to_update = task.dict(exclude_unset=True, exclude={'id'}) response = self.supabase.table('tasks').update(data_to_update).eq('id', task.id).execute() if response.data: return {"success": True, "task": response.data[0]} else: return {"success": False, "error": response.error.message if response.error else "Task not found or unknown error"} except Exception as e: return {"success": False, "error": str(e)} @self.mcp.tool() def delete_task(task_id: int) -> dict: """ Delete a task from the database by its ID. 根据任务ID从数据库中删除任务。 """ try: response = self.supabase.table('tasks').delete().eq('id', task_id).execute() print(f"Supabase 删除任务结果: {response}") if response.data: return {"success": True, "message": f"Task with ID {task_id} deleted successfully."} else: return {"success": False, "error": response.error.message if response.error else "Task not found or unknown error"} except Exception as e: return {"success": False, "error": str(e)} @self.mcp.tool() def get_tasks( status: Optional[str] = None, priority: Optional[str] = None ) -> dict: """ 获取数据库中的任务列表(可选状态/优先级过滤) """ try: return {"success": True, "tasks": []} # 测试返回固定数据 # 构造基础查询 query = self.supabase.table('tasks').select('*') # 动态添加过滤条件 if status: query = query.eq('status', status.lower()) # 添加状态小写转换保证一致性 if priority: query = query.eq('priority', priority.lower()) print("正在执行 Supabase 查询...") response = query.execute() print("Supabase 查询执行完成。") # 明确处理空结果(非错误状态) if not response.data: print("未找到任务。") return {"success": True, "tasks": []} print(f"找到 {len(response.data)} 个任务。") return {"success": True, "tasks": response.data} except Exception as e: # 捕获具体异常类型 error_msg = ( f"Database query failed: {str(e)}. " f"Params: status={status}, priority={priority}" ) return {"success": False, "error": error_msg}