File size: 6,184 Bytes
6a3de9e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""
Test script to verify MCP server functionality directly.
This script tests that the MCP server properly handles tool calls and executes database operations.
"""
import asyncio
import json
from datetime import datetime
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession

from database.session import get_async_session
from models.user import User
from models.task import Task, TaskCreate, TaskComplete
from services.task_service import TaskService


async def test_mcp_server_handlers():
    """
    Test the MCP server handlers directly to ensure they properly execute database operations.
    """
    print("Testing MCP Server Handlers...")

    # Get database session
    async with get_async_session() as session:
        # Create a test user with an integer ID to match the model
        import uuid
        unique_email = f"test_{uuid.uuid4()}@example.com"
        user = User(
            email=unique_email,
            name="Test User",
            created_at=datetime.utcnow()
        )
        session.add(user)
        await session.commit()
        await session.refresh(user)

        print(f"Created test user: {user.id}")

        # Test the add_task handler directly
        print("\n--- Testing add_task handler ---")

        # Simulate the arguments that would come from the MCP server
        add_arguments = {
            'user_id': user.id,
            'title': 'Test task from MCP',
            'description': 'This is a test task created via MCP server',
            'priority': 'high',
            'due_date': '2024-12-31'
        }

        # Import the MCP server handlers
        from ai.mcp.server import handle_add_task

        # Call the handler directly
        result = await handle_add_task(add_arguments)
        print(f"Add task result: {result}")

        # Verify task was added to database
        tasks = await session.exec(select(Task).where(Task.user_id == user.id))
        tasks_list = tasks.all()
        print(f"Tasks in database after add: {len(tasks_list)}")
        if tasks_list:
            latest_task = tasks_list[-1]
            print(f"Latest task: {latest_task.title} (Priority: {latest_task.priority})")

        # Test the list_tasks handler directly
        print("\n--- Testing list_tasks handler ---")
        list_arguments = {
            'user_id': user.id,
            'status': 'all'
        }

        from ai.mcp.server import handle_list_tasks
        result = await handle_list_tasks(list_arguments)
        print(f"List tasks result: {result}")

        # Test the complete_task handler directly
        if tasks_list:
            print("\n--- Testing complete_task handler ---")
            complete_arguments = {
                'user_id': user.id,
                'task_id': latest_task.id
            }

            from ai.mcp.server import handle_complete_task
            result = await handle_complete_task(complete_arguments)
            print(f"Complete task result: {result}")

            # Verify task was marked as completed
            await session.refresh(latest_task)
            print(f"Task {latest_task.id} completed status: {latest_task.completed}")

        # Test the update_task handler directly
        if tasks_list:
            print("\n--- Testing update_task handler ---")
            update_arguments = {
                'user_id': user.id,
                'task_id': latest_task.id,
                'title': 'Updated task title',
                'priority': 'low'
            }

            from ai.mcp.server import handle_update_task
            result = await handle_update_task(update_arguments)
            print(f"Update task result: {result}")

            # Verify task was updated
            await session.refresh(latest_task)
            print(f"Task {latest_task.id} updated title: {latest_task.title}, priority: {latest_task.priority}")

        # Final verification
        final_tasks = await session.exec(select(Task).where(Task.user_id == user.id))
        final_tasks_list = final_tasks.all()
        print(f"\nFinal task count: {len(final_tasks_list)}")

        for task in final_tasks_list:
            print(f"- Task: {task.title}, Priority: {task.priority}, Completed: {task.completed}, Due: {task.due_date}")

        print("\n--- Test Summary ---")
        print(f"Created {len(final_tasks_list)} tasks in database")
        completed_count = sum(1 for task in final_tasks_list if task.completed)
        print(f"Completed {completed_count} tasks")
        print("MCP server handlers properly executed database operations")

        # Clean up test data
        for task in final_tasks_list:
            await session.delete(task)
        await session.delete(user)
        await session.commit()

        print("Test data cleaned up")


async def test_mcp_server_tools():
    """
    Test that the MCP server properly lists tools.
    """
    print("\nTesting MCP Server Tool Listing...")

    # Import and test the tool listing
    from ai.mcp.server import list_todo_tools

    tools = await list_todo_tools()
    tool_names = [tool.name for tool in tools]

    print(f"MCP Server tools available: {tool_names}")

    # Verify all expected tools are present
    expected_tools = {"add_task", "list_tasks", "complete_task", "delete_task", "update_task"}
    actual_tools = set(tool_names)

    if expected_tools.issubset(actual_tools):
        print("All expected tools are registered in MCP server")
    else:
        missing = expected_tools - actual_tools
        print(f"Missing tools: {missing}")


async def main():
    """
    Main test function.
    """
    print("=" * 60)
    print("MCP SERVER FUNCTIONALITY TEST")
    print("=" * 60)

    try:
        await test_mcp_server_tools()
        await test_mcp_server_handlers()

        print("\n" + "=" * 60)
        print("ALL TESTS PASSED!")
        print("MCP server properly handles tool calls")
        print("Database operations are executed correctly")
        print("=" * 60)

    except Exception as e:
        print(f"\nTEST FAILED: {str(e)}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    asyncio.run(main())