todoappapi / tasks.py
GrowWithTalha's picture
feat: sync backend changes from SDDRI-Hackathon-2
84c328d
"""Task CRUD API endpoints with JWT authentication.
[Task]: T053-T059, T043, T065-T067
[From]: specs/001-user-auth/tasks.md (User Story 3), specs/007-intermediate-todo-features/tasks.md (User Story 4)
Implements all task management operations with JWT-based authentication:
- Create task with validation
- List tasks with filtering (status, priority, tags, due_date) [T043]
- Get task by ID
- Update task with validation
- Delete task
- Toggle completion status
- Search tasks (User Story 3)
- List tags
All endpoints require valid JWT token. user_id is extracted from JWT claims.
"""
import uuid
from datetime import datetime, timedelta
from typing import Annotated, List, Optional
from zoneinfo import ZoneInfo
from fastapi import APIRouter, HTTPException, Query
from sqlmodel import Session, select
from pydantic import BaseModel
from sqlalchemy import func, and_, any_
from core.deps import SessionDep, CurrentUserDep
from models.task import Task, TaskCreate, TaskUpdate, TaskRead
# Create API router (user_id removed - now from JWT)
router = APIRouter(prefix="/api/tasks", tags=["tasks"])
# Response models
class TaskListResponse(BaseModel):
"""Response model for task list with pagination."""
tasks: list[TaskRead]
total: int
offset: int
limit: int
class TagInfo(BaseModel):
"""Tag information with usage count."""
name: str
count: int
class TagsListResponse(BaseModel):
"""Response model for tags list."""
tags: list[TagInfo]
class TaskSearchResponse(BaseModel):
"""Response model for task search results."""
tasks: list[TaskRead]
total: int
page: int
limit: int
query: str
# Routes - IMPORTANT: Static routes MUST come before dynamic path parameters
# This ensures /tags and /search are matched before /{task_id}
@router.post("", response_model=TaskRead, status_code=201)
def create_task(
task: TaskCreate,
session: SessionDep,
user_id: CurrentUserDep
):
"""Create a new task for the authenticated user."""
# Convert priority string to PriorityLevel enum (handles both upper/lowercase input)
priority_enum = PriorityLevel(task.priority.upper()) if isinstance(task.priority, str) else task.priority
db_task = Task(
user_id=user_id,
title=task.title,
description=task.description,
priority=priority_enum,
tags=task.tags,
due_date=task.due_date,
completed=task.completed,
reminder_offset=task.reminder_offset, # [T043] Add reminder_offset support
reminder_sent=False # Initialize reminder_sent to False
)
session.add(db_task)
session.commit()
session.refresh(db_task)
return db_task
@router.get("", response_model=TaskListResponse)
def list_tasks(
session: SessionDep,
user_id: CurrentUserDep,
offset: int = 0,
limit: Annotated[int, Query(le=100)] = 50,
completed: bool | None = None,
priority: str | None = None,
tags: Annotated[List[str] | None, Query()] = None,
due_date: str | None = None,
due_before: str | None = None, # [T028] Add due_before filter
due_after: str | None = None, # [T028] Add due_after filter
timezone: str = "UTC",
sort_by: str | None = None,
sort_order: str = "asc",
):
"""List all tasks for the authenticated user with pagination and filtering."""
count_statement = select(func.count(Task.id)).where(Task.user_id == user_id)
statement = select(Task).where(Task.user_id == user_id)
if completed is not None:
count_statement = count_statement.where(Task.completed == completed)
statement = statement.where(Task.completed == completed)
if priority is not None:
count_statement = count_statement.where(Task.priority == priority)
statement = statement.where(Task.priority == priority)
if tags and len(tags) > 0:
for tag in tags:
# Use PostgreSQL ANY operator: tag = ANY(tags)
count_statement = count_statement.where(tag == any_(Task.tags))
statement = statement.where(tag == any_(Task.tags))
# [T028] Add due_before and due_after filters
if due_before:
try:
due_before_dt = datetime.fromisoformat(due_before)
count_statement = count_statement.where(Task.due_date <= due_before_dt)
statement = statement.where(Task.due_date <= due_before_dt)
except ValueError:
pass # Invalid date format, ignore filter
if due_after:
try:
due_after_dt = datetime.fromisoformat(due_after)
count_statement = count_statement.where(Task.due_date >= due_after_dt)
statement = statement.where(Task.due_date >= due_after_dt)
except ValueError:
pass # Invalid date format, ignore filter
if due_date:
try:
user_tz = ZoneInfo(timezone)
now_utc = datetime.now(ZoneInfo("UTC"))
now_user = now_utc.astimezone(user_tz)
today_start = now_user.replace(hour=0, minute=0, second=0, microsecond=0)
today_end = today_start + timedelta(days=1)
if due_date == "overdue":
today_start_utc = today_start.astimezone(ZoneInfo("UTC"))
count_statement = count_statement.where(
and_(Task.due_date < today_start_utc, Task.completed == False)
)
statement = statement.where(
and_(Task.due_date < today_start_utc, Task.completed == False)
)
elif due_date == "today":
today_start_utc = today_start.astimezone(ZoneInfo("UTC"))
today_end_utc = today_end.astimezone(ZoneInfo("UTC"))
count_statement = count_statement.where(
and_(Task.due_date >= today_start_utc, Task.due_date < today_end_utc)
)
statement = statement.where(
and_(Task.due_date >= today_start_utc, Task.due_date < today_end_utc)
)
elif due_date == "week":
week_end_utc = (today_start + timedelta(days=7)).astimezone(ZoneInfo("UTC"))
today_start_utc = today_start.astimezone(ZoneInfo("UTC"))
count_statement = count_statement.where(
and_(Task.due_date >= today_start_utc, Task.due_date < week_end_utc)
)
statement = statement.where(
and_(Task.due_date >= today_start_utc, Task.due_date < week_end_utc)
)
elif due_date == "month":
month_end_utc = (today_start + timedelta(days=30)).astimezone(ZoneInfo("UTC"))
today_start_utc = today_start.astimezone(ZoneInfo("UTC"))
count_statement = count_statement.where(
and_(Task.due_date >= today_start_utc, Task.due_date < month_end_utc)
)
statement = statement.where(
and_(Task.due_date >= today_start_utc, Task.due_date < month_end_utc)
)
except Exception:
pass
total = session.exec(count_statement).one()
if sort_by == "due_date":
if sort_order == "asc":
statement = statement.order_by(Task.due_date.asc().nulls_last())
else:
statement = statement.order_by(Task.due_date.desc().nulls_last())
elif sort_by == "priority":
from sqlalchemy import case
priority_case = case(
*[(Task.priority == k, i) for i, k in enumerate(["high", "medium", "low"])],
else_=3
)
if sort_order == "asc":
statement = statement.order_by(priority_case.asc())
else:
statement = statement.order_by(priority_case.desc())
elif sort_by == "title":
if sort_order == "asc":
statement = statement.order_by(Task.title.asc())
else:
statement = statement.order_by(Task.title.desc())
else:
if sort_order == "asc":
statement = statement.order_by(Task.created_at.asc())
else:
statement = statement.order_by(Task.created_at.desc())
statement = statement.offset(offset).limit(limit)
tasks = session.exec(statement).all()
return TaskListResponse(
tasks=[TaskRead.model_validate(task) for task in tasks],
total=total,
offset=offset,
limit=limit
)
@router.get("/tags", response_model=TagsListResponse)
def list_tags(
session: SessionDep,
user_id: CurrentUserDep
):
"""Get all unique tags for the authenticated user with usage counts."""
from sqlalchemy import text
query = text("""
SELECT unnest(tags) as tag, COUNT(*) as count
FROM tasks
WHERE user_id = :user_id
AND tags != '{}'
GROUP BY tag
ORDER BY count DESC, tag ASC
""")
result = session.exec(query.params(user_id=str(user_id)))
tags = [TagInfo(name=row[0], count=row[1]) for row in result]
return TagsListResponse(tags=tags)
@router.get("/search", response_model=TaskSearchResponse)
def search_tasks(
session: SessionDep,
user_id: CurrentUserDep,
q: Annotated[str, Query(min_length=1, max_length=200)] = "",
page: int = 1,
limit: Annotated[int, Query(le=100)] = 20,
):
"""Search tasks by keyword in title and description."""
if not q:
raise HTTPException(status_code=400, detail="Search query parameter 'q' is required")
search_pattern = f"%{q}%"
count_statement = select(func.count(Task.id)).where(
(Task.user_id == user_id) &
(Task.title.ilike(search_pattern) | Task.description.ilike(search_pattern))
)
total = session.exec(count_statement).one()
offset = (page - 1) * limit
statement = select(Task).where(
(Task.user_id == user_id) &
(Task.title.ilike(search_pattern) | Task.description.ilike(search_pattern))
)
statement = statement.offset(offset).limit(limit)
statement = statement.order_by(Task.created_at.desc())
tasks = session.exec(statement).all()
return TaskSearchResponse(
tasks=[TaskRead.model_validate(task) for task in tasks],
total=total,
page=page,
limit=limit,
query=q
)
@router.get("/{task_id}", response_model=TaskRead)
def get_task(
task_id: uuid.UUID,
session: SessionDep,
user_id: CurrentUserDep
):
"""Get a specific task by ID."""
task = session.get(Task, task_id)
if not task or task.user_id != user_id:
raise HTTPException(status_code=404, detail="Task not found")
return task
@router.put("/{task_id}", response_model=TaskRead)
def update_task(
task_id: uuid.UUID,
task_update: TaskUpdate,
session: SessionDep,
user_id: CurrentUserDep
):
"""Update an existing task."""
task = session.get(Task, task_id)
if not task or task.user_id != user_id:
raise HTTPException(status_code=404, detail="Task not found")
task_data = task_update.model_dump(exclude_unset=True)
for key, value in task_data.items():
# Convert priority string to PriorityLevel enum
if key == "priority" and isinstance(value, str):
value = PriorityLevel(value.upper())
setattr(task, key, value)
task.updated_at = datetime.utcnow()
session.add(task)
session.commit()
session.refresh(task)
return task
@router.delete("/{task_id}")
def delete_task(
task_id: uuid.UUID,
session: SessionDep,
user_id: CurrentUserDep
):
"""Delete a task."""
task = session.get(Task, task_id)
if not task or task.user_id != user_id:
raise HTTPException(status_code=404, detail="Task not found")
session.delete(task)
session.commit()
return {"ok": True}
@router.patch("/{task_id}/complete", response_model=TaskRead)
def toggle_complete(
task_id: uuid.UUID,
session: SessionDep,
user_id: CurrentUserDep
):
"""Toggle task completion status and create next instance for recurring tasks.
[Task]: T062-T065
[From]: specs/008-advanced-features/tasks.md (User Story 3)
When completing a recurring task:
- T063: Checks if recurrence limit (count/end_date) is reached
- T064: Handles count limit
- T065: Handles end_date limit
- Creates next instance if limits not reached
"""
task = session.get(Task, task_id)
if not task or task.user_id != user_id:
raise HTTPException(status_code=404, detail="Task not found")
is_completing = not task.completed
task.completed = is_completing
task.updated_at = datetime.utcnow()
session.add(task)
session.commit()
# [T062] Create next instance for recurring tasks when completing
if is_completing and task.recurrence and task.due_date:
from services.recurrence_service import RecurrenceService
recurrence_service = RecurrenceService()
# Parse recurrence rule
recurrence_dict = task.recurrence if isinstance(task.recurrence, dict) else {"frequency": task.recurrence}
if not isinstance(recurrence_dict, dict):
recurrence_dict = {"frequency": str(task.recurrence)}
# [T063] Check if we should create the next instance
should_create_next = True
current_count = 0
# Count existing instances (tasks with same parent_task_id)
if task.parent_task_id:
# This is already an instance, count siblings
count_statement = select(func.count(Task.id)).where(
Task.parent_task_id == task.parent_task_id
)
current_count = session.exec(count_statement).one() + 1 # +1 for parent
else:
# This is the parent task, count its instances
count_statement = select(func.count(Task.id)).where(
Task.parent_task_id == task_id
)
current_count = session.exec(count_statement).one() + 1 # +1 for this task
# [T064] Handle count limit
max_count = recurrence_dict.get("count")
if max_count is not None:
if current_count >= max_count:
should_create_next = False
# [T065] Handle end_date limit
end_date_str = recurrence_dict.get("end_date")
if end_date_str and should_create_next:
try:
if isinstance(end_date_str, str):
end_date = datetime.fromisoformat(end_date_str.replace('Z', '+00:00'))
else:
end_date = end_date_str
# Calculate next occurrence date
base_date = datetime.fromisoformat(task.due_date.replace('Z', '+00:00')) if isinstance(task.due_date, str) else task.due_date
next_due_date = recurrence_service.calculate_next_occurrence(base_date, recurrence_dict)
if next_due_date and next_due_date > end_date:
should_create_next = False
except Exception:
pass # Invalid date format, skip check
# Create next instance if limits not reached
if should_create_next:
base_date = datetime.fromisoformat(task.due_date.replace('Z', '+00:00')) if isinstance(task.due_date, str) else task.due_date
next_due_date = recurrence_service.calculate_next_occurrence(base_date, recurrence_dict)
if next_due_date:
# Create next instance
next_task = Task(
user_id=user_id,
title=task.title,
description=task.description,
priority=task.priority,
tags=task.tags,
due_date=next_due_date.isoformat(),
completed=False,
reminder_offset=task.reminder_offset,
reminder_sent=False,
recurrence=task.recurrence,
parent_task_id=task.parent_task_id if task.parent_task_id else task.id,
)
session.add(next_task)
session.commit()
session.refresh(task)
return task
@router.patch("/{task_id}/tags")
def update_task_tags(
task_id: uuid.UUID,
session: SessionDep,
user_id: CurrentUserDep,
tags_add: Optional[List[str]] = None,
tags_remove: Optional[List[str]] = None,
):
"""Add or remove tags from a task."""
from services.nlp_service import normalize_tag_name
if tags_add is None and tags_remove is None:
raise HTTPException(
status_code=400,
detail="Either 'tags_add' or 'tags_remove' must be provided"
)
if not tags_add and not tags_remove:
raise HTTPException(
status_code=400,
detail="Either 'tags_add' or 'tags_remove' must contain at least one tag"
)
task = session.get(Task, task_id)
if not task or task.user_id != user_id:
raise HTTPException(status_code=404, detail="Task not found")
current_tags = set(task.tags or [])
if tags_add:
normalized_add = [normalize_tag_name(tag) for tag in tags_add]
current_tags.update(normalized_add)
if tags_remove:
normalized_remove = [normalize_tag_name(tag).lower() for tag in tags_remove]
current_tags = {
tag for tag in current_tags
if tag.lower() not in normalized_remove
}
task.tags = sorted(list(current_tags))
task.updated_at = datetime.utcnow()
session.add(task)
session.commit()
session.refresh(task)
return task
@router.patch("/{task_id}/reminder", response_model=TaskRead)
def update_reminder(
task_id: uuid.UUID,
session: SessionDep,
user_id: CurrentUserDep,
reminder_offset: int | None = None,
reset_sent: bool = False
):
"""Update reminder settings for a task.
[Task]: T045
[From]: specs/008-advanced-features/tasks.md (User Story 2)
Allows updating the reminder_offset and optionally resetting the reminder_sent flag.
"""
task = session.get(Task, task_id)
if not task or task.user_id != user_id:
raise HTTPException(status_code=404, detail="Task not found")
# Update reminder_offset if provided
if reminder_offset is not None:
task.reminder_offset = reminder_offset
# Reset reminder_sent flag if requested (e.g., when changing due date)
if reset_sent:
task.reminder_sent = False
task.updated_at = datetime.utcnow()
session.add(task)
session.commit()
session.refresh(task)
return task