Spaces:
Sleeping
Sleeping
| """ | |
| Profile Management Endpoints | |
| Handles user profile CRUD with hierarchical permissions | |
| """ | |
| from fastapi import APIRouter, Depends, HTTPException, status, Request, Query | |
| from sqlalchemy.orm import Session | |
| from typing import Dict, Any, List, Optional | |
| from uuid import UUID | |
| from app.api.deps import get_db, get_current_active_user | |
| from app.models.user import User | |
| from app.models.user_financial_account import UserFinancialAccount | |
| from app.models.user_document_link import UserDocumentLink | |
| from app.models.user_asset_assignment import UserAssetAssignment | |
| from app.schemas.profile import ( | |
| BasicProfileUpdate, BasicProfileResponse, | |
| HealthInfoUpdate, HealthInfoResponse, | |
| PPESizesUpdate, PPESizesResponse, | |
| LocationUpdate, LocationResponse, | |
| CompleteProfileResponse, ProfileCompletionStatus, | |
| ProfilePermissions, ProfileValidationResult, | |
| BulkProfileUpdate | |
| ) | |
| from fastapi import Query | |
| from app.services.profile_service import ProfileService | |
| from app.services.audit_service import AuditService | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/profile", tags=["Profile Management"]) | |
| # ============================================ | |
| # SELF PROFILE ENDPOINTS (Current User) | |
| # ============================================ | |
| async def get_my_profile( | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get current user's complete profile | |
| Returns all profile sections with completion status | |
| """ | |
| # Get completion status | |
| completion_status = ProfileService.calculate_profile_completion(current_user, db) | |
| # Get related data counts | |
| financial_accounts_count = db.query(UserFinancialAccount).filter( | |
| UserFinancialAccount.user_id == current_user.id, | |
| UserFinancialAccount.is_active == True, | |
| UserFinancialAccount.deleted_at == None | |
| ).count() | |
| documents_count = db.query(UserDocumentLink).filter( | |
| UserDocumentLink.user_id == current_user.id | |
| ).count() | |
| asset_assignments_count = db.query(UserAssetAssignment).filter( | |
| UserAssetAssignment.user_id == current_user.id, | |
| UserAssetAssignment.is_active == True, | |
| UserAssetAssignment.deleted_at == None | |
| ).count() | |
| # Handle empty JSONB fields properly with all field defaults | |
| health_info_dict = current_user.health_info if current_user.health_info else {} | |
| health_response = HealthInfoResponse( | |
| blood_type=health_info_dict.get('blood_type'), | |
| allergies=health_info_dict.get('allergies'), | |
| chronic_conditions=health_info_dict.get('chronic_conditions'), | |
| medications=health_info_dict.get('medications'), | |
| last_medical_check=health_info_dict.get('last_medical_check'), | |
| medical_notes=health_info_dict.get('medical_notes') | |
| ) | |
| ppe_sizes_dict = current_user.ppe_sizes if current_user.ppe_sizes else {} | |
| ppe_response = PPESizesResponse( | |
| height=ppe_sizes_dict.get('height'), | |
| weight=ppe_sizes_dict.get('weight'), | |
| waist=ppe_sizes_dict.get('waist'), | |
| shoe_size=ppe_sizes_dict.get('shoe_size'), | |
| helmet_size=ppe_sizes_dict.get('helmet_size'), | |
| shirt_size=ppe_sizes_dict.get('shirt_size'), | |
| pants_size=ppe_sizes_dict.get('pants_size'), | |
| glove_size=ppe_sizes_dict.get('glove_size'), | |
| vest_size=ppe_sizes_dict.get('vest_size') | |
| ) | |
| return CompleteProfileResponse( | |
| basic_info=BasicProfileResponse.from_orm(current_user), | |
| health_info=health_response, | |
| ppe_sizes=ppe_response, | |
| location=LocationResponse( | |
| current_location_name=current_user.current_location_name, | |
| current_country=current_user.current_country, | |
| current_region=current_user.current_region, | |
| current_city=current_user.current_city, | |
| current_address_line1=current_user.current_address_line1, | |
| current_address_line2=current_user.current_address_line2, | |
| current_maps_link=current_user.current_maps_link, | |
| current_latitude=current_user.current_latitude, | |
| current_longitude=current_user.current_longitude, | |
| current_location_updated_at=current_user.current_location_updated_at | |
| ), | |
| completion_status=completion_status, | |
| financial_accounts_count=financial_accounts_count, | |
| documents_count=documents_count, | |
| asset_assignments_count=asset_assignments_count | |
| ) | |
| async def get_my_profile_completion( | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Get profile completion status""" | |
| return ProfileService.calculate_profile_completion(current_user, db) | |
| async def validate_my_profile( | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Validate profile completeness""" | |
| return ProfileService.validate_profile(current_user, db) | |
| async def update_my_basic_profile( | |
| data: BasicProfileUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update current user's basic profile""" | |
| updated_user = ProfileService.update_basic_profile(current_user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(current_user.id), | |
| description=f"User updated their basic profile", | |
| user=current_user, | |
| request=request, | |
| changes={'old': {}, 'new': data.dict(exclude_unset=True)} | |
| ) | |
| return BasicProfileResponse.from_orm(updated_user) | |
| async def update_my_health_info( | |
| data: HealthInfoUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update current user's health information""" | |
| health_info = ProfileService.update_health_info(current_user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(current_user.id), | |
| description=f"User updated their health information", | |
| user=current_user, | |
| request=request | |
| ) | |
| return HealthInfoResponse(**health_info) | |
| async def update_my_ppe_sizes( | |
| data: PPESizesUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update current user's PPE sizes""" | |
| ppe_sizes = ProfileService.update_ppe_sizes(current_user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(current_user.id), | |
| description=f"User updated their PPE sizes", | |
| user=current_user, | |
| request=request | |
| ) | |
| # Ensure all fields exist with None defaults | |
| ppe_response = { | |
| 'height': ppe_sizes.get('height'), | |
| 'weight': ppe_sizes.get('weight'), | |
| 'waist': ppe_sizes.get('waist'), | |
| 'shoe_size': ppe_sizes.get('shoe_size'), | |
| 'helmet_size': ppe_sizes.get('helmet_size'), | |
| 'shirt_size': ppe_sizes.get('shirt_size'), | |
| 'pants_size': ppe_sizes.get('pants_size'), | |
| 'glove_size': ppe_sizes.get('glove_size'), | |
| 'vest_size': ppe_sizes.get('vest_size') | |
| } | |
| return PPESizesResponse(**ppe_response) | |
| async def update_my_location( | |
| data: LocationUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update current user's location""" | |
| updated_user = ProfileService.update_location(current_user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(current_user.id), | |
| description=f"User updated their location", | |
| user=current_user, | |
| request=request | |
| ) | |
| return LocationResponse( | |
| current_location_name=updated_user.current_location_name, | |
| current_country=updated_user.current_country, | |
| current_region=updated_user.current_region, | |
| current_city=updated_user.current_city, | |
| current_address_line1=updated_user.current_address_line1, | |
| current_address_line2=updated_user.current_address_line2, | |
| current_maps_link=updated_user.current_maps_link, | |
| current_latitude=updated_user.current_latitude, | |
| current_longitude=updated_user.current_longitude, | |
| current_location_updated_at=updated_user.current_location_updated_at | |
| ) | |
| async def bulk_update_my_profile( | |
| data: BulkProfileUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update multiple profile sections at once | |
| Useful for profile setup wizard | |
| """ | |
| # Update each section if provided | |
| if data.basic_info: | |
| ProfileService.update_basic_profile(current_user, data.basic_info, current_user, db) | |
| if data.health_info: | |
| ProfileService.update_health_info(current_user, data.health_info, current_user, db) | |
| if data.ppe_sizes: | |
| ProfileService.update_ppe_sizes(current_user, data.ppe_sizes, current_user, db) | |
| if data.location: | |
| ProfileService.update_location(current_user, data.location, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(current_user.id), | |
| description=f"User performed bulk profile update", | |
| user=current_user, | |
| request=request | |
| ) | |
| # Return complete profile | |
| db.refresh(current_user) | |
| return await get_my_profile(current_user, db) | |
| # ============================================ | |
| # MANAGER ENDPOINTS (Edit Other Users) | |
| # ============================================ | |
| async def get_user_profile( | |
| user_id: UUID, | |
| include_documents: bool = Query(default=True, description="Include full documents list"), | |
| include_financial: bool = Query(default=True, description="Include financial accounts"), | |
| include_assets: bool = Query(default=True, description="Include asset assignments"), | |
| include_projects: bool = Query(default=True, description="Include project memberships"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get another user's complete profile with all related data | |
| Returns everything in one call: | |
| - Basic info with profile photo | |
| - Health info, PPE sizes, location | |
| - Documents list | |
| - Financial accounts | |
| - Asset assignments | |
| - Project memberships | |
| Requires permission to view the user | |
| """ | |
| # Get target user | |
| target_user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not target_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| # Check permissions | |
| if not ProfileService.check_edit_permission(current_user, target_user, 'view', db): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You don't have permission to view this user's profile" | |
| ) | |
| # Get completion status | |
| completion_status = ProfileService.calculate_profile_completion(target_user, db) | |
| # Get related data counts | |
| financial_accounts_count = db.query(UserFinancialAccount).filter( | |
| UserFinancialAccount.user_id == target_user.id, | |
| UserFinancialAccount.is_active == True, | |
| UserFinancialAccount.deleted_at == None | |
| ).count() | |
| documents_count = db.query(UserDocumentLink).filter( | |
| UserDocumentLink.user_id == target_user.id | |
| ).count() | |
| asset_assignments_count = db.query(UserAssetAssignment).filter( | |
| UserAssetAssignment.user_id == target_user.id, | |
| UserAssetAssignment.is_active == True, | |
| UserAssetAssignment.deleted_at == None | |
| ).count() | |
| # Handle empty JSONB fields properly with all field defaults | |
| health_info_dict = target_user.health_info if target_user.health_info else {} | |
| health_response = HealthInfoResponse( | |
| blood_type=health_info_dict.get('blood_type'), | |
| allergies=health_info_dict.get('allergies'), | |
| chronic_conditions=health_info_dict.get('chronic_conditions'), | |
| medications=health_info_dict.get('medications'), | |
| last_medical_check=health_info_dict.get('last_medical_check'), | |
| medical_notes=health_info_dict.get('medical_notes') | |
| ) | |
| ppe_sizes_dict = target_user.ppe_sizes if target_user.ppe_sizes else {} | |
| ppe_response = PPESizesResponse( | |
| height=ppe_sizes_dict.get('height'), | |
| weight=ppe_sizes_dict.get('weight'), | |
| waist=ppe_sizes_dict.get('waist'), | |
| shoe_size=ppe_sizes_dict.get('shoe_size'), | |
| helmet_size=ppe_sizes_dict.get('helmet_size'), | |
| shirt_size=ppe_sizes_dict.get('shirt_size'), | |
| pants_size=ppe_sizes_dict.get('pants_size'), | |
| glove_size=ppe_sizes_dict.get('glove_size'), | |
| vest_size=ppe_sizes_dict.get('vest_size') | |
| ) | |
| # Get profile photo URL | |
| from app.models.document import Document | |
| profile_photo_url = None | |
| doc_link = db.query(UserDocumentLink).filter( | |
| UserDocumentLink.user_id == target_user.id, | |
| UserDocumentLink.document_link_type == 'profile_photo' | |
| ).first() | |
| if doc_link: | |
| document = db.query(Document).filter( | |
| Document.id == doc_link.document_id, | |
| Document.deleted_at == None | |
| ).first() | |
| if document: | |
| profile_photo_url = document.file_url | |
| # Generate fresh signed URL for Supabase files | |
| if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): | |
| from app.integrations.supabase import SupabaseStorageService | |
| bucket = document.additional_metadata.get('bucket') | |
| path = document.additional_metadata.get('path') | |
| if bucket and path: | |
| profile_photo_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| # Build basic info with profile photo | |
| basic_info = BasicProfileResponse.from_orm(target_user) | |
| basic_info.profile_photo_url = profile_photo_url | |
| # Get full documents list if requested | |
| documents_list = None | |
| if include_documents: | |
| from app.models.document import Document | |
| from app.integrations.supabase import SupabaseStorageService | |
| docs = db.query(Document).filter( | |
| Document.entity_type == "user", | |
| Document.entity_id == target_user.id, | |
| Document.deleted_at == None | |
| ).order_by(Document.created_at.desc()).all() | |
| documents_list = [] | |
| for doc in docs: | |
| doc_dict = { | |
| "id": str(doc.id), | |
| "document_type": doc.document_type, | |
| "document_category": doc.document_category, | |
| "file_name": doc.file_name, | |
| "file_url": doc.file_url, | |
| "file_size": doc.file_size, | |
| "file_type": doc.file_type, | |
| "storage_provider": doc.storage_provider, | |
| "version": doc.version, | |
| "is_latest_version": doc.is_latest_version, | |
| "description": doc.description, | |
| "tags": doc.tags or [], | |
| "created_at": doc.created_at if isinstance(doc.created_at, str) else doc.created_at.isoformat() if doc.created_at else None | |
| } | |
| # Generate fresh signed URL for Supabase | |
| if doc.storage_provider == 'supabase' and doc.file_url.startswith('supabase://'): | |
| bucket = doc.additional_metadata.get('bucket') | |
| path = doc.additional_metadata.get('path') | |
| if bucket and path: | |
| doc_dict["file_url"] = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| documents_list.append(doc_dict) | |
| # Get financial accounts if requested | |
| financial_list = None | |
| if include_financial: | |
| accounts = db.query(UserFinancialAccount).filter( | |
| UserFinancialAccount.user_id == target_user.id, | |
| UserFinancialAccount.deleted_at == None | |
| ).all() | |
| financial_list = [{ | |
| "id": str(acc.id), | |
| "account_name": acc.account_name, | |
| "payout_method": acc.payout_method, | |
| "mobile_money_provider": acc.mobile_money_provider, | |
| "mobile_money_phone": acc.mobile_money_phone, | |
| "bank_name": acc.bank_name, | |
| "bank_account_number": acc.bank_account_number, | |
| "is_primary": acc.is_primary, | |
| "is_active": acc.is_active, | |
| "is_verified": acc.is_verified | |
| } for acc in accounts] | |
| # Get asset assignments if requested | |
| assets_list = None | |
| if include_assets: | |
| assignments = db.query(UserAssetAssignment).filter( | |
| UserAssetAssignment.user_id == target_user.id, | |
| UserAssetAssignment.deleted_at == None | |
| ).all() | |
| assets_list = [{ | |
| "id": str(asn.id), | |
| "asset_type": asn.asset_type, | |
| "asset_name": asn.asset_name, | |
| "asset_description": asn.asset_description, | |
| "serial_number": asn.serial_number, | |
| "asset_value": float(asn.asset_value) if asn.asset_value else None, | |
| "assigned_at": asn.assigned_at, | |
| "is_active": asn.is_active, | |
| "condition_on_assign": asn.condition_on_assign | |
| } for asn in assignments] | |
| # Get project memberships if requested | |
| projects_list = None | |
| if include_projects: | |
| from app.models.project import Project | |
| from app.models.project_team import ProjectTeam | |
| memberships = db.query(ProjectTeam).join(Project).filter( | |
| ProjectTeam.user_id == target_user.id, | |
| ProjectTeam.deleted_at == None, | |
| ProjectTeam.removed_at == None, | |
| Project.deleted_at == None | |
| ).all() | |
| projects_list = [] | |
| for mem in memberships: | |
| project_data = { | |
| "id": str(mem.project_id), | |
| "title": mem.project.title if mem.project else None, | |
| "role": mem.role, | |
| "is_lead": mem.is_lead, | |
| "is_assigned_slot": mem.is_assigned_slot, | |
| "assigned_at": mem.assigned_at.isoformat() if mem.assigned_at else None, | |
| # Project role details (with compensation) | |
| "project_role_id": str(mem.project_role_id) if mem.project_role_id else None, | |
| "project_role_name": mem.project_role.role_name if mem.project_role else None, | |
| # Regional assignment | |
| "project_region_id": str(mem.project_region_id) if mem.project_region_id else None, | |
| "region_name": mem.region.region_name if mem.region else None, | |
| "can_work_project_wide": mem.project_region_id is None, | |
| # Subcontractor info | |
| "project_subcontractor_id": str(mem.project_subcontractor_id) if mem.project_subcontractor_id else None, | |
| "is_main_contractor": mem.project_subcontractor_id is None, | |
| } | |
| projects_list.append(project_data) | |
| return CompleteProfileResponse( | |
| basic_info=basic_info, | |
| health_info=health_response, | |
| ppe_sizes=ppe_response, | |
| location=LocationResponse( | |
| current_location_name=target_user.current_location_name, | |
| current_country=target_user.current_country, | |
| current_region=target_user.current_region, | |
| current_city=target_user.current_city, | |
| current_address_line1=target_user.current_address_line1, | |
| current_address_line2=target_user.current_address_line2, | |
| current_maps_link=target_user.current_maps_link, | |
| current_latitude=target_user.current_latitude, | |
| current_longitude=target_user.current_longitude, | |
| current_location_updated_at=target_user.current_location_updated_at | |
| ), | |
| completion_status=completion_status, | |
| financial_accounts_count=financial_accounts_count, | |
| documents_count=documents_count, | |
| asset_assignments_count=asset_assignments_count, | |
| documents=documents_list, | |
| financial_accounts=financial_list, | |
| asset_assignments=assets_list, | |
| projects=projects_list | |
| ) | |
| async def get_user_profile_permissions( | |
| user_id: UUID, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get what sections current user can edit for target user | |
| Useful for UI to show/hide edit buttons | |
| """ | |
| # Get target user | |
| target_user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not target_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| return ProfileService.get_profile_permissions(current_user, target_user, db) | |
| async def update_user_basic_profile( | |
| user_id: UUID, | |
| data: BasicProfileUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update another user's basic profile (requires permission)""" | |
| # Get target user | |
| target_user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not target_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| updated_user = ProfileService.update_basic_profile(target_user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(target_user.id), | |
| description=f"Manager updated user's basic profile: {target_user.email}", | |
| user=current_user, | |
| request=request, | |
| changes={'old': {}, 'new': data.dict(exclude_unset=True)} | |
| ) | |
| return BasicProfileResponse.from_orm(updated_user) | |
| async def update_user_health_info( | |
| user_id: UUID, | |
| data: HealthInfoUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update another user's health information (requires permission)""" | |
| # Get target user | |
| target_user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not target_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| health_info = ProfileService.update_health_info(target_user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(target_user.id), | |
| description=f"Manager updated user's health information: {target_user.email}", | |
| user=current_user, | |
| request=request | |
| ) | |
| return HealthInfoResponse(**health_info) | |
| async def update_user_ppe_sizes( | |
| user_id: UUID, | |
| data: PPESizesUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update another user's PPE sizes (requires permission)""" | |
| # Get target user | |
| target_user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not target_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| ppe_sizes = ProfileService.update_ppe_sizes(target_user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(target_user.id), | |
| description=f"Manager updated user's PPE sizes: {target_user.email}", | |
| user=current_user, | |
| request=request | |
| ) | |
| # Ensure all fields exist with None defaults | |
| ppe_response = { | |
| 'height': ppe_sizes.get('height'), | |
| 'weight': ppe_sizes.get('weight'), | |
| 'waist': ppe_sizes.get('waist'), | |
| 'shoe_size': ppe_sizes.get('shoe_size'), | |
| 'helmet_size': ppe_sizes.get('helmet_size'), | |
| 'shirt_size': ppe_sizes.get('shirt_size'), | |
| 'pants_size': ppe_sizes.get('pants_size'), | |
| 'glove_size': ppe_sizes.get('glove_size'), | |
| 'vest_size': ppe_sizes.get('vest_size') | |
| } | |
| return PPESizesResponse(**ppe_response) | |
| async def update_user_location( | |
| user_id: UUID, | |
| data: LocationUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update another user's location (requires permission)""" | |
| # Get target user | |
| target_user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not target_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| updated_user = ProfileService.update_location(target_user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(target_user.id), | |
| description=f"Manager updated user's location: {target_user.email}", | |
| user=current_user, | |
| request=request | |
| ) | |
| return LocationResponse( | |
| current_location_name=updated_user.current_location_name, | |
| current_country=updated_user.current_country, | |
| current_region=updated_user.current_region, | |
| current_city=updated_user.current_city, | |
| current_address_line1=updated_user.current_address_line1, | |
| current_address_line2=updated_user.current_address_line2, | |
| current_maps_link=updated_user.current_maps_link, | |
| current_latitude=updated_user.current_latitude, | |
| current_longitude=updated_user.current_longitude, | |
| current_location_updated_at=updated_user.current_location_updated_at | |
| ) | |
| async def validate_user_profile( | |
| user_id: UUID, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Validate another user's profile (requires permission)""" | |
| # Get target user | |
| target_user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not target_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| # Check permissions | |
| if not ProfileService.check_edit_permission(current_user, target_user, 'view', db): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You don't have permission to view this user's profile" | |
| ) | |
| return ProfileService.validate_profile(target_user, db) | |