Taskflow / src /routers /tasks.py
NirmaQureshi's picture
code
e650b33
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from uuid import UUID
from datetime import datetime
from src.schemas.task import TaskResponse, TaskListResponse, TaskCreateRequest, TaskPatchRequest, TaskUpdateRequest
from src.models.task import Task
from src.models.user import User
from src.utils.deps import get_db, get_current_user
from src.utils.security import verify_token
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
router = APIRouter(tags=["Tasks"])
security = HTTPBearer()
@router.get("/{user_id}/tasks", response_model=TaskListResponse)
def get_user_tasks(
user_id: UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if user_id != current_user.id:
raise HTTPException(status_code=404, detail="Task not found")
tasks = (
db.query(Task)
.filter(Task.user_id == current_user.id)
.order_by(Task.created_at.desc())
.all()
)
return TaskListResponse(
tasks=tasks,
total=len(tasks),
)
@router.post("/{user_id}/tasks", response_model=TaskResponse, status_code=201)
def create_task(
user_id: UUID,
task_data: TaskCreateRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if user_id != current_user.id:
raise HTTPException(status_code=404, detail="User not found")
task = Task(
title=task_data.title,
description=task_data.description,
completed=task_data.completed,
user_id=current_user.id,
)
db.add(task)
db.commit()
db.refresh(task)
return task
@router.patch("/{user_id}/tasks/{task_id}", response_model=TaskResponse)
def update_task_partial(
user_id: UUID,
task_id: int,
task_update: TaskPatchRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if user_id != current_user.id:
raise HTTPException(status_code=404, detail="Task not found")
task = (
db.query(Task)
.filter(Task.id == task_id, Task.user_id == current_user.id)
.first()
)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if task_update.completed is not None:
task.completed = task_update.completed
db.commit()
db.refresh(task)
return task
@router.put("/{user_id}/tasks/{task_id}", response_model=TaskResponse)
def update_task_full(
user_id: UUID,
task_id: int,
task_update: TaskUpdateRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if user_id != current_user.id:
raise HTTPException(status_code=404, detail="Task not found")
task = (
db.query(Task)
.filter(Task.id == task_id, Task.user_id == current_user.id)
.first()
)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
task.title = task_update.title
task.description = task_update.description
task.completed = task_update.completed
db.commit()
db.refresh(task)
return task
@router.delete("/{user_id}/tasks/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_task(
user_id: UUID,
task_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if user_id != current_user.id:
raise HTTPException(status_code=404, detail="Task not found")
task = (
db.query(Task)
.filter(Task.id == task_id, Task.user_id == current_user.id)
.first()
)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
db.delete(task)
db.commit()