File size: 9,014 Bytes
a823b8a 4469050 a823b8a 4b2a02f a823b8a 4b2a02f a823b8a 4469050 a823b8a 4469050 4b2a02f db7fef9 4b2a02f db7fef9 4b2a02f db7fef9 4b2a02f db7fef9 4b2a02f db7fef9 4b2a02f db7fef9 4b2a02f db7fef9 4b2a02f db7fef9 4b2a02f db7fef9 4b2a02f db7fef9 08904dc db7fef9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
import asyncio
from fastapi import Request
from mcp.server.fastmcp import FastMCP
import os # 用于获取环境变量
from airs.base import Base # 导入 Base 类
from pydantic import BaseModel, Field # 用于定义数据模型
from typing import Optional
# 定义 Pydantic 模型
class TaskCreate(BaseModel):
title: str = Field(..., description="任务的标题(必填)")
description: Optional[str] = Field(None, description="任务的详细描述(可选)") # 添加描述
due_date: Optional[str] = None # 可以是 'YYYY-MM-DD' 格式的字符串
status: Optional[str] = None
priority: Optional[str] = None
parent_task_id: Optional[str] = None # UUID 字符串,可选
class TaskUpdate(BaseModel):
id: int = Field(..., description="任务的ID(必填)")
title: Optional[str] = Field(None, description="任务的标题(可选)")
description: Optional[str] = Field(None, description="任务的详细描述(可选)")
due_date: Optional[str] = None
status: Optional[str] = None
priority: Optional[str] = None
parent_task_id: Optional[str] = None
# 创建 MCP 应用实例
class Tasks(Base): # Tasks 类继承自 Base 类
def __init__(self):
super().__init__() # 初始化 Base 类
self.mcp = FastMCP(name="tasks", stateless_http=True)
self.register_endpoints()
# 登录测试 (暂时注释掉,排查超时问题)
# test_user = os.getenv("TEST_USER")
# test_passsword = os.getenv("TEST_PASSWORD")
# print("尝试登录...")
# login_result = self.sign_in_with_email(test_user, test_passsword)
# if login_result["success"]:
# # print("登录成功!用户信息:", login_result["user"])
# print("登录成功!")
# else:
# print("登录失败!错误信息:", login_result["error"])
def register_endpoints(self):
@self.mcp.resource("tasks://info")
def info() -> str:
"""Information about Tasks"""
return "This is Airs Tasks"
@self.mcp.tool()
def echo(message: str) -> str:
"""Echo a message in Tasks"""
return f"Tasks Echo: {message}"
@self.mcp.tool()
def app01_add_integers(num1: int, num2: int) -> int:
"""Add two integers in App01"""
print(self.test()) # 调用 Base 类的 test 方法
return num1 + num2
@self.mcp.tool()
def add_task(task: TaskCreate) -> dict:
"""
Add a new task to the database.
向数据库添加新任务
Args:
task (TaskCreate): 任务数据对象,包含以下属性:
- title (str): 任务标题
- description (str): 任务描述
- due_date (str): 截止日期(YYYY-MM-DD)
"""
try:
# 直接使用 Supabase 客户端插入数据
data_to_insert = {
'title': task.title,
'description': task.description,
'parent_task_id': task.parent_task_id,
'due_date': task.due_date,
'status': task.status,
'priority': task.priority
}
response = self.supabase.table('tasks').insert(data_to_insert).execute()
# 检查 Supabase 响应
if response.data:
return {"success": True, "task": response.data[0]}
else:
return {"success": False, "error": response.error.message if response.error else "Unknown error"}
except Exception as e:
return {"success": False, "error": str(e)}
# @self.mcp.tool()
# def get_tasks(
# status: Optional[str] = None,
# priority: Optional[str] = None
# ) -> dict:
# """
# 获取数据库中的任务列表(可选状态/优先级过滤)
# Get a list of tasks from the database, with optional filtering by status or priority.
# Args:
# status: 任务状态过滤值(可选)
# priority: 任务优先级过滤值(可选)
# Returns:
# {
# "success": True,
# "tasks": [...] # 成功时返回任务列表
# } or {
# "success": False,
# "error": "错误描述" # 失败时返回错误原因
# }
# """
# try:
# # 构造基础查询
# query = self.supabase.table('tasks').select('*')
# # 动态添加过滤条件
# if status:
# query = query.eq('status', status.lower()) # 添加状态小写转换保证一致性
# if priority:
# query = query.eq('priority', priority.lower())
# print("正在执行 Supabase 查询...")
# response = query.execute()
# print("Supabase 查询执行完成。")
# # 明确处理空结果(非错误状态)
# if not response.data:
# print("未找到任务。")
# return {"success": True, "tasks": []}
# print(f"找到 {len(response.data)} 个任务。")
# return {"success": True, "tasks": response.data}
# except Exception as e:
# # 捕获具体异常类型
# error_msg = (
# f"Database query failed: {str(e)}. "
# f"Params: status={status}, priority={priority}"
# )
# return {"success": False, "error": error_msg}
@self.mcp.tool()
def update_task(task: TaskUpdate) -> dict:
"""
Update an existing task in the database.
更新数据库中的现有任务。
"""
try:
data_to_update = task.dict(exclude_unset=True, exclude={'id'})
response = self.supabase.table('tasks').update(data_to_update).eq('id', task.id).execute()
if response.data:
return {"success": True, "task": response.data[0]}
else:
return {"success": False, "error": response.error.message if response.error else "Task not found or unknown error"}
except Exception as e:
return {"success": False, "error": str(e)}
@self.mcp.tool()
def delete_task(task_id: int) -> dict:
"""
Delete a task from the database by its ID.
根据任务ID从数据库中删除任务。
"""
try:
response = self.supabase.table('tasks').delete().eq('id', task_id).execute()
print(f"Supabase 删除任务结果: {response}")
if response.data:
return {"success": True, "message": f"Task with ID {task_id} deleted successfully."}
else:
return {"success": False, "error": response.error.message if response.error else "Task not found or unknown error"}
except Exception as e:
return {"success": False, "error": str(e)}
@self.mcp.tool()
def get_tasks(
status: Optional[str] = None,
priority: Optional[str] = None
) -> dict:
"""
获取数据库中的任务列表(可选状态/优先级过滤)
"""
try:
return {"success": True, "tasks": []} # 测试返回固定数据
# 构造基础查询
query = self.supabase.table('tasks').select('*')
# 动态添加过滤条件
if status:
query = query.eq('status', status.lower()) # 添加状态小写转换保证一致性
if priority:
query = query.eq('priority', priority.lower())
print("正在执行 Supabase 查询...")
response = query.execute()
print("Supabase 查询执行完成。")
# 明确处理空结果(非错误状态)
if not response.data:
print("未找到任务。")
return {"success": True, "tasks": []}
print(f"找到 {len(response.data)} 个任务。")
return {"success": True, "tasks": response.data}
except Exception as e:
# 捕获具体异常类型
error_msg = (
f"Database query failed: {str(e)}. "
f"Params: status={status}, priority={priority}"
)
return {"success": False, "error": error_msg}
|