|
|
"""Task service for business logic.""" |
|
|
from datetime import datetime |
|
|
from typing import Optional |
|
|
from sqlmodel import Session, select |
|
|
from src.models.task import Task |
|
|
from src.schemas.task import TaskCreate, TaskUpdate, TaskPatch |
|
|
|
|
|
|
|
|
class TaskService: |
|
|
"""Service for task operations.""" |
|
|
|
|
|
def __init__(self, db: Session): |
|
|
"""Initialize task service with database session.""" |
|
|
self.db = db |
|
|
|
|
|
def get_tasks( |
|
|
self, |
|
|
user_id: int, |
|
|
completed: Optional[bool] = None, |
|
|
sort: str = "created_at", |
|
|
order: str = "desc", |
|
|
limit: Optional[int] = None, |
|
|
offset: int = 0 |
|
|
) -> list[Task]: |
|
|
""" |
|
|
Get tasks for a user with filtering and sorting. |
|
|
|
|
|
Args: |
|
|
user_id: ID of the user |
|
|
completed: Filter by completion status (None = all, True = completed, False = active) |
|
|
sort: Sort field ("created_at" or "updated_at") |
|
|
order: Sort order ("asc" or "desc") |
|
|
limit: Maximum number of tasks to return |
|
|
offset: Number of tasks to skip |
|
|
|
|
|
Returns: |
|
|
List of tasks matching the criteria |
|
|
""" |
|
|
statement = select(Task).where(Task.user_id == user_id) |
|
|
|
|
|
|
|
|
if completed is not None: |
|
|
statement = statement.where(Task.completed == completed) |
|
|
|
|
|
|
|
|
sort_column = Task.created_at if sort == "created_at" else Task.updated_at |
|
|
if order == "asc": |
|
|
statement = statement.order_by(sort_column.asc()) |
|
|
else: |
|
|
statement = statement.order_by(sort_column.desc()) |
|
|
|
|
|
|
|
|
if offset > 0: |
|
|
statement = statement.offset(offset) |
|
|
if limit is not None: |
|
|
statement = statement.limit(limit) |
|
|
|
|
|
tasks = self.db.exec(statement).all() |
|
|
return list(tasks) |
|
|
|
|
|
def create_task(self, user_id: int, task_data: TaskCreate) -> Task: |
|
|
""" |
|
|
Create a new task for a user. |
|
|
|
|
|
Args: |
|
|
user_id: ID of the user |
|
|
task_data: Task creation data |
|
|
|
|
|
Returns: |
|
|
Created task |
|
|
""" |
|
|
now = datetime.utcnow() |
|
|
task = Task( |
|
|
user_id=user_id, |
|
|
title=task_data.title, |
|
|
description=task_data.description, |
|
|
completed=False, |
|
|
created_at=now, |
|
|
updated_at=now |
|
|
) |
|
|
self.db.add(task) |
|
|
self.db.commit() |
|
|
self.db.refresh(task) |
|
|
return task |
|
|
|
|
|
def get_task(self, task_id: int, user_id: int) -> Optional[Task]: |
|
|
""" |
|
|
Get a single task by ID for a specific user. |
|
|
|
|
|
Args: |
|
|
task_id: ID of the task |
|
|
user_id: ID of the user |
|
|
|
|
|
Returns: |
|
|
Task if found and belongs to user, None otherwise |
|
|
""" |
|
|
statement = select(Task).where(Task.id == task_id, Task.user_id == user_id) |
|
|
task = self.db.exec(statement).first() |
|
|
return task |
|
|
|
|
|
def update_task(self, task_id: int, user_id: int, task_data: TaskUpdate) -> Optional[Task]: |
|
|
""" |
|
|
Update a task (PUT - replaces all fields). |
|
|
|
|
|
Args: |
|
|
task_id: ID of the task |
|
|
user_id: ID of the user |
|
|
task_data: Task update data |
|
|
|
|
|
Returns: |
|
|
Updated task if found and belongs to user, None otherwise |
|
|
""" |
|
|
task = self.get_task(task_id, user_id) |
|
|
if not task: |
|
|
return None |
|
|
|
|
|
task.title = task_data.title |
|
|
task.description = task_data.description |
|
|
task.completed = task_data.completed |
|
|
task.updated_at = datetime.utcnow() |
|
|
|
|
|
self.db.add(task) |
|
|
self.db.commit() |
|
|
self.db.refresh(task) |
|
|
return task |
|
|
|
|
|
def patch_task(self, task_id: int, user_id: int, task_data: TaskPatch) -> Optional[Task]: |
|
|
""" |
|
|
Partially update a task (PATCH - updates only provided fields). |
|
|
|
|
|
Args: |
|
|
task_id: ID of the task |
|
|
user_id: ID of the user |
|
|
task_data: Task patch data |
|
|
|
|
|
Returns: |
|
|
Updated task if found and belongs to user, None otherwise |
|
|
""" |
|
|
task = self.get_task(task_id, user_id) |
|
|
if not task: |
|
|
return None |
|
|
|
|
|
|
|
|
if task_data.title is not None: |
|
|
task.title = task_data.title |
|
|
if task_data.description is not None: |
|
|
task.description = task_data.description |
|
|
if task_data.completed is not None: |
|
|
task.completed = task_data.completed |
|
|
|
|
|
task.updated_at = datetime.utcnow() |
|
|
|
|
|
self.db.add(task) |
|
|
self.db.commit() |
|
|
self.db.refresh(task) |
|
|
return task |
|
|
|
|
|
def delete_task(self, task_id: int, user_id: int) -> bool: |
|
|
""" |
|
|
Delete a task. |
|
|
|
|
|
Args: |
|
|
task_id: ID of the task |
|
|
user_id: ID of the user |
|
|
|
|
|
Returns: |
|
|
True if task was deleted, False if not found or doesn't belong to user |
|
|
""" |
|
|
task = self.get_task(task_id, user_id) |
|
|
if not task: |
|
|
return False |
|
|
|
|
|
self.db.delete(task) |
|
|
self.db.commit() |
|
|
return True |
|
|
|