""" Task Service - Core business logic for task management (infrastructure projects) Handles task CRUD operations with project type validation and authorization """ import logging from typing import Optional, Dict, Any, List, Tuple from datetime import datetime, date from sqlalchemy.orm import Session, joinedload from sqlalchemy import or_, and_, func from fastapi import HTTPException, status, BackgroundTasks from uuid import UUID from app.models.task import Task from app.models.project import Project, ProjectRegion from app.models.user import User from app.models.enums import AppRole, TaskStatus, TicketPriority from app.schemas.task import TaskCreate, TaskUpdate, TaskStatusUpdate, TaskStart, TaskComplete, TaskCancel from app.services.notification_creator import NotificationCreator from app.services.notification_delivery import NotificationDelivery logger = logging.getLogger(__name__) class TaskService: """Service for managing tasks with authorization and project type validation""" # ============================================ # AUTHORIZATION HELPERS # ============================================ @staticmethod def can_user_create_task(user: User, project: Project) -> bool: """Check if user can create a task for this project""" if user.role == AppRole.PLATFORM_ADMIN: return True # Project managers can create tasks if user.role == AppRole.PROJECT_MANAGER: # Check if user is the primary manager or on the project team if project.primary_manager_id == user.id: return True # Client and contractor admins can create tasks for their projects if user.role == AppRole.CLIENT_ADMIN and user.client_id == project.client_id: return True if user.role == AppRole.CONTRACTOR_ADMIN and user.contractor_id == project.contractor_id: return True return False @staticmethod def can_user_view_task(user: User, task: Task) -> bool: """Check if user can view this task""" if user.role == AppRole.PLATFORM_ADMIN: return True # Project managers can view tasks if user.role == AppRole.PROJECT_MANAGER: return True # Users can view tasks from their organization's projects if user.client_id and task.project.client_id == user.client_id: return True if user.contractor_id and task.project.contractor_id == user.contractor_id: return True return False @staticmethod def can_user_edit_task(user: User, task: Task) -> bool: """Check if user can edit this task""" if user.role == AppRole.PLATFORM_ADMIN: return True # Project managers can edit tasks if user.role == AppRole.PROJECT_MANAGER: if task.project.primary_manager_id == user.id: return True # Admins can edit tasks for their projects if user.role == AppRole.CLIENT_ADMIN and user.client_id == task.project.client_id: return True if user.role == AppRole.CONTRACTOR_ADMIN and user.contractor_id == task.project.contractor_id: return True return False # ============================================ # TASK CRUD # ============================================ @staticmethod def create_task( db: Session, data: TaskCreate, current_user: User, background_tasks: Optional[BackgroundTasks] = None ) -> Task: """ Create a new task for an infrastructure project Business Rules: - Project must exist and be active - Project should be of infrastructure type (warning if not, but allow) - User must have permission to create tasks for this project - If project_region_id provided, must exist and belong to this project - Location coordinates must be provided together Authorization: - platform_admin: Can create for any project - project_manager: Can create for projects they manage - client_admin/contractor_admin: Can create for their organization's projects """ # Validate project exists project = db.query(Project).filter( Project.id == data.project_id, Project.deleted_at == None ).first() if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with ID {data.project_id} not found" ) # Authorization check if not TaskService.can_user_create_task(current_user, project): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have permission to create tasks for this project" ) # Log task creation with context logger.info( f"Creating {data.task_type or 'general'} task for project {project.id} " f"({project.title}). Task: {data.task_title}" ) # Validate project_region if provided if data.project_region_id: region = db.query(ProjectRegion).filter( ProjectRegion.id == data.project_region_id, ProjectRegion.project_id == data.project_id, ProjectRegion.deleted_at == None ).first() if not region: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project region with ID {data.project_region_id} not found or doesn't belong to this project" ) # Create task task_data = data.model_dump() task_data['created_by_user_id'] = current_user.id task = Task(**task_data) db.add(task) db.commit() db.refresh(task) logger.info(f"Task created: {task.id} - {task.task_title} for project {project.id} by user {current_user.id}") # Notify PMs/managers about new task (Tier 1 - Synchronous) try: notification_ids = NotificationCreator.notify_project_team( db=db, project_id=task.project_id, title=f"📋 New Task Created", message=f"Task '{task.task_title}' was created by {current_user.full_name}.\n\nType: {task.task_type or 'General'}\nPriority: {task.priority.value if task.priority else 'Normal'}", source_type="task", source_id=task.id, notification_type="task_created", channel="in_app", roles=["project_manager", "dispatcher"], metadata={ "task_id": str(task.id), "task_title": task.task_title, "task_type": task.task_type, "priority": task.priority.value if task.priority else None, "created_by": current_user.full_name, "action_url": f"/tasks/{task.id}" } ) db.commit() # Queue delivery (Tier 2 - Asynchronous) if background_tasks and notification_ids: NotificationDelivery.queue_bulk_delivery( background_tasks=background_tasks, notification_ids=notification_ids ) except Exception as e: logger.error(f"Failed to send task creation notification: {str(e)}") return task @staticmethod def list_tasks( db: Session, current_user: User, skip: int = 0, limit: int = 50, project_id: Optional[UUID] = None, project_region_id: Optional[UUID] = None, status: Optional[TaskStatus] = None, task_type: Optional[str] = None, priority: Optional[TicketPriority] = None, scheduled_date_from: Optional[date] = None, scheduled_date_to: Optional[date] = None, is_overdue: Optional[bool] = None, search: Optional[str] = None ) -> Tuple[List[Task], int]: """ List tasks with authorization filtering Filters: - project_id: Filter by specific project - project_region_id: Filter by region - status: Filter by task status - task_type: Filter by task type - priority: Filter by priority - scheduled_date_from/to: Date range filter - is_overdue: Filter overdue tasks - search: Search by title, description, or location Authorization: - platform_admin: Can see all tasks - Managers: Can see all tasks in projects they're involved with - Admins: Can see tasks in their organization's projects """ query = db.query(Task).filter(Task.deleted_at == None) # Authorization filtering if current_user.role == AppRole.PLATFORM_ADMIN: # Platform admins see all pass elif current_user.role in [AppRole.PROJECT_MANAGER, AppRole.DISPATCHER]: # Managers see all (could be refined with project_team check) pass else: # Filter by organization if current_user.client_id: query = query.join(Project).filter(Project.client_id == current_user.client_id) elif current_user.contractor_id: query = query.join(Project).filter(Project.contractor_id == current_user.contractor_id) # Apply filters if project_id: query = query.filter(Task.project_id == project_id) if project_region_id: query = query.filter(Task.project_region_id == project_region_id) if status: query = query.filter(Task.status == status) if task_type: query = query.filter(Task.task_type.ilike(f"%{task_type}%")) if priority: query = query.filter(Task.priority == priority) if scheduled_date_from: query = query.filter(Task.scheduled_date >= scheduled_date_from) if scheduled_date_to: query = query.filter(Task.scheduled_date <= scheduled_date_to) if is_overdue is not None: if is_overdue: # Overdue: scheduled date in past and not completed query = query.filter( Task.scheduled_date < date.today(), Task.status.notin_([TaskStatus.COMPLETED, TaskStatus.CANCELLED]) ) else: # Not overdue query = query.filter( or_( Task.scheduled_date >= date.today(), Task.scheduled_date == None, Task.status.in_([TaskStatus.COMPLETED, TaskStatus.CANCELLED]) ) ) if search: search_filter = or_( Task.task_title.ilike(f"%{search}%"), Task.task_description.ilike(f"%{search}%"), Task.location_name.ilike(f"%{search}%"), Task.task_address_line1.ilike(f"%{search}%") ) query = query.filter(search_filter) # Get total count total = query.count() # Get paginated results with relationships tasks = query.options( joinedload(Task.project), joinedload(Task.project_region), joinedload(Task.created_by) ).order_by(Task.scheduled_date.asc().nullslast(), Task.created_at.desc()).offset(skip).limit(limit).all() return tasks, total @staticmethod def get_task_by_id( db: Session, task_id: UUID, current_user: User ) -> Task: """Get a single task by ID with authorization check""" task = db.query(Task).options( joinedload(Task.project), joinedload(Task.project_region), joinedload(Task.created_by) ).filter( Task.id == task_id, Task.deleted_at == None ).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with ID {task_id} not found" ) # Authorization check if not TaskService.can_user_view_task(current_user, task): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have permission to view this task" ) return task @staticmethod def update_task( db: Session, task_id: UUID, data: TaskUpdate, current_user: User ) -> Task: """Update an existing task""" # Get existing task task = db.query(Task).filter( Task.id == task_id, Task.deleted_at == None ).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with ID {task_id} not found" ) # Authorization check if not TaskService.can_user_edit_task(current_user, task): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have permission to edit this task" ) # Validate project_region if being updated if data.project_region_id: region = db.query(ProjectRegion).filter( ProjectRegion.id == data.project_region_id, ProjectRegion.project_id == task.project_id, ProjectRegion.deleted_at == None ).first() if not region: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project region with ID {data.project_region_id} not found or doesn't belong to this project" ) # Validate timeline if both dates being updated if data.started_at and data.completed_at: if data.completed_at < data.started_at: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="completed_at must be after started_at" ) # Update fields update_data = data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(task, field, value) task.updated_at = datetime.utcnow() db.commit() db.refresh(task) logger.info(f"Task updated: {task.id} by user {current_user.id}") return task @staticmethod def update_task_status( db: Session, task_id: UUID, data: TaskStatusUpdate, current_user: User ) -> Task: """Update task status with validation""" task = TaskService.get_task_by_id(db, task_id, current_user) if not TaskService.can_user_edit_task(current_user, task): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have permission to update this task status" ) old_status = task.status new_status = data.status # Validate status transitions if old_status == TaskStatus.COMPLETED and new_status != TaskStatus.COMPLETED: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot change status of completed task. Create a new task if work needs to be re-done." ) if old_status == TaskStatus.CANCELLED and new_status != TaskStatus.CANCELLED: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot reopen cancelled task. Create a new task instead." ) # Auto-set timestamps based on status if new_status == TaskStatus.IN_PROGRESS and not task.started_at: task.started_at = datetime.utcnow() if new_status == TaskStatus.COMPLETED and not task.completed_at: task.completed_at = datetime.utcnow() task.status = new_status if data.reason: current_notes = task.notes or "" task.notes = f"{current_notes}\nStatus change ({old_status.value} → {new_status.value}): {data.reason}".strip() task.updated_at = datetime.utcnow() db.commit() db.refresh(task) logger.info(f"Task status updated: {task.id} from {old_status.value} to {new_status.value}") return task @staticmethod def start_task( db: Session, task_id: UUID, data: TaskStart, current_user: User ) -> Task: """Start a task (convenience method)""" task = TaskService.get_task_by_id(db, task_id, current_user) if not task.can_start(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Task cannot be started from current status: {task.status.value}" ) task.status = TaskStatus.IN_PROGRESS task.started_at = data.started_at or datetime.utcnow() if data.notes: current_notes = task.notes or "" task.notes = f"{current_notes}\nStarted: {data.notes}".strip() task.updated_at = datetime.utcnow() db.commit() db.refresh(task) logger.info(f"Task started: {task.id}") return task @staticmethod def complete_task( db: Session, task_id: UUID, data: TaskComplete, current_user: User, background_tasks: Optional[BackgroundTasks] = None ) -> Task: """Complete a task""" task = TaskService.get_task_by_id(db, task_id, current_user) if not task.can_complete(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Task cannot be completed from current status: {task.status.value}" ) task.status = TaskStatus.COMPLETED task.completed_at = data.completed_at or datetime.utcnow() if not task.started_at: task.started_at = task.completed_at if data.completion_notes: current_notes = task.notes or "" task.notes = f"{current_notes}\nCompleted: {data.completion_notes}".strip() task.updated_at = datetime.utcnow() db.commit() db.refresh(task) logger.info(f"Task completed: {task.id}") # Notify PMs/managers about task completion (Tier 1 - Synchronous) try: notification_ids = NotificationCreator.notify_project_team( db=db, project_id=task.project_id, title=f"✅ Task Completed", message=f"Task '{task.task_title}' was completed by {current_user.full_name}.", source_type="task", source_id=task.id, notification_type="task_completed", channel="in_app", roles=["project_manager", "dispatcher"], metadata={ "task_id": str(task.id), "task_title": task.task_title, "completed_by": current_user.full_name, "completed_at": str(task.completed_at), "completion_notes": data.completion_notes, "action_url": f"/tasks/{task.id}" } ) db.commit() # Queue delivery (Tier 2 - Asynchronous) if background_tasks and notification_ids: NotificationDelivery.queue_bulk_delivery( background_tasks=background_tasks, notification_ids=notification_ids ) except Exception as e: logger.error(f"Failed to send task completion notification: {str(e)}") return task @staticmethod def cancel_task( db: Session, task_id: UUID, data: TaskCancel, current_user: User, background_tasks: Optional[BackgroundTasks] = None ) -> Task: """Cancel a task""" task = TaskService.get_task_by_id(db, task_id, current_user) if not task.can_cancel(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Task cannot be cancelled from current status: {task.status.value}" ) task.status = TaskStatus.CANCELLED current_notes = task.notes or "" task.notes = f"{current_notes}\nCancelled: {data.cancellation_reason}".strip() task.updated_at = datetime.utcnow() db.commit() db.refresh(task) logger.info(f"Task cancelled: {task.id}") # Notify PMs/managers about task cancellation (Tier 1 - Synchronous) try: notification_ids = NotificationCreator.notify_project_team( db=db, project_id=task.project_id, title=f"❌ Task Cancelled", message=f"Task '{task.task_title}' was cancelled by {current_user.full_name}.\n\nReason: {data.cancellation_reason}", source_type="task", source_id=task.id, notification_type="task_cancelled", channel="in_app", roles=["project_manager", "dispatcher"], metadata={ "task_id": str(task.id), "task_title": task.task_title, "cancelled_by": current_user.full_name, "cancellation_reason": data.cancellation_reason, "action_url": f"/tasks/{task.id}" } ) db.commit() # Queue delivery (Tier 2 - Asynchronous) if background_tasks and notification_ids: NotificationDelivery.queue_bulk_delivery( background_tasks=background_tasks, notification_ids=notification_ids ) except Exception as e: logger.error(f"Failed to send task cancellation notification: {str(e)}") return task @staticmethod def delete_task( db: Session, task_id: UUID, current_user: User ) -> None: """ Soft delete a task (platform_admin and project_manager only) """ if current_user.role not in [AppRole.PLATFORM_ADMIN, AppRole.PROJECT_MANAGER]: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only platform administrators and project managers can delete tasks" ) task = db.query(Task).filter( Task.id == task_id, Task.deleted_at == None ).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with ID {task_id} not found" ) # Soft delete task.deleted_at = datetime.utcnow() db.commit() logger.info(f"Task deleted: {task_id} by user {current_user.id}")