Spaces:
Sleeping
Sleeping
File size: 5,157 Bytes
7ffe51d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
"""Task service for business logic."""
from datetime import datetime
from typing import Optional
from sqlmodel import Session, select
from src.models.task import Task
from src.schemas.task import TaskCreate, TaskUpdate, TaskPatch
class TaskService:
"""Service for task operations."""
def __init__(self, db: Session):
"""Initialize task service with database session."""
self.db = db
def get_tasks(
self,
user_id: int,
completed: Optional[bool] = None,
sort: str = "created_at",
order: str = "desc",
limit: Optional[int] = None,
offset: int = 0
) -> list[Task]:
"""
Get tasks for a user with filtering and sorting.
Args:
user_id: ID of the user
completed: Filter by completion status (None = all, True = completed, False = active)
sort: Sort field ("created_at" or "updated_at")
order: Sort order ("asc" or "desc")
limit: Maximum number of tasks to return
offset: Number of tasks to skip
Returns:
List of tasks matching the criteria
"""
statement = select(Task).where(Task.user_id == user_id)
# Apply completion filter
if completed is not None:
statement = statement.where(Task.completed == completed)
# Apply sorting
sort_column = Task.created_at if sort == "created_at" else Task.updated_at
if order == "asc":
statement = statement.order_by(sort_column.asc())
else:
statement = statement.order_by(sort_column.desc())
# Apply pagination
if offset > 0:
statement = statement.offset(offset)
if limit is not None:
statement = statement.limit(limit)
tasks = self.db.exec(statement).all()
return list(tasks)
def create_task(self, user_id: int, task_data: TaskCreate) -> Task:
"""
Create a new task for a user.
Args:
user_id: ID of the user
task_data: Task creation data
Returns:
Created task
"""
now = datetime.utcnow()
task = Task(
user_id=user_id,
title=task_data.title,
description=task_data.description,
completed=False,
created_at=now,
updated_at=now
)
self.db.add(task)
self.db.commit()
self.db.refresh(task)
return task
def get_task(self, task_id: int, user_id: int) -> Optional[Task]:
"""
Get a single task by ID for a specific user.
Args:
task_id: ID of the task
user_id: ID of the user
Returns:
Task if found and belongs to user, None otherwise
"""
statement = select(Task).where(Task.id == task_id, Task.user_id == user_id)
task = self.db.exec(statement).first()
return task
def update_task(self, task_id: int, user_id: int, task_data: TaskUpdate) -> Optional[Task]:
"""
Update a task (PUT - replaces all fields).
Args:
task_id: ID of the task
user_id: ID of the user
task_data: Task update data
Returns:
Updated task if found and belongs to user, None otherwise
"""
task = self.get_task(task_id, user_id)
if not task:
return None
task.title = task_data.title
task.description = task_data.description
task.completed = task_data.completed
task.updated_at = datetime.utcnow()
self.db.add(task)
self.db.commit()
self.db.refresh(task)
return task
def patch_task(self, task_id: int, user_id: int, task_data: TaskPatch) -> Optional[Task]:
"""
Partially update a task (PATCH - updates only provided fields).
Args:
task_id: ID of the task
user_id: ID of the user
task_data: Task patch data
Returns:
Updated task if found and belongs to user, None otherwise
"""
task = self.get_task(task_id, user_id)
if not task:
return None
# Update only provided fields
if task_data.title is not None:
task.title = task_data.title
if task_data.description is not None:
task.description = task_data.description
if task_data.completed is not None:
task.completed = task_data.completed
task.updated_at = datetime.utcnow()
self.db.add(task)
self.db.commit()
self.db.refresh(task)
return task
def delete_task(self, task_id: int, user_id: int) -> bool:
"""
Delete a task.
Args:
task_id: ID of the task
user_id: ID of the user
Returns:
True if task was deleted, False if not found or doesn't belong to user
"""
task = self.get_task(task_id, user_id)
if not task:
return False
self.db.delete(task)
self.db.commit()
return True
|