Spaces:
Sleeping
Sleeping
| """ | |
| Task Service - Core business logic for task management (infrastructure projects) | |
| Handles task CRUD operations with project type validation and authorization | |
| """ | |
| import logging | |
| from typing import Optional, Dict, Any, List, Tuple | |
| from datetime import datetime, date | |
| from sqlalchemy.orm import Session, joinedload | |
| from sqlalchemy import or_, and_, func | |
| from fastapi import HTTPException, status, BackgroundTasks | |
| from uuid import UUID | |
| from app.models.task import Task | |
| from app.models.project import Project, ProjectRegion | |
| from app.models.user import User | |
| from app.models.enums import AppRole, TaskStatus, TicketPriority | |
| from app.schemas.task import TaskCreate, TaskUpdate, TaskStatusUpdate, TaskStart, TaskComplete, TaskCancel | |
| from app.services.notification_creator import NotificationCreator | |
| from app.services.notification_delivery import NotificationDelivery | |
| logger = logging.getLogger(__name__) | |
| class TaskService: | |
| """Service for managing tasks with authorization and project type validation""" | |
| # ============================================ | |
| # AUTHORIZATION HELPERS | |
| # ============================================ | |
| def can_user_create_task(user: User, project: Project) -> bool: | |
| """Check if user can create a task for this project""" | |
| if user.role == AppRole.PLATFORM_ADMIN: | |
| return True | |
| # Project managers can create tasks | |
| if user.role == AppRole.PROJECT_MANAGER: | |
| # Check if user is the primary manager or on the project team | |
| if project.primary_manager_id == user.id: | |
| return True | |
| # Client and contractor admins can create tasks for their projects | |
| if user.role == AppRole.CLIENT_ADMIN and user.client_id == project.client_id: | |
| return True | |
| if user.role == AppRole.CONTRACTOR_ADMIN and user.contractor_id == project.contractor_id: | |
| return True | |
| return False | |
| def can_user_view_task(user: User, task: Task) -> bool: | |
| """Check if user can view this task""" | |
| if user.role == AppRole.PLATFORM_ADMIN: | |
| return True | |
| # Project managers can view tasks | |
| if user.role == AppRole.PROJECT_MANAGER: | |
| return True | |
| # Users can view tasks from their organization's projects | |
| if user.client_id and task.project.client_id == user.client_id: | |
| return True | |
| if user.contractor_id and task.project.contractor_id == user.contractor_id: | |
| return True | |
| return False | |
| def can_user_edit_task(user: User, task: Task) -> bool: | |
| """Check if user can edit this task""" | |
| if user.role == AppRole.PLATFORM_ADMIN: | |
| return True | |
| # Project managers can edit tasks | |
| if user.role == AppRole.PROJECT_MANAGER: | |
| if task.project.primary_manager_id == user.id: | |
| return True | |
| # Admins can edit tasks for their projects | |
| if user.role == AppRole.CLIENT_ADMIN and user.client_id == task.project.client_id: | |
| return True | |
| if user.role == AppRole.CONTRACTOR_ADMIN and user.contractor_id == task.project.contractor_id: | |
| return True | |
| return False | |
| # ============================================ | |
| # TASK CRUD | |
| # ============================================ | |
| def create_task( | |
| db: Session, | |
| data: TaskCreate, | |
| current_user: User, | |
| background_tasks: Optional[BackgroundTasks] = None | |
| ) -> Task: | |
| """ | |
| Create a new task for an infrastructure project | |
| Business Rules: | |
| - Project must exist and be active | |
| - Project should be of infrastructure type (warning if not, but allow) | |
| - User must have permission to create tasks for this project | |
| - If project_region_id provided, must exist and belong to this project | |
| - Location coordinates must be provided together | |
| Authorization: | |
| - platform_admin: Can create for any project | |
| - project_manager: Can create for projects they manage | |
| - client_admin/contractor_admin: Can create for their organization's projects | |
| """ | |
| # Validate project exists | |
| project = db.query(Project).filter( | |
| Project.id == data.project_id, | |
| Project.deleted_at == None | |
| ).first() | |
| if not project: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Project with ID {data.project_id} not found" | |
| ) | |
| # Authorization check | |
| if not TaskService.can_user_create_task(current_user, project): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You don't have permission to create tasks for this project" | |
| ) | |
| # Log task creation with context | |
| logger.info( | |
| f"Creating {data.task_type or 'general'} task for project {project.id} " | |
| f"({project.title}). Task: {data.task_title}" | |
| ) | |
| # Validate project_region if provided | |
| if data.project_region_id: | |
| region = db.query(ProjectRegion).filter( | |
| ProjectRegion.id == data.project_region_id, | |
| ProjectRegion.project_id == data.project_id, | |
| ProjectRegion.deleted_at == None | |
| ).first() | |
| if not region: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Project region with ID {data.project_region_id} not found or doesn't belong to this project" | |
| ) | |
| # Create task | |
| task_data = data.model_dump() | |
| task_data['created_by_user_id'] = current_user.id | |
| task = Task(**task_data) | |
| db.add(task) | |
| db.commit() | |
| db.refresh(task) | |
| logger.info(f"Task created: {task.id} - {task.task_title} for project {project.id} by user {current_user.id}") | |
| # Notify PMs/managers about new task (Tier 1 - Synchronous) | |
| try: | |
| notification_ids = NotificationCreator.notify_project_team( | |
| db=db, | |
| project_id=task.project_id, | |
| title=f"📋 New Task Created", | |
| message=f"Task '{task.task_title}' was created by {current_user.full_name}.\n\nType: {task.task_type or 'General'}\nPriority: {task.priority.value if task.priority else 'Normal'}", | |
| source_type="task", | |
| source_id=task.id, | |
| notification_type="task_created", | |
| channel="in_app", | |
| roles=["project_manager", "dispatcher"], | |
| metadata={ | |
| "task_id": str(task.id), | |
| "task_title": task.task_title, | |
| "task_type": task.task_type, | |
| "priority": task.priority.value if task.priority else None, | |
| "created_by": current_user.full_name, | |
| "action_url": f"/tasks/{task.id}" | |
| } | |
| ) | |
| db.commit() | |
| # Queue delivery (Tier 2 - Asynchronous) | |
| if background_tasks and notification_ids: | |
| NotificationDelivery.queue_bulk_delivery( | |
| background_tasks=background_tasks, | |
| notification_ids=notification_ids | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to send task creation notification: {str(e)}") | |
| return task | |
| def list_tasks( | |
| db: Session, | |
| current_user: User, | |
| skip: int = 0, | |
| limit: int = 50, | |
| project_id: Optional[UUID] = None, | |
| project_region_id: Optional[UUID] = None, | |
| status: Optional[TaskStatus] = None, | |
| task_type: Optional[str] = None, | |
| priority: Optional[TicketPriority] = None, | |
| scheduled_date_from: Optional[date] = None, | |
| scheduled_date_to: Optional[date] = None, | |
| is_overdue: Optional[bool] = None, | |
| search: Optional[str] = None | |
| ) -> Tuple[List[Task], int]: | |
| """ | |
| List tasks with authorization filtering | |
| Filters: | |
| - project_id: Filter by specific project | |
| - project_region_id: Filter by region | |
| - status: Filter by task status | |
| - task_type: Filter by task type | |
| - priority: Filter by priority | |
| - scheduled_date_from/to: Date range filter | |
| - is_overdue: Filter overdue tasks | |
| - search: Search by title, description, or location | |
| Authorization: | |
| - platform_admin: Can see all tasks | |
| - Managers: Can see all tasks in projects they're involved with | |
| - Admins: Can see tasks in their organization's projects | |
| """ | |
| query = db.query(Task).filter(Task.deleted_at == None) | |
| # Authorization filtering | |
| if current_user.role == AppRole.PLATFORM_ADMIN: | |
| # Platform admins see all | |
| pass | |
| elif current_user.role in [AppRole.PROJECT_MANAGER, AppRole.DISPATCHER]: | |
| # Managers see all (could be refined with project_team check) | |
| pass | |
| else: | |
| # Filter by organization | |
| if current_user.client_id: | |
| query = query.join(Project).filter(Project.client_id == current_user.client_id) | |
| elif current_user.contractor_id: | |
| query = query.join(Project).filter(Project.contractor_id == current_user.contractor_id) | |
| # Apply filters | |
| if project_id: | |
| query = query.filter(Task.project_id == project_id) | |
| if project_region_id: | |
| query = query.filter(Task.project_region_id == project_region_id) | |
| if status: | |
| query = query.filter(Task.status == status) | |
| if task_type: | |
| query = query.filter(Task.task_type.ilike(f"%{task_type}%")) | |
| if priority: | |
| query = query.filter(Task.priority == priority) | |
| if scheduled_date_from: | |
| query = query.filter(Task.scheduled_date >= scheduled_date_from) | |
| if scheduled_date_to: | |
| query = query.filter(Task.scheduled_date <= scheduled_date_to) | |
| if is_overdue is not None: | |
| if is_overdue: | |
| # Overdue: scheduled date in past and not completed | |
| query = query.filter( | |
| Task.scheduled_date < date.today(), | |
| Task.status.notin_([TaskStatus.COMPLETED, TaskStatus.CANCELLED]) | |
| ) | |
| else: | |
| # Not overdue | |
| query = query.filter( | |
| or_( | |
| Task.scheduled_date >= date.today(), | |
| Task.scheduled_date == None, | |
| Task.status.in_([TaskStatus.COMPLETED, TaskStatus.CANCELLED]) | |
| ) | |
| ) | |
| if search: | |
| search_filter = or_( | |
| Task.task_title.ilike(f"%{search}%"), | |
| Task.task_description.ilike(f"%{search}%"), | |
| Task.location_name.ilike(f"%{search}%"), | |
| Task.task_address_line1.ilike(f"%{search}%") | |
| ) | |
| query = query.filter(search_filter) | |
| # Get total count | |
| total = query.count() | |
| # Get paginated results with relationships | |
| tasks = query.options( | |
| joinedload(Task.project), | |
| joinedload(Task.project_region), | |
| joinedload(Task.created_by) | |
| ).order_by(Task.scheduled_date.asc().nullslast(), Task.created_at.desc()).offset(skip).limit(limit).all() | |
| return tasks, total | |
| def get_task_by_id( | |
| db: Session, | |
| task_id: UUID, | |
| current_user: User | |
| ) -> Task: | |
| """Get a single task by ID with authorization check""" | |
| task = db.query(Task).options( | |
| joinedload(Task.project), | |
| joinedload(Task.project_region), | |
| joinedload(Task.created_by) | |
| ).filter( | |
| Task.id == task_id, | |
| Task.deleted_at == None | |
| ).first() | |
| if not task: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Task with ID {task_id} not found" | |
| ) | |
| # Authorization check | |
| if not TaskService.can_user_view_task(current_user, task): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You don't have permission to view this task" | |
| ) | |
| return task | |
| def update_task( | |
| db: Session, | |
| task_id: UUID, | |
| data: TaskUpdate, | |
| current_user: User | |
| ) -> Task: | |
| """Update an existing task""" | |
| # Get existing task | |
| task = db.query(Task).filter( | |
| Task.id == task_id, | |
| Task.deleted_at == None | |
| ).first() | |
| if not task: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Task with ID {task_id} not found" | |
| ) | |
| # Authorization check | |
| if not TaskService.can_user_edit_task(current_user, task): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You don't have permission to edit this task" | |
| ) | |
| # Validate project_region if being updated | |
| if data.project_region_id: | |
| region = db.query(ProjectRegion).filter( | |
| ProjectRegion.id == data.project_region_id, | |
| ProjectRegion.project_id == task.project_id, | |
| ProjectRegion.deleted_at == None | |
| ).first() | |
| if not region: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Project region with ID {data.project_region_id} not found or doesn't belong to this project" | |
| ) | |
| # Validate timeline if both dates being updated | |
| if data.started_at and data.completed_at: | |
| if data.completed_at < data.started_at: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="completed_at must be after started_at" | |
| ) | |
| # Update fields | |
| update_data = data.model_dump(exclude_unset=True) | |
| for field, value in update_data.items(): | |
| setattr(task, field, value) | |
| task.updated_at = datetime.utcnow() | |
| db.commit() | |
| db.refresh(task) | |
| logger.info(f"Task updated: {task.id} by user {current_user.id}") | |
| return task | |
| def update_task_status( | |
| db: Session, | |
| task_id: UUID, | |
| data: TaskStatusUpdate, | |
| current_user: User | |
| ) -> Task: | |
| """Update task status with validation""" | |
| task = TaskService.get_task_by_id(db, task_id, current_user) | |
| if not TaskService.can_user_edit_task(current_user, task): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You don't have permission to update this task status" | |
| ) | |
| old_status = task.status | |
| new_status = data.status | |
| # Validate status transitions | |
| if old_status == TaskStatus.COMPLETED and new_status != TaskStatus.COMPLETED: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Cannot change status of completed task. Create a new task if work needs to be re-done." | |
| ) | |
| if old_status == TaskStatus.CANCELLED and new_status != TaskStatus.CANCELLED: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Cannot reopen cancelled task. Create a new task instead." | |
| ) | |
| # Auto-set timestamps based on status | |
| if new_status == TaskStatus.IN_PROGRESS and not task.started_at: | |
| task.started_at = datetime.utcnow() | |
| if new_status == TaskStatus.COMPLETED and not task.completed_at: | |
| task.completed_at = datetime.utcnow() | |
| task.status = new_status | |
| if data.reason: | |
| current_notes = task.notes or "" | |
| task.notes = f"{current_notes}\nStatus change ({old_status.value} → {new_status.value}): {data.reason}".strip() | |
| task.updated_at = datetime.utcnow() | |
| db.commit() | |
| db.refresh(task) | |
| logger.info(f"Task status updated: {task.id} from {old_status.value} to {new_status.value}") | |
| return task | |
| def start_task( | |
| db: Session, | |
| task_id: UUID, | |
| data: TaskStart, | |
| current_user: User | |
| ) -> Task: | |
| """Start a task (convenience method)""" | |
| task = TaskService.get_task_by_id(db, task_id, current_user) | |
| if not task.can_start(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Task cannot be started from current status: {task.status.value}" | |
| ) | |
| task.status = TaskStatus.IN_PROGRESS | |
| task.started_at = data.started_at or datetime.utcnow() | |
| if data.notes: | |
| current_notes = task.notes or "" | |
| task.notes = f"{current_notes}\nStarted: {data.notes}".strip() | |
| task.updated_at = datetime.utcnow() | |
| db.commit() | |
| db.refresh(task) | |
| logger.info(f"Task started: {task.id}") | |
| return task | |
| def complete_task( | |
| db: Session, | |
| task_id: UUID, | |
| data: TaskComplete, | |
| current_user: User, | |
| background_tasks: Optional[BackgroundTasks] = None | |
| ) -> Task: | |
| """Complete a task""" | |
| task = TaskService.get_task_by_id(db, task_id, current_user) | |
| if not task.can_complete(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Task cannot be completed from current status: {task.status.value}" | |
| ) | |
| task.status = TaskStatus.COMPLETED | |
| task.completed_at = data.completed_at or datetime.utcnow() | |
| if not task.started_at: | |
| task.started_at = task.completed_at | |
| if data.completion_notes: | |
| current_notes = task.notes or "" | |
| task.notes = f"{current_notes}\nCompleted: {data.completion_notes}".strip() | |
| task.updated_at = datetime.utcnow() | |
| db.commit() | |
| db.refresh(task) | |
| logger.info(f"Task completed: {task.id}") | |
| # Notify PMs/managers about task completion (Tier 1 - Synchronous) | |
| try: | |
| notification_ids = NotificationCreator.notify_project_team( | |
| db=db, | |
| project_id=task.project_id, | |
| title=f"✅ Task Completed", | |
| message=f"Task '{task.task_title}' was completed by {current_user.full_name}.", | |
| source_type="task", | |
| source_id=task.id, | |
| notification_type="task_completed", | |
| channel="in_app", | |
| roles=["project_manager", "dispatcher"], | |
| metadata={ | |
| "task_id": str(task.id), | |
| "task_title": task.task_title, | |
| "completed_by": current_user.full_name, | |
| "completed_at": str(task.completed_at), | |
| "completion_notes": data.completion_notes, | |
| "action_url": f"/tasks/{task.id}" | |
| } | |
| ) | |
| db.commit() | |
| # Queue delivery (Tier 2 - Asynchronous) | |
| if background_tasks and notification_ids: | |
| NotificationDelivery.queue_bulk_delivery( | |
| background_tasks=background_tasks, | |
| notification_ids=notification_ids | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to send task completion notification: {str(e)}") | |
| return task | |
| def cancel_task( | |
| db: Session, | |
| task_id: UUID, | |
| data: TaskCancel, | |
| current_user: User, | |
| background_tasks: Optional[BackgroundTasks] = None | |
| ) -> Task: | |
| """Cancel a task""" | |
| task = TaskService.get_task_by_id(db, task_id, current_user) | |
| if not task.can_cancel(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Task cannot be cancelled from current status: {task.status.value}" | |
| ) | |
| task.status = TaskStatus.CANCELLED | |
| current_notes = task.notes or "" | |
| task.notes = f"{current_notes}\nCancelled: {data.cancellation_reason}".strip() | |
| task.updated_at = datetime.utcnow() | |
| db.commit() | |
| db.refresh(task) | |
| logger.info(f"Task cancelled: {task.id}") | |
| # Notify PMs/managers about task cancellation (Tier 1 - Synchronous) | |
| try: | |
| notification_ids = NotificationCreator.notify_project_team( | |
| db=db, | |
| project_id=task.project_id, | |
| title=f"❌ Task Cancelled", | |
| message=f"Task '{task.task_title}' was cancelled by {current_user.full_name}.\n\nReason: {data.cancellation_reason}", | |
| source_type="task", | |
| source_id=task.id, | |
| notification_type="task_cancelled", | |
| channel="in_app", | |
| roles=["project_manager", "dispatcher"], | |
| metadata={ | |
| "task_id": str(task.id), | |
| "task_title": task.task_title, | |
| "cancelled_by": current_user.full_name, | |
| "cancellation_reason": data.cancellation_reason, | |
| "action_url": f"/tasks/{task.id}" | |
| } | |
| ) | |
| db.commit() | |
| # Queue delivery (Tier 2 - Asynchronous) | |
| if background_tasks and notification_ids: | |
| NotificationDelivery.queue_bulk_delivery( | |
| background_tasks=background_tasks, | |
| notification_ids=notification_ids | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to send task cancellation notification: {str(e)}") | |
| return task | |
| def delete_task( | |
| db: Session, | |
| task_id: UUID, | |
| current_user: User | |
| ) -> None: | |
| """ | |
| Soft delete a task (platform_admin and project_manager only) | |
| """ | |
| if current_user.role not in [AppRole.PLATFORM_ADMIN, AppRole.PROJECT_MANAGER]: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only platform administrators and project managers can delete tasks" | |
| ) | |
| task = db.query(Task).filter( | |
| Task.id == task_id, | |
| Task.deleted_at == None | |
| ).first() | |
| if not task: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=f"Task with ID {task_id} not found" | |
| ) | |
| # Soft delete | |
| task.deleted_at = datetime.utcnow() | |
| db.commit() | |
| logger.info(f"Task deleted: {task_id} by user {current_user.id}") | |