swiftops-backend / src /app /services /task_service.py
kamau1's picture
feat: unified sync notification system, realtime timesheets and payroll, simplified project role compensation
95005e1
"""
Task Service - Core business logic for task management (infrastructure projects)
Handles task CRUD operations with project type validation and authorization
"""
import logging
from typing import Optional, Dict, Any, List, Tuple
from datetime import datetime, date
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import or_, and_, func
from fastapi import HTTPException, status, BackgroundTasks
from uuid import UUID
from app.models.task import Task
from app.models.project import Project, ProjectRegion
from app.models.user import User
from app.models.enums import AppRole, TaskStatus, TicketPriority
from app.schemas.task import TaskCreate, TaskUpdate, TaskStatusUpdate, TaskStart, TaskComplete, TaskCancel
from app.services.notification_creator import NotificationCreator
from app.services.notification_delivery import NotificationDelivery
logger = logging.getLogger(__name__)
class TaskService:
"""Service for managing tasks with authorization and project type validation"""
# ============================================
# AUTHORIZATION HELPERS
# ============================================
@staticmethod
def can_user_create_task(user: User, project: Project) -> bool:
"""Check if user can create a task for this project"""
if user.role == AppRole.PLATFORM_ADMIN:
return True
# Project managers can create tasks
if user.role == AppRole.PROJECT_MANAGER:
# Check if user is the primary manager or on the project team
if project.primary_manager_id == user.id:
return True
# Client and contractor admins can create tasks for their projects
if user.role == AppRole.CLIENT_ADMIN and user.client_id == project.client_id:
return True
if user.role == AppRole.CONTRACTOR_ADMIN and user.contractor_id == project.contractor_id:
return True
return False
@staticmethod
def can_user_view_task(user: User, task: Task) -> bool:
"""Check if user can view this task"""
if user.role == AppRole.PLATFORM_ADMIN:
return True
# Project managers can view tasks
if user.role == AppRole.PROJECT_MANAGER:
return True
# Users can view tasks from their organization's projects
if user.client_id and task.project.client_id == user.client_id:
return True
if user.contractor_id and task.project.contractor_id == user.contractor_id:
return True
return False
@staticmethod
def can_user_edit_task(user: User, task: Task) -> bool:
"""Check if user can edit this task"""
if user.role == AppRole.PLATFORM_ADMIN:
return True
# Project managers can edit tasks
if user.role == AppRole.PROJECT_MANAGER:
if task.project.primary_manager_id == user.id:
return True
# Admins can edit tasks for their projects
if user.role == AppRole.CLIENT_ADMIN and user.client_id == task.project.client_id:
return True
if user.role == AppRole.CONTRACTOR_ADMIN and user.contractor_id == task.project.contractor_id:
return True
return False
# ============================================
# TASK CRUD
# ============================================
@staticmethod
def create_task(
db: Session,
data: TaskCreate,
current_user: User,
background_tasks: Optional[BackgroundTasks] = None
) -> Task:
"""
Create a new task for an infrastructure project
Business Rules:
- Project must exist and be active
- Project should be of infrastructure type (warning if not, but allow)
- User must have permission to create tasks for this project
- If project_region_id provided, must exist and belong to this project
- Location coordinates must be provided together
Authorization:
- platform_admin: Can create for any project
- project_manager: Can create for projects they manage
- client_admin/contractor_admin: Can create for their organization's projects
"""
# Validate project exists
project = db.query(Project).filter(
Project.id == data.project_id,
Project.deleted_at == None
).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Project with ID {data.project_id} not found"
)
# Authorization check
if not TaskService.can_user_create_task(current_user, project):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to create tasks for this project"
)
# Log task creation with context
logger.info(
f"Creating {data.task_type or 'general'} task for project {project.id} "
f"({project.title}). Task: {data.task_title}"
)
# Validate project_region if provided
if data.project_region_id:
region = db.query(ProjectRegion).filter(
ProjectRegion.id == data.project_region_id,
ProjectRegion.project_id == data.project_id,
ProjectRegion.deleted_at == None
).first()
if not region:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Project region with ID {data.project_region_id} not found or doesn't belong to this project"
)
# Create task
task_data = data.model_dump()
task_data['created_by_user_id'] = current_user.id
task = Task(**task_data)
db.add(task)
db.commit()
db.refresh(task)
logger.info(f"Task created: {task.id} - {task.task_title} for project {project.id} by user {current_user.id}")
# Notify PMs/managers about new task (Tier 1 - Synchronous)
try:
notification_ids = NotificationCreator.notify_project_team(
db=db,
project_id=task.project_id,
title=f"📋 New Task Created",
message=f"Task '{task.task_title}' was created by {current_user.full_name}.\n\nType: {task.task_type or 'General'}\nPriority: {task.priority.value if task.priority else 'Normal'}",
source_type="task",
source_id=task.id,
notification_type="task_created",
channel="in_app",
roles=["project_manager", "dispatcher"],
metadata={
"task_id": str(task.id),
"task_title": task.task_title,
"task_type": task.task_type,
"priority": task.priority.value if task.priority else None,
"created_by": current_user.full_name,
"action_url": f"/tasks/{task.id}"
}
)
db.commit()
# Queue delivery (Tier 2 - Asynchronous)
if background_tasks and notification_ids:
NotificationDelivery.queue_bulk_delivery(
background_tasks=background_tasks,
notification_ids=notification_ids
)
except Exception as e:
logger.error(f"Failed to send task creation notification: {str(e)}")
return task
@staticmethod
def list_tasks(
db: Session,
current_user: User,
skip: int = 0,
limit: int = 50,
project_id: Optional[UUID] = None,
project_region_id: Optional[UUID] = None,
status: Optional[TaskStatus] = None,
task_type: Optional[str] = None,
priority: Optional[TicketPriority] = None,
scheduled_date_from: Optional[date] = None,
scheduled_date_to: Optional[date] = None,
is_overdue: Optional[bool] = None,
search: Optional[str] = None
) -> Tuple[List[Task], int]:
"""
List tasks with authorization filtering
Filters:
- project_id: Filter by specific project
- project_region_id: Filter by region
- status: Filter by task status
- task_type: Filter by task type
- priority: Filter by priority
- scheduled_date_from/to: Date range filter
- is_overdue: Filter overdue tasks
- search: Search by title, description, or location
Authorization:
- platform_admin: Can see all tasks
- Managers: Can see all tasks in projects they're involved with
- Admins: Can see tasks in their organization's projects
"""
query = db.query(Task).filter(Task.deleted_at == None)
# Authorization filtering
if current_user.role == AppRole.PLATFORM_ADMIN:
# Platform admins see all
pass
elif current_user.role in [AppRole.PROJECT_MANAGER, AppRole.DISPATCHER]:
# Managers see all (could be refined with project_team check)
pass
else:
# Filter by organization
if current_user.client_id:
query = query.join(Project).filter(Project.client_id == current_user.client_id)
elif current_user.contractor_id:
query = query.join(Project).filter(Project.contractor_id == current_user.contractor_id)
# Apply filters
if project_id:
query = query.filter(Task.project_id == project_id)
if project_region_id:
query = query.filter(Task.project_region_id == project_region_id)
if status:
query = query.filter(Task.status == status)
if task_type:
query = query.filter(Task.task_type.ilike(f"%{task_type}%"))
if priority:
query = query.filter(Task.priority == priority)
if scheduled_date_from:
query = query.filter(Task.scheduled_date >= scheduled_date_from)
if scheduled_date_to:
query = query.filter(Task.scheduled_date <= scheduled_date_to)
if is_overdue is not None:
if is_overdue:
# Overdue: scheduled date in past and not completed
query = query.filter(
Task.scheduled_date < date.today(),
Task.status.notin_([TaskStatus.COMPLETED, TaskStatus.CANCELLED])
)
else:
# Not overdue
query = query.filter(
or_(
Task.scheduled_date >= date.today(),
Task.scheduled_date == None,
Task.status.in_([TaskStatus.COMPLETED, TaskStatus.CANCELLED])
)
)
if search:
search_filter = or_(
Task.task_title.ilike(f"%{search}%"),
Task.task_description.ilike(f"%{search}%"),
Task.location_name.ilike(f"%{search}%"),
Task.task_address_line1.ilike(f"%{search}%")
)
query = query.filter(search_filter)
# Get total count
total = query.count()
# Get paginated results with relationships
tasks = query.options(
joinedload(Task.project),
joinedload(Task.project_region),
joinedload(Task.created_by)
).order_by(Task.scheduled_date.asc().nullslast(), Task.created_at.desc()).offset(skip).limit(limit).all()
return tasks, total
@staticmethod
def get_task_by_id(
db: Session,
task_id: UUID,
current_user: User
) -> Task:
"""Get a single task by ID with authorization check"""
task = db.query(Task).options(
joinedload(Task.project),
joinedload(Task.project_region),
joinedload(Task.created_by)
).filter(
Task.id == task_id,
Task.deleted_at == None
).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with ID {task_id} not found"
)
# Authorization check
if not TaskService.can_user_view_task(current_user, task):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to view this task"
)
return task
@staticmethod
def update_task(
db: Session,
task_id: UUID,
data: TaskUpdate,
current_user: User
) -> Task:
"""Update an existing task"""
# Get existing task
task = db.query(Task).filter(
Task.id == task_id,
Task.deleted_at == None
).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with ID {task_id} not found"
)
# Authorization check
if not TaskService.can_user_edit_task(current_user, task):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to edit this task"
)
# Validate project_region if being updated
if data.project_region_id:
region = db.query(ProjectRegion).filter(
ProjectRegion.id == data.project_region_id,
ProjectRegion.project_id == task.project_id,
ProjectRegion.deleted_at == None
).first()
if not region:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Project region with ID {data.project_region_id} not found or doesn't belong to this project"
)
# Validate timeline if both dates being updated
if data.started_at and data.completed_at:
if data.completed_at < data.started_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="completed_at must be after started_at"
)
# Update fields
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value)
task.updated_at = datetime.utcnow()
db.commit()
db.refresh(task)
logger.info(f"Task updated: {task.id} by user {current_user.id}")
return task
@staticmethod
def update_task_status(
db: Session,
task_id: UUID,
data: TaskStatusUpdate,
current_user: User
) -> Task:
"""Update task status with validation"""
task = TaskService.get_task_by_id(db, task_id, current_user)
if not TaskService.can_user_edit_task(current_user, task):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to update this task status"
)
old_status = task.status
new_status = data.status
# Validate status transitions
if old_status == TaskStatus.COMPLETED and new_status != TaskStatus.COMPLETED:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot change status of completed task. Create a new task if work needs to be re-done."
)
if old_status == TaskStatus.CANCELLED and new_status != TaskStatus.CANCELLED:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot reopen cancelled task. Create a new task instead."
)
# Auto-set timestamps based on status
if new_status == TaskStatus.IN_PROGRESS and not task.started_at:
task.started_at = datetime.utcnow()
if new_status == TaskStatus.COMPLETED and not task.completed_at:
task.completed_at = datetime.utcnow()
task.status = new_status
if data.reason:
current_notes = task.notes or ""
task.notes = f"{current_notes}\nStatus change ({old_status.value}{new_status.value}): {data.reason}".strip()
task.updated_at = datetime.utcnow()
db.commit()
db.refresh(task)
logger.info(f"Task status updated: {task.id} from {old_status.value} to {new_status.value}")
return task
@staticmethod
def start_task(
db: Session,
task_id: UUID,
data: TaskStart,
current_user: User
) -> Task:
"""Start a task (convenience method)"""
task = TaskService.get_task_by_id(db, task_id, current_user)
if not task.can_start():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Task cannot be started from current status: {task.status.value}"
)
task.status = TaskStatus.IN_PROGRESS
task.started_at = data.started_at or datetime.utcnow()
if data.notes:
current_notes = task.notes or ""
task.notes = f"{current_notes}\nStarted: {data.notes}".strip()
task.updated_at = datetime.utcnow()
db.commit()
db.refresh(task)
logger.info(f"Task started: {task.id}")
return task
@staticmethod
def complete_task(
db: Session,
task_id: UUID,
data: TaskComplete,
current_user: User,
background_tasks: Optional[BackgroundTasks] = None
) -> Task:
"""Complete a task"""
task = TaskService.get_task_by_id(db, task_id, current_user)
if not task.can_complete():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Task cannot be completed from current status: {task.status.value}"
)
task.status = TaskStatus.COMPLETED
task.completed_at = data.completed_at or datetime.utcnow()
if not task.started_at:
task.started_at = task.completed_at
if data.completion_notes:
current_notes = task.notes or ""
task.notes = f"{current_notes}\nCompleted: {data.completion_notes}".strip()
task.updated_at = datetime.utcnow()
db.commit()
db.refresh(task)
logger.info(f"Task completed: {task.id}")
# Notify PMs/managers about task completion (Tier 1 - Synchronous)
try:
notification_ids = NotificationCreator.notify_project_team(
db=db,
project_id=task.project_id,
title=f"✅ Task Completed",
message=f"Task '{task.task_title}' was completed by {current_user.full_name}.",
source_type="task",
source_id=task.id,
notification_type="task_completed",
channel="in_app",
roles=["project_manager", "dispatcher"],
metadata={
"task_id": str(task.id),
"task_title": task.task_title,
"completed_by": current_user.full_name,
"completed_at": str(task.completed_at),
"completion_notes": data.completion_notes,
"action_url": f"/tasks/{task.id}"
}
)
db.commit()
# Queue delivery (Tier 2 - Asynchronous)
if background_tasks and notification_ids:
NotificationDelivery.queue_bulk_delivery(
background_tasks=background_tasks,
notification_ids=notification_ids
)
except Exception as e:
logger.error(f"Failed to send task completion notification: {str(e)}")
return task
@staticmethod
def cancel_task(
db: Session,
task_id: UUID,
data: TaskCancel,
current_user: User,
background_tasks: Optional[BackgroundTasks] = None
) -> Task:
"""Cancel a task"""
task = TaskService.get_task_by_id(db, task_id, current_user)
if not task.can_cancel():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Task cannot be cancelled from current status: {task.status.value}"
)
task.status = TaskStatus.CANCELLED
current_notes = task.notes or ""
task.notes = f"{current_notes}\nCancelled: {data.cancellation_reason}".strip()
task.updated_at = datetime.utcnow()
db.commit()
db.refresh(task)
logger.info(f"Task cancelled: {task.id}")
# Notify PMs/managers about task cancellation (Tier 1 - Synchronous)
try:
notification_ids = NotificationCreator.notify_project_team(
db=db,
project_id=task.project_id,
title=f"❌ Task Cancelled",
message=f"Task '{task.task_title}' was cancelled by {current_user.full_name}.\n\nReason: {data.cancellation_reason}",
source_type="task",
source_id=task.id,
notification_type="task_cancelled",
channel="in_app",
roles=["project_manager", "dispatcher"],
metadata={
"task_id": str(task.id),
"task_title": task.task_title,
"cancelled_by": current_user.full_name,
"cancellation_reason": data.cancellation_reason,
"action_url": f"/tasks/{task.id}"
}
)
db.commit()
# Queue delivery (Tier 2 - Asynchronous)
if background_tasks and notification_ids:
NotificationDelivery.queue_bulk_delivery(
background_tasks=background_tasks,
notification_ids=notification_ids
)
except Exception as e:
logger.error(f"Failed to send task cancellation notification: {str(e)}")
return task
@staticmethod
def delete_task(
db: Session,
task_id: UUID,
current_user: User
) -> None:
"""
Soft delete a task (platform_admin and project_manager only)
"""
if current_user.role not in [AppRole.PLATFORM_ADMIN, AppRole.PROJECT_MANAGER]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only platform administrators and project managers can delete tasks"
)
task = db.query(Task).filter(
Task.id == task_id,
Task.deleted_at == None
).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with ID {task_id} not found"
)
# Soft delete
task.deleted_at = datetime.utcnow()
db.commit()
logger.info(f"Task deleted: {task_id} by user {current_user.id}")