kamau1's picture
Fix runtime error: remove await from sync function, update AuditService calls to new API
5d3b68c
"""
Tasks API Endpoints - Infrastructure project task management
"""
from fastapi import APIRouter, Depends, HTTPException, status, Query, Request, BackgroundTasks
from sqlalchemy.orm import Session
from typing import Optional, List
from uuid import UUID
from datetime import date
import math
import logging
from app.api.deps import get_db, get_current_active_user
from app.models.user import User
from app.models.task import Task
from app.models.enums import TaskStatus, TicketPriority
from app.schemas.task import (
TaskCreate, TaskUpdate, TaskResponse, TaskListResponse,
TaskStatusUpdate, TaskStart, TaskComplete, TaskCancel
)
from app.schemas.filters import TaskFilters
from app.services.task_service import TaskService
from app.services.audit_service import AuditService
from app.core.permissions import require_permission
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/tasks", tags=["Tasks"])
def parse_task_filters(
project_id: Optional[UUID] = Query(None),
project_region_id: Optional[UUID] = Query(None),
created_by_user_id: Optional[UUID] = Query(None),
status: Optional[str] = Query(None),
task_type: Optional[str] = Query(None),
priority: Optional[str] = Query(None),
scheduled_date: Optional[date] = Query(None),
scheduled_date_from: Optional[date] = Query(None),
scheduled_date_to: Optional[date] = Query(None),
is_overdue: Optional[bool] = Query(None),
has_location: Optional[bool] = Query(None),
search: Optional[str] = Query(None),
sort_by: Optional[str] = Query(None),
sort_order: str = Query("desc"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
from_date: Optional[date] = Query(None),
to_date: Optional[date] = Query(None),
) -> TaskFilters:
"""Parse and convert query parameters to TaskFilters"""
def parse_csv(value: Optional[str]) -> Optional[List[str]]:
if value is None:
return None
return [item.strip() for item in value.split(',') if item.strip()]
return TaskFilters(
project_id=project_id,
project_region_id=project_region_id,
created_by_user_id=created_by_user_id,
status=parse_csv(status),
task_type=parse_csv(task_type),
priority=parse_csv(priority),
scheduled_date=scheduled_date,
scheduled_date_from=scheduled_date_from,
scheduled_date_to=scheduled_date_to,
is_overdue=is_overdue,
has_location=has_location,
search=search,
sort_by=sort_by,
sort_order=sort_order,
page=page,
page_size=page_size,
from_date=from_date,
to_date=to_date,
)
# ============================================
# TASK CRUD
# ============================================
@router.post("", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
@require_permission("manage_tasks")
async def create_task(
data: TaskCreate,
request: Request,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Create a new task for any project
**Use Cases:**
- Infrastructure: Installation, maintenance, survey, testing
- Logistics: Delivery, pickup, equipment distribution
- Customer Service: Site surveys, customer visits, training
- General: Any work requiring field agent assignment and expense tracking
**Authorization:**
- platform_admin: Can create for any project
- project_manager: Can create for projects they manage
- client_admin/contractor_admin: Can create for their organization's projects
**Required Fields:**
- task_title: Task name/title
- project_id: Project this task belongs to (any project type)
**Optional Fields:**
- task_type: Type of work (installation, delivery, site_survey, pickup, etc.)
- location: location_name, coordinates, address, maps_link
- project_region_id: Geographic region for organization
- priority: low, normal, high, urgent
- scheduled_date: When task should be executed
**Business Rules:**
- Tasks can be created for any project type
- If project_region_id provided, must belong to the project
- Location coordinates must be provided together (lat + lon)
- Tickets can be generated from tasks for field agent assignment
**Workflow:**
1. Create task → 2. Generate ticket from task → 3. Assign to field agent
4. Agent completes work and logs expenses → 5. Manager approves expenses
**Response includes:**
- All task fields
- project_title, region_name, created_by_name (nested)
- Computed properties (is_completed, is_overdue, has_location, duration_days)
"""
try:
task = TaskService.create_task(db, data, current_user, background_tasks)
# Log audit trail
AuditService.log_action(
db=db,
action="create_task",
entity_type="task",
description=f"Created task: {task.task_title}",
user=current_user,
entity_id=str(task.id),
request=request,
additional_metadata={
"task_title": task.task_title,
"project_id": str(task.project_id),
"task_type": task.task_type,
"status": task.status.value
}
)
# Build response with nested data
response = TaskResponse.model_validate(task)
response.project_title = task.project.title if task.project else None
response.region_name = task.project_region.region_name if task.project_region else None
response.created_by_name = task.created_by.name if task.created_by else None
response.is_completed = task.is_completed
response.is_overdue = task.is_overdue
response.has_location = task.has_location
response.duration_days = task.duration_days
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating task: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create task: {str(e)}"
)
@router.get("", response_model=TaskListResponse)
@require_permission("view_tasks")
async def list_tasks(
filters: TaskFilters = Depends(parse_task_filters),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
List tasks with pagination and filters
**Authorization:**
- platform_admin: Can see all tasks
- Managers: Can see all tasks in projects they're involved with
- Admins: Can see tasks in their organization's projects
**Filters:**
- project_id: Filter by specific project
- project_region_id: Filter by region
- status: Filter by task status (pending, assigned, in_progress, completed, cancelled, blocked)
- task_type: Filter by task type (installation, maintenance, survey, testing)
- priority: Filter by priority (low, normal, high, urgent)
- scheduled_date_from/to: Date range filter
- is_overdue: Show only overdue tasks (past scheduled date and not completed)
- search: Search across title, description, location name, and address
**Pagination:**
- page: Current page (1-indexed)
- page_size: Items per page (1-100, default 50)
**Response includes:**
- items: Array of tasks with nested data
- total: Total count of matching tasks
- page/page_size: Current pagination state
- total_pages: Calculated total pages
"""
try:
skip = (filters.page - 1) * filters.page_size
tasks, total = TaskService.list_tasks(
db, current_user, skip, filters.page_size,
filters.project_id, filters.project_region_id,
filters.status[0] if filters.status and len(filters.status) == 1 else None, # Temporary
filters.task_type[0] if filters.task_type and len(filters.task_type) == 1 else None, # Temporary
filters.priority[0] if filters.priority and len(filters.priority) == 1 else None, # Temporary
filters.scheduled_date_from, filters.scheduled_date_to, filters.is_overdue, filters.search
)
# Build response with nested data
items = []
for task in tasks:
response = TaskResponse.model_validate(task)
response.project_title = task.project.title if task.project else None
response.region_name = task.project_region.region_name if task.project_region else None
response.created_by_name = task.created_by.name if task.created_by else None
response.is_completed = task.is_completed
response.is_overdue = task.is_overdue
response.has_location = task.has_location
response.duration_days = task.duration_days
items.append(response)
return TaskListResponse(
items=items,
total=total,
page=filters.page,
page_size=filters.page_size,
total_pages=math.ceil(total / filters.page_size) if total > 0 else 0
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error listing tasks: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list tasks: {str(e)}"
)
@router.get("/stats", response_model=dict)
@require_permission("view_tasks")
async def get_task_stats(
project_id: Optional[UUID] = Query(None, description="Filter by project"),
project_region_id: Optional[UUID] = Query(None, description="Filter by region"),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get task statistics and analytics
**Authorization:**
- platform_admin: All projects
- project_manager: Their projects only
- sales_manager: Their organization's projects
- field_agent: Tasks assigned to them
- sales_agent: Tasks related to their sales
**Filters:**
- `project_id`: Stats for specific project
- `project_region_id`: Stats for specific region
**Returns:**
- Counts by status (pending, assigned, in_progress, completed, cancelled, blocked)
- Counts by priority (urgent, high, normal, low)
- Time-based metrics (overdue, scheduled today/this week)
- Performance metrics (avg completion time, completion rate)
- Task type breakdown
**Example Response:**
```json
{
"total_tasks": 150,
"pending_tasks": 20,
"in_progress_tasks": 30,
"completed_tasks": 80,
"overdue_tasks": 5,
"avg_completion_time_hours": 48.5,
"completion_rate": 53.33,
"by_task_type": {
"installation": 50,
"maintenance": 30,
"survey": 20
}
}
```
"""
try:
from app.models.task import Task
from app.models.enums import TaskStatus, AppRole
from app.models.project import Project
from sqlalchemy import func, case
from datetime import date, timedelta
# Base query with authorization
query = db.query(Task).filter(Task.deleted_at.is_(None))
# Authorization filter (same pattern as tickets)
if current_user.role != AppRole.PLATFORM_ADMIN.value:
if current_user.role == AppRole.PROJECT_MANAGER.value:
query = query.join(Project).filter(Project.primary_manager_id == current_user.id)
elif current_user.role in [AppRole.DISPATCHER.value, AppRole.CONTRACTOR_ADMIN.value]:
query = query.join(Project).filter(Project.contractor_id == current_user.contractor_id)
elif current_user.role == AppRole.CLIENT_ADMIN.value:
query = query.join(Project).filter(Project.client_id == current_user.client_id)
# Apply project filter
if project_id:
query = query.filter(Task.project_id == project_id)
# Apply region filter
if project_region_id:
query = query.filter(Task.project_region_id == project_region_id)
# Get all tasks for calculations
tasks = query.all()
total_tasks = len(tasks)
if total_tasks == 0:
return {
"total_tasks": 0,
"pending_tasks": 0,
"assigned_tasks": 0,
"in_progress_tasks": 0,
"completed_tasks": 0,
"cancelled_tasks": 0,
"blocked_tasks": 0,
"urgent_tasks": 0,
"high_priority_tasks": 0,
"normal_priority_tasks": 0,
"low_priority_tasks": 0,
"overdue_tasks": 0,
"scheduled_today": 0,
"scheduled_this_week": 0,
"avg_completion_time_hours": None,
"completion_rate": 0.0,
"by_task_type": {}
}
# Count by status
status_counts = {}
for task in tasks:
status = task.status.value if hasattr(task.status, 'value') else str(task.status)
status_counts[status] = status_counts.get(status, 0) + 1
# Count by priority
priority_counts = {}
for task in tasks:
priority = task.priority.value if hasattr(task.priority, 'value') else str(task.priority)
priority_counts[priority] = priority_counts.get(priority, 0) + 1
# Count by task type
type_counts = {}
for task in tasks:
if task.task_type:
type_counts[task.task_type] = type_counts.get(task.task_type, 0) + 1
# Time-based metrics
today = date.today()
week_end = today + timedelta(days=7)
overdue_count = sum(1 for t in tasks if t.is_overdue)
scheduled_today = sum(1 for t in tasks if t.scheduled_date == today)
scheduled_this_week = sum(1 for t in tasks if t.scheduled_date and today <= t.scheduled_date <= week_end)
# Performance metrics
completed_tasks = [t for t in tasks if t.status == TaskStatus.COMPLETED]
completed_count = len(completed_tasks)
# Calculate average completion time
completion_times = []
for task in completed_tasks:
if task.started_at and task.completed_at:
duration = (task.completed_at - task.started_at).total_seconds() / 3600 # hours
completion_times.append(duration)
avg_completion_time = sum(completion_times) / len(completion_times) if completion_times else None
completion_rate = (completed_count / total_tasks * 100) if total_tasks > 0 else 0.0
return {
"total_tasks": total_tasks,
"pending_tasks": status_counts.get('pending', 0),
"assigned_tasks": status_counts.get('assigned', 0),
"in_progress_tasks": status_counts.get('in_progress', 0),
"completed_tasks": status_counts.get('completed', 0),
"cancelled_tasks": status_counts.get('cancelled', 0),
"blocked_tasks": status_counts.get('blocked', 0),
"urgent_tasks": priority_counts.get('urgent', 0),
"high_priority_tasks": priority_counts.get('high', 0),
"normal_priority_tasks": priority_counts.get('normal', 0),
"low_priority_tasks": priority_counts.get('low', 0),
"overdue_tasks": overdue_count,
"scheduled_today": scheduled_today,
"scheduled_this_week": scheduled_this_week,
"avg_completion_time_hours": round(avg_completion_time, 2) if avg_completion_time else None,
"completion_rate": round(completion_rate, 2),
"by_task_type": type_counts
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting task stats: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get task stats: {str(e)}"
)
@router.get("/{task_id}", response_model=TaskResponse)
@require_permission("view_tasks")
async def get_task(
task_id: UUID,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get a single task by ID
**Authorization:**
- platform_admin: Can view any task
- Managers: Can view tasks in projects they're involved with
- Admins: Can view tasks in their organization's projects
**Returns:**
- Complete task details with nested data
- Computed properties (is_completed, is_overdue, duration_days)
"""
try:
task = TaskService.get_task_by_id(db, task_id, current_user)
# Build response with nested data
response = TaskResponse.model_validate(task)
response.project_title = task.project.title if task.project else None
response.region_name = task.project_region.region_name if task.project_region else None
response.created_by_name = task.created_by.name if task.created_by else None
response.is_completed = task.is_completed
response.is_overdue = task.is_overdue
response.has_location = task.has_location
response.duration_days = task.duration_days
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting task: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get task: {str(e)}"
)
@router.put("/{task_id}", response_model=TaskResponse)
@require_permission("manage_tasks")
async def update_task(
task_id: UUID,
data: TaskUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Update an existing task
**Authorization:**
- platform_admin: Can update any task
- project_manager: Can update tasks in projects they manage
- Admins: Can update tasks in their organization's projects
**Validation:**
- project_region_id validated if changing
- Location coordinates must be provided together
- Timeline validation (completed_at >= started_at)
**All fields are optional** - only provided fields will be updated
"""
try:
# Get old state for audit
old_task = TaskService.get_task_by_id(db, task_id, current_user)
old_state = {
"task_title": old_task.task_title,
"status": old_task.status.value,
"scheduled_date": str(old_task.scheduled_date) if old_task.scheduled_date else None
}
# Update task
task = TaskService.update_task(db, task_id, data, current_user)
# Log audit trail
AuditService.log_action(
db=db,
action="update_task",
entity_type="task",
description=f"Updated task {task.id}",
user=current_user,
entity_id=str(task.id),
request=request,
changes={
"old": old_state,
"new": data.model_dump(exclude_unset=True)
}
)
# Build response with nested data
response = TaskResponse.model_validate(task)
response.project_title = task.project.title if task.project else None
response.region_name = task.project_region.region_name if task.project_region else None
response.created_by_name = task.created_by.name if task.created_by else None
response.is_completed = task.is_completed
response.is_overdue = task.is_overdue
response.has_location = task.has_location
response.duration_days = task.duration_days
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating task: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update task: {str(e)}"
)
@router.patch("/{task_id}/status", response_model=TaskResponse)
@require_permission("manage_tasks")
async def update_task_status(
task_id: UUID,
data: TaskStatusUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Update task status with validation
**Business Rules:**
- Cannot revert from completed status
- Cannot reopen cancelled tasks
- Auto-sets started_at when moving to in_progress
- Auto-sets completed_at when moving to completed
**Status Flow:**
- pending → assigned → in_progress → completed
- Can cancel from any status except completed
- Can block at any time
"""
try:
# Get old state
old_task = TaskService.get_task_by_id(db, task_id, current_user)
old_status = old_task.status
# Update status
task = TaskService.update_task_status(db, task_id, data, current_user)
# Log audit trail
AuditService.log_action(
db=db,
action="update_task_status",
entity_type="task",
description=f"Updated task status from {old_status.value} to {task.status.value}",
user=current_user,
entity_id=str(task.id),
request=request,
changes={
"old": {"status": old_status.value},
"new": {"status": task.status.value, "reason": data.reason}
}
)
# Build response
response = TaskResponse.model_validate(task)
response.project_title = task.project.title if task.project else None
response.region_name = task.project_region.region_name if task.project_region else None
response.created_by_name = task.created_by.name if task.created_by else None
response.is_completed = task.is_completed
response.is_overdue = task.is_overdue
response.has_location = task.has_location
response.duration_days = task.duration_days
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating task status: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update task status: {str(e)}"
)
@router.post("/{task_id}/start", response_model=TaskResponse)
@require_permission("manage_tasks")
async def start_task(
task_id: UUID,
data: TaskStart,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Start a task (convenience endpoint)
Changes status to in_progress and sets started_at timestamp.
Can only start from pending or assigned status.
"""
try:
task = TaskService.start_task(db, task_id, data, current_user)
# Log audit trail
AuditService.log_action(
db=db,
action="start_task",
entity_type="task",
description=f"Started task {task.id}",
user=current_user,
entity_id=str(task.id),
request=request,
additional_metadata={"started_at": str(task.started_at)}
)
# Build response
response = TaskResponse.model_validate(task)
response.project_title = task.project.title if task.project else None
response.region_name = task.project_region.region_name if task.project_region else None
response.created_by_name = task.created_by.name if task.created_by else None
response.is_completed = task.is_completed
response.is_overdue = task.is_overdue
response.has_location = task.has_location
response.duration_days = task.duration_days
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error starting task: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to start task: {str(e)}"
)
@router.post("/{task_id}/complete", response_model=TaskResponse)
@require_permission("manage_tasks")
async def complete_task(
task_id: UUID,
data: TaskComplete,
request: Request,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Complete a task
Changes status to completed and sets completed_at timestamp.
Can only complete from assigned or in_progress status.
Auto-sets started_at if not already set.
"""
try:
task = TaskService.complete_task(db, task_id, data, current_user, background_tasks)
# Log audit trail
AuditService.log_action(
db=db,
action="complete_task",
entity_type="task",
description=f"Completed task {task.id}",
user=current_user,
entity_id=str(task.id),
request=request,
additional_metadata={
"completed_at": str(task.completed_at),
"notes": data.completion_notes
}
)
# Build response
response = TaskResponse.model_validate(task)
response.project_title = task.project.title if task.project else None
response.region_name = task.project_region.region_name if task.project_region else None
response.created_by_name = task.created_by.name if task.created_by else None
response.is_completed = task.is_completed
response.is_overdue = task.is_overdue
response.has_location = task.has_location
response.duration_days = task.duration_days
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error completing task: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to complete task: {str(e)}"
)
@router.post("/{task_id}/cancel", response_model=TaskResponse)
@require_permission("manage_tasks")
async def cancel_task(
task_id: UUID,
data: TaskCancel,
request: Request,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Cancel a task
Changes status to cancelled.
Cannot cancel already completed or cancelled tasks.
Requires cancellation reason.
"""
try:
task = TaskService.cancel_task(db, task_id, data, current_user, background_tasks)
# Log audit trail
AuditService.log_action(
db=db,
action="cancel_task",
entity_type="task",
description=f"Cancelled task {task.id}",
user=current_user,
entity_id=str(task.id),
request=request,
additional_metadata={"reason": data.cancellation_reason}
)
# Build response
response = TaskResponse.model_validate(task)
response.project_title = task.project.title if task.project else None
response.region_name = task.project_region.region_name if task.project_region else None
response.created_by_name = task.created_by.name if task.created_by else None
response.is_completed = task.is_completed
response.is_overdue = task.is_overdue
response.has_location = task.has_location
response.duration_days = task.duration_days
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error cancelling task: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to cancel task: {str(e)}"
)
@router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT)
@require_permission("manage_tasks")
async def delete_task(
task_id: UUID,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Soft delete a task (platform_admin and project_manager only)
**Authorization:**
- platform_admin and project_manager only
**Action:**
- Performs soft delete (sets deleted_at timestamp)
- Task remains in database but filtered from queries
**WARNING:** This action cannot be undone via API
"""
try:
# Get task for audit before deletion
task = TaskService.get_task_by_id(db, task_id, current_user)
# Delete task
TaskService.delete_task(db, task_id, current_user)
# Log audit trail
AuditService.log_action(
db=db,
action="delete_task",
entity_type="task",
description=f"Deleted task: {task.task_title}",
user=current_user,
entity_id=str(task_id),
request=request,
additional_metadata={
"task_title": task.task_title,
"project_id": str(task.project_id)
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting task: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete task: {str(e)}"
)