taskflow-api / src /services /task_service.py
suhail
Initial deployment
7ffe51d
"""Task service for business logic."""
from datetime import datetime
from typing import Optional
from sqlmodel import Session, select
from src.models.task import Task
from src.schemas.task import TaskCreate, TaskUpdate, TaskPatch
class TaskService:
"""Service for task operations."""
def __init__(self, db: Session):
"""Initialize task service with database session."""
self.db = db
def get_tasks(
self,
user_id: int,
completed: Optional[bool] = None,
sort: str = "created_at",
order: str = "desc",
limit: Optional[int] = None,
offset: int = 0
) -> list[Task]:
"""
Get tasks for a user with filtering and sorting.
Args:
user_id: ID of the user
completed: Filter by completion status (None = all, True = completed, False = active)
sort: Sort field ("created_at" or "updated_at")
order: Sort order ("asc" or "desc")
limit: Maximum number of tasks to return
offset: Number of tasks to skip
Returns:
List of tasks matching the criteria
"""
statement = select(Task).where(Task.user_id == user_id)
# Apply completion filter
if completed is not None:
statement = statement.where(Task.completed == completed)
# Apply sorting
sort_column = Task.created_at if sort == "created_at" else Task.updated_at
if order == "asc":
statement = statement.order_by(sort_column.asc())
else:
statement = statement.order_by(sort_column.desc())
# Apply pagination
if offset > 0:
statement = statement.offset(offset)
if limit is not None:
statement = statement.limit(limit)
tasks = self.db.exec(statement).all()
return list(tasks)
def create_task(self, user_id: int, task_data: TaskCreate) -> Task:
"""
Create a new task for a user.
Args:
user_id: ID of the user
task_data: Task creation data
Returns:
Created task
"""
now = datetime.utcnow()
task = Task(
user_id=user_id,
title=task_data.title,
description=task_data.description,
completed=False,
created_at=now,
updated_at=now
)
self.db.add(task)
self.db.commit()
self.db.refresh(task)
return task
def get_task(self, task_id: int, user_id: int) -> Optional[Task]:
"""
Get a single task by ID for a specific user.
Args:
task_id: ID of the task
user_id: ID of the user
Returns:
Task if found and belongs to user, None otherwise
"""
statement = select(Task).where(Task.id == task_id, Task.user_id == user_id)
task = self.db.exec(statement).first()
return task
def update_task(self, task_id: int, user_id: int, task_data: TaskUpdate) -> Optional[Task]:
"""
Update a task (PUT - replaces all fields).
Args:
task_id: ID of the task
user_id: ID of the user
task_data: Task update data
Returns:
Updated task if found and belongs to user, None otherwise
"""
task = self.get_task(task_id, user_id)
if not task:
return None
task.title = task_data.title
task.description = task_data.description
task.completed = task_data.completed
task.updated_at = datetime.utcnow()
self.db.add(task)
self.db.commit()
self.db.refresh(task)
return task
def patch_task(self, task_id: int, user_id: int, task_data: TaskPatch) -> Optional[Task]:
"""
Partially update a task (PATCH - updates only provided fields).
Args:
task_id: ID of the task
user_id: ID of the user
task_data: Task patch data
Returns:
Updated task if found and belongs to user, None otherwise
"""
task = self.get_task(task_id, user_id)
if not task:
return None
# Update only provided fields
if task_data.title is not None:
task.title = task_data.title
if task_data.description is not None:
task.description = task_data.description
if task_data.completed is not None:
task.completed = task_data.completed
task.updated_at = datetime.utcnow()
self.db.add(task)
self.db.commit()
self.db.refresh(task)
return task
def delete_task(self, task_id: int, user_id: int) -> bool:
"""
Delete a task.
Args:
task_id: ID of the task
user_id: ID of the user
Returns:
True if task was deleted, False if not found or doesn't belong to user
"""
task = self.get_task(task_id, user_id)
if not task:
return False
self.db.delete(task)
self.db.commit()
return True