|
|
""" |
|
|
Tasks API - Phase 5 |
|
|
|
|
|
CRUD operations for tasks with Dapr event publishing. |
|
|
All state changes are published to Kafka for microservices to consume. |
|
|
""" |
|
|
|
|
|
from datetime import datetime, timezone |
|
|
from typing import List, Optional |
|
|
from fastapi import APIRouter, HTTPException, status, Depends |
|
|
from sqlalchemy.orm import Session |
|
|
from pydantic import BaseModel |
|
|
|
|
|
from src.models.base import get_db |
|
|
from src.models.task import Task as TaskModel |
|
|
from src.orchestrator.event_publisher import EventPublisher |
|
|
from src.utils.logging import get_logger |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
router = APIRouter(prefix="/api/tasks", tags=["tasks"]) |
|
|
event_publisher = EventPublisher() |
|
|
|
|
|
|
|
|
|
|
|
class TaskCreate(BaseModel): |
|
|
title: str |
|
|
description: Optional[str] = None |
|
|
due_date: Optional[str] = None |
|
|
priority: Optional[str] = "medium" |
|
|
tags: Optional[List[str]] = [] |
|
|
reminder_config: Optional[dict] = None |
|
|
recurrence_rule: Optional[dict] = None |
|
|
|
|
|
|
|
|
class TaskUpdate(BaseModel): |
|
|
title: Optional[str] = None |
|
|
description: Optional[str] = None |
|
|
due_date: Optional[str] = None |
|
|
priority: Optional[str] = None |
|
|
tags: Optional[List[str]] = None |
|
|
status: Optional[str] = None |
|
|
reminder_config: Optional[dict] = None |
|
|
recurrence_rule: Optional[dict] = None |
|
|
|
|
|
|
|
|
class TaskResponse(BaseModel): |
|
|
id: str |
|
|
user_id: str |
|
|
title: str |
|
|
description: Optional[str] |
|
|
due_date: Optional[str] |
|
|
priority: str |
|
|
tags: List[str] |
|
|
status: str |
|
|
reminder_config: Optional[dict] |
|
|
recurrence_rule: Optional[dict] |
|
|
created_at: str |
|
|
updated_at: str |
|
|
|
|
|
|
|
|
@router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED) |
|
|
async def create_task( |
|
|
task_data: TaskCreate, |
|
|
user_id: str, |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
""" |
|
|
Create a new task and publish task.created event. |
|
|
|
|
|
Args: |
|
|
task_data: Task creation data |
|
|
user_id: User ID from authentication |
|
|
db: Database session |
|
|
|
|
|
Returns: |
|
|
Created task |
|
|
""" |
|
|
try: |
|
|
|
|
|
task = TaskModel( |
|
|
title=task_data.title, |
|
|
description=task_data.description, |
|
|
due_date=task_data.due_date, |
|
|
priority=task_data.priority or "medium", |
|
|
tags=task_data.tags or [], |
|
|
reminder_config=task_data.reminder_config, |
|
|
recurrence_rule=task_data.recurrence_rule, |
|
|
user_id=user_id, |
|
|
status="active" |
|
|
) |
|
|
|
|
|
db.add(task) |
|
|
db.commit() |
|
|
db.refresh(task) |
|
|
|
|
|
logger.info( |
|
|
"task_created", |
|
|
task_id=str(task.id), |
|
|
user_id=user_id, |
|
|
title=task.title |
|
|
) |
|
|
|
|
|
|
|
|
await event_publisher.publish_task_event( |
|
|
"task.created", |
|
|
str(task.id), |
|
|
task.to_dict() |
|
|
) |
|
|
|
|
|
await event_publisher.publish_task_update( |
|
|
str(task.id), |
|
|
"created", |
|
|
task.to_dict() |
|
|
) |
|
|
|
|
|
await event_publisher.publish_audit_event( |
|
|
"Task", |
|
|
str(task.id), |
|
|
"CREATE", |
|
|
"user", |
|
|
user_id, |
|
|
new_values=task.to_dict() |
|
|
) |
|
|
|
|
|
return TaskResponse(**task.to_dict()) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("create_task_failed", error=str(e)) |
|
|
db.rollback() |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Failed to create task: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
@router.get("/", response_model=List[TaskResponse]) |
|
|
async def list_tasks( |
|
|
user_id: str, |
|
|
status: Optional[str] = None, |
|
|
priority: Optional[str] = None, |
|
|
limit: int = 50, |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
""" |
|
|
List tasks for a user with optional filters. |
|
|
|
|
|
Args: |
|
|
user_id: User ID from authentication |
|
|
status: Filter by status (active, completed, deleted) |
|
|
priority: Filter by priority (low, medium, high) |
|
|
limit: Maximum number of tasks to return |
|
|
db: Database session |
|
|
|
|
|
Returns: |
|
|
List of tasks |
|
|
""" |
|
|
try: |
|
|
query = db.query(TaskModel).filter(TaskModel.user_id == user_id) |
|
|
|
|
|
|
|
|
if status: |
|
|
query = query.filter(TaskModel.status == status) |
|
|
|
|
|
if priority: |
|
|
query = query.filter(TaskModel.priority == priority) |
|
|
|
|
|
|
|
|
if status != "deleted": |
|
|
query = query.filter(TaskModel.status != "deleted") |
|
|
|
|
|
|
|
|
query = query.order_by( |
|
|
TaskModel.due_date.asc().nulls_last(), |
|
|
TaskModel.created_at.desc() |
|
|
) |
|
|
|
|
|
tasks = query.limit(limit).all() |
|
|
|
|
|
logger.info( |
|
|
"tasks_listed", |
|
|
user_id=user_id, |
|
|
count=len(tasks), |
|
|
status=status, |
|
|
priority=priority |
|
|
) |
|
|
|
|
|
return [TaskResponse(**task.to_dict()) for task in tasks] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("list_tasks_failed", error=str(e)) |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Failed to list tasks: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
@router.get("/{task_id}", response_model=TaskResponse) |
|
|
async def get_task( |
|
|
task_id: str, |
|
|
user_id: str, |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
""" |
|
|
Get a specific task by ID. |
|
|
|
|
|
Args: |
|
|
task_id: Task ID |
|
|
user_id: User ID from authentication |
|
|
db: Database session |
|
|
|
|
|
Returns: |
|
|
Task details |
|
|
""" |
|
|
task = db.query(TaskModel).filter( |
|
|
TaskModel.id == task_id, |
|
|
TaskModel.user_id == user_id |
|
|
).first() |
|
|
|
|
|
if not task: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_404_NOT_FOUND, |
|
|
detail=f"Task {task_id} not found" |
|
|
) |
|
|
|
|
|
return TaskResponse(**task.to_dict()) |
|
|
|
|
|
|
|
|
@router.patch("/{task_id}", response_model=TaskResponse) |
|
|
async def update_task( |
|
|
task_id: str, |
|
|
updates: TaskUpdate, |
|
|
user_id: str, |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
""" |
|
|
Update a task and publish task.updated event. |
|
|
|
|
|
Args: |
|
|
task_id: Task ID |
|
|
updates: Fields to update |
|
|
user_id: User ID from authentication |
|
|
db: Database session |
|
|
|
|
|
Returns: |
|
|
Updated task |
|
|
""" |
|
|
task = db.query(TaskModel).filter( |
|
|
TaskModel.id == task_id, |
|
|
TaskModel.user_id == user_id |
|
|
).first() |
|
|
|
|
|
if not task: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_404_NOT_FOUND, |
|
|
detail=f"Task {task_id} not found" |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
old_values = task.to_dict() |
|
|
|
|
|
|
|
|
update_data = updates.dict(exclude_unset=True) |
|
|
for field, value in update_data.items(): |
|
|
setattr(task, field, value) |
|
|
|
|
|
db.commit() |
|
|
db.refresh(task) |
|
|
|
|
|
logger.info( |
|
|
"task_updated", |
|
|
task_id=str(task.id), |
|
|
user_id=user_id, |
|
|
updated_fields=list(update_data.keys()) |
|
|
) |
|
|
|
|
|
|
|
|
await event_publisher.publish_task_event( |
|
|
"task.updated", |
|
|
str(task.id), |
|
|
{ |
|
|
"old_values": old_values, |
|
|
"new_values": task.to_dict(), |
|
|
"updated_fields": list(update_data.keys()) |
|
|
} |
|
|
) |
|
|
|
|
|
await event_publisher.publish_task_update( |
|
|
str(task.id), |
|
|
"updated", |
|
|
task.to_dict() |
|
|
) |
|
|
|
|
|
await event_publisher.publish_audit_event( |
|
|
"Task", |
|
|
str(task.id), |
|
|
"UPDATE", |
|
|
"user", |
|
|
user_id, |
|
|
old_values=old_values, |
|
|
new_values=task.to_dict() |
|
|
) |
|
|
|
|
|
return TaskResponse(**task.to_dict()) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("update_task_failed", task_id=task_id, error=str(e)) |
|
|
db.rollback() |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Failed to update task: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
@router.post("/{task_id}/complete", response_model=TaskResponse) |
|
|
async def complete_task( |
|
|
task_id: str, |
|
|
user_id: str, |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
""" |
|
|
Mark a task as complete and publish task.completed event. |
|
|
|
|
|
This event triggers the recurring task service to generate next instance. |
|
|
|
|
|
Args: |
|
|
task_id: Task ID |
|
|
user_id: User ID from authentication |
|
|
db: Database session |
|
|
|
|
|
Returns: |
|
|
Completed task |
|
|
""" |
|
|
task = db.query(TaskModel).filter( |
|
|
TaskModel.id == task_id, |
|
|
TaskModel.user_id == user_id |
|
|
).first() |
|
|
|
|
|
if not task: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_404_NOT_FOUND, |
|
|
detail=f"Task {task_id} not found" |
|
|
) |
|
|
|
|
|
if task.status == "completed": |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
|
detail=f"Task {task_id} is already completed" |
|
|
) |
|
|
|
|
|
try: |
|
|
old_values = task.to_dict() |
|
|
|
|
|
|
|
|
task.status = "completed" |
|
|
task.completed_at = datetime.now(timezone.utc) |
|
|
|
|
|
db.commit() |
|
|
db.refresh(task) |
|
|
|
|
|
logger.info( |
|
|
"task_completed", |
|
|
task_id=str(task.id), |
|
|
user_id=user_id |
|
|
) |
|
|
|
|
|
|
|
|
await event_publisher.publish_task_event( |
|
|
"task.completed", |
|
|
str(task.id), |
|
|
{ |
|
|
"old_values": old_values, |
|
|
"new_values": task.to_dict(), |
|
|
"completed_at": task.completed_at.isoformat() |
|
|
} |
|
|
) |
|
|
|
|
|
await event_publisher.publish_task_update( |
|
|
str(task.id), |
|
|
"completed", |
|
|
task.to_dict() |
|
|
) |
|
|
|
|
|
await event_publisher.publish_audit_event( |
|
|
"Task", |
|
|
str(task.id), |
|
|
"COMPLETE", |
|
|
"user", |
|
|
user_id, |
|
|
old_values=old_values, |
|
|
new_values=task.to_dict() |
|
|
) |
|
|
|
|
|
return TaskResponse(**task.to_dict()) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("complete_task_failed", task_id=task_id, error=str(e)) |
|
|
db.rollback() |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Failed to complete task: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
@router.delete("/{task_id}") |
|
|
async def delete_task( |
|
|
task_id: str, |
|
|
user_id: str, |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
""" |
|
|
Soft delete a task and publish task.deleted event. |
|
|
|
|
|
Args: |
|
|
task_id: Task ID |
|
|
user_id: User ID from authentication |
|
|
db: Database session |
|
|
|
|
|
Returns: |
|
|
Deletion confirmation |
|
|
""" |
|
|
task = db.query(TaskModel).filter( |
|
|
TaskModel.id == task_id, |
|
|
TaskModel.user_id == user_id |
|
|
).first() |
|
|
|
|
|
if not task: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_404_NOT_FOUND, |
|
|
detail=f"Task {task_id} not found" |
|
|
) |
|
|
|
|
|
try: |
|
|
old_values = task.to_dict() |
|
|
|
|
|
|
|
|
task.status = "deleted" |
|
|
|
|
|
db.commit() |
|
|
|
|
|
logger.info( |
|
|
"task_deleted", |
|
|
task_id=str(task.id), |
|
|
user_id=user_id |
|
|
) |
|
|
|
|
|
|
|
|
await event_publisher.publish_task_event( |
|
|
"task.deleted", |
|
|
str(task.id), |
|
|
{ |
|
|
"old_values": old_values |
|
|
} |
|
|
) |
|
|
|
|
|
await event_publisher.publish_task_update( |
|
|
str(task.id), |
|
|
"deleted", |
|
|
task.to_dict() |
|
|
) |
|
|
|
|
|
await event_publisher.publish_audit_event( |
|
|
"Task", |
|
|
str(task.id), |
|
|
"DELETE", |
|
|
"user", |
|
|
user_id, |
|
|
old_values=old_values |
|
|
) |
|
|
|
|
|
return { |
|
|
"status": "deleted", |
|
|
"task_id": str(task.id), |
|
|
"message": "Task deleted successfully" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error("delete_task_failed", task_id=task_id, error=str(e)) |
|
|
db.rollback() |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Failed to delete task: {str(e)}" |
|
|
) |
|
|
|