File size: 3,367 Bytes
04a921d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Task service for CRUD operations."""

from typing import Optional
from uuid import UUID

from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select

from src.models.task import Task
from src.schemas.task import TaskCreate, TaskListResponse, TaskResponse, TaskUpdate


class TaskService:
    """Service for task CRUD operations."""

    def __init__(self, session: AsyncSession, user_id: UUID) -> None:
        self.session = session
        self.user_id = user_id

    async def create(self, data: TaskCreate) -> Task:
        """Create a new task."""
        task = Task(
            user_id=self.user_id,
            title=data.title,
            description=data.description,
        )
        self.session.add(task)
        await self.session.flush()
        await self.session.refresh(task)
        return task

    async def list(
        self,
        page: int = 1,
        page_size: int = 50,
        is_completed: Optional[bool] = None,
    ) -> TaskListResponse:
        """List all tasks for the current user."""
        # Build query
        query = select(Task).where(Task.user_id == self.user_id)

        if is_completed is not None:
            query = query.where(Task.is_completed == is_completed)

        # Get total count
        count_query = select(func.count()).select_from(
            query.subquery()
        )
        total_result = await self.session.execute(count_query)
        total = total_result.scalar() or 0

        # Get paginated results
        query = query.order_by(Task.created_at.desc())
        query = query.offset((page - 1) * page_size).limit(page_size)

        result = await self.session.execute(query)
        tasks = result.scalars().all()

        return TaskListResponse(
            tasks=[TaskResponse.model_validate(t) for t in tasks],
            total=total,
            page=page,
            page_size=page_size,
        )

    async def get(self, task_id: UUID) -> Optional[Task]:
        """Get a task by ID for the current user."""
        statement = select(Task).where(
            Task.id == task_id,
            Task.user_id == self.user_id,
        )
        result = await self.session.execute(statement)
        return result.scalar_one_or_none()

    async def update(self, task_id: UUID, data: TaskUpdate) -> Optional[Task]:
        """Update a task."""
        task = await self.get(task_id)
        if not task:
            return None

        update_data = data.model_dump(exclude_unset=True)
        for key, value in update_data.items():
            setattr(task, key, value)

        task.update_timestamp()
        await self.session.flush()
        await self.session.refresh(task)
        return task

    async def delete(self, task_id: UUID) -> bool:
        """Delete a task."""
        task = await self.get(task_id)
        if not task:
            return False

        await self.session.delete(task)
        await self.session.flush()
        return True

    async def toggle(self, task_id: UUID) -> Optional[Task]:
        """Toggle task completion status."""
        task = await self.get(task_id)
        if not task:
            return None

        task.is_completed = not task.is_completed
        task.update_timestamp()
        await self.session.flush()
        await self.session.refresh(task)
        return task