mcps / apps /airs_tasks.py
geqintan's picture
update
08904dc
import asyncio
from fastapi import Request
from mcp.server.fastmcp import FastMCP
import os # 用于获取环境变量
from airs.base import Base # 导入 Base 类
from pydantic import BaseModel, Field # 用于定义数据模型
from typing import Optional
# 定义 Pydantic 模型
class TaskCreate(BaseModel):
title: str = Field(..., description="任务的标题(必填)")
description: Optional[str] = Field(None, description="任务的详细描述(可选)") # 添加描述
due_date: Optional[str] = None # 可以是 'YYYY-MM-DD' 格式的字符串
status: Optional[str] = None
priority: Optional[str] = None
parent_task_id: Optional[str] = None # UUID 字符串,可选
class TaskUpdate(BaseModel):
id: int = Field(..., description="任务的ID(必填)")
title: Optional[str] = Field(None, description="任务的标题(可选)")
description: Optional[str] = Field(None, description="任务的详细描述(可选)")
due_date: Optional[str] = None
status: Optional[str] = None
priority: Optional[str] = None
parent_task_id: Optional[str] = None
# 创建 MCP 应用实例
class Tasks(Base): # Tasks 类继承自 Base 类
def __init__(self):
super().__init__() # 初始化 Base 类
self.mcp = FastMCP(name="tasks", stateless_http=True)
self.register_endpoints()
# 登录测试 (暂时注释掉,排查超时问题)
# test_user = os.getenv("TEST_USER")
# test_passsword = os.getenv("TEST_PASSWORD")
# print("尝试登录...")
# login_result = self.sign_in_with_email(test_user, test_passsword)
# if login_result["success"]:
# # print("登录成功!用户信息:", login_result["user"])
# print("登录成功!")
# else:
# print("登录失败!错误信息:", login_result["error"])
def register_endpoints(self):
@self.mcp.resource("tasks://info")
def info() -> str:
"""Information about Tasks"""
return "This is Airs Tasks"
@self.mcp.tool()
def echo(message: str) -> str:
"""Echo a message in Tasks"""
return f"Tasks Echo: {message}"
@self.mcp.tool()
def app01_add_integers(num1: int, num2: int) -> int:
"""Add two integers in App01"""
print(self.test()) # 调用 Base 类的 test 方法
return num1 + num2
@self.mcp.tool()
def add_task(task: TaskCreate) -> dict:
"""
Add a new task to the database.
向数据库添加新任务
Args:
task (TaskCreate): 任务数据对象,包含以下属性:
- title (str): 任务标题
- description (str): 任务描述
- due_date (str): 截止日期(YYYY-MM-DD)
"""
try:
# 直接使用 Supabase 客户端插入数据
data_to_insert = {
'title': task.title,
'description': task.description,
'parent_task_id': task.parent_task_id,
'due_date': task.due_date,
'status': task.status,
'priority': task.priority
}
response = self.supabase.table('tasks').insert(data_to_insert).execute()
# 检查 Supabase 响应
if response.data:
return {"success": True, "task": response.data[0]}
else:
return {"success": False, "error": response.error.message if response.error else "Unknown error"}
except Exception as e:
return {"success": False, "error": str(e)}
# @self.mcp.tool()
# def get_tasks(
# status: Optional[str] = None,
# priority: Optional[str] = None
# ) -> dict:
# """
# 获取数据库中的任务列表(可选状态/优先级过滤)
# Get a list of tasks from the database, with optional filtering by status or priority.
# Args:
# status: 任务状态过滤值(可选)
# priority: 任务优先级过滤值(可选)
# Returns:
# {
# "success": True,
# "tasks": [...] # 成功时返回任务列表
# } or {
# "success": False,
# "error": "错误描述" # 失败时返回错误原因
# }
# """
# try:
# # 构造基础查询
# query = self.supabase.table('tasks').select('*')
# # 动态添加过滤条件
# if status:
# query = query.eq('status', status.lower()) # 添加状态小写转换保证一致性
# if priority:
# query = query.eq('priority', priority.lower())
# print("正在执行 Supabase 查询...")
# response = query.execute()
# print("Supabase 查询执行完成。")
# # 明确处理空结果(非错误状态)
# if not response.data:
# print("未找到任务。")
# return {"success": True, "tasks": []}
# print(f"找到 {len(response.data)} 个任务。")
# return {"success": True, "tasks": response.data}
# except Exception as e:
# # 捕获具体异常类型
# error_msg = (
# f"Database query failed: {str(e)}. "
# f"Params: status={status}, priority={priority}"
# )
# return {"success": False, "error": error_msg}
@self.mcp.tool()
def update_task(task: TaskUpdate) -> dict:
"""
Update an existing task in the database.
更新数据库中的现有任务。
"""
try:
data_to_update = task.dict(exclude_unset=True, exclude={'id'})
response = self.supabase.table('tasks').update(data_to_update).eq('id', task.id).execute()
if response.data:
return {"success": True, "task": response.data[0]}
else:
return {"success": False, "error": response.error.message if response.error else "Task not found or unknown error"}
except Exception as e:
return {"success": False, "error": str(e)}
@self.mcp.tool()
def delete_task(task_id: int) -> dict:
"""
Delete a task from the database by its ID.
根据任务ID从数据库中删除任务。
"""
try:
response = self.supabase.table('tasks').delete().eq('id', task_id).execute()
print(f"Supabase 删除任务结果: {response}")
if response.data:
return {"success": True, "message": f"Task with ID {task_id} deleted successfully."}
else:
return {"success": False, "error": response.error.message if response.error else "Task not found or unknown error"}
except Exception as e:
return {"success": False, "error": str(e)}
@self.mcp.tool()
def get_tasks(
status: Optional[str] = None,
priority: Optional[str] = None
) -> dict:
"""
获取数据库中的任务列表(可选状态/优先级过滤)
"""
try:
return {"success": True, "tasks": []} # 测试返回固定数据
# 构造基础查询
query = self.supabase.table('tasks').select('*')
# 动态添加过滤条件
if status:
query = query.eq('status', status.lower()) # 添加状态小写转换保证一致性
if priority:
query = query.eq('priority', priority.lower())
print("正在执行 Supabase 查询...")
response = query.execute()
print("Supabase 查询执行完成。")
# 明确处理空结果(非错误状态)
if not response.data:
print("未找到任务。")
return {"success": True, "tasks": []}
print(f"找到 {len(response.data)} 个任务。")
return {"success": True, "tasks": response.data}
except Exception as e:
# 捕获具体异常类型
error_msg = (
f"Database query failed: {str(e)}. "
f"Params: status={status}, priority={priority}"
)
return {"success": False, "error": error_msg}