Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from sqlmodel import Session, select | |
| from typing import List | |
| from datetime import datetime | |
| from ..api.dependencies import get_session, CurrentUserDep | |
| from ..models.task import Task, TaskCreate, TaskUpdate, TaskRead | |
| router = APIRouter(prefix="/api/tasks", tags=["tasks"]) | |
| def list_tasks( | |
| current_user: CurrentUserDep, | |
| session: Session = Depends(get_session) | |
| ): | |
| statement = select(Task).where(Task.user_id == current_user.id).order_by(Task.completed.asc(), Task.created_at.desc()) | |
| tasks = session.execute(statement).scalars().all() | |
| return tasks | |
| def create_task( | |
| current_user: CurrentUserDep, | |
| task_in: TaskCreate, | |
| session: Session = Depends(get_session) | |
| ): | |
| db_task = Task.model_validate(task_in, update={"user_id": current_user.id}) | |
| session.add(db_task) | |
| session.commit() | |
| session.refresh(db_task) | |
| return db_task | |
| def read_task( | |
| current_user: CurrentUserDep, | |
| task_id: int, | |
| session: Session = Depends(get_session) | |
| ): | |
| task = session.get(Task, task_id) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| if task.user_id != current_user.id: | |
| raise HTTPException(status_code=403, detail="Not enough permissions") | |
| return task | |
| def update_task( | |
| current_user: CurrentUserDep, | |
| task_id: int, | |
| task_in: TaskUpdate, | |
| session: Session = Depends(get_session) | |
| ): | |
| task = session.get(Task, task_id) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| if task.user_id != current_user.id: | |
| raise HTTPException(status_code=403, detail="Not enough permissions") | |
| update_data = task_in.model_dump(exclude_unset=True) | |
| for key, value in update_data.items(): | |
| setattr(task, key, value) | |
| task.updated_at = datetime.utcnow() | |
| session.add(task) | |
| session.commit() | |
| session.refresh(task) | |
| return task | |
| def toggle_task( | |
| current_user: CurrentUserDep, | |
| task_id: int, | |
| session: Session = Depends(get_session) | |
| ): | |
| task = session.get(Task, task_id) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| if task.user_id != current_user.id: | |
| raise HTTPException(status_code=403, detail="Not enough permissions") | |
| task.completed = not task.completed | |
| task.completed_at = datetime.utcnow() if task.completed else None | |
| task.updated_at = datetime.utcnow() | |
| session.add(task) | |
| session.commit() | |
| session.refresh(task) | |
| return task | |
| def delete_task( | |
| current_user: CurrentUserDep, | |
| task_id: int, | |
| session: Session = Depends(get_session) | |
| ): | |
| task = session.get(Task, task_id) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| if task.user_id != current_user.id: | |
| raise HTTPException(status_code=403, detail="Not enough permissions") | |
| session.delete(task) | |
| session.commit() | |
| return {"message": "Task deleted successfully"} | |