Spaces:
Sleeping
Sleeping
| """ | |
| User Management Endpoints | |
| Complete user CRUD operations with admin capabilities | |
| """ | |
| from fastapi import APIRouter, Depends, HTTPException, status, Query, Request | |
| from sqlalchemy.orm import Session | |
| from typing import Optional, List | |
| from uuid import UUID | |
| import math | |
| from app.api.deps import get_db, get_current_active_user | |
| from app.models.user import User | |
| from app.schemas.user import ( | |
| UserResponse, UserUpdateAdmin, UserStatusUpdate, UserRoleUpdate, | |
| UserOrganizationUpdate, UserListResponse, BulkUserUpdate, | |
| BulkUserStatusUpdate, BulkUserRoleUpdate, BulkOperationResult | |
| ) | |
| from app.schemas.filters import UserFilters | |
| from app.services.user_service import UserService | |
| from app.services.audit_service import AuditService | |
| from app.core.permissions import require_permission, require_role, AppRole | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/users", tags=["Users"]) | |
| def parse_user_filters( | |
| client_id: Optional[UUID] = Query(None), | |
| contractor_id: Optional[UUID] = Query(None), | |
| role: Optional[str] = Query(None), | |
| status: Optional[str] = Query(None), | |
| is_active: Optional[bool] = Query(None), | |
| email: Optional[str] = Query(None), | |
| phone: Optional[str] = Query(None), | |
| search: Optional[str] = Query(None), | |
| sort_by: Optional[str] = Query(None), | |
| sort_order: str = Query("desc"), | |
| page: int = Query(1, ge=1), | |
| page_size: int = Query(50, ge=1, le=100), | |
| from_date: Optional[str] = Query(None), | |
| to_date: Optional[str] = Query(None), | |
| ) -> UserFilters: | |
| """Parse and convert query parameters to UserFilters""" | |
| def parse_csv(value: Optional[str]) -> Optional[List[str]]: | |
| if value is None: | |
| return None | |
| return [item.strip() for item in value.split(',') if item.strip()] | |
| return UserFilters( | |
| client_id=client_id, | |
| contractor_id=contractor_id, | |
| role=parse_csv(role), | |
| status=parse_csv(status), | |
| is_active=is_active, | |
| email=email, | |
| phone=phone, | |
| search=search, | |
| sort_by=sort_by, | |
| sort_order=sort_order, | |
| page=page, | |
| page_size=page_size, | |
| from_date=None, # Not used in users | |
| to_date=None, # Not used in users | |
| ) | |
| async def list_users( | |
| filters: UserFilters = Depends(parse_user_filters), | |
| # Legacy support | |
| skip: int = Query(None, ge=0, description="DEPRECATED: Use page instead"), | |
| limit: int = Query(None, ge=1, le=100, description="DEPRECATED: Use page_size instead"), | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| List users with advanced filtering, sorting, and pagination | |
| **Authorization:** | |
| - Platform admins can see all users | |
| - Org admins can see their organization's users | |
| - Other users can see users in their organization | |
| **Sorting:** | |
| - Supported fields: created_at, updated_at, name, email, role, status | |
| - Order: asc or desc | |
| """ | |
| # Handle backward compatibility | |
| if skip is not None and filters.page == 1: | |
| filters.page = (skip // filters.page_size) + 1 | |
| if limit is not None: | |
| filters.page_size = limit | |
| skip_calc = (filters.page - 1) * filters.page_size | |
| users, total = UserService.search_users( | |
| db=db, | |
| current_user=current_user, | |
| email=filters.email, | |
| phone=filters.phone, | |
| role=filters.role[0] if filters.role and len(filters.role) == 1 else None, # Temporary: only single value | |
| status=filters.status[0] if filters.status and len(filters.status) == 1 else None, # Temporary: only single value | |
| is_active=filters.is_active, | |
| client_id=filters.client_id, | |
| contractor_id=filters.contractor_id, | |
| skip=skip_calc, | |
| limit=filters.page_size, | |
| sort_by=filters.sort_by or 'created_at', | |
| sort_order=filters.sort_order | |
| ) | |
| # Calculate pagination info | |
| total_pages = math.ceil(total / filters.page_size) if filters.page_size > 0 else 0 | |
| return UserListResponse( | |
| users=users, | |
| total=total, | |
| page=filters.page, | |
| page_size=filters.page_size, | |
| total_pages=total_pages | |
| ) | |
| async def search_users( | |
| email: Optional[str] = Query(None, description="Search by email"), | |
| phone: Optional[str] = Query(None, description="Search by phone"), | |
| role: Optional[str] = Query(None, description="Filter by role"), | |
| limit: int = Query(10, ge=1, le=100), | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Simple user search (legacy endpoint - use GET /users for full features) | |
| Requires authentication. Platform admins can search all users. | |
| Other users can only search within their organization. | |
| """ | |
| users, _ = UserService.search_users( | |
| db=db, | |
| current_user=current_user, | |
| email=email, | |
| phone=phone, | |
| role=role, | |
| skip=0, | |
| limit=limit | |
| ) | |
| return users | |
| async def get_user( | |
| user_id: UUID, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get user by ID | |
| Requires authentication and appropriate permissions. | |
| """ | |
| user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| # Permission check | |
| if current_user.role == 'platform_admin': | |
| # Platform admins can view anyone | |
| return user | |
| # Users can view their own profile | |
| if current_user.id == user.id: | |
| return user | |
| # Org admins can view their org's users | |
| if current_user.role in ['client_admin', 'contractor_admin']: | |
| if current_user.client_id and current_user.client_id == user.client_id: | |
| return user | |
| if current_user.contractor_id and current_user.contractor_id == user.contractor_id: | |
| return user | |
| # Project managers, sales managers, and dispatchers can view users in their organization | |
| if current_user.role in ['project_manager', 'sales_manager', 'dispatcher']: | |
| # Check if they share the same organization (client or contractor) | |
| if current_user.client_id and current_user.client_id == user.client_id: | |
| return user | |
| if current_user.contractor_id and current_user.contractor_id == user.contractor_id: | |
| return user | |
| # Also check if they share any projects (for cross-org collaboration) | |
| from app.models.project_team import ProjectTeam | |
| shared_projects = db.query(ProjectTeam.project_id).filter( | |
| ProjectTeam.user_id == current_user.id, | |
| ProjectTeam.deleted_at == None, | |
| ProjectTeam.removed_at == None | |
| ).intersect( | |
| db.query(ProjectTeam.project_id).filter( | |
| ProjectTeam.user_id == user.id, | |
| ProjectTeam.deleted_at == None, | |
| ProjectTeam.removed_at == None | |
| ) | |
| ).count() | |
| if shared_projects > 0: | |
| return user | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You don't have permission to view this user" | |
| ) | |
| # ============================================ | |
| # USER UPDATE ENDPOINTS | |
| # ============================================ | |
| async def update_user( | |
| user_id: UUID, | |
| data: UserUpdateAdmin, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update user (admin operation) | |
| **Authorization:** | |
| - Platform admin - can update any user | |
| - Client admin - can update their client's users (except role/org changes) | |
| - Contractor admin - can update their contractor's users (except role/org changes) | |
| **Updatable Fields:** | |
| - Basic info: name, email, phone, display_name, emergency contacts | |
| - Role (platform admin only) | |
| - Status (admins only) | |
| - Organization (platform admin only) | |
| - is_active flag | |
| """ | |
| # Get target user | |
| user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| # Update user | |
| updated_user, changes = UserService.update_user(user, data, current_user, db) | |
| # Audit log | |
| if changes['old']: # Only log if there were actual changes | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(user.id), | |
| description=f"Admin updated user: {user.email}", | |
| user=current_user, | |
| request=request, | |
| changes=changes | |
| ) | |
| return updated_user | |
| async def change_user_status( | |
| user_id: UUID, | |
| data: UserStatusUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Change user status | |
| **Statuses:** | |
| - invited: User has been invited but not accepted | |
| - pending_setup: User accepted invitation but profile incomplete | |
| - active: User is active and can use the system | |
| - suspended: User is suspended and cannot login | |
| **Authorization:** | |
| - Platform admin - can change any user's status | |
| - Org admins - can change their org's users' status | |
| """ | |
| # Get target user | |
| user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| old_status = user.status | |
| updated_user = UserService.change_user_status(user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(user.id), | |
| description=f"User status changed from {old_status} to {data.status}", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={'reason': data.reason, 'old_status': old_status, 'new_status': data.status} | |
| ) | |
| return updated_user | |
| async def change_user_role( | |
| user_id: UUID, | |
| data: UserRoleUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Change user role | |
| **Roles:** | |
| - platform_admin: Full system access | |
| - client_admin: Admin for a client organization | |
| - contractor_admin: Admin for a contractor organization | |
| - sales_manager, project_manager, dispatcher: Management roles | |
| - field_agent, sales_agent: Operational roles | |
| **Authorization:** | |
| - Platform admin - can assign any role | |
| - Org admins - can assign roles within their org (except platform_admin) | |
| """ | |
| # Get target user | |
| user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| old_role = user.role | |
| updated_user = UserService.change_user_role(user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(user.id), | |
| description=f"User role changed from {old_role} to {data.role}", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={'reason': data.reason, 'old_role': old_role, 'new_role': data.role} | |
| ) | |
| return updated_user | |
| # Only platform admins can move users between orgs | |
| async def change_user_organization( | |
| user_id: UUID, | |
| data: UserOrganizationUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Change user organization assignment | |
| **Authorization:** | |
| - Platform admin only | |
| **Rules:** | |
| - User can be assigned to either client OR contractor (not both) | |
| - Role must be compatible with organization type | |
| """ | |
| # Get target user | |
| user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| old_client = user.client_id | |
| old_contractor = user.contractor_id | |
| updated_user, cleanup_summary = UserService.change_user_organization(user, data, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(user.id), | |
| description=f"User organization changed (removed from {cleanup_summary['projects_removed']} projects, returned {cleanup_summary['assets_returned']} assets)", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={ | |
| 'reason': data.reason, | |
| 'old_client_id': str(old_client) if old_client else None, | |
| 'old_contractor_id': str(old_contractor) if old_contractor else None, | |
| 'new_client_id': str(data.client_id) if data.client_id else None, | |
| 'new_contractor_id': str(data.contractor_id) if data.contractor_id else None, | |
| 'cleanup_summary': cleanup_summary | |
| } | |
| ) | |
| return updated_user | |
| # ============================================ | |
| # USER ACTIVATION/DEACTIVATION | |
| # ============================================ | |
| async def deactivate_user( | |
| user_id: UUID, | |
| reason: Optional[str] = Query(None, description="Reason for deactivation"), | |
| request: Request = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Deactivate user (set is_active=False, status=suspended) | |
| Deactivated users cannot login but their data is preserved. | |
| **Authorization:** | |
| - Platform admin - can deactivate any user | |
| - Org admins - can deactivate their org's users | |
| """ | |
| # Get target user | |
| user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| updated_user = UserService.deactivate_user(user, reason, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(user.id), | |
| description=f"User deactivated: {user.email}", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={'reason': reason} | |
| ) | |
| return updated_user | |
| async def activate_user( | |
| user_id: UUID, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Activate user (set is_active=True, status=active) | |
| Reactivates a previously deactivated user. | |
| **Authorization:** | |
| - Platform admin - can activate any user | |
| - Org admins - can activate their org's users | |
| """ | |
| # Get target user | |
| user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| updated_user = UserService.activate_user(user, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(user.id), | |
| description=f"User activated: {user.email}", | |
| user=current_user, | |
| request=request | |
| ) | |
| return updated_user | |
| # ============================================ | |
| # USER DELETION | |
| # ============================================ | |
| # Only platform admins can delete users | |
| async def delete_user( | |
| user_id: UUID, | |
| reason: Optional[str] = Query(None, description="Reason for deletion"), | |
| request: Request = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Soft delete user (set deleted_at timestamp) | |
| **IMPORTANT:** This is a soft delete. User data is preserved but marked as deleted. | |
| User cannot login and won't appear in normal queries. | |
| **Authorization:** | |
| - Platform admin only | |
| **Restrictions:** | |
| - Cannot delete yourself | |
| - Cannot delete users with active assignments (future enhancement) | |
| """ | |
| # Get target user | |
| user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| # Soft delete | |
| UserService.soft_delete_user(user, reason, current_user, db) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='delete', | |
| entity_type='user', | |
| entity_id=str(user.id), | |
| description=f"User deleted: {user.email}", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={'reason': reason} | |
| ) | |
| return None | |
| # ============================================ | |
| # BULK USER OPERATIONS | |
| # ============================================ | |
| async def bulk_update_users( | |
| data: BulkUserUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Bulk update multiple users | |
| **Authorization:** | |
| - Platform admin - can update any users | |
| - Org admins - can update their org's users | |
| **Note:** Updates are applied individually. Some may succeed while others fail. | |
| Check the response for detailed results. | |
| """ | |
| from app.schemas.user import BulkOperationResult | |
| results = UserService.bulk_update_users( | |
| user_ids=data.user_ids, | |
| updates=data.updates.dict(exclude_unset=True), | |
| current_user=current_user, | |
| db=db | |
| ) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| description=f"Bulk update of {results['total']} users ({results['successful']} successful, {results['failed']} failed)", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={ | |
| 'reason': data.reason, | |
| 'results': results | |
| } | |
| ) | |
| return BulkOperationResult(**results) | |
| async def bulk_change_status( | |
| data: BulkUserStatusUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Bulk change user status | |
| **Authorization:** | |
| - Platform admin - can change any users' status | |
| - Org admins - can change their org's users' status | |
| """ | |
| from app.schemas.user import BulkOperationResult | |
| results = UserService.bulk_change_status( | |
| user_ids=data.user_ids, | |
| status=data.status, | |
| reason=data.reason, | |
| current_user=current_user, | |
| db=db | |
| ) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| description=f"Bulk status change to '{data.status}' for {results['total']} users ({results['successful']} successful)", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={ | |
| 'status': data.status, | |
| 'reason': data.reason, | |
| 'results': results | |
| } | |
| ) | |
| return BulkOperationResult(**results) | |
| async def bulk_change_role( | |
| data: BulkUserRoleUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Bulk change user role | |
| **Authorization:** | |
| - Platform admin - can change any users' role | |
| - Org admins - can change their org's users' role (except platform_admin) | |
| """ | |
| from app.schemas.user import BulkOperationResult | |
| results = UserService.bulk_change_role( | |
| user_ids=data.user_ids, | |
| role=data.role, | |
| reason=data.reason, | |
| current_user=current_user, | |
| db=db | |
| ) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| description=f"Bulk role change to '{data.role}' for {results['total']} users ({results['successful']} successful)", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={ | |
| 'role': data.role, | |
| 'reason': data.reason, | |
| 'results': results | |
| } | |
| ) | |
| return BulkOperationResult(**results) | |
| # ============================================ | |
| # ADMIN PASSWORD RESET | |
| # ============================================ | |
| from pydantic import BaseModel | |
| from app.core.supabase_client import supabase_admin | |
| class AdminPasswordResetRequest(BaseModel): | |
| new_password: str | |
| async def admin_reset_user_password( | |
| user_id: UUID, | |
| data: AdminPasswordResetRequest, | |
| request: Request = None, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Admin resets user's password (Supabase Admin API) | |
| **Authorization:** | |
| - Platform admin (all users) | |
| - Org admins (users in their organization only) | |
| **Use Cases:** | |
| - User forgot password and verified via OTP | |
| - Password recovery after identity verification | |
| - Emergency access restoration | |
| """ | |
| # Get target user | |
| user = db.query(User).filter( | |
| User.id == user_id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| # Org admins can only reset passwords for users in their org | |
| if current_user.role in ['client_admin', 'contractor_admin']: | |
| if current_user.client_id and user.client_id != current_user.client_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only reset passwords for users in your organization" | |
| ) | |
| if current_user.contractor_id and user.contractor_id != current_user.contractor_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only reset passwords for users in your organization" | |
| ) | |
| try: | |
| # Use Supabase Admin API to update user password | |
| response = supabase_admin.auth.admin.update_user_by_id( | |
| str(user_id), | |
| {"password": data.new_password} | |
| ) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(user.id), | |
| description=f"Admin reset password for user: {user.email}", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={ | |
| 'target_user_email': user.email, | |
| 'admin_email': current_user.email | |
| } | |
| ) | |
| logger.info(f"Admin {current_user.email} reset password for user {user.email}") | |
| return { | |
| "message": f"Password reset successfully for {user.email}", | |
| "user_id": str(user.id), | |
| "user_email": user.email | |
| } | |
| except Exception as e: | |
| logger.error(f"Admin password reset failed for {user.email}: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to reset password: {str(e)}" | |
| ) | |