Spaces:
Sleeping
Sleeping
| """ | |
| Role-Based Access Control (RBAC) Definitions | |
| This module provides centralized permission management for all roles. | |
| All route handlers should use @require_permission or @require_role decorators | |
| to enforce consistent access control. | |
| Usage Examples: | |
| from app.core.permissions import require_permission, require_role, AppRole | |
| @router.get("/users") | |
| @require_permission("view_users") | |
| async def list_users(current_user: User = Depends(get_current_active_user)): | |
| ... | |
| @router.delete("/users/{id}") | |
| @require_role(AppRole.PLATFORM_ADMIN) | |
| async def delete_user(current_user: User = Depends(get_current_active_user)): | |
| ... | |
| Permission Categories: | |
| - User Management: view_users, manage_org_users, invite_users | |
| - Documents: view_documents, upload_documents | |
| - Projects: manage_projects, create_tickets | |
| - Financial: view_payroll, approve_expenses | |
| - Reports: view_reports, export_reports | |
| """ | |
| from enum import Enum | |
| from typing import List, Dict, Callable | |
| from functools import wraps | |
| from fastapi import HTTPException, status, Depends | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class AppRole(str, Enum): | |
| """ | |
| Application Roles | |
| Hierarchy (highest to lowest): | |
| 1. PLATFORM_ADMIN - Full system access | |
| 2. CLIENT_ADMIN / CONTRACTOR_ADMIN - Organization-level admin | |
| 3. SALES_MANAGER / PROJECT_MANAGER - Department managers | |
| 4. DISPATCHER - Operations coordinator | |
| 5. FIELD_AGENT / SALES_AGENT - Individual contributors | |
| """ | |
| PLATFORM_ADMIN = "platform_admin" | |
| CLIENT_ADMIN = "client_admin" | |
| CONTRACTOR_ADMIN = "contractor_admin" | |
| SALES_MANAGER = "sales_manager" | |
| PROJECT_MANAGER = "project_manager" | |
| DISPATCHER = "dispatcher" | |
| FIELD_AGENT = "field_agent" | |
| SALES_AGENT = "sales_agent" | |
| # Role permissions mapping | |
| # Platform admin has "*" (all permissions) | |
| # All other roles have explicit permission lists | |
| ROLE_PERMISSIONS: Dict[AppRole, List[str]] = { | |
| AppRole.PLATFORM_ADMIN: [ | |
| "*", # All permissions | |
| "reset_user_password", # Explicitly included for clarity | |
| "view_audit_logs", | |
| "view_organizations", | |
| "view_billing" | |
| ], | |
| AppRole.CLIENT_ADMIN: [ | |
| # User Management | |
| "view_users", | |
| "invite_users", | |
| "manage_org_users", # Can manage users in their organization | |
| "reset_user_password", # Can reset passwords for users in their org | |
| "view_audit_logs", # Can view audit logs for their org | |
| # Organization Management | |
| "create_clients", | |
| "create_contractors", # Can onboard contractors they work with | |
| "view_organizations", # Can view their own organization | |
| "view_billing", # Can view their org's billing/subscription | |
| # Project Management | |
| "manage_projects", | |
| "view_reports", | |
| # Timesheet Management | |
| "view_timesheets", # View team timesheets | |
| "manage_timesheets", # Create, update timesheets | |
| # Inventory Management | |
| "view_inventory", # View inventory lists and stats | |
| "manage_inventory", # Create, update, delete inventory | |
| # Sales & Customers | |
| "manage_sales_orders", | |
| "manage_customers", | |
| "view_customers", | |
| # Documents | |
| "view_documents", | |
| "upload_documents" | |
| ], | |
| AppRole.CONTRACTOR_ADMIN: [ | |
| # User Management | |
| "view_users", | |
| "invite_users", | |
| "manage_org_users", # Can manage users in their organization | |
| "manage_agents", | |
| "reset_user_password", # Can reset passwords for users in their org | |
| "view_audit_logs", # Can view audit logs for their org | |
| # Organization Management | |
| "create_clients", # Can onboard clients they work with | |
| "create_contractors", | |
| "view_organizations", # Can view their own organization | |
| "view_billing", # Can view their org's billing/subscription | |
| # Project Management | |
| "accept_projects", | |
| "view_payroll", | |
| "manage_tickets", | |
| # Timesheet Management | |
| "view_timesheets", # View team timesheets | |
| "manage_timesheets", # Create, update timesheets | |
| # Inventory Management | |
| "view_inventory", # View inventory lists and stats | |
| "manage_inventory", # Create, update, delete inventory | |
| "record_collection", # Record collections on behalf of agents | |
| "return_inventory", # Record returns on behalf of agents | |
| # Customers | |
| "manage_customers", | |
| "view_customers", | |
| # Documents | |
| "view_documents", | |
| "upload_documents" | |
| ], | |
| AppRole.PROJECT_MANAGER: [ | |
| # User Management | |
| "view_users", | |
| "invite_users", # Can invite users to their projects | |
| "manage_org_users", # Can manage users in their organization | |
| "reset_user_password", # Can reset passwords for users in their org | |
| "view_team_performance", | |
| # Project Management | |
| "manage_projects", # Can edit projects they manage | |
| # Task Management | |
| "view_tasks", # View task lists and details | |
| "manage_tasks", # Create, update, delete tasks | |
| # Ticket Management | |
| "create_tickets", | |
| "manage_tickets", | |
| "approve_expenses", | |
| # Timesheet Management | |
| "view_timesheets", # View team timesheets | |
| "manage_timesheets", # Create, update timesheets | |
| # Inventory Management | |
| "view_inventory", # View inventory lists and stats | |
| "manage_inventory", # Create, update, delete inventory | |
| "record_collection", # Record collections on behalf of agents | |
| "return_inventory", # Record returns on behalf of agents | |
| # Invoice Management | |
| "view_invoices", # View invoice lists and details | |
| "create_invoices", # Generate invoices from tickets | |
| "manage_invoices", # Update invoice details | |
| # Reports | |
| "view_reports", # View business intelligence reports | |
| "export_reports", # Export reports to CSV | |
| # Customers | |
| "manage_customers", | |
| "view_customers", | |
| # Documents | |
| "view_documents", | |
| "upload_documents" | |
| ], | |
| AppRole.DISPATCHER: [ | |
| # User Management | |
| "view_users", | |
| "invite_users", # Can invite users to projects in their contractor | |
| "manage_org_users", # Can manage users in their organization | |
| "reset_user_password", # Can reset passwords for users in their org | |
| "view_agent_locations", | |
| # Project Management | |
| "manage_projects", # Can edit projects in their organization | |
| # Task Management | |
| "assign_tickets", | |
| "manage_tickets", | |
| "issue_equipment", | |
| "approve_expenses", | |
| # Timesheet Management | |
| "view_timesheets", # View team timesheets | |
| "manage_timesheets", # Create, update timesheets | |
| # Inventory Management | |
| "view_inventory", # View inventory lists and stats | |
| "manage_inventory", # Manage distributions and assignments | |
| "record_collection", # Record collections on behalf of agents | |
| "return_inventory", # Record returns on behalf of agents | |
| # Invoice Management | |
| "view_invoices", # View invoice lists and details | |
| "create_invoices", # Generate invoices from tickets | |
| "manage_invoices", # Update invoice details | |
| # Reports | |
| "view_reports", # View business intelligence reports | |
| "export_reports", # Export reports to CSV | |
| # Customers | |
| "manage_customers", | |
| "view_customers", | |
| # Documents | |
| "view_documents" | |
| ], | |
| AppRole.FIELD_AGENT: [ | |
| # Task Management | |
| "view_tasks", # View tasks assigned to them | |
| "view_tickets", | |
| "update_ticket_status", | |
| "log_expenses", | |
| # Timesheet Management (Self-Service) | |
| "view_timesheets", # View own timesheets only | |
| # Inventory Management (Self-Service) | |
| "record_collection", # Record what they collected from hub | |
| "view_inventory", # View available inventory at hubs | |
| "return_inventory", # Record returns to hub | |
| # Customers (view only) | |
| "view_customers", | |
| # Documents | |
| "upload_photos", | |
| "upload_documents", # Upload receipts for expenses | |
| "view_own_documents" | |
| ], | |
| AppRole.SALES_AGENT: [ | |
| # Task Management | |
| "view_tasks", # View tasks related to their sales | |
| # Sales & Customers | |
| "create_orders", | |
| "manage_customers", | |
| "view_customers", | |
| "view_own_performance" | |
| ], | |
| AppRole.SALES_MANAGER: [ | |
| # User Management | |
| "view_users", | |
| "invite_users", # Can invite users to projects in their contractor | |
| "manage_org_users", # Can manage users in their organization | |
| "reset_user_password", # Can reset passwords for users in their org | |
| "manage_sales_agents", | |
| # Project Management | |
| "manage_projects", # Can edit projects in their organization | |
| # Task Management | |
| "view_tasks", # View task lists and details | |
| "manage_tasks", # Create, update, delete tasks | |
| # Sales & Customers | |
| "manage_sales_orders", | |
| "manage_customers", | |
| "view_customers", | |
| # Invoice Management | |
| "view_invoices", # View invoice lists and details | |
| "create_invoices", # Generate invoices from tickets | |
| "manage_invoices", # Update invoice details | |
| # Reports | |
| "view_team_performance", | |
| "view_reports", # View business intelligence reports | |
| "export_reports" # Export reports to CSV | |
| ] | |
| } | |
| def has_permission(role: AppRole, permission: str) -> bool: | |
| """ | |
| Check if a role has a specific permission | |
| Args: | |
| role: User's role | |
| permission: Permission to check (e.g., 'manage_projects', 'invite_users') | |
| Returns: | |
| True if role has permission, False otherwise | |
| """ | |
| if role == AppRole.PLATFORM_ADMIN: | |
| return True | |
| role_perms = ROLE_PERMISSIONS.get(role, []) | |
| return permission in role_perms or "*" in role_perms | |
| def require_permission(permission: str): | |
| """ | |
| Decorator to enforce permission checks on route handlers | |
| Usage: | |
| @router.get("/users") | |
| @require_permission("manage_users") | |
| async def list_users(current_user: User = Depends(get_current_active_user)): | |
| ... | |
| Args: | |
| permission: Required permission string | |
| Raises: | |
| HTTPException 403: If user doesn't have required permission | |
| """ | |
| def decorator(func: Callable): | |
| async def wrapper(*args, **kwargs): | |
| # Extract current_user from kwargs (injected by Depends(get_current_active_user)) | |
| current_user = kwargs.get('current_user') | |
| if not current_user: | |
| logger.error(f"Permission check failed: No current_user in kwargs for {func.__name__}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Authentication required" | |
| ) | |
| # Check if user has permission | |
| user_role = current_user.role | |
| if not has_permission(AppRole(user_role), permission): | |
| logger.warning( | |
| f"Permission denied: User {current_user.email} (role={user_role}) " | |
| f"attempted {func.__name__} requiring '{permission}'" | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail=f"Insufficient permissions. Required: {permission}" | |
| ) | |
| # User has permission, execute the route handler | |
| return await func(*args, **kwargs) | |
| return wrapper | |
| return decorator | |
| def require_any_permission(*permissions: str): | |
| """ | |
| Decorator to enforce that user has at least one of the specified permissions | |
| Usage: | |
| @router.get("/reports") | |
| @require_any_permission("view_reports", "export_reports") | |
| async def get_reports(current_user: User = Depends(get_current_active_user)): | |
| ... | |
| Args: | |
| *permissions: List of acceptable permissions (user needs at least one) | |
| Raises: | |
| HTTPException 403: If user doesn't have any of the required permissions | |
| """ | |
| def decorator(func: Callable): | |
| async def wrapper(*args, **kwargs): | |
| current_user = kwargs.get('current_user') | |
| if not current_user: | |
| logger.error(f"Permission check failed: No current_user in kwargs for {func.__name__}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Authentication required" | |
| ) | |
| # Check if user has any of the permissions | |
| user_role = AppRole(current_user.role) | |
| has_any = any(has_permission(user_role, perm) for perm in permissions) | |
| if not has_any: | |
| logger.warning( | |
| f"Permission denied: User {current_user.email} (role={current_user.role}) " | |
| f"attempted {func.__name__} requiring one of: {', '.join(permissions)}" | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail=f"Insufficient permissions. Required one of: {', '.join(permissions)}" | |
| ) | |
| return await func(*args, **kwargs) | |
| return wrapper | |
| return decorator | |
| def require_role(*roles): | |
| """ | |
| Decorator to enforce specific role requirements (simpler than permission-based) | |
| Usage: | |
| @router.post("/admin/settings") | |
| @require_role(AppRole.PLATFORM_ADMIN) | |
| async def update_settings(current_user: User = Depends(get_current_active_user)): | |
| ... | |
| # Or with strings: | |
| @require_role("platform_admin") | |
| @require_role(["platform_admin", "client_admin"]) | |
| Args: | |
| *roles: Acceptable roles (can be AppRole enums, strings, or list of strings) | |
| Raises: | |
| HTTPException 403: If user doesn't have one of the required roles | |
| """ | |
| def decorator(func: Callable): | |
| async def wrapper(*args, **kwargs): | |
| current_user = kwargs.get('current_user') | |
| if not current_user: | |
| logger.error(f"Role check failed: No current_user in kwargs for {func.__name__}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Authentication required" | |
| ) | |
| # Normalize roles to list of strings | |
| allowed_roles = [] | |
| for role in roles: | |
| if isinstance(role, list): | |
| # Handle list of strings: ["platform_admin", "client_admin"] | |
| allowed_roles.extend(role) | |
| elif isinstance(role, str): | |
| # Handle string: "platform_admin" | |
| allowed_roles.append(role) | |
| elif hasattr(role, 'value'): | |
| # Handle AppRole enum: AppRole.PLATFORM_ADMIN | |
| allowed_roles.append(role.value) | |
| else: | |
| allowed_roles.append(str(role)) | |
| # Check if user has one of the required roles | |
| if current_user.role not in allowed_roles: | |
| logger.warning( | |
| f"Role check failed: User {current_user.email} (role={current_user.role}) " | |
| f"attempted {func.__name__} requiring role: {', '.join(allowed_roles)}" | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail=f"Insufficient privileges. Required role: {', '.join(allowed_roles)}" | |
| ) | |
| # Call function - handle both sync and async | |
| import inspect | |
| if inspect.iscoroutinefunction(func): | |
| return await func(*args, **kwargs) | |
| else: | |
| return func(*args, **kwargs) | |
| return wrapper | |
| return decorator | |