""" User Management Endpoints Complete user CRUD operations with admin capabilities """ from fastapi import APIRouter, Depends, HTTPException, status, Query, Request from sqlalchemy.orm import Session from typing import Optional, List from uuid import UUID import math from app.api.deps import get_db, get_current_active_user from app.models.user import User from app.schemas.user import ( UserResponse, UserUpdateAdmin, UserStatusUpdate, UserRoleUpdate, UserOrganizationUpdate, UserListResponse, BulkUserUpdate, BulkUserStatusUpdate, BulkUserRoleUpdate, BulkOperationResult ) from app.schemas.filters import UserFilters from app.services.user_service import UserService from app.services.audit_service import AuditService from app.core.permissions import require_permission, require_role, AppRole import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/users", tags=["Users"]) def parse_user_filters( client_id: Optional[UUID] = Query(None), contractor_id: Optional[UUID] = Query(None), role: Optional[str] = Query(None), status: Optional[str] = Query(None), is_active: Optional[bool] = Query(None), email: Optional[str] = Query(None), phone: Optional[str] = Query(None), search: Optional[str] = Query(None), sort_by: Optional[str] = Query(None), sort_order: str = Query("desc"), page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=100), from_date: Optional[str] = Query(None), to_date: Optional[str] = Query(None), ) -> UserFilters: """Parse and convert query parameters to UserFilters""" def parse_csv(value: Optional[str]) -> Optional[List[str]]: if value is None: return None return [item.strip() for item in value.split(',') if item.strip()] return UserFilters( client_id=client_id, contractor_id=contractor_id, role=parse_csv(role), status=parse_csv(status), is_active=is_active, email=email, phone=phone, search=search, sort_by=sort_by, sort_order=sort_order, page=page, page_size=page_size, from_date=None, # Not used in users to_date=None, # Not used in users ) @router.get("", response_model=UserListResponse) @require_permission("view_users") async def list_users( filters: UserFilters = Depends(parse_user_filters), # Legacy support skip: int = Query(None, ge=0, description="DEPRECATED: Use page instead"), limit: int = Query(None, ge=1, le=100, description="DEPRECATED: Use page_size instead"), current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ List users with advanced filtering, sorting, and pagination **Authorization:** - Platform admins can see all users - Org admins can see their organization's users - Other users can see users in their organization **Sorting:** - Supported fields: created_at, updated_at, name, email, role, status - Order: asc or desc """ # Handle backward compatibility if skip is not None and filters.page == 1: filters.page = (skip // filters.page_size) + 1 if limit is not None: filters.page_size = limit skip_calc = (filters.page - 1) * filters.page_size users, total = UserService.search_users( db=db, current_user=current_user, email=filters.email, phone=filters.phone, role=filters.role[0] if filters.role and len(filters.role) == 1 else None, # Temporary: only single value status=filters.status[0] if filters.status and len(filters.status) == 1 else None, # Temporary: only single value is_active=filters.is_active, client_id=filters.client_id, contractor_id=filters.contractor_id, skip=skip_calc, limit=filters.page_size, sort_by=filters.sort_by or 'created_at', sort_order=filters.sort_order ) # Calculate pagination info total_pages = math.ceil(total / filters.page_size) if filters.page_size > 0 else 0 return UserListResponse( users=users, total=total, page=filters.page, page_size=filters.page_size, total_pages=total_pages ) @router.get("/search", response_model=List[UserResponse]) async def search_users( email: Optional[str] = Query(None, description="Search by email"), phone: Optional[str] = Query(None, description="Search by phone"), role: Optional[str] = Query(None, description="Filter by role"), limit: int = Query(10, ge=1, le=100), current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Simple user search (legacy endpoint - use GET /users for full features) Requires authentication. Platform admins can search all users. Other users can only search within their organization. """ users, _ = UserService.search_users( db=db, current_user=current_user, email=email, phone=phone, role=role, skip=0, limit=limit ) return users @router.get("/{user_id}", response_model=UserResponse) async def get_user( user_id: UUID, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get user by ID Requires authentication and appropriate permissions. """ user = db.query(User).filter( User.id == user_id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Permission check if current_user.role == 'platform_admin': # Platform admins can view anyone return user # Users can view their own profile if current_user.id == user.id: return user # Org admins can view their org's users if current_user.role in ['client_admin', 'contractor_admin']: if current_user.client_id and current_user.client_id == user.client_id: return user if current_user.contractor_id and current_user.contractor_id == user.contractor_id: return user # Project managers, sales managers, and dispatchers can view users in their organization if current_user.role in ['project_manager', 'sales_manager', 'dispatcher']: # Check if they share the same organization (client or contractor) if current_user.client_id and current_user.client_id == user.client_id: return user if current_user.contractor_id and current_user.contractor_id == user.contractor_id: return user # Also check if they share any projects (for cross-org collaboration) from app.models.project_team import ProjectTeam shared_projects = db.query(ProjectTeam.project_id).filter( ProjectTeam.user_id == current_user.id, ProjectTeam.deleted_at == None, ProjectTeam.removed_at == None ).intersect( db.query(ProjectTeam.project_id).filter( ProjectTeam.user_id == user.id, ProjectTeam.deleted_at == None, ProjectTeam.removed_at == None ) ).count() if shared_projects > 0: return user raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have permission to view this user" ) # ============================================ # USER UPDATE ENDPOINTS # ============================================ @router.put("/{user_id}", response_model=UserResponse) @require_permission("manage_org_users") async def update_user( user_id: UUID, data: UserUpdateAdmin, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Update user (admin operation) **Authorization:** - Platform admin - can update any user - Client admin - can update their client's users (except role/org changes) - Contractor admin - can update their contractor's users (except role/org changes) **Updatable Fields:** - Basic info: name, email, phone, display_name, emergency contacts - Role (platform admin only) - Status (admins only) - Organization (platform admin only) - is_active flag """ # Get target user user = db.query(User).filter( User.id == user_id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Update user updated_user, changes = UserService.update_user(user, data, current_user, db) # Audit log if changes['old']: # Only log if there were actual changes AuditService.log_action( db=db, action='update', entity_type='user', entity_id=str(user.id), description=f"Admin updated user: {user.email}", user=current_user, request=request, changes=changes ) return updated_user @router.post("/{user_id}/status", response_model=UserResponse) @require_permission("manage_org_users") async def change_user_status( user_id: UUID, data: UserStatusUpdate, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Change user status **Statuses:** - invited: User has been invited but not accepted - pending_setup: User accepted invitation but profile incomplete - active: User is active and can use the system - suspended: User is suspended and cannot login **Authorization:** - Platform admin - can change any user's status - Org admins - can change their org's users' status """ # Get target user user = db.query(User).filter( User.id == user_id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) old_status = user.status updated_user = UserService.change_user_status(user, data, current_user, db) # Audit log AuditService.log_action( db=db, action='update', entity_type='user', entity_id=str(user.id), description=f"User status changed from {old_status} to {data.status}", user=current_user, request=request, additional_metadata={'reason': data.reason, 'old_status': old_status, 'new_status': data.status} ) return updated_user @router.post("/{user_id}/role", response_model=UserResponse) @require_permission("manage_org_users") async def change_user_role( user_id: UUID, data: UserRoleUpdate, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Change user role **Roles:** - platform_admin: Full system access - client_admin: Admin for a client organization - contractor_admin: Admin for a contractor organization - sales_manager, project_manager, dispatcher: Management roles - field_agent, sales_agent: Operational roles **Authorization:** - Platform admin - can assign any role - Org admins - can assign roles within their org (except platform_admin) """ # Get target user user = db.query(User).filter( User.id == user_id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) old_role = user.role updated_user = UserService.change_user_role(user, data, current_user, db) # Audit log AuditService.log_action( db=db, action='update', entity_type='user', entity_id=str(user.id), description=f"User role changed from {old_role} to {data.role}", user=current_user, request=request, additional_metadata={'reason': data.reason, 'old_role': old_role, 'new_role': data.role} ) return updated_user @router.post("/{user_id}/organization", response_model=UserResponse) @require_role(AppRole.PLATFORM_ADMIN) # Only platform admins can move users between orgs async def change_user_organization( user_id: UUID, data: UserOrganizationUpdate, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Change user organization assignment **Authorization:** - Platform admin only **Rules:** - User can be assigned to either client OR contractor (not both) - Role must be compatible with organization type """ # Get target user user = db.query(User).filter( User.id == user_id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) old_client = user.client_id old_contractor = user.contractor_id updated_user, cleanup_summary = UserService.change_user_organization(user, data, current_user, db) # Audit log AuditService.log_action( db=db, action='update', entity_type='user', entity_id=str(user.id), description=f"User organization changed (removed from {cleanup_summary['projects_removed']} projects, returned {cleanup_summary['assets_returned']} assets)", user=current_user, request=request, additional_metadata={ 'reason': data.reason, 'old_client_id': str(old_client) if old_client else None, 'old_contractor_id': str(old_contractor) if old_contractor else None, 'new_client_id': str(data.client_id) if data.client_id else None, 'new_contractor_id': str(data.contractor_id) if data.contractor_id else None, 'cleanup_summary': cleanup_summary } ) return updated_user # ============================================ # USER ACTIVATION/DEACTIVATION # ============================================ @router.post("/{user_id}/deactivate", response_model=UserResponse) @require_permission("manage_org_users") async def deactivate_user( user_id: UUID, reason: Optional[str] = Query(None, description="Reason for deactivation"), request: Request = None, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Deactivate user (set is_active=False, status=suspended) Deactivated users cannot login but their data is preserved. **Authorization:** - Platform admin - can deactivate any user - Org admins - can deactivate their org's users """ # Get target user user = db.query(User).filter( User.id == user_id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) updated_user = UserService.deactivate_user(user, reason, current_user, db) # Audit log AuditService.log_action( db=db, action='update', entity_type='user', entity_id=str(user.id), description=f"User deactivated: {user.email}", user=current_user, request=request, additional_metadata={'reason': reason} ) return updated_user @router.post("/{user_id}/activate", response_model=UserResponse) @require_permission("manage_org_users") async def activate_user( user_id: UUID, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Activate user (set is_active=True, status=active) Reactivates a previously deactivated user. **Authorization:** - Platform admin - can activate any user - Org admins - can activate their org's users """ # Get target user user = db.query(User).filter( User.id == user_id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) updated_user = UserService.activate_user(user, current_user, db) # Audit log AuditService.log_action( db=db, action='update', entity_type='user', entity_id=str(user.id), description=f"User activated: {user.email}", user=current_user, request=request ) return updated_user # ============================================ # USER DELETION # ============================================ @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) @require_role(AppRole.PLATFORM_ADMIN) # Only platform admins can delete users async def delete_user( user_id: UUID, reason: Optional[str] = Query(None, description="Reason for deletion"), request: Request = None, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Soft delete user (set deleted_at timestamp) **IMPORTANT:** This is a soft delete. User data is preserved but marked as deleted. User cannot login and won't appear in normal queries. **Authorization:** - Platform admin only **Restrictions:** - Cannot delete yourself - Cannot delete users with active assignments (future enhancement) """ # Get target user user = db.query(User).filter( User.id == user_id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Soft delete UserService.soft_delete_user(user, reason, current_user, db) # Audit log AuditService.log_action( db=db, action='delete', entity_type='user', entity_id=str(user.id), description=f"User deleted: {user.email}", user=current_user, request=request, additional_metadata={'reason': reason} ) return None # ============================================ # BULK USER OPERATIONS # ============================================ @router.post("/bulk/update", response_model=BulkOperationResult) async def bulk_update_users( data: BulkUserUpdate, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Bulk update multiple users **Authorization:** - Platform admin - can update any users - Org admins - can update their org's users **Note:** Updates are applied individually. Some may succeed while others fail. Check the response for detailed results. """ from app.schemas.user import BulkOperationResult results = UserService.bulk_update_users( user_ids=data.user_ids, updates=data.updates.dict(exclude_unset=True), current_user=current_user, db=db ) # Audit log AuditService.log_action( db=db, action='update', entity_type='user', description=f"Bulk update of {results['total']} users ({results['successful']} successful, {results['failed']} failed)", user=current_user, request=request, additional_metadata={ 'reason': data.reason, 'results': results } ) return BulkOperationResult(**results) @router.post("/bulk/status", response_model=BulkOperationResult) async def bulk_change_status( data: BulkUserStatusUpdate, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Bulk change user status **Authorization:** - Platform admin - can change any users' status - Org admins - can change their org's users' status """ from app.schemas.user import BulkOperationResult results = UserService.bulk_change_status( user_ids=data.user_ids, status=data.status, reason=data.reason, current_user=current_user, db=db ) # Audit log AuditService.log_action( db=db, action='update', entity_type='user', description=f"Bulk status change to '{data.status}' for {results['total']} users ({results['successful']} successful)", user=current_user, request=request, additional_metadata={ 'status': data.status, 'reason': data.reason, 'results': results } ) return BulkOperationResult(**results) @router.post("/bulk/role", response_model=BulkOperationResult) async def bulk_change_role( data: BulkUserRoleUpdate, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Bulk change user role **Authorization:** - Platform admin - can change any users' role - Org admins - can change their org's users' role (except platform_admin) """ from app.schemas.user import BulkOperationResult results = UserService.bulk_change_role( user_ids=data.user_ids, role=data.role, reason=data.reason, current_user=current_user, db=db ) # Audit log AuditService.log_action( db=db, action='update', entity_type='user', description=f"Bulk role change to '{data.role}' for {results['total']} users ({results['successful']} successful)", user=current_user, request=request, additional_metadata={ 'role': data.role, 'reason': data.reason, 'results': results } ) return BulkOperationResult(**results) # ============================================ # ADMIN PASSWORD RESET # ============================================ from pydantic import BaseModel from app.core.supabase_client import supabase_admin class AdminPasswordResetRequest(BaseModel): new_password: str @router.post("/{user_id}/reset-password", status_code=status.HTTP_200_OK) @require_permission("reset_user_password") async def admin_reset_user_password( user_id: UUID, data: AdminPasswordResetRequest, request: Request = None, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Admin resets user's password (Supabase Admin API) **Authorization:** - Platform admin (all users) - Org admins (users in their organization only) **Use Cases:** - User forgot password and verified via OTP - Password recovery after identity verification - Emergency access restoration """ # Get target user user = db.query(User).filter( User.id == user_id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Org admins can only reset passwords for users in their org if current_user.role in ['client_admin', 'contractor_admin']: if current_user.client_id and user.client_id != current_user.client_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You can only reset passwords for users in your organization" ) if current_user.contractor_id and user.contractor_id != current_user.contractor_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You can only reset passwords for users in your organization" ) try: # Use Supabase Admin API to update user password response = supabase_admin.auth.admin.update_user_by_id( str(user_id), {"password": data.new_password} ) # Audit log AuditService.log_action( db=db, action='update', entity_type='user', entity_id=str(user.id), description=f"Admin reset password for user: {user.email}", user=current_user, request=request, additional_metadata={ 'target_user_email': user.email, 'admin_email': current_user.email } ) logger.info(f"Admin {current_user.email} reset password for user {user.email}") return { "message": f"Password reset successfully for {user.email}", "user_id": str(user.id), "user_email": user.email } except Exception as e: logger.error(f"Admin password reset failed for {user.email}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to reset password: {str(e)}" )