swiftops-backend / src /app /api /v1 /profile.py
kamau1's picture
Enhance profile endpoint with full project team details including roles, regions, and subcontractor info
3ad3c84
"""
Profile Management Endpoints
Handles user profile CRUD with hierarchical permissions
"""
from fastapi import APIRouter, Depends, HTTPException, status, Request, Query
from sqlalchemy.orm import Session
from typing import Dict, Any, List, Optional
from uuid import UUID
from app.api.deps import get_db, get_current_active_user
from app.models.user import User
from app.models.user_financial_account import UserFinancialAccount
from app.models.user_document_link import UserDocumentLink
from app.models.user_asset_assignment import UserAssetAssignment
from app.schemas.profile import (
BasicProfileUpdate, BasicProfileResponse,
HealthInfoUpdate, HealthInfoResponse,
PPESizesUpdate, PPESizesResponse,
LocationUpdate, LocationResponse,
CompleteProfileResponse, ProfileCompletionStatus,
ProfilePermissions, ProfileValidationResult,
BulkProfileUpdate
)
from fastapi import Query
from app.services.profile_service import ProfileService
from app.services.audit_service import AuditService
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/profile", tags=["Profile Management"])
# ============================================
# SELF PROFILE ENDPOINTS (Current User)
# ============================================
@router.get("/me", response_model=CompleteProfileResponse)
async def get_my_profile(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get current user's complete profile
Returns all profile sections with completion status
"""
# Get completion status
completion_status = ProfileService.calculate_profile_completion(current_user, db)
# Get related data counts
financial_accounts_count = db.query(UserFinancialAccount).filter(
UserFinancialAccount.user_id == current_user.id,
UserFinancialAccount.is_active == True,
UserFinancialAccount.deleted_at == None
).count()
documents_count = db.query(UserDocumentLink).filter(
UserDocumentLink.user_id == current_user.id
).count()
asset_assignments_count = db.query(UserAssetAssignment).filter(
UserAssetAssignment.user_id == current_user.id,
UserAssetAssignment.is_active == True,
UserAssetAssignment.deleted_at == None
).count()
# Handle empty JSONB fields properly with all field defaults
health_info_dict = current_user.health_info if current_user.health_info else {}
health_response = HealthInfoResponse(
blood_type=health_info_dict.get('blood_type'),
allergies=health_info_dict.get('allergies'),
chronic_conditions=health_info_dict.get('chronic_conditions'),
medications=health_info_dict.get('medications'),
last_medical_check=health_info_dict.get('last_medical_check'),
medical_notes=health_info_dict.get('medical_notes')
)
ppe_sizes_dict = current_user.ppe_sizes if current_user.ppe_sizes else {}
ppe_response = PPESizesResponse(
height=ppe_sizes_dict.get('height'),
weight=ppe_sizes_dict.get('weight'),
waist=ppe_sizes_dict.get('waist'),
shoe_size=ppe_sizes_dict.get('shoe_size'),
helmet_size=ppe_sizes_dict.get('helmet_size'),
shirt_size=ppe_sizes_dict.get('shirt_size'),
pants_size=ppe_sizes_dict.get('pants_size'),
glove_size=ppe_sizes_dict.get('glove_size'),
vest_size=ppe_sizes_dict.get('vest_size')
)
return CompleteProfileResponse(
basic_info=BasicProfileResponse.from_orm(current_user),
health_info=health_response,
ppe_sizes=ppe_response,
location=LocationResponse(
current_location_name=current_user.current_location_name,
current_country=current_user.current_country,
current_region=current_user.current_region,
current_city=current_user.current_city,
current_address_line1=current_user.current_address_line1,
current_address_line2=current_user.current_address_line2,
current_maps_link=current_user.current_maps_link,
current_latitude=current_user.current_latitude,
current_longitude=current_user.current_longitude,
current_location_updated_at=current_user.current_location_updated_at
),
completion_status=completion_status,
financial_accounts_count=financial_accounts_count,
documents_count=documents_count,
asset_assignments_count=asset_assignments_count
)
@router.get("/me/completion", response_model=ProfileCompletionStatus)
async def get_my_profile_completion(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Get profile completion status"""
return ProfileService.calculate_profile_completion(current_user, db)
@router.get("/me/validation", response_model=ProfileValidationResult)
async def validate_my_profile(
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Validate profile completeness"""
return ProfileService.validate_profile(current_user, db)
@router.put("/me/basic", response_model=BasicProfileResponse)
async def update_my_basic_profile(
data: BasicProfileUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update current user's basic profile"""
updated_user = ProfileService.update_basic_profile(current_user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(current_user.id),
description=f"User updated their basic profile",
user=current_user,
request=request,
changes={'old': {}, 'new': data.dict(exclude_unset=True)}
)
return BasicProfileResponse.from_orm(updated_user)
@router.put("/me/health", response_model=HealthInfoResponse)
async def update_my_health_info(
data: HealthInfoUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update current user's health information"""
health_info = ProfileService.update_health_info(current_user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(current_user.id),
description=f"User updated their health information",
user=current_user,
request=request
)
return HealthInfoResponse(**health_info)
@router.put("/me/ppe", response_model=PPESizesResponse)
async def update_my_ppe_sizes(
data: PPESizesUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update current user's PPE sizes"""
ppe_sizes = ProfileService.update_ppe_sizes(current_user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(current_user.id),
description=f"User updated their PPE sizes",
user=current_user,
request=request
)
# Ensure all fields exist with None defaults
ppe_response = {
'height': ppe_sizes.get('height'),
'weight': ppe_sizes.get('weight'),
'waist': ppe_sizes.get('waist'),
'shoe_size': ppe_sizes.get('shoe_size'),
'helmet_size': ppe_sizes.get('helmet_size'),
'shirt_size': ppe_sizes.get('shirt_size'),
'pants_size': ppe_sizes.get('pants_size'),
'glove_size': ppe_sizes.get('glove_size'),
'vest_size': ppe_sizes.get('vest_size')
}
return PPESizesResponse(**ppe_response)
@router.put("/me/location", response_model=LocationResponse)
async def update_my_location(
data: LocationUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update current user's location"""
updated_user = ProfileService.update_location(current_user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(current_user.id),
description=f"User updated their location",
user=current_user,
request=request
)
return LocationResponse(
current_location_name=updated_user.current_location_name,
current_country=updated_user.current_country,
current_region=updated_user.current_region,
current_city=updated_user.current_city,
current_address_line1=updated_user.current_address_line1,
current_address_line2=updated_user.current_address_line2,
current_maps_link=updated_user.current_maps_link,
current_latitude=updated_user.current_latitude,
current_longitude=updated_user.current_longitude,
current_location_updated_at=updated_user.current_location_updated_at
)
@router.put("/me/bulk", response_model=CompleteProfileResponse)
async def bulk_update_my_profile(
data: BulkProfileUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Update multiple profile sections at once
Useful for profile setup wizard
"""
# Update each section if provided
if data.basic_info:
ProfileService.update_basic_profile(current_user, data.basic_info, current_user, db)
if data.health_info:
ProfileService.update_health_info(current_user, data.health_info, current_user, db)
if data.ppe_sizes:
ProfileService.update_ppe_sizes(current_user, data.ppe_sizes, current_user, db)
if data.location:
ProfileService.update_location(current_user, data.location, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(current_user.id),
description=f"User performed bulk profile update",
user=current_user,
request=request
)
# Return complete profile
db.refresh(current_user)
return await get_my_profile(current_user, db)
# ============================================
# MANAGER ENDPOINTS (Edit Other Users)
# ============================================
@router.get("/{user_id}", response_model=CompleteProfileResponse)
async def get_user_profile(
user_id: UUID,
include_documents: bool = Query(default=True, description="Include full documents list"),
include_financial: bool = Query(default=True, description="Include financial accounts"),
include_assets: bool = Query(default=True, description="Include asset assignments"),
include_projects: bool = Query(default=True, description="Include project memberships"),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get another user's complete profile with all related data
Returns everything in one call:
- Basic info with profile photo
- Health info, PPE sizes, location
- Documents list
- Financial accounts
- Asset assignments
- Project memberships
Requires permission to view the user
"""
# Get target user
target_user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Check permissions
if not ProfileService.check_edit_permission(current_user, target_user, 'view', db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to view this user's profile"
)
# Get completion status
completion_status = ProfileService.calculate_profile_completion(target_user, db)
# Get related data counts
financial_accounts_count = db.query(UserFinancialAccount).filter(
UserFinancialAccount.user_id == target_user.id,
UserFinancialAccount.is_active == True,
UserFinancialAccount.deleted_at == None
).count()
documents_count = db.query(UserDocumentLink).filter(
UserDocumentLink.user_id == target_user.id
).count()
asset_assignments_count = db.query(UserAssetAssignment).filter(
UserAssetAssignment.user_id == target_user.id,
UserAssetAssignment.is_active == True,
UserAssetAssignment.deleted_at == None
).count()
# Handle empty JSONB fields properly with all field defaults
health_info_dict = target_user.health_info if target_user.health_info else {}
health_response = HealthInfoResponse(
blood_type=health_info_dict.get('blood_type'),
allergies=health_info_dict.get('allergies'),
chronic_conditions=health_info_dict.get('chronic_conditions'),
medications=health_info_dict.get('medications'),
last_medical_check=health_info_dict.get('last_medical_check'),
medical_notes=health_info_dict.get('medical_notes')
)
ppe_sizes_dict = target_user.ppe_sizes if target_user.ppe_sizes else {}
ppe_response = PPESizesResponse(
height=ppe_sizes_dict.get('height'),
weight=ppe_sizes_dict.get('weight'),
waist=ppe_sizes_dict.get('waist'),
shoe_size=ppe_sizes_dict.get('shoe_size'),
helmet_size=ppe_sizes_dict.get('helmet_size'),
shirt_size=ppe_sizes_dict.get('shirt_size'),
pants_size=ppe_sizes_dict.get('pants_size'),
glove_size=ppe_sizes_dict.get('glove_size'),
vest_size=ppe_sizes_dict.get('vest_size')
)
# Get profile photo URL
from app.models.document import Document
profile_photo_url = None
doc_link = db.query(UserDocumentLink).filter(
UserDocumentLink.user_id == target_user.id,
UserDocumentLink.document_link_type == 'profile_photo'
).first()
if doc_link:
document = db.query(Document).filter(
Document.id == doc_link.document_id,
Document.deleted_at == None
).first()
if document:
profile_photo_url = document.file_url
# Generate fresh signed URL for Supabase files
if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'):
from app.integrations.supabase import SupabaseStorageService
bucket = document.additional_metadata.get('bucket')
path = document.additional_metadata.get('path')
if bucket and path:
profile_photo_url = SupabaseStorageService.get_signed_url(bucket, path, 3600)
# Build basic info with profile photo
basic_info = BasicProfileResponse.from_orm(target_user)
basic_info.profile_photo_url = profile_photo_url
# Get full documents list if requested
documents_list = None
if include_documents:
from app.models.document import Document
from app.integrations.supabase import SupabaseStorageService
docs = db.query(Document).filter(
Document.entity_type == "user",
Document.entity_id == target_user.id,
Document.deleted_at == None
).order_by(Document.created_at.desc()).all()
documents_list = []
for doc in docs:
doc_dict = {
"id": str(doc.id),
"document_type": doc.document_type,
"document_category": doc.document_category,
"file_name": doc.file_name,
"file_url": doc.file_url,
"file_size": doc.file_size,
"file_type": doc.file_type,
"storage_provider": doc.storage_provider,
"version": doc.version,
"is_latest_version": doc.is_latest_version,
"description": doc.description,
"tags": doc.tags or [],
"created_at": doc.created_at if isinstance(doc.created_at, str) else doc.created_at.isoformat() if doc.created_at else None
}
# Generate fresh signed URL for Supabase
if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'):
bucket = doc.additional_metadata.get('bucket')
path = doc.additional_metadata.get('path')
if bucket and path:
doc_dict["file_url"] = SupabaseStorageService.get_signed_url(bucket, path, 3600)
documents_list.append(doc_dict)
# Get financial accounts if requested
financial_list = None
if include_financial:
accounts = db.query(UserFinancialAccount).filter(
UserFinancialAccount.user_id == target_user.id,
UserFinancialAccount.deleted_at == None
).all()
financial_list = [{
"id": str(acc.id),
"account_name": acc.account_name,
"payout_method": acc.payout_method,
"mobile_money_provider": acc.mobile_money_provider,
"mobile_money_phone": acc.mobile_money_phone,
"bank_name": acc.bank_name,
"bank_account_number": acc.bank_account_number,
"is_primary": acc.is_primary,
"is_active": acc.is_active,
"is_verified": acc.is_verified
} for acc in accounts]
# Get asset assignments if requested
assets_list = None
if include_assets:
assignments = db.query(UserAssetAssignment).filter(
UserAssetAssignment.user_id == target_user.id,
UserAssetAssignment.deleted_at == None
).all()
assets_list = [{
"id": str(asn.id),
"asset_type": asn.asset_type,
"asset_name": asn.asset_name,
"asset_description": asn.asset_description,
"serial_number": asn.serial_number,
"asset_value": float(asn.asset_value) if asn.asset_value else None,
"assigned_at": asn.assigned_at,
"is_active": asn.is_active,
"condition_on_assign": asn.condition_on_assign
} for asn in assignments]
# Get project memberships if requested
projects_list = None
if include_projects:
from app.models.project import Project
from app.models.project_team import ProjectTeam
memberships = db.query(ProjectTeam).join(Project).filter(
ProjectTeam.user_id == target_user.id,
ProjectTeam.deleted_at == None,
ProjectTeam.removed_at == None,
Project.deleted_at == None
).all()
projects_list = []
for mem in memberships:
project_data = {
"id": str(mem.project_id),
"title": mem.project.title if mem.project else None,
"role": mem.role,
"is_lead": mem.is_lead,
"is_assigned_slot": mem.is_assigned_slot,
"assigned_at": mem.assigned_at.isoformat() if mem.assigned_at else None,
# Project role details (with compensation)
"project_role_id": str(mem.project_role_id) if mem.project_role_id else None,
"project_role_name": mem.project_role.role_name if mem.project_role else None,
# Regional assignment
"project_region_id": str(mem.project_region_id) if mem.project_region_id else None,
"region_name": mem.region.region_name if mem.region else None,
"can_work_project_wide": mem.project_region_id is None,
# Subcontractor info
"project_subcontractor_id": str(mem.project_subcontractor_id) if mem.project_subcontractor_id else None,
"is_main_contractor": mem.project_subcontractor_id is None,
}
projects_list.append(project_data)
return CompleteProfileResponse(
basic_info=basic_info,
health_info=health_response,
ppe_sizes=ppe_response,
location=LocationResponse(
current_location_name=target_user.current_location_name,
current_country=target_user.current_country,
current_region=target_user.current_region,
current_city=target_user.current_city,
current_address_line1=target_user.current_address_line1,
current_address_line2=target_user.current_address_line2,
current_maps_link=target_user.current_maps_link,
current_latitude=target_user.current_latitude,
current_longitude=target_user.current_longitude,
current_location_updated_at=target_user.current_location_updated_at
),
completion_status=completion_status,
financial_accounts_count=financial_accounts_count,
documents_count=documents_count,
asset_assignments_count=asset_assignments_count,
documents=documents_list,
financial_accounts=financial_list,
asset_assignments=assets_list,
projects=projects_list
)
@router.get("/{user_id}/permissions", response_model=ProfilePermissions)
async def get_user_profile_permissions(
user_id: UUID,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get what sections current user can edit for target user
Useful for UI to show/hide edit buttons
"""
# Get target user
target_user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return ProfileService.get_profile_permissions(current_user, target_user, db)
@router.put("/{user_id}/basic", response_model=BasicProfileResponse)
async def update_user_basic_profile(
user_id: UUID,
data: BasicProfileUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update another user's basic profile (requires permission)"""
# Get target user
target_user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
updated_user = ProfileService.update_basic_profile(target_user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(target_user.id),
description=f"Manager updated user's basic profile: {target_user.email}",
user=current_user,
request=request,
changes={'old': {}, 'new': data.dict(exclude_unset=True)}
)
return BasicProfileResponse.from_orm(updated_user)
@router.put("/{user_id}/health", response_model=HealthInfoResponse)
async def update_user_health_info(
user_id: UUID,
data: HealthInfoUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update another user's health information (requires permission)"""
# Get target user
target_user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
health_info = ProfileService.update_health_info(target_user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(target_user.id),
description=f"Manager updated user's health information: {target_user.email}",
user=current_user,
request=request
)
return HealthInfoResponse(**health_info)
@router.put("/{user_id}/ppe", response_model=PPESizesResponse)
async def update_user_ppe_sizes(
user_id: UUID,
data: PPESizesUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update another user's PPE sizes (requires permission)"""
# Get target user
target_user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
ppe_sizes = ProfileService.update_ppe_sizes(target_user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(target_user.id),
description=f"Manager updated user's PPE sizes: {target_user.email}",
user=current_user,
request=request
)
# Ensure all fields exist with None defaults
ppe_response = {
'height': ppe_sizes.get('height'),
'weight': ppe_sizes.get('weight'),
'waist': ppe_sizes.get('waist'),
'shoe_size': ppe_sizes.get('shoe_size'),
'helmet_size': ppe_sizes.get('helmet_size'),
'shirt_size': ppe_sizes.get('shirt_size'),
'pants_size': ppe_sizes.get('pants_size'),
'glove_size': ppe_sizes.get('glove_size'),
'vest_size': ppe_sizes.get('vest_size')
}
return PPESizesResponse(**ppe_response)
@router.put("/{user_id}/location", response_model=LocationResponse)
async def update_user_location(
user_id: UUID,
data: LocationUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Update another user's location (requires permission)"""
# Get target user
target_user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
updated_user = ProfileService.update_location(target_user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(target_user.id),
description=f"Manager updated user's location: {target_user.email}",
user=current_user,
request=request
)
return LocationResponse(
current_location_name=updated_user.current_location_name,
current_country=updated_user.current_country,
current_region=updated_user.current_region,
current_city=updated_user.current_city,
current_address_line1=updated_user.current_address_line1,
current_address_line2=updated_user.current_address_line2,
current_maps_link=updated_user.current_maps_link,
current_latitude=updated_user.current_latitude,
current_longitude=updated_user.current_longitude,
current_location_updated_at=updated_user.current_location_updated_at
)
@router.get("/{user_id}/validation", response_model=ProfileValidationResult)
async def validate_user_profile(
user_id: UUID,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""Validate another user's profile (requires permission)"""
# Get target user
target_user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Check permissions
if not ProfileService.check_edit_permission(current_user, target_user, 'view', db):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to view this user's profile"
)
return ProfileService.validate_profile(target_user, db)