swiftops-backend / src /app /services /project_service.py
kamau1's picture
fix: convert Decimal to float in invoice pricing rules for JSON serialization
8f5c70d
"""
Project Service - Core business logic for project management
Handles project CRUD operations with proper authorization and validation
"""
import logging
from typing import Optional, Dict, Any, List, Tuple
from datetime import datetime, date
from decimal import Decimal
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import or_, and_, func, String
from fastapi import HTTPException, status, BackgroundTasks
from uuid import UUID
from app.models.project import Project, ProjectRegion, ProjectRole, ProjectSubcontractor
from app.models.project_team import ProjectTeam
from app.models.client import Client
from app.models.contractor import Contractor
from app.models.user import User
from app.schemas.project import (
ProjectCreate, ProjectUpdate, ProjectStatusUpdate,
ProjectRegionCreate, ProjectRegionUpdate,
ProjectRoleCreate, ProjectRoleUpdate,
ProjectTeamCreate, ProjectTeamUpdate,
ProjectSubcontractorCreate, ProjectSubcontractorUpdate,
ProjectSetup, ProjectSetupResponse
)
from app.schemas.filters import ProjectFilters
from app.services.base_filter_service import BaseFilterService
from app.services.notification_creator import NotificationCreator
from app.services.notification_delivery import NotificationDelivery
logger = logging.getLogger(__name__)
def _convert_decimals_to_float(obj):
"""
Recursively convert Decimal objects to float for JSON serialization
Args:
obj: Object to convert (dict, list, or primitive)
Returns:
Object with all Decimals converted to float
"""
if isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, dict):
return {k: _convert_decimals_to_float(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [_convert_decimals_to_float(item) for item in obj]
else:
return obj
class ProjectService(BaseFilterService):
"""Service for managing projects with authorization"""
# ============================================
# PROJECT CRUD
# ============================================
@staticmethod
def create_project(
db: Session,
data: ProjectCreate,
current_user: User
) -> Project:
"""
Create a new project with full validation
Business Rules:
- Only admins can create projects
- Client must exist and be active
- Contractor must exist and be active
- Client and contractor must be different
- No duplicate active projects (same client+contractor+title)
- Date validation (end >= start)
Authorization:
- platform_admin: Can create any project
- client_admin: Can create if they own the client
- contractor_admin: Can create if they own the contractor
"""
# Authorization check
if not ProjectService.can_user_create_project(current_user, data.client_id, data.contractor_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to create this project"
)
# Validate client exists and is active
client = db.query(Client).filter(
Client.id == data.client_id,
Client.deleted_at == None
).first()
if not client:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Client with ID {data.client_id} not found"
)
if not client.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Client organization is not active"
)
# Validate contractor exists and is active
contractor = db.query(Contractor).filter(
Contractor.id == data.contractor_id,
Contractor.deleted_at == None
).first()
if not contractor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Contractor with ID {data.contractor_id} not found"
)
if not contractor.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Contractor organization is not active"
)
# Validate primary manager if provided
if data.primary_manager_id:
manager = db.query(User).filter(
User.id == data.primary_manager_id,
User.deleted_at == None
).first()
if not manager:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Primary manager with ID {data.primary_manager_id} not found"
)
# Manager should belong to client or contractor
if manager.client_id != data.client_id and manager.contractor_id != data.contractor_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Primary manager must belong to either client or contractor organization"
)
# Check for duplicate project (same client+contractor+title, not deleted)
existing = db.query(Project).filter(
Project.client_id == data.client_id,
Project.contractor_id == data.contractor_id,
func.lower(Project.title) == data.title.lower(),
Project.deleted_at == None
).first()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"A project with title '{data.title}' already exists for this client-contractor pair"
)
# Convert Pydantic models to dict for JSONB fields
project_data = data.model_dump()
# Convert nested Pydantic models to dicts
if 'activation_requirements' in project_data:
project_data['activation_requirements'] = [
req.model_dump() if hasattr(req, 'model_dump') else req
for req in project_data['activation_requirements']
]
if 'photo_requirements' in project_data:
project_data['photo_requirements'] = [
req.model_dump() if hasattr(req, 'model_dump') else req
for req in project_data['photo_requirements']
]
if 'budget' in project_data:
project_data['budget'] = {
k: v.model_dump() if hasattr(v, 'model_dump') else v
for k, v in project_data['budget'].items()
}
# Convert invoice_pricing_rules to dict and handle Decimal serialization
if 'invoice_pricing_rules' in project_data and project_data['invoice_pricing_rules']:
pricing_rules = project_data['invoice_pricing_rules']
if hasattr(pricing_rules, 'model_dump'):
# Convert Pydantic model to dict
pricing_dict = pricing_rules.model_dump()
# Convert Decimal to float for JSON serialization
project_data['invoice_pricing_rules'] = _convert_decimals_to_float(pricing_dict)
else:
# Already a dict, just convert Decimals
project_data['invoice_pricing_rules'] = _convert_decimals_to_float(pricing_rules)
if 'inventory_requirements' in project_data:
project_data['inventory_requirements'] = {
k: v.model_dump() if hasattr(v, 'model_dump') else v
for k, v in project_data['inventory_requirements'].items()
}
# Create project
try:
new_project = Project(**project_data)
# Determine initial status based on whether primary_manager is assigned
if data.primary_manager_id:
# If primary manager assigned, start in draft (awaiting setup)
new_project.status = 'draft'
else:
# If no primary manager, start in planning (legacy behavior)
new_project.status = 'planning'
db.add(new_project)
db.commit()
db.refresh(new_project)
# Auto-add primary manager to project team if specified
if data.primary_manager_id:
team_member = ProjectTeam(
project_id=new_project.id,
user_id=data.primary_manager_id,
role='project_manager',
is_lead=True, # Primary manager is the lead by default
is_assigned_slot=True,
assigned_at=datetime.utcnow()
)
db.add(team_member)
db.commit()
logger.info(f"Auto-added primary manager {data.primary_manager_id} to project team with is_lead=True")
logger.info(f"Project created: {new_project.title} (ID: {new_project.id}, status: {new_project.status}) by user {current_user.id}")
return new_project
except Exception as e:
db.rollback()
# Check if it's an enum validation error
error_str = str(e)
if 'invalid input value for enum servicetype' in error_str.lower():
# Extract the invalid value from error message
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid service_type. Must be one of: ftth, fttb, fixed_wireless, dsl, adsl, cable, other"
)
# Re-raise other exceptions
raise
@staticmethod
def can_user_create_project(user: User, client_id: UUID, contractor_id: UUID) -> bool:
"""
Check if user has permission to create project
Authorization Rules:
- platform_admin: YES (always)
- client_admin: YES if user.client_id == client_id
- contractor_admin: YES if user.contractor_id == contractor_id
- others: NO
"""
if user.role == 'platform_admin':
return True
if user.role == 'client_admin' and user.client_id == client_id:
return True
if user.role == 'contractor_admin' and user.contractor_id == contractor_id:
return True
return False
@staticmethod
def can_user_view_project(user: User, project: Project) -> bool:
"""Check if user can view this project"""
if user.role == 'platform_admin':
return True
if user.client_id == project.client_id:
return True
if user.contractor_id == project.contractor_id:
return True
# Check if user is on project team
if hasattr(project, 'team'):
team_user_ids = [member.user_id for member in project.team if member.deleted_at is None]
if user.id in team_user_ids:
return True
return False
@staticmethod
def can_user_edit_project(user: User, project: Project) -> bool:
"""Check if user can edit this project"""
if user.role == 'platform_admin':
return True
if user.role == 'client_admin' and user.client_id == project.client_id:
return True
if user.role == 'contractor_admin' and user.contractor_id == project.contractor_id:
return True
if user.role == 'project_manager' and project.primary_manager_id == user.id:
return True
return False
@staticmethod
def list_projects(
db: Session,
current_user: User,
skip: int = 0,
limit: int = 50,
status: Optional[str] = None,
client_id: Optional[UUID] = None,
contractor_id: Optional[UUID] = None,
service_type: Optional[str] = None,
is_closed: Optional[bool] = None,
search: Optional[str] = None
) -> Tuple[List[Project], int]:
"""
List projects with authorization filtering
Returns: (projects, total_count)
"""
# Base query with relationships
query = db.query(Project).options(
joinedload(Project.client),
joinedload(Project.contractor),
joinedload(Project.primary_manager)
).filter(Project.deleted_at == None)
# Authorization filtering
if current_user.role != 'platform_admin':
# Filter to only projects user has access to
# Include projects where:
# 1. User's client/contractor matches
# 2. User is primary manager (to see draft projects awaiting setup)
# 3. User is on project team
query = query.filter(
or_(
Project.client_id == current_user.client_id,
Project.contractor_id == current_user.contractor_id,
Project.primary_manager_id == current_user.id # Draft projects awaiting setup
)
)
# Apply filters
if status:
query = query.filter(Project.status == status)
if client_id:
query = query.filter(Project.client_id == client_id)
if contractor_id:
query = query.filter(Project.contractor_id == contractor_id)
if service_type:
query = query.filter(Project.service_type == service_type)
if is_closed is not None:
query = query.filter(Project.is_closed == is_closed)
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
Project.title.ilike(search_term),
Project.description.ilike(search_term),
Project.project_type.cast(String).ilike(search_term)
)
)
# Get total count
total = query.count()
# Get paginated results
projects = query.order_by(Project.created_at.desc()).offset(skip).limit(limit).all()
return projects, total
@staticmethod
def list_projects_with_filters(
db: Session,
filters: ProjectFilters,
current_user: User
) -> Tuple[List[Project], int]:
"""
List projects with comprehensive filtering support
Enhanced filtering with:
- Multi-value filters (status, project_type, service_type)
- Date range filters
- Boolean filters
- Text search
- Flexible sorting
- Standardized pagination
"""
# Base query with relationships
query = db.query(Project).options(
joinedload(Project.client),
joinedload(Project.contractor),
joinedload(Project.primary_manager)
).filter(Project.deleted_at == None)
# Authorization filtering (same as existing method)
if current_user.role != 'platform_admin':
query = query.filter(
or_(
Project.client_id == current_user.client_id,
Project.contractor_id == current_user.contractor_id,
Project.primary_manager_id == current_user.id
)
)
# Apply UUID filters
query = ProjectService.apply_uuid_filter(query, Project, 'client_id', filters.client_id)
query = ProjectService.apply_uuid_filter(query, Project, 'contractor_id', filters.contractor_id)
query = ProjectService.apply_uuid_filter(query, Project, 'primary_manager_id', filters.primary_manager_id)
# Apply multi-value filters
query = ProjectService.apply_multi_value_filter(query, Project, 'status', filters.status)
query = ProjectService.apply_multi_value_filter(query, Project, 'project_type', filters.project_type)
query = ProjectService.apply_multi_value_filter(query, Project, 'service_type', filters.service_type)
# Apply boolean filters
query = ProjectService.apply_boolean_filter(query, Project, 'is_closed', filters.is_closed)
query = ProjectService.apply_boolean_filter(query, Project, 'is_billable', filters.is_billable)
# Apply date filters
query = ProjectService.apply_date_filter(
query, Project, 'planned_start_date',
from_date=filters.planned_start_from,
to_date=filters.planned_start_to
)
query = ProjectService.apply_date_filter(
query, Project, 'actual_start_date',
from_date=filters.actual_start_from,
to_date=filters.actual_start_to
)
query = ProjectService.apply_date_filter(
query, Project, 'created_at',
from_date=filters.from_date,
to_date=filters.to_date
)
# Apply search
search_fields = ['title', 'description']
query = ProjectService.apply_search(query, Project, filters.search, search_fields)
# Apply sorting
query = ProjectService.apply_sorting(query, Project, filters.sort_by, filters.sort_order)
# Apply pagination
paginated_query, total = ProjectService.apply_pagination(query, filters.page, filters.page_size)
# Execute query
projects = paginated_query.all()
logger.info(f"Listed {len(projects)} projects (total: {total}) for user {current_user.id}")
return projects, total
@staticmethod
def get_project_by_id(db: Session, project_id: UUID, current_user: User) -> Project:
"""Get single project by ID with authorization check"""
project = db.query(Project).options(
joinedload(Project.client),
joinedload(Project.contractor),
joinedload(Project.primary_manager)
).filter(
Project.id == project_id,
Project.deleted_at == None
).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Authorization check
if not ProjectService.can_user_view_project(current_user, project):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to view this project"
)
return project
@staticmethod
def update_project(
db: Session,
project_id: UUID,
data: ProjectUpdate,
current_user: User
) -> Project:
"""Update project with authorization"""
project = ProjectService.get_project_by_id(db, project_id, current_user)
# Authorization check
if not ProjectService.can_user_edit_project(current_user, project):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to edit this project"
)
# Update fields
update_data = data.model_dump(exclude_unset=True)
# Convert nested Pydantic models to dicts
if 'activation_requirements' in update_data and update_data['activation_requirements'] is not None:
update_data['activation_requirements'] = [
req.model_dump() if hasattr(req, 'model_dump') else req
for req in update_data['activation_requirements']
]
if 'photo_requirements' in update_data and update_data['photo_requirements'] is not None:
update_data['photo_requirements'] = [
req.model_dump() if hasattr(req, 'model_dump') else req
for req in update_data['photo_requirements']
]
if 'budget' in update_data and update_data['budget'] is not None:
update_data['budget'] = {
k: v.model_dump() if hasattr(v, 'model_dump') else v
for k, v in update_data['budget'].items()
}
if 'inventory_requirements' in update_data and update_data['inventory_requirements'] is not None:
update_data['inventory_requirements'] = {
k: v.model_dump() if hasattr(v, 'model_dump') else v
for k, v in update_data['inventory_requirements'].items()
}
# Convert invoice_pricing_rules to dict and handle Decimal serialization
if 'invoice_pricing_rules' in update_data and update_data['invoice_pricing_rules']:
pricing_rules = update_data['invoice_pricing_rules']
if hasattr(pricing_rules, 'model_dump'):
# Convert Pydantic model to dict
pricing_dict = pricing_rules.model_dump()
# Convert Decimal to float for JSON serialization
update_data['invoice_pricing_rules'] = _convert_decimals_to_float(pricing_dict)
else:
# Already a dict, just convert Decimals
update_data['invoice_pricing_rules'] = _convert_decimals_to_float(pricing_rules)
# Validate primary manager if being updated
if 'primary_manager_id' in update_data and update_data['primary_manager_id']:
manager = db.query(User).filter(
User.id == update_data['primary_manager_id'],
User.deleted_at == None
).first()
if not manager:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Primary manager not found"
)
if manager.client_id != project.client_id and manager.contractor_id != project.contractor_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Primary manager must belong to client or contractor"
)
# Validate dates
if 'planned_start_date' in update_data or 'planned_end_date' in update_data:
start = update_data.get('planned_start_date', project.planned_start_date)
end = update_data.get('planned_end_date', project.planned_end_date)
if start and end and end < start:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="planned_end_date must be >= planned_start_date"
)
if 'actual_start_date' in update_data or 'actual_end_date' in update_data:
start = update_data.get('actual_start_date', project.actual_start_date)
end = update_data.get('actual_end_date', project.actual_end_date)
if start and end and end < start:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="actual_end_date must be >= actual_start_date"
)
# Apply updates
for field, value in update_data.items():
setattr(project, field, value)
db.commit()
db.refresh(project)
logger.info(f"Project updated: {project.id} by user {current_user.id}")
return project
@staticmethod
def update_project_status(
db: Session,
project_id: UUID,
data: ProjectStatusUpdate,
current_user: User
) -> Project:
"""Update project status with validation"""
project = ProjectService.get_project_by_id(db, project_id, current_user)
if not ProjectService.can_user_edit_project(current_user, project):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to change project status"
)
# Validate status transition
old_status = project.status
new_status = data.status
# Business rules for status transitions
if old_status == 'completed' and new_status != 'completed':
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot change status of completed project"
)
if project.is_closed:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot change status of closed project"
)
# Update status
project.status = new_status
# Auto-set actual_start_date when activating
if new_status == 'active' and not project.actual_start_date:
project.actual_start_date = date.today()
# Auto-set actual_end_date when completing
if new_status == 'completed' and not project.actual_end_date:
project.actual_end_date = date.today()
db.commit()
db.refresh(project)
logger.info(f"Project status changed: {project.id} from {old_status} to {new_status} by user {current_user.id}")
return project
@staticmethod
def close_project(
db: Session,
project_id: UUID,
current_user: User,
reason: Optional[str] = None
) -> Project:
"""Close a project (final action)"""
project = ProjectService.get_project_by_id(db, project_id, current_user)
if not ProjectService.can_user_edit_project(current_user, project):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to close this project"
)
if project.is_closed:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Project is already closed"
)
# Close the project
project.is_closed = True
project.closed_at = datetime.utcnow()
project.closed_by_user_id = current_user.id
if project.status not in ['completed', 'cancelled']:
project.status = 'completed'
if not project.actual_end_date:
project.actual_end_date = date.today()
if reason:
if not project.additional_metadata:
project.additional_metadata = {}
project.additional_metadata['closure_reason'] = reason
db.commit()
db.refresh(project)
logger.info(f"Project closed: {project.id} by user {current_user.id}")
return project
@staticmethod
def complete_project_setup(
db: Session,
project_id: UUID,
setup_data: 'ProjectSetup',
current_user: User,
background_tasks: Optional[BackgroundTasks] = None
) -> Dict[str, Any]:
"""
Complete project setup: Add regions, roles, subcontractors, and team members.
Transitions project from 'draft' to 'planning'.
Authorization:
- Only the primary_manager can complete setup
- Or platform_admin
"""
from app.schemas.project import ProjectSetupResponse
# Get project
project = db.query(Project).filter(
Project.id == project_id,
Project.deleted_at == None
).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Authorization: Only primary manager or platform admin
if current_user.role != 'platform_admin' and project.primary_manager_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the primary manager or platform admin can complete project setup"
)
# Validate project is in draft status
if project.status != 'draft':
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Project must be in 'draft' status for setup. Current status: {project.status}"
)
regions_created = 0
roles_created = 0
subcontractors_added = 0
team_members_added = 0
try:
# 1. Create regions
for region_data in setup_data.regions:
region = ProjectRegion(
project_id=project_id,
**region_data.model_dump()
)
db.add(region)
regions_created += 1
if regions_created > 0:
db.flush() # Get region IDs for team assignments
# 2. Create project roles (for casual workers)
for role_data in setup_data.roles:
role = ProjectRole(
project_id=project_id,
**role_data.model_dump()
)
db.add(role)
roles_created += 1
if roles_created > 0:
db.flush()
# 3. Add subcontractors
for subcontractor_data in setup_data.subcontractors:
# Validate subcontractor exists
subcontractor = db.query(Contractor).filter(
Contractor.id == subcontractor_data.subcontractor_id,
Contractor.deleted_at == None
).first()
if not subcontractor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Subcontractor {subcontractor_data.subcontractor_id} not found"
)
project_subcontractor = ProjectSubcontractor(
project_id=project_id,
**subcontractor_data.model_dump()
)
db.add(project_subcontractor)
subcontractors_added += 1
if subcontractors_added > 0:
db.flush()
# 4. Add team members
for team_data in setup_data.team:
# Validate user exists
user = db.query(User).filter(
User.id == team_data.user_id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User {team_data.user_id} not found"
)
# Check if user already in team (skip primary manager if already added)
existing = db.query(ProjectTeam).filter(
ProjectTeam.project_id == project_id,
ProjectTeam.user_id == team_data.user_id,
ProjectTeam.deleted_at == None
).first()
if existing:
# Update existing team member
existing.role = team_data.role
existing.is_lead = team_data.is_lead
existing.project_region_id = team_data.project_region_id
logger.info(f"Updated existing team member {team_data.user_id}")
else:
# Add new team member
team_member = ProjectTeam(
project_id=project_id,
user_id=team_data.user_id,
role=team_data.role,
is_lead=team_data.is_lead,
project_region_id=team_data.project_region_id,
is_assigned_slot=True,
assigned_at=datetime.utcnow()
)
db.add(team_member)
team_members_added += 1
# Notify user about being added to project (Tier 1 - Synchronous)
try:
notification = NotificationCreator.create(
db=db,
user_id=team_data.user_id,
title=f"🎯 Added to Project",
message=f"You have been added to project '{project.project_name}' by {current_user.full_name}.\n\nYour role: {team_data.role}",
source_type="project",
source_id=project_id,
notification_type="project_invitation",
channel="in_app",
project_id=project_id,
metadata={
"project_id": str(project_id),
"project_name": project.project_name,
"invited_by": current_user.full_name,
"role": team_data.role,
"action_url": f"/projects/{project_id}"
}
)
db.commit()
# Queue delivery (Tier 2 - Asynchronous)
if background_tasks:
NotificationDelivery.queue_delivery(
background_tasks=background_tasks,
notification_id=notification.id
)
except Exception as e:
logger.error(f"Failed to send project invitation notification: {str(e)}")
# 5. Update project status to planning
project.status = 'planning'
# 6. Add setup notes to metadata if provided
if setup_data.setup_notes:
if not project.additional_metadata:
project.additional_metadata = {}
project.additional_metadata['setup_notes'] = setup_data.setup_notes
project.additional_metadata['setup_completed_at'] = datetime.utcnow().isoformat()
project.additional_metadata['setup_completed_by'] = str(current_user.id)
db.commit()
logger.info(
f"Project setup completed for {project_id}: "
f"{regions_created} regions, {roles_created} roles, "
f"{subcontractors_added} subcontractors, {team_members_added} team members"
)
return {
"project_id": project_id,
"status": project.status,
"regions_created": regions_created,
"roles_created": roles_created,
"subcontractors_added": subcontractors_added,
"team_members_added": team_members_added,
"message": "Project setup completed successfully. Project is now in 'planning' status."
}
except HTTPException:
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(f"Error completing project setup: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to complete project setup: {str(e)}"
)
@staticmethod
def delete_project(
db: Session,
project_id: UUID,
current_user: User
) -> None:
"""Soft delete a project (platform_admin only)"""
if current_user.role != 'platform_admin':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only platform administrators can delete projects"
)
project = db.query(Project).filter(
Project.id == project_id,
Project.deleted_at == None
).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Soft delete
project.deleted_at = datetime.utcnow()
db.commit()
logger.info(f"Project deleted: {project.id} by user {current_user.id}")
# ============================================
# PROJECT REGIONS MANAGEMENT
# ============================================
@staticmethod
def create_region(
db: Session,
project_id: UUID,
data: ProjectRegionCreate,
current_user: User
) -> ProjectRegion:
"""
Create a new project region/hub (idempotent)
Authorization:
- platform_admin: Can create for any project
- project_manager: Can create for their own projects
Idempotency:
- If region with same region_name OR region_code exists, updates it
- This allows frontend to safely re-save without errors
"""
# Get and validate project
project = ProjectService._get_project_with_auth(db, project_id, current_user)
# Check for existing region by name OR code (idempotency check)
existing = None
if data.region_name:
existing = db.query(ProjectRegion).filter(
ProjectRegion.project_id == project_id,
ProjectRegion.region_name == data.region_name,
ProjectRegion.deleted_at == None
).first()
if not existing and data.region_code:
existing = db.query(ProjectRegion).filter(
ProjectRegion.project_id == project_id,
ProjectRegion.region_code == data.region_code,
ProjectRegion.deleted_at == None
).first()
# If exists, update instead of creating (idempotent behavior)
if existing:
logger.info(f"Region '{data.region_name}' already exists, updating instead")
# Validate manager if provided
if data.manager_id:
manager = db.query(User).filter(
User.id == data.manager_id,
User.deleted_at == None
).first()
if not manager:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Manager user {data.manager_id} not found"
)
# Convert hub_contact_persons to dict for JSONB storage
hub_contacts = None
if data.hub_contact_persons is not None:
hub_contacts = [
contact.model_dump() if hasattr(contact, 'model_dump') else contact
for contact in data.hub_contact_persons
]
# Update all fields from data
existing.region_name = data.region_name
existing.region_code = data.region_code
existing.description = data.description
existing.country = data.country
existing.region = data.region
existing.city = data.city
existing.address_line1 = data.address_line1
existing.address_line2 = data.address_line2
existing.maps_link = data.maps_link
existing.latitude = data.latitude
existing.longitude = data.longitude
existing.manager_id = data.manager_id
existing.notes = data.notes
if hub_contacts is not None:
existing.hub_contact_persons = hub_contacts
db.commit()
db.refresh(existing)
logger.info(f"Updated existing region '{existing.region_name}' for project {project_id}")
return existing
# New region - validate location data
# Validate that county (region) and sub-county (city) are valid for Kenya
if data.country == "Kenya" and data.region and data.city:
from app.data.regions import get_sub_counties_by_county
valid_sub_counties = get_sub_counties_by_county(data.region, "Kenya")
valid_sub_county_names = [sc["sub_county"] for sc in valid_sub_counties]
if not valid_sub_counties:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid county '{data.region}' for Kenya. Use /api/v1/locations/kenya/counties to get valid counties."
)
if data.city not in valid_sub_county_names:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid sub-county '{data.city}' for county '{data.region}'. Use /api/v1/locations/kenya/sub-counties?county={data.region} to get valid sub-counties."
)
# Check for duplicate region in same county/sub-county
duplicate = db.query(ProjectRegion).filter(
ProjectRegion.project_id == project_id,
ProjectRegion.region == data.region,
ProjectRegion.city == data.city,
ProjectRegion.deleted_at == None
).first()
if duplicate:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"A region already exists for {data.city}, {data.region} in this project. Region name: '{duplicate.region_name}'"
)
# Validate manager if provided
if data.manager_id:
manager = db.query(User).filter(
User.id == data.manager_id,
User.deleted_at == None
).first()
if not manager:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Manager user {data.manager_id} not found"
)
# Convert hub_contact_persons to dict for JSONB storage
hub_contacts = []
if data.hub_contact_persons:
hub_contacts = [
contact.model_dump() if hasattr(contact, 'model_dump') else contact
for contact in data.hub_contact_persons
]
# Create region
region = ProjectRegion(
project_id=project_id,
region_name=data.region_name,
region_code=data.region_code,
description=data.description,
country=data.country,
region=data.region,
city=data.city,
address_line1=data.address_line1,
address_line2=data.address_line2,
maps_link=data.maps_link,
latitude=data.latitude,
longitude=data.longitude,
manager_id=data.manager_id,
notes=data.notes,
is_active=True, # Default to active for new regions
hub_contact_persons=hub_contacts,
additional_metadata={}
)
db.add(region)
db.commit()
db.refresh(region)
logger.info(f"Created region '{region.region_name}' for project {project_id}")
return region
@staticmethod
def get_regions(
db: Session,
project_id: UUID,
current_user: User,
is_active: Optional[bool] = None
) -> List[ProjectRegion]:
"""Get all regions for a project"""
# Validate project access
ProjectService._get_project_with_auth(db, project_id, current_user)
query = db.query(ProjectRegion).filter(
ProjectRegion.project_id == project_id,
ProjectRegion.deleted_at == None
)
if is_active is not None:
query = query.filter(ProjectRegion.is_active == is_active)
return query.order_by(ProjectRegion.region_name).all()
@staticmethod
def update_region(
db: Session,
region_id: UUID,
data: ProjectRegionUpdate,
current_user: User
) -> ProjectRegion:
"""Update a project region"""
region = db.query(ProjectRegion).filter(
ProjectRegion.id == region_id,
ProjectRegion.deleted_at == None
).first()
if not region:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Region not found"
)
# Validate project access
ProjectService._get_project_with_auth(db, region.project_id, current_user)
# Check for duplicate name if changing
if data.region_name and data.region_name != region.region_name:
existing = db.query(ProjectRegion).filter(
ProjectRegion.project_id == region.project_id,
ProjectRegion.region_name == data.region_name,
ProjectRegion.id != region_id,
ProjectRegion.deleted_at == None
).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Region '{data.region_name}' already exists in this project"
)
# Validate manager if provided
if data.manager_id:
manager = db.query(User).filter(
User.id == data.manager_id,
User.deleted_at == None
).first()
if not manager:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Manager user {data.manager_id} not found"
)
# Update fields
update_data = data.model_dump(exclude_unset=True)
# Convert hub_contact_persons to dict for JSONB storage if present
if 'hub_contact_persons' in update_data and update_data['hub_contact_persons'] is not None:
update_data['hub_contact_persons'] = [
contact.model_dump() if hasattr(contact, 'model_dump') else contact
for contact in update_data['hub_contact_persons']
]
for field, value in update_data.items():
setattr(region, field, value)
db.commit()
db.refresh(region)
logger.info(f"Updated region {region_id}")
return region
@staticmethod
def delete_region(
db: Session,
region_id: UUID,
current_user: User
) -> None:
"""Soft delete a project region"""
region = db.query(ProjectRegion).filter(
ProjectRegion.id == region_id,
ProjectRegion.deleted_at == None
).first()
if not region:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Region not found"
)
# Validate project access
ProjectService._get_project_with_auth(db, region.project_id, current_user)
# Check if region is used by team members
team_count = db.query(ProjectTeam).filter(
ProjectTeam.project_region_id == region_id,
ProjectTeam.deleted_at == None
).count()
if team_count > 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot delete region: {team_count} team members are assigned to it"
)
# Soft delete
region.deleted_at = datetime.utcnow()
db.commit()
logger.info(f"Deleted region {region_id}")
# ============================================
# PROJECT ROLES MANAGEMENT
# ============================================
@staticmethod
def create_project_role(
db: Session,
project_id: UUID,
data: ProjectRoleCreate,
current_user: User
) -> ProjectRole:
"""
Create a new project role with compensation structure (idempotent)
Authorization:
- platform_admin: Can create for any project
- project_manager: Can create for their own projects
Idempotency:
- If role with same role_name exists, updates it
- This allows frontend to safely re-save without errors
"""
# Get and validate project
project = ProjectService._get_project_with_auth(db, project_id, current_user)
# Check for existing role by name (idempotency check)
existing = db.query(ProjectRole).filter(
ProjectRole.project_id == project_id,
ProjectRole.role_name == data.role_name,
ProjectRole.deleted_at == None
).first()
# Validate compensation type and required fields
ProjectService._validate_compensation_structure(data)
if existing:
# Update existing role (idempotent behavior)
logger.info(f"Role '{data.role_name}' already exists, updating instead")
# Clear irrelevant fields when compensation type changes
ProjectService._clear_irrelevant_compensation_fields(existing, data.compensation_type)
existing.description = data.description
existing.compensation_type = data.compensation_type
# NEW - Simple fields
existing.daily_rate = data.daily_rate
existing.weekly_rate = data.weekly_rate
existing.per_ticket_rate = data.per_ticket_rate
# OLD - Legacy fields
existing.flat_rate_amount = data.flat_rate_amount
existing.commission_percentage = data.commission_percentage
existing.base_amount = data.base_amount
existing.bonus_percentage = data.bonus_percentage
existing.hourly_rate = data.hourly_rate
db.commit()
db.refresh(existing)
logger.info(f"Updated existing role '{existing.role_name}' for project {project_id}")
return existing
# Create role
role = ProjectRole(
project_id=project_id,
role_name=data.role_name,
description=data.description,
compensation_type=data.compensation_type,
# Compensation fields
base_rate=data.base_rate,
rate_period=data.rate_period,
per_unit_rate=data.per_unit_rate,
commission_percentage=data.commission_percentage,
is_active=True # Default to active for new roles
)
db.add(role)
db.commit()
db.refresh(role)
logger.info(f"Created role '{role.role_name}' for project {project_id}")
return role
@staticmethod
def _clear_irrelevant_compensation_fields(role: ProjectRole, compensation_type: str):
"""
Clear compensation fields that don't apply to the selected type.
This prevents confusion and ensures only relevant fields have values.
World-class systems (Stripe, QuickBooks) clear irrelevant fields on type change.
"""
# Clear ALL fields first
role.base_rate = None
role.rate_period = None
role.per_unit_rate = None
role.commission_percentage = None
# Note: The relevant field will be set by the caller after this function
# This ensures clean state - only one payment method is active
@staticmethod
def _validate_compensation_structure(data):
"""
Validate compensation fields based on type
This validation is redundant with Pydantic schema validation,
but provides better error messages for API users.
"""
comp_type = data.compensation_type
if comp_type == 'FIXED_RATE':
if not data.base_rate or data.base_rate <= 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="base_rate is required for FIXED_RATE compensation (e.g., 1000 KES/day, 25 USD/hour)"
)
if not data.rate_period:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="rate_period is required for FIXED_RATE compensation (HOUR, DAY, WEEK, or MONTH)"
)
elif comp_type == 'PER_UNIT':
if not data.per_unit_rate or data.per_unit_rate <= 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="per_unit_rate is required for PER_UNIT compensation (e.g., 500 KES/ticket)"
)
elif comp_type == 'COMMISSION':
if not data.commission_percentage or data.commission_percentage <= 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="commission_percentage is required for COMMISSION compensation (0-100)"
)
elif comp_type == 'FIXED_PLUS_COMMISSION':
if not data.base_rate or data.base_rate <= 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="base_rate is required for FIXED_PLUS_COMMISSION compensation"
)
if not data.rate_period:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="rate_period is required for FIXED_PLUS_COMMISSION compensation"
)
if not data.commission_percentage or data.commission_percentage <= 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="commission_percentage is required for FIXED_PLUS_COMMISSION compensation"
)
@staticmethod
def get_project_roles(
db: Session,
project_id: UUID,
current_user: User,
is_active: Optional[bool] = None
) -> List[ProjectRole]:
"""Get all roles for a project"""
# Validate project access
ProjectService._get_project_with_auth(db, project_id, current_user)
query = db.query(ProjectRole).filter(
ProjectRole.project_id == project_id,
ProjectRole.deleted_at == None
)
if is_active is not None:
query = query.filter(ProjectRole.is_active == is_active)
return query.order_by(ProjectRole.role_name).all()
@staticmethod
def update_project_role(
db: Session,
role_id: UUID,
data: ProjectRoleUpdate,
current_user: User
) -> ProjectRole:
"""Update a project role"""
role = db.query(ProjectRole).filter(
ProjectRole.id == role_id,
ProjectRole.deleted_at == None
).first()
if not role:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Role not found"
)
# Validate project access
ProjectService._get_project_with_auth(db, role.project_id, current_user)
# Check for duplicate name if changing
if data.role_name and data.role_name != role.role_name:
existing = db.query(ProjectRole).filter(
ProjectRole.project_id == role.project_id,
ProjectRole.role_name == data.role_name,
ProjectRole.id != role_id,
ProjectRole.deleted_at == None
).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Role '{data.role_name}' already exists in this project"
)
# If compensation type is changing, clear irrelevant fields
if data.compensation_type and data.compensation_type != role.compensation_type:
ProjectService._clear_irrelevant_compensation_fields(role, data.compensation_type)
# Validate compensation if type is changing
if data.compensation_type:
# Create a temporary object for validation
from types import SimpleNamespace
temp_data = SimpleNamespace(
compensation_type=data.compensation_type,
base_rate=data.base_rate if data.base_rate is not None else role.base_rate,
rate_period=data.rate_period if data.rate_period is not None else role.rate_period,
per_unit_rate=data.per_unit_rate if data.per_unit_rate is not None else role.per_unit_rate,
commission_percentage=data.commission_percentage if data.commission_percentage is not None else role.commission_percentage
)
ProjectService._validate_compensation_structure(temp_data)
# Update fields
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(role, field, value)
db.commit()
db.refresh(role)
logger.info(f"Updated role {role_id}")
return role
@staticmethod
def delete_project_role(
db: Session,
role_id: UUID,
current_user: User
) -> None:
"""Soft delete a project role"""
role = db.query(ProjectRole).filter(
ProjectRole.id == role_id,
ProjectRole.deleted_at == None
).first()
if not role:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Role not found"
)
# Validate project access
ProjectService._get_project_with_auth(db, role.project_id, current_user)
# Check if role is used by team members
team_count = db.query(ProjectTeam).filter(
ProjectTeam.project_role_id == role_id,
ProjectTeam.deleted_at == None
).count()
if team_count > 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot delete role: {team_count} team members are using it"
)
# Soft delete
role.deleted_at = datetime.utcnow()
db.commit()
logger.info(f"Deleted role {role_id}")
# ============================================
# PROJECT SUBCONTRACTORS MANAGEMENT
# ============================================
@staticmethod
def create_subcontractor(
db: Session,
project_id: UUID,
data: ProjectSubcontractorCreate,
current_user: User
) -> ProjectSubcontractor:
"""
Add a subcontractor to a project (idempotent)
Authorization:
- platform_admin: Can add to any project
- project_manager: Can add to their own projects
Idempotency:
- If same subcontractor + region combination exists, updates it
- This allows frontend to safely re-save without errors
"""
# Get and validate project
project = ProjectService._get_project_with_auth(db, project_id, current_user)
# Validate subcontractor exists
subcontractor = db.query(Contractor).filter(
Contractor.id == data.subcontractor_id,
Contractor.deleted_at == None
).first()
if not subcontractor:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Subcontractor {data.subcontractor_id} not found"
)
if not subcontractor.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Subcontractor organization is not active"
)
# Check if subcontractor is the main contractor
if data.subcontractor_id == project.contractor_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot add main contractor as subcontractor"
)
# Validate region if provided
if data.project_region_id:
region = db.query(ProjectRegion).filter(
ProjectRegion.id == data.project_region_id,
ProjectRegion.project_id == project_id,
ProjectRegion.deleted_at == None
).first()
if not region:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Region {data.project_region_id} not found in this project"
)
# Check for existing (same subcontractor + region combination) - idempotency
existing = db.query(ProjectSubcontractor).filter(
ProjectSubcontractor.project_id == project_id,
ProjectSubcontractor.subcontractor_id == data.subcontractor_id,
ProjectSubcontractor.project_region_id == data.project_region_id,
ProjectSubcontractor.deleted_at == None
).first()
if existing:
# Update existing relationship (idempotent behavior)
logger.info(f"Subcontractor {data.subcontractor_id} already assigned, updating instead")
existing.scope_description = data.scope_description
existing.contract_start_date = data.contract_start_date
existing.contract_end_date = data.contract_end_date
existing.contract_value = data.contract_value
existing.currency = data.currency or 'KES'
existing.notes = data.notes
existing.is_active = True
db.commit()
db.refresh(existing)
logger.info(f"Updated existing subcontractor relationship for project {project_id}")
return existing
# Create new subcontractor relationship
project_subcon = ProjectSubcontractor(
project_id=project_id,
subcontractor_id=data.subcontractor_id,
scope_description=data.scope_description,
project_region_id=data.project_region_id,
contract_start_date=data.contract_start_date,
contract_end_date=data.contract_end_date,
contract_value=data.contract_value,
currency=data.currency or 'KES',
notes=data.notes,
is_active=True,
activated_at=datetime.utcnow()
)
db.add(project_subcon)
db.commit()
db.refresh(project_subcon)
logger.info(f"Added subcontractor {data.subcontractor_id} to project {project_id}")
return project_subcon
@staticmethod
def get_subcontractors(
db: Session,
project_id: UUID,
current_user: User,
is_active: Optional[bool] = None
) -> List[ProjectSubcontractor]:
"""Get all subcontractors for a project"""
# Validate project access
ProjectService._get_project_with_auth(db, project_id, current_user)
query = db.query(ProjectSubcontractor).options(
joinedload(ProjectSubcontractor.subcontractor),
joinedload(ProjectSubcontractor.region)
).filter(
ProjectSubcontractor.project_id == project_id,
ProjectSubcontractor.deleted_at == None
)
if is_active is not None:
query = query.filter(ProjectSubcontractor.is_active == is_active)
return query.order_by(ProjectSubcontractor.activated_at.desc()).all()
@staticmethod
def update_subcontractor(
db: Session,
subcontractor_id: UUID,
data: ProjectSubcontractorUpdate,
current_user: User
) -> ProjectSubcontractor:
"""Update a project subcontractor"""
project_subcon = db.query(ProjectSubcontractor).filter(
ProjectSubcontractor.id == subcontractor_id,
ProjectSubcontractor.deleted_at == None
).first()
if not project_subcon:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Subcontractor relationship not found"
)
# Validate project access
ProjectService._get_project_with_auth(db, project_subcon.project_id, current_user)
# Validate region if changing
if data.project_region_id and data.project_region_id != project_subcon.project_region_id:
region = db.query(ProjectRegion).filter(
ProjectRegion.id == data.project_region_id,
ProjectRegion.project_id == project_subcon.project_id,
ProjectRegion.deleted_at == None
).first()
if not region:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Region {data.project_region_id} not found in this project"
)
# Handle deactivation
if data.is_active is False and project_subcon.is_active:
project_subcon.deactivated_at = datetime.utcnow()
project_subcon.deactivated_by_user_id = current_user.id
project_subcon.deactivation_reason = data.deactivation_reason
# Update fields
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
if field not in ['deactivation_reason']: # Already handled above
setattr(project_subcon, field, value)
db.commit()
db.refresh(project_subcon)
logger.info(f"Updated subcontractor relationship {subcontractor_id}")
return project_subcon
@staticmethod
def delete_subcontractor(
db: Session,
subcontractor_id: UUID,
current_user: User
) -> None:
"""Soft delete a project subcontractor"""
project_subcon = db.query(ProjectSubcontractor).filter(
ProjectSubcontractor.id == subcontractor_id,
ProjectSubcontractor.deleted_at == None
).first()
if not project_subcon:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Subcontractor relationship not found"
)
# Validate project access
ProjectService._get_project_with_auth(db, project_subcon.project_id, current_user)
# Check if subcontractor is used by team members
team_count = db.query(ProjectTeam).filter(
ProjectTeam.project_subcontractor_id == subcontractor_id,
ProjectTeam.deleted_at == None
).count()
if team_count > 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot delete subcontractor: {team_count} team members are assigned to it"
)
# Soft delete
project_subcon.deleted_at = datetime.utcnow()
db.commit()
logger.info(f"Deleted subcontractor relationship {subcontractor_id}")
# ============================================
# PROJECT FINALIZATION
# ============================================
@staticmethod
def finalize_project_setup(
db: Session,
project_id: UUID,
setup_notes: Optional[str],
current_user: User
) -> Dict[str, Any]:
"""
Finalize project setup and move from draft → planning
Validation:
- Project must be in 'draft' status
- At least 1 team member must exist
- Only PM or platform_admin can finalize
Returns summary of configuration
"""
# Get project
project = db.query(Project).filter(
Project.id == project_id,
Project.deleted_at == None
).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Authorization check
if current_user.role != 'platform_admin':
if current_user.role != 'project_manager' or str(project.primary_manager_id) != str(current_user.id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the project manager or platform admin can finalize setup"
)
# Check project status - only allow finalizing draft/planning projects
if project.status not in ['draft', 'planning']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Project is in '{project.status}' status. Only draft or planning projects can be finalized."
)
# Validate at least 1 team member exists
team_count = db.query(ProjectTeam).filter(
ProjectTeam.project_id == project_id,
ProjectTeam.deleted_at == None
).count()
if team_count == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot finalize setup: Project must have at least 1 team member"
)
# Get counts for summary
regions_count = db.query(ProjectRegion).filter(
ProjectRegion.project_id == project_id,
ProjectRegion.deleted_at == None
).count()
roles_count = db.query(ProjectRole).filter(
ProjectRole.project_id == project_id,
ProjectRole.deleted_at == None
).count()
subcontractors_count = db.query(ProjectSubcontractor).filter(
ProjectSubcontractor.project_id == project_id,
ProjectSubcontractor.deleted_at == None
).count()
# Check JSONB configuration
has_budget = bool(project.budget and len(project.budget) > 0)
has_activation_reqs = bool(project.activation_requirements and len(project.activation_requirements) > 0)
has_photo_reqs = bool(project.photo_requirements and len(project.photo_requirements) > 0)
has_inventory_reqs = bool(project.inventory_requirements and len(project.inventory_requirements) > 0)
# Update project status: draft/planning → active
old_status = project.status
project.status = 'active'
logger.info(f"Project {project_id} finalized: {old_status} → active")
# Add/update setup metadata
if not project.additional_metadata:
project.additional_metadata = {}
project.additional_metadata['setup_finalized_at'] = datetime.utcnow().isoformat()
project.additional_metadata['setup_finalized_by'] = str(current_user.id)
if setup_notes:
project.additional_metadata['setup_notes'] = setup_notes
db.commit()
return {
"project_id": project_id,
"status": project.status,
"summary": {
"regions_count": regions_count,
"roles_count": roles_count,
"subcontractors_count": subcontractors_count,
"team_members_count": team_count,
"budget_defined": has_budget,
"activation_requirements_defined": has_activation_reqs,
"photo_requirements_defined": has_photo_reqs,
"inventory_requirements_defined": has_inventory_reqs
},
"message": "Project setup finalized successfully. Project is now in 'planning' status."
}
# ============================================
# HELPER METHODS
# ============================================
@staticmethod
def _get_project_with_auth(
db: Session,
project_id: UUID,
current_user: User
) -> Project:
"""
Get project and validate user has access
Authorization:
- platform_admin: Full access
- project_manager: Only their projects (primary_manager_id)
- client_admin: Their client's projects
- contractor_admin: Their contractor's projects
- dispatcher: Their contractor's projects
- field_agent/sales_agent: Projects they're team members of
"""
project = db.query(Project).filter(
Project.id == project_id,
Project.deleted_at == None
).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
# Authorization check
if current_user.role == 'platform_admin':
return project
elif current_user.role == 'project_manager':
if str(project.primary_manager_id) != str(current_user.id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to access this project"
)
elif current_user.role == 'client_admin':
if str(project.client_id) != str(current_user.client_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to access this project"
)
elif current_user.role in ['contractor_admin', 'dispatcher']:
if str(project.contractor_id) != str(current_user.contractor_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to access this project"
)
elif current_user.role in ['field_agent', 'sales_agent']:
# Check if they're a team member
from app.models.project_team import ProjectTeam
team_member = db.query(ProjectTeam).filter(
ProjectTeam.project_id == project_id,
ProjectTeam.user_id == current_user.id,
ProjectTeam.deleted_at.is_(None),
ProjectTeam.removed_at.is_(None)
).first()
if not team_member:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to access this project"
)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to access this project"
)
return project