kamau1's picture
Fix audit logging by replacing invalid AuditAction values with UPDATE across all user management operations
c10c65e
"""
User Management Endpoints
Complete user CRUD operations with admin capabilities
"""
from fastapi import APIRouter, Depends, HTTPException, status, Query, Request
from sqlalchemy.orm import Session
from typing import Optional, List
from uuid import UUID
import math
from app.api.deps import get_db, get_current_active_user
from app.models.user import User
from app.schemas.user import (
UserResponse, UserUpdateAdmin, UserStatusUpdate, UserRoleUpdate,
UserOrganizationUpdate, UserListResponse, BulkUserUpdate,
BulkUserStatusUpdate, BulkUserRoleUpdate, BulkOperationResult
)
from app.schemas.filters import UserFilters
from app.services.user_service import UserService
from app.services.audit_service import AuditService
from app.core.permissions import require_permission, require_role, AppRole
import logging
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/users", tags=["Users"])
def parse_user_filters(
client_id: Optional[UUID] = Query(None),
contractor_id: Optional[UUID] = Query(None),
role: Optional[str] = Query(None),
status: Optional[str] = Query(None),
is_active: Optional[bool] = Query(None),
email: Optional[str] = Query(None),
phone: Optional[str] = Query(None),
search: Optional[str] = Query(None),
sort_by: Optional[str] = Query(None),
sort_order: str = Query("desc"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=100),
from_date: Optional[str] = Query(None),
to_date: Optional[str] = Query(None),
) -> UserFilters:
"""Parse and convert query parameters to UserFilters"""
def parse_csv(value: Optional[str]) -> Optional[List[str]]:
if value is None:
return None
return [item.strip() for item in value.split(',') if item.strip()]
return UserFilters(
client_id=client_id,
contractor_id=contractor_id,
role=parse_csv(role),
status=parse_csv(status),
is_active=is_active,
email=email,
phone=phone,
search=search,
sort_by=sort_by,
sort_order=sort_order,
page=page,
page_size=page_size,
from_date=None, # Not used in users
to_date=None, # Not used in users
)
@router.get("", response_model=UserListResponse)
@require_permission("view_users")
async def list_users(
filters: UserFilters = Depends(parse_user_filters),
# Legacy support
skip: int = Query(None, ge=0, description="DEPRECATED: Use page instead"),
limit: int = Query(None, ge=1, le=100, description="DEPRECATED: Use page_size instead"),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
List users with advanced filtering, sorting, and pagination
**Authorization:**
- Platform admins can see all users
- Org admins can see their organization's users
- Other users can see users in their organization
**Sorting:**
- Supported fields: created_at, updated_at, name, email, role, status
- Order: asc or desc
"""
# Handle backward compatibility
if skip is not None and filters.page == 1:
filters.page = (skip // filters.page_size) + 1
if limit is not None:
filters.page_size = limit
skip_calc = (filters.page - 1) * filters.page_size
users, total = UserService.search_users(
db=db,
current_user=current_user,
email=filters.email,
phone=filters.phone,
role=filters.role[0] if filters.role and len(filters.role) == 1 else None, # Temporary: only single value
status=filters.status[0] if filters.status and len(filters.status) == 1 else None, # Temporary: only single value
is_active=filters.is_active,
client_id=filters.client_id,
contractor_id=filters.contractor_id,
skip=skip_calc,
limit=filters.page_size,
sort_by=filters.sort_by or 'created_at',
sort_order=filters.sort_order
)
# Calculate pagination info
total_pages = math.ceil(total / filters.page_size) if filters.page_size > 0 else 0
return UserListResponse(
users=users,
total=total,
page=filters.page,
page_size=filters.page_size,
total_pages=total_pages
)
@router.get("/search", response_model=List[UserResponse])
async def search_users(
email: Optional[str] = Query(None, description="Search by email"),
phone: Optional[str] = Query(None, description="Search by phone"),
role: Optional[str] = Query(None, description="Filter by role"),
limit: int = Query(10, ge=1, le=100),
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Simple user search (legacy endpoint - use GET /users for full features)
Requires authentication. Platform admins can search all users.
Other users can only search within their organization.
"""
users, _ = UserService.search_users(
db=db,
current_user=current_user,
email=email,
phone=phone,
role=role,
skip=0,
limit=limit
)
return users
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: UUID,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Get user by ID
Requires authentication and appropriate permissions.
"""
user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Permission check
if current_user.role == 'platform_admin':
# Platform admins can view anyone
return user
# Users can view their own profile
if current_user.id == user.id:
return user
# Org admins can view their org's users
if current_user.role in ['client_admin', 'contractor_admin']:
if current_user.client_id and current_user.client_id == user.client_id:
return user
if current_user.contractor_id and current_user.contractor_id == user.contractor_id:
return user
# Project managers, sales managers, and dispatchers can view users in their organization
if current_user.role in ['project_manager', 'sales_manager', 'dispatcher']:
# Check if they share the same organization (client or contractor)
if current_user.client_id and current_user.client_id == user.client_id:
return user
if current_user.contractor_id and current_user.contractor_id == user.contractor_id:
return user
# Also check if they share any projects (for cross-org collaboration)
from app.models.project_team import ProjectTeam
shared_projects = db.query(ProjectTeam.project_id).filter(
ProjectTeam.user_id == current_user.id,
ProjectTeam.deleted_at == None,
ProjectTeam.removed_at == None
).intersect(
db.query(ProjectTeam.project_id).filter(
ProjectTeam.user_id == user.id,
ProjectTeam.deleted_at == None,
ProjectTeam.removed_at == None
)
).count()
if shared_projects > 0:
return user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to view this user"
)
# ============================================
# USER UPDATE ENDPOINTS
# ============================================
@router.put("/{user_id}", response_model=UserResponse)
@require_permission("manage_org_users")
async def update_user(
user_id: UUID,
data: UserUpdateAdmin,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Update user (admin operation)
**Authorization:**
- Platform admin - can update any user
- Client admin - can update their client's users (except role/org changes)
- Contractor admin - can update their contractor's users (except role/org changes)
**Updatable Fields:**
- Basic info: name, email, phone, display_name, emergency contacts
- Role (platform admin only)
- Status (admins only)
- Organization (platform admin only)
- is_active flag
"""
# Get target user
user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Update user
updated_user, changes = UserService.update_user(user, data, current_user, db)
# Audit log
if changes['old']: # Only log if there were actual changes
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(user.id),
description=f"Admin updated user: {user.email}",
user=current_user,
request=request,
changes=changes
)
return updated_user
@router.post("/{user_id}/status", response_model=UserResponse)
@require_permission("manage_org_users")
async def change_user_status(
user_id: UUID,
data: UserStatusUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Change user status
**Statuses:**
- invited: User has been invited but not accepted
- pending_setup: User accepted invitation but profile incomplete
- active: User is active and can use the system
- suspended: User is suspended and cannot login
**Authorization:**
- Platform admin - can change any user's status
- Org admins - can change their org's users' status
"""
# Get target user
user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
old_status = user.status
updated_user = UserService.change_user_status(user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(user.id),
description=f"User status changed from {old_status} to {data.status}",
user=current_user,
request=request,
additional_metadata={'reason': data.reason, 'old_status': old_status, 'new_status': data.status}
)
return updated_user
@router.post("/{user_id}/role", response_model=UserResponse)
@require_permission("manage_org_users")
async def change_user_role(
user_id: UUID,
data: UserRoleUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Change user role
**Roles:**
- platform_admin: Full system access
- client_admin: Admin for a client organization
- contractor_admin: Admin for a contractor organization
- sales_manager, project_manager, dispatcher: Management roles
- field_agent, sales_agent: Operational roles
**Authorization:**
- Platform admin - can assign any role
- Org admins - can assign roles within their org (except platform_admin)
"""
# Get target user
user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
old_role = user.role
updated_user = UserService.change_user_role(user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(user.id),
description=f"User role changed from {old_role} to {data.role}",
user=current_user,
request=request,
additional_metadata={'reason': data.reason, 'old_role': old_role, 'new_role': data.role}
)
return updated_user
@router.post("/{user_id}/organization", response_model=UserResponse)
@require_role(AppRole.PLATFORM_ADMIN) # Only platform admins can move users between orgs
async def change_user_organization(
user_id: UUID,
data: UserOrganizationUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Change user organization assignment
**Authorization:**
- Platform admin only
**Rules:**
- User can be assigned to either client OR contractor (not both)
- Role must be compatible with organization type
"""
# Get target user
user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
old_client = user.client_id
old_contractor = user.contractor_id
updated_user, cleanup_summary = UserService.change_user_organization(user, data, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(user.id),
description=f"User organization changed (removed from {cleanup_summary['projects_removed']} projects, returned {cleanup_summary['assets_returned']} assets)",
user=current_user,
request=request,
additional_metadata={
'reason': data.reason,
'old_client_id': str(old_client) if old_client else None,
'old_contractor_id': str(old_contractor) if old_contractor else None,
'new_client_id': str(data.client_id) if data.client_id else None,
'new_contractor_id': str(data.contractor_id) if data.contractor_id else None,
'cleanup_summary': cleanup_summary
}
)
return updated_user
# ============================================
# USER ACTIVATION/DEACTIVATION
# ============================================
@router.post("/{user_id}/deactivate", response_model=UserResponse)
@require_permission("manage_org_users")
async def deactivate_user(
user_id: UUID,
reason: Optional[str] = Query(None, description="Reason for deactivation"),
request: Request = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Deactivate user (set is_active=False, status=suspended)
Deactivated users cannot login but their data is preserved.
**Authorization:**
- Platform admin - can deactivate any user
- Org admins - can deactivate their org's users
"""
# Get target user
user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
updated_user = UserService.deactivate_user(user, reason, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(user.id),
description=f"User deactivated: {user.email}",
user=current_user,
request=request,
additional_metadata={'reason': reason}
)
return updated_user
@router.post("/{user_id}/activate", response_model=UserResponse)
@require_permission("manage_org_users")
async def activate_user(
user_id: UUID,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Activate user (set is_active=True, status=active)
Reactivates a previously deactivated user.
**Authorization:**
- Platform admin - can activate any user
- Org admins - can activate their org's users
"""
# Get target user
user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
updated_user = UserService.activate_user(user, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(user.id),
description=f"User activated: {user.email}",
user=current_user,
request=request
)
return updated_user
# ============================================
# USER DELETION
# ============================================
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
@require_role(AppRole.PLATFORM_ADMIN) # Only platform admins can delete users
async def delete_user(
user_id: UUID,
reason: Optional[str] = Query(None, description="Reason for deletion"),
request: Request = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Soft delete user (set deleted_at timestamp)
**IMPORTANT:** This is a soft delete. User data is preserved but marked as deleted.
User cannot login and won't appear in normal queries.
**Authorization:**
- Platform admin only
**Restrictions:**
- Cannot delete yourself
- Cannot delete users with active assignments (future enhancement)
"""
# Get target user
user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Soft delete
UserService.soft_delete_user(user, reason, current_user, db)
# Audit log
AuditService.log_action(
db=db,
action='delete',
entity_type='user',
entity_id=str(user.id),
description=f"User deleted: {user.email}",
user=current_user,
request=request,
additional_metadata={'reason': reason}
)
return None
# ============================================
# BULK USER OPERATIONS
# ============================================
@router.post("/bulk/update", response_model=BulkOperationResult)
async def bulk_update_users(
data: BulkUserUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Bulk update multiple users
**Authorization:**
- Platform admin - can update any users
- Org admins - can update their org's users
**Note:** Updates are applied individually. Some may succeed while others fail.
Check the response for detailed results.
"""
from app.schemas.user import BulkOperationResult
results = UserService.bulk_update_users(
user_ids=data.user_ids,
updates=data.updates.dict(exclude_unset=True),
current_user=current_user,
db=db
)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
description=f"Bulk update of {results['total']} users ({results['successful']} successful, {results['failed']} failed)",
user=current_user,
request=request,
additional_metadata={
'reason': data.reason,
'results': results
}
)
return BulkOperationResult(**results)
@router.post("/bulk/status", response_model=BulkOperationResult)
async def bulk_change_status(
data: BulkUserStatusUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Bulk change user status
**Authorization:**
- Platform admin - can change any users' status
- Org admins - can change their org's users' status
"""
from app.schemas.user import BulkOperationResult
results = UserService.bulk_change_status(
user_ids=data.user_ids,
status=data.status,
reason=data.reason,
current_user=current_user,
db=db
)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
description=f"Bulk status change to '{data.status}' for {results['total']} users ({results['successful']} successful)",
user=current_user,
request=request,
additional_metadata={
'status': data.status,
'reason': data.reason,
'results': results
}
)
return BulkOperationResult(**results)
@router.post("/bulk/role", response_model=BulkOperationResult)
async def bulk_change_role(
data: BulkUserRoleUpdate,
request: Request,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Bulk change user role
**Authorization:**
- Platform admin - can change any users' role
- Org admins - can change their org's users' role (except platform_admin)
"""
from app.schemas.user import BulkOperationResult
results = UserService.bulk_change_role(
user_ids=data.user_ids,
role=data.role,
reason=data.reason,
current_user=current_user,
db=db
)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
description=f"Bulk role change to '{data.role}' for {results['total']} users ({results['successful']} successful)",
user=current_user,
request=request,
additional_metadata={
'role': data.role,
'reason': data.reason,
'results': results
}
)
return BulkOperationResult(**results)
# ============================================
# ADMIN PASSWORD RESET
# ============================================
from pydantic import BaseModel
from app.core.supabase_client import supabase_admin
class AdminPasswordResetRequest(BaseModel):
new_password: str
@router.post("/{user_id}/reset-password", status_code=status.HTTP_200_OK)
@require_permission("reset_user_password")
async def admin_reset_user_password(
user_id: UUID,
data: AdminPasswordResetRequest,
request: Request = None,
current_user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
Admin resets user's password (Supabase Admin API)
**Authorization:**
- Platform admin (all users)
- Org admins (users in their organization only)
**Use Cases:**
- User forgot password and verified via OTP
- Password recovery after identity verification
- Emergency access restoration
"""
# Get target user
user = db.query(User).filter(
User.id == user_id,
User.deleted_at == None
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Org admins can only reset passwords for users in their org
if current_user.role in ['client_admin', 'contractor_admin']:
if current_user.client_id and user.client_id != current_user.client_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only reset passwords for users in your organization"
)
if current_user.contractor_id and user.contractor_id != current_user.contractor_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only reset passwords for users in your organization"
)
try:
# Use Supabase Admin API to update user password
response = supabase_admin.auth.admin.update_user_by_id(
str(user_id),
{"password": data.new_password}
)
# Audit log
AuditService.log_action(
db=db,
action='update',
entity_type='user',
entity_id=str(user.id),
description=f"Admin reset password for user: {user.email}",
user=current_user,
request=request,
additional_metadata={
'target_user_email': user.email,
'admin_email': current_user.email
}
)
logger.info(f"Admin {current_user.email} reset password for user {user.email}")
return {
"message": f"Password reset successfully for {user.email}",
"user_id": str(user.id),
"user_email": user.email
}
except Exception as e:
logger.error(f"Admin password reset failed for {user.email}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to reset password: {str(e)}"
)