Spaces:
Sleeping
Sleeping
| """Task service for business logic.""" | |
| from datetime import datetime | |
| from typing import Optional | |
| from sqlmodel import Session, select | |
| from src.models.task import Task | |
| from src.schemas.task import TaskCreate, TaskUpdate, TaskPatch | |
| class TaskService: | |
| """Service for task operations.""" | |
| def __init__(self, db: Session): | |
| """Initialize task service with database session.""" | |
| self.db = db | |
| def get_tasks( | |
| self, | |
| user_id: int, | |
| completed: Optional[bool] = None, | |
| sort: str = "created_at", | |
| order: str = "desc", | |
| limit: Optional[int] = None, | |
| offset: int = 0 | |
| ) -> list[Task]: | |
| """ | |
| Get tasks for a user with filtering and sorting. | |
| Args: | |
| user_id: ID of the user | |
| completed: Filter by completion status (None = all, True = completed, False = active) | |
| sort: Sort field ("created_at" or "updated_at") | |
| order: Sort order ("asc" or "desc") | |
| limit: Maximum number of tasks to return | |
| offset: Number of tasks to skip | |
| Returns: | |
| List of tasks matching the criteria | |
| """ | |
| statement = select(Task).where(Task.user_id == user_id) | |
| # Apply completion filter | |
| if completed is not None: | |
| statement = statement.where(Task.completed == completed) | |
| # Apply sorting | |
| sort_column = Task.created_at if sort == "created_at" else Task.updated_at | |
| if order == "asc": | |
| statement = statement.order_by(sort_column.asc()) | |
| else: | |
| statement = statement.order_by(sort_column.desc()) | |
| # Apply pagination | |
| if offset > 0: | |
| statement = statement.offset(offset) | |
| if limit is not None: | |
| statement = statement.limit(limit) | |
| tasks = self.db.exec(statement).all() | |
| return list(tasks) | |
| def create_task(self, user_id: int, task_data: TaskCreate) -> Task: | |
| """ | |
| Create a new task for a user. | |
| Args: | |
| user_id: ID of the user | |
| task_data: Task creation data | |
| Returns: | |
| Created task | |
| """ | |
| now = datetime.utcnow() | |
| task = Task( | |
| user_id=user_id, | |
| title=task_data.title, | |
| description=task_data.description, | |
| completed=False, | |
| created_at=now, | |
| updated_at=now | |
| ) | |
| self.db.add(task) | |
| self.db.commit() | |
| self.db.refresh(task) | |
| return task | |
| def get_task(self, task_id: int, user_id: int) -> Optional[Task]: | |
| """ | |
| Get a single task by ID for a specific user. | |
| Args: | |
| task_id: ID of the task | |
| user_id: ID of the user | |
| Returns: | |
| Task if found and belongs to user, None otherwise | |
| """ | |
| statement = select(Task).where(Task.id == task_id, Task.user_id == user_id) | |
| task = self.db.exec(statement).first() | |
| return task | |
| def update_task(self, task_id: int, user_id: int, task_data: TaskUpdate) -> Optional[Task]: | |
| """ | |
| Update a task (PUT - replaces all fields). | |
| Args: | |
| task_id: ID of the task | |
| user_id: ID of the user | |
| task_data: Task update data | |
| Returns: | |
| Updated task if found and belongs to user, None otherwise | |
| """ | |
| task = self.get_task(task_id, user_id) | |
| if not task: | |
| return None | |
| task.title = task_data.title | |
| task.description = task_data.description | |
| task.completed = task_data.completed | |
| task.updated_at = datetime.utcnow() | |
| self.db.add(task) | |
| self.db.commit() | |
| self.db.refresh(task) | |
| return task | |
| def patch_task(self, task_id: int, user_id: int, task_data: TaskPatch) -> Optional[Task]: | |
| """ | |
| Partially update a task (PATCH - updates only provided fields). | |
| Args: | |
| task_id: ID of the task | |
| user_id: ID of the user | |
| task_data: Task patch data | |
| Returns: | |
| Updated task if found and belongs to user, None otherwise | |
| """ | |
| task = self.get_task(task_id, user_id) | |
| if not task: | |
| return None | |
| # Update only provided fields | |
| if task_data.title is not None: | |
| task.title = task_data.title | |
| if task_data.description is not None: | |
| task.description = task_data.description | |
| if task_data.completed is not None: | |
| task.completed = task_data.completed | |
| task.updated_at = datetime.utcnow() | |
| self.db.add(task) | |
| self.db.commit() | |
| self.db.refresh(task) | |
| return task | |
| def delete_task(self, task_id: int, user_id: int) -> bool: | |
| """ | |
| Delete a task. | |
| Args: | |
| task_id: ID of the task | |
| user_id: ID of the user | |
| Returns: | |
| True if task was deleted, False if not found or doesn't belong to user | |
| """ | |
| task = self.get_task(task_id, user_id) | |
| if not task: | |
| return False | |
| self.db.delete(task) | |
| self.db.commit() | |
| return True | |