|
|
import asyncio |
|
|
from fastapi import Request |
|
|
from mcp.server.fastmcp import FastMCP |
|
|
import os |
|
|
from airs.base import Base |
|
|
from pydantic import BaseModel, Field |
|
|
from typing import Optional |
|
|
|
|
|
|
|
|
class TaskCreate(BaseModel): |
|
|
title: str = Field(..., description="任务的标题(必填)") |
|
|
description: Optional[str] = Field(None, description="任务的详细描述(可选)") |
|
|
due_date: Optional[str] = None |
|
|
status: Optional[str] = None |
|
|
priority: Optional[str] = None |
|
|
parent_task_id: Optional[str] = None |
|
|
|
|
|
class TaskUpdate(BaseModel): |
|
|
id: int = Field(..., description="任务的ID(必填)") |
|
|
title: Optional[str] = Field(None, description="任务的标题(可选)") |
|
|
description: Optional[str] = Field(None, description="任务的详细描述(可选)") |
|
|
due_date: Optional[str] = None |
|
|
status: Optional[str] = None |
|
|
priority: Optional[str] = None |
|
|
parent_task_id: Optional[str] = None |
|
|
|
|
|
|
|
|
class Tasks(Base): |
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
self.mcp = FastMCP(name="tasks", stateless_http=True) |
|
|
self.register_endpoints() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def register_endpoints(self): |
|
|
@self.mcp.resource("tasks://info") |
|
|
def info() -> str: |
|
|
"""Information about Tasks""" |
|
|
return "This is Airs Tasks" |
|
|
|
|
|
@self.mcp.tool() |
|
|
def echo(message: str) -> str: |
|
|
"""Echo a message in Tasks""" |
|
|
return f"Tasks Echo: {message}" |
|
|
|
|
|
@self.mcp.tool() |
|
|
def app01_add_integers(num1: int, num2: int) -> int: |
|
|
"""Add two integers in App01""" |
|
|
print(self.test()) |
|
|
return num1 + num2 |
|
|
|
|
|
@self.mcp.tool() |
|
|
def add_task(task: TaskCreate) -> dict: |
|
|
""" |
|
|
Add a new task to the database. |
|
|
向数据库添加新任务 |
|
|
|
|
|
Args: |
|
|
task (TaskCreate): 任务数据对象,包含以下属性: |
|
|
- title (str): 任务标题 |
|
|
- description (str): 任务描述 |
|
|
- due_date (str): 截止日期(YYYY-MM-DD) |
|
|
""" |
|
|
try: |
|
|
|
|
|
data_to_insert = { |
|
|
'title': task.title, |
|
|
'description': task.description, |
|
|
'parent_task_id': task.parent_task_id, |
|
|
'due_date': task.due_date, |
|
|
'status': task.status, |
|
|
'priority': task.priority |
|
|
} |
|
|
response = self.supabase.table('tasks').insert(data_to_insert).execute() |
|
|
|
|
|
|
|
|
if response.data: |
|
|
return {"success": True, "task": response.data[0]} |
|
|
else: |
|
|
return {"success": False, "error": response.error.message if response.error else "Unknown error"} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@self.mcp.tool() |
|
|
def update_task(task: TaskUpdate) -> dict: |
|
|
""" |
|
|
Update an existing task in the database. |
|
|
更新数据库中的现有任务。 |
|
|
""" |
|
|
try: |
|
|
data_to_update = task.dict(exclude_unset=True, exclude={'id'}) |
|
|
response = self.supabase.table('tasks').update(data_to_update).eq('id', task.id).execute() |
|
|
|
|
|
if response.data: |
|
|
return {"success": True, "task": response.data[0]} |
|
|
else: |
|
|
return {"success": False, "error": response.error.message if response.error else "Task not found or unknown error"} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
@self.mcp.tool() |
|
|
def delete_task(task_id: int) -> dict: |
|
|
""" |
|
|
Delete a task from the database by its ID. |
|
|
根据任务ID从数据库中删除任务。 |
|
|
""" |
|
|
try: |
|
|
response = self.supabase.table('tasks').delete().eq('id', task_id).execute() |
|
|
print(f"Supabase 删除任务结果: {response}") |
|
|
|
|
|
if response.data: |
|
|
return {"success": True, "message": f"Task with ID {task_id} deleted successfully."} |
|
|
else: |
|
|
return {"success": False, "error": response.error.message if response.error else "Task not found or unknown error"} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@self.mcp.tool() |
|
|
def get_tasks( |
|
|
status: Optional[str] = None, |
|
|
priority: Optional[str] = None |
|
|
) -> dict: |
|
|
""" |
|
|
获取数据库中的任务列表(可选状态/优先级过滤) |
|
|
""" |
|
|
try: |
|
|
return {"success": True, "tasks": []} |
|
|
|
|
|
query = self.supabase.table('tasks').select('*') |
|
|
|
|
|
|
|
|
if status: |
|
|
query = query.eq('status', status.lower()) |
|
|
if priority: |
|
|
query = query.eq('priority', priority.lower()) |
|
|
|
|
|
print("正在执行 Supabase 查询...") |
|
|
response = query.execute() |
|
|
print("Supabase 查询执行完成。") |
|
|
|
|
|
|
|
|
if not response.data: |
|
|
print("未找到任务。") |
|
|
return {"success": True, "tasks": []} |
|
|
|
|
|
print(f"找到 {len(response.data)} 个任务。") |
|
|
|
|
|
return {"success": True, "tasks": response.data} |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
error_msg = ( |
|
|
f"Database query failed: {str(e)}. " |
|
|
f"Params: status={status}, priority={priority}" |
|
|
) |
|
|
return {"success": False, "error": error_msg} |
|
|
|