Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from sqlalchemy.orm import Session | |
| from typing import List | |
| from uuid import UUID | |
| from datetime import datetime | |
| from src.schemas.task import TaskResponse, TaskListResponse, TaskCreateRequest, TaskPatchRequest, TaskUpdateRequest | |
| from src.models.task import Task | |
| from src.models.user import User | |
| from src.utils.deps import get_db, get_current_user | |
| from src.utils.security import verify_token | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| router = APIRouter(tags=["Tasks"]) | |
| security = HTTPBearer() | |
| def get_user_tasks( | |
| user_id: UUID, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| if user_id != current_user.id: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| tasks = ( | |
| db.query(Task) | |
| .filter(Task.user_id == current_user.id) | |
| .order_by(Task.created_at.desc()) | |
| .all() | |
| ) | |
| return TaskListResponse( | |
| tasks=tasks, | |
| total=len(tasks), | |
| ) | |
| def create_task( | |
| user_id: UUID, | |
| task_data: TaskCreateRequest, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| if user_id != current_user.id: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| task = Task( | |
| title=task_data.title, | |
| description=task_data.description, | |
| completed=task_data.completed, | |
| user_id=current_user.id, | |
| ) | |
| db.add(task) | |
| db.commit() | |
| db.refresh(task) | |
| return task | |
| def update_task_partial( | |
| user_id: UUID, | |
| task_id: int, | |
| task_update: TaskPatchRequest, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| if user_id != current_user.id: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| task = ( | |
| db.query(Task) | |
| .filter(Task.id == task_id, Task.user_id == current_user.id) | |
| .first() | |
| ) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| if task_update.completed is not None: | |
| task.completed = task_update.completed | |
| db.commit() | |
| db.refresh(task) | |
| return task | |
| def update_task_full( | |
| user_id: UUID, | |
| task_id: int, | |
| task_update: TaskUpdateRequest, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| if user_id != current_user.id: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| task = ( | |
| db.query(Task) | |
| .filter(Task.id == task_id, Task.user_id == current_user.id) | |
| .first() | |
| ) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| task.title = task_update.title | |
| task.description = task_update.description | |
| task.completed = task_update.completed | |
| db.commit() | |
| db.refresh(task) | |
| return task | |
| def delete_task( | |
| user_id: UUID, | |
| task_id: int, | |
| db: Session = Depends(get_db), | |
| current_user: User = Depends(get_current_user), | |
| ): | |
| if user_id != current_user.id: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| task = ( | |
| db.query(Task) | |
| .filter(Task.id == task_id, Task.user_id == current_user.id) | |
| .first() | |
| ) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| db.delete(task) | |
| db.commit() | |