Spaces:
Sleeping
Sleeping
| """ | |
| Authentication Endpoints - Supabase Auth Integration | |
| """ | |
| from fastapi import APIRouter, Depends, HTTPException, status, Request, Response | |
| from sqlalchemy.orm import Session | |
| from app.api.deps import get_db, get_current_active_user | |
| from app.core.rate_limit import limiter | |
| from app.schemas.auth import ( | |
| LoginRequest, TokenResponse, PasswordChange, | |
| RefreshTokenRequest, ForgotPasswordRequest, ResetPasswordRequest, MessageResponse | |
| ) | |
| from app.schemas.user import ( | |
| UserCreate, UserResponse, UserUpdate, UserProfile, | |
| AdminOTPRequest, AdminRegistrationRequest | |
| ) | |
| from app.schemas.user_preferences import ( | |
| UserPreferencesUpdate, UserPreferencesResponse, | |
| DEFAULT_DASHBOARD_WIDGETS | |
| ) | |
| from app.config.apps import ( | |
| get_available_apps_for_role, | |
| get_available_app_codes_for_role, | |
| get_default_favorites_for_role, | |
| get_meta_apps_for_role, | |
| validate_apps_for_role, | |
| get_app_by_code, | |
| APPS | |
| ) | |
| from app.models.user_preference import UserPreference | |
| from app.models.user import User | |
| from app.core.supabase_auth import supabase_auth | |
| from app.core.supabase_client import supabase_admin | |
| from app.services.audit_service import AuditService | |
| from app.services.password_reset_service import PasswordResetService | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/auth", tags=["Authentication"]) | |
| # 3 OTP requests per hour per IP | |
| async def send_admin_registration_otp( | |
| request: Request, | |
| response: Response, | |
| otp_request: AdminOTPRequest, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| 🔒 Step 1: Send OTP for Platform Admin Registration | |
| **USER-FRIENDLY FLOW:** | |
| User provides basic info (name, email, phone) → Admin receives OTP with details | |
| **PROCESS:** | |
| 1. User submits: name, email, phone (NO PASSWORD YET) | |
| 2. System sends OTP to configured admin email | |
| 3. OTP email includes: registrant's name, email, phone | |
| 4. Admin verifies identity offline and shares OTP with user | |
| 5. User proceeds to /auth/register with OTP + password | |
| **SECURITY MODEL:** | |
| - Self-registration DISABLED for regular users (must use invitation flow) | |
| - Platform admin creation requires OTP verification | |
| - Admin email configured via PLATFORM_ADMIN_EMAIL environment variable | |
| - Password is NOT sent or stored at this stage | |
| **Request Body:** | |
| ```json | |
| { | |
| "email": "admin@example.com", | |
| "first_name": "John", | |
| "last_name": "Doe", | |
| "phone": "+1234567890" | |
| } | |
| ``` | |
| **Note:** Regular users (field agents, managers, etc.) must be invited by existing admins. | |
| """ | |
| from app.config import settings | |
| from app.services.otp_service import OTPService | |
| PLATFORM_ADMIN_EMAIL = settings.PLATFORM_ADMIN_EMAIL | |
| try: | |
| # Check if user already exists | |
| existing_user = db.query(User).filter(User.email == otp_request.email).first() | |
| if existing_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email already registered" | |
| ) | |
| # Get OTP service | |
| otp_service = OTPService.get_instance() | |
| # Store basic registration data temporarily (30 minutes TTL) | |
| # NOTE: Password is NOT stored here - will be provided in step 2 | |
| full_name = f"{otp_request.first_name} {otp_request.last_name}" | |
| # Capture request metadata for security audit | |
| from datetime import datetime | |
| ip_address = request.client.host if request.client else "Unknown" | |
| timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") | |
| await otp_service.store_registration_data( | |
| identifier=otp_request.email, | |
| data={ | |
| "email": otp_request.email, | |
| "name": full_name, | |
| "first_name": otp_request.first_name, | |
| "last_name": otp_request.last_name, | |
| "phone": otp_request.phone, | |
| "role": "platform_admin", | |
| "registration_type": "platform_admin_otp", | |
| "ip_address": ip_address, | |
| "timestamp": timestamp | |
| }, | |
| ttl=1800 # 30 minutes | |
| ) | |
| # Generate and send OTP to admin email with registration context | |
| otp_result = await otp_service.send_otp( | |
| identifier=otp_request.email, # Storage key | |
| channel="email", | |
| recipient=PLATFORM_ADMIN_EMAIL, # Send to configured admin email | |
| purpose="Platform Admin Registration", | |
| db=db, | |
| additional_metadata={ | |
| "admin_registration": True, | |
| "registrant_name": full_name, | |
| "registrant_email": otp_request.email, | |
| "registrant_phone": otp_request.phone or "Not provided", | |
| "ip_address": ip_address, | |
| "timestamp": timestamp | |
| } | |
| ) | |
| # Audit log | |
| AuditService.log_auth_event( | |
| db=db, | |
| action='register', | |
| user_email=otp_request.email, | |
| success=False, # Not completed yet | |
| request=request, | |
| reason=f"OTP sent to {PLATFORM_ADMIN_EMAIL}" | |
| ) | |
| logger.info(f"Platform admin registration OTP sent for: {otp_request.email}") | |
| return { | |
| "message": f"✅ Registration request received! An OTP code has been sent to {PLATFORM_ADMIN_EMAIL} with your details (name, email, phone). " | |
| f"Once the admin verifies your identity, they will share the OTP code with you. Then use /auth/register with the OTP and your password to complete registration." | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"OTP send error: {str(e)}") | |
| AuditService.log_auth_event( | |
| db=db, | |
| action='register', | |
| user_email=otp_request.email, | |
| success=False, | |
| request=request, | |
| reason=f"OTP send failed: {str(e)}" | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Failed to send OTP: {str(e)}" | |
| ) | |
| # 5 registration attempts per hour per IP | |
| async def register( | |
| request: Request, | |
| response: Response, | |
| registration_data: AdminRegistrationRequest, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| 🔒 Step 2: Complete Platform Admin Registration with OTP and Password | |
| **USER-FRIENDLY FLOW:** | |
| After receiving OTP from admin, complete registration with password. | |
| **PROCESS:** | |
| 1. User previously called /auth/send-admin-otp with (name, email, phone) | |
| 2. Admin received OTP email with user details | |
| 3. Admin verified identity and shared OTP with user | |
| 4. User now submits: name, email, phone, password, OTP code | |
| 5. System verifies OTP and creates account | |
| 6. User is logged in immediately | |
| **REQUEST BODY:** | |
| ```json | |
| { | |
| "email": "admin@example.com", | |
| "first_name": "John", | |
| "last_name": "Doe", | |
| "phone": "+1234567890", | |
| "password": "SecurePass123!", | |
| "otp_code": "123456" | |
| } | |
| ``` | |
| **PASSWORD SECURITY:** | |
| - Send password as plain text over HTTPS (standard practice) | |
| - Backend hashes password using Supabase Auth (bcrypt) | |
| - Password must be 8+ chars with uppercase and digit | |
| **Returns:** Access token for immediate login | |
| """ | |
| from app.services.otp_service import OTPService | |
| try: | |
| # Get OTP service | |
| otp_service = OTPService.get_instance() | |
| # Verify OTP | |
| verification_result = otp_service.verify_otp( | |
| email=registration_data.email, | |
| phone=None, | |
| code=registration_data.otp_code, | |
| purpose="Platform Admin Registration", | |
| db=db | |
| ) | |
| if not verification_result['verified']: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"❌ Invalid or expired OTP. {verification_result.get('attempts_remaining', 0)} attempts remaining." | |
| ) | |
| # Retrieve stored registration data from step 1 | |
| stored_data = await otp_service.get_registration_data(registration_data.email) | |
| if not stored_data: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="❌ Registration data not found or expired. Please start the registration process again via /auth/send-admin-otp." | |
| ) | |
| # Verify registration type | |
| if stored_data.get('registration_type') != 'platform_admin_otp': | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid registration type" | |
| ) | |
| # Verify that submitted data matches stored data from step 1 | |
| stored_email = stored_data.get('email') | |
| stored_name = stored_data.get('name') | |
| submitted_name = f"{registration_data.first_name} {registration_data.last_name}" | |
| if stored_email != registration_data.email or stored_name != submitted_name: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="❌ Registration data mismatch. Please ensure you're using the same details from step 1." | |
| ) | |
| # Check if user already exists (safety check) | |
| existing_user = db.query(User).filter(User.email == registration_data.email).first() | |
| if existing_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email already registered" | |
| ) | |
| # Create Supabase auth user with password from current request | |
| full_name = f"{registration_data.first_name} {registration_data.last_name}" | |
| auth_response = await supabase_auth.sign_up( | |
| email=registration_data.email, | |
| password=registration_data.password, # Password provided in this step | |
| user_metadata={ | |
| "first_name": registration_data.first_name, | |
| "last_name": registration_data.last_name, | |
| "phone": registration_data.phone, | |
| "full_name": full_name | |
| } | |
| ) | |
| auth_user = auth_response["user"] | |
| session = auth_response["session"] | |
| # Create user record in our database | |
| new_user = User( | |
| id=auth_user.id, | |
| email=registration_data.email, | |
| name=full_name, | |
| phone=registration_data.phone, | |
| is_active=True, | |
| role="platform_admin", | |
| status="active" # Active after OTP verification | |
| ) | |
| db.add(new_user) | |
| db.commit() | |
| db.refresh(new_user) | |
| # Clean up stored registration data | |
| await otp_service.delete_registration_data(registration_data.email) | |
| # Audit log | |
| AuditService.log_action( | |
| db=db, | |
| action='create', | |
| entity_type='user', | |
| entity_id=str(new_user.id), | |
| description=f"Platform admin account created (OTP verified): {registration_data.email}", | |
| user=new_user, | |
| request=request, | |
| additional_metadata={ | |
| 'role': new_user.role, | |
| 'verification_method': 'otp_email' | |
| } | |
| ) | |
| logger.info(f"✅ Platform admin account created successfully: {registration_data.email}") | |
| return { | |
| "access_token": session.access_token, | |
| "token_type": "bearer", | |
| "user": { | |
| "id": str(new_user.id), | |
| "email": new_user.email, | |
| "name": new_user.name, | |
| "full_name": new_user.full_name, | |
| "is_active": new_user.is_active, | |
| "role": new_user.role | |
| } | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Registration error: {str(e)}") | |
| db.rollback() | |
| AuditService.log_auth_event( | |
| db=db, | |
| action='register', | |
| user_email=registration_data.email, | |
| success=False, | |
| request=request, | |
| reason=f"Registration failed: {str(e)}" | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to complete registration: {str(e)}" | |
| ) | |
| # 10 login attempts per minute per IP | |
| async def login(request: Request, response: Response, credentials: LoginRequest, db: Session = Depends(get_db)): | |
| """ | |
| Login with email and password via Supabase Auth | |
| - **email**: User's email address | |
| - **password**: User's password | |
| Returns access token for authenticated requests | |
| """ | |
| try: | |
| # Authenticate with Supabase Auth | |
| auth_response = await supabase_auth.sign_in( | |
| email=credentials.email, | |
| password=credentials.password | |
| ) | |
| auth_user = auth_response["user"] | |
| access_token = auth_response["access_token"] | |
| # PERFORMANCE: Query user with joinedload to get all data in ONE query | |
| # Use .options() instead of separate queries | |
| from sqlalchemy.orm import joinedload | |
| user = db.query(User).filter( | |
| User.id == auth_user.id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User profile not found" | |
| ) | |
| # Check if user is active | |
| if not user.is_active: | |
| # Audit failed login | |
| AuditService.log_auth_event( | |
| db=db, | |
| action='login', | |
| user_email=credentials.email, | |
| success=False, | |
| request=request, | |
| reason='Account inactive' | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Account is inactive. Please contact support." | |
| ) | |
| # Audit successful login | |
| AuditService.log_auth_event( | |
| db=db, | |
| action='login', | |
| user_email=credentials.email, | |
| success=True, | |
| request=request, | |
| user=user # Pass authenticated user for proper audit trail | |
| ) | |
| logger.info(f"User logged in successfully: {credentials.email}") | |
| # Get refresh token from auth response session object | |
| session = auth_response.get("session") | |
| refresh_token = session.refresh_token if session else None | |
| expires_in = session.expires_in if session else 3600 | |
| return { | |
| "access_token": access_token, | |
| "refresh_token": refresh_token, | |
| "expires_in": expires_in, | |
| "token_type": "bearer", | |
| "user": { | |
| "id": str(user.id), | |
| "email": user.email, | |
| "first_name": user.first_name, | |
| "last_name": user.last_name, | |
| "full_name": user.full_name, | |
| "role": user.role, | |
| "is_active": user.is_active | |
| } | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Login error: {str(e)}") | |
| # Audit failed login | |
| AuditService.log_auth_event( | |
| db=db, | |
| action='login', | |
| user_email=credentials.email, | |
| success=False, | |
| request=request, | |
| reason='Invalid credentials' | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect email or password" | |
| ) | |
| async def login_full( | |
| request: Request, | |
| response: Response, | |
| credentials: LoginRequest, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| 🚀 OPTIMIZED LOGIN - Returns everything in ONE request | |
| This endpoint combines: | |
| - /auth/login | |
| - /auth/me | |
| - /auth/me/preferences | |
| - /auth/me/preferences/available-apps | |
| **Reduces 4 API calls to 1 = 75% faster login** | |
| Use this instead of separate endpoints to reduce network latency. | |
| Especially important for high-latency connections (cross-region, mobile, etc.) | |
| """ | |
| try: | |
| # Step 1: Authenticate with Supabase | |
| auth_response = await supabase_auth.sign_in( | |
| email=credentials.email, | |
| password=credentials.password | |
| ) | |
| auth_user = auth_response["user"] | |
| access_token = auth_response["access_token"] | |
| session = auth_response.get("session") | |
| # Step 2: Get user from database | |
| user = db.query(User).filter( | |
| User.id == auth_user.id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user or not user.is_active: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Account is inactive or not found" | |
| ) | |
| # Step 3: Get preferences | |
| user_prefs = db.query(UserPreference).filter( | |
| UserPreference.user_id == user.id, | |
| UserPreference.deleted_at.is_(None) | |
| ).first() | |
| preferences = { | |
| "last_active_project_id": str(user_prefs.last_active_project_id) if user_prefs and user_prefs.last_active_project_id else None, | |
| "dashboard_widgets": user_prefs.dashboard_widgets if user_prefs else DEFAULT_DASHBOARD_WIDGETS, | |
| "favorite_apps": user_prefs.favorite_apps if user_prefs else get_default_favorites_for_role(user.role) | |
| } | |
| # Step 4: Get available apps | |
| available_apps = get_available_apps_for_role(user.role) | |
| # Step 4b: Get meta apps (apps available outside project context) | |
| meta_apps = get_meta_apps_for_role(user.role) | |
| # Step 5: Audit log (async, doesn't block response) | |
| AuditService.log_auth_event( | |
| db=db, | |
| action='login', | |
| user_email=credentials.email, | |
| success=True, | |
| request=request, | |
| user=user | |
| ) | |
| logger.info(f"User logged in successfully (full): {credentials.email}") | |
| # Return everything in one response | |
| return { | |
| "access_token": access_token, | |
| "refresh_token": session.refresh_token if session else None, | |
| "expires_in": session.expires_in if session else 3600, | |
| "token_type": "bearer", | |
| "user": { | |
| "id": str(user.id), | |
| "email": user.email, | |
| "first_name": user.first_name, | |
| "last_name": user.last_name, | |
| "full_name": user.full_name, | |
| "role": user.role, | |
| "is_active": user.is_active, | |
| "client_id": str(user.client_id) if user.client_id else None, | |
| "contractor_id": str(user.contractor_id) if user.contractor_id else None, | |
| }, | |
| "preferences": preferences, | |
| "available_apps": available_apps, | |
| "meta_apps": meta_apps | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Login error: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect email or password" | |
| ) | |
| async def refresh_access_token( | |
| request_data: RefreshTokenRequest, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Refresh access token using refresh token | |
| **Purpose:** Get a new access token without re-logging in | |
| **When to use:** | |
| - Access token expired (401 error) | |
| - Before token expires (recommended: 5 min before) | |
| - On app startup to check session validity | |
| **Request Body:** | |
| ```json | |
| { | |
| "refresh_token": "your-refresh-token-here" | |
| } | |
| ``` | |
| **Response:** New access_token and refresh_token (rotated for security) | |
| **Supabase Behavior:** | |
| - Refresh tokens are automatically rotated on each use | |
| - Old refresh token becomes invalid after rotation | |
| - Refresh tokens last 30 days by default (Supabase Free tier) | |
| **Error Handling:** | |
| - 401: Invalid or expired refresh token → redirect to login | |
| """ | |
| try: | |
| # Refresh session with Supabase | |
| auth_response = await supabase_auth.refresh_session(request_data.refresh_token) | |
| if not auth_response or not auth_response.get("session"): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Refresh token expired. Please log in again." | |
| ) | |
| session = auth_response["session"] | |
| auth_user = auth_response["user"] | |
| # Get user from database to verify they still exist and are active | |
| user = db.query(User).filter( | |
| User.id == auth_user.id, | |
| User.deleted_at == None | |
| ).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User account no longer exists" | |
| ) | |
| if not user.is_active: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Account is inactive. Contact support." | |
| ) | |
| logger.info(f"✅ Token refreshed successfully for: {user.email}") | |
| return { | |
| "access_token": session.access_token, | |
| "refresh_token": session.refresh_token, # New rotated token (Supabase rotates on each use) | |
| "expires_in": session.expires_in, | |
| "token_type": "bearer", | |
| "user": { | |
| "id": str(user.id), | |
| "email": user.email, | |
| "first_name": user.first_name, | |
| "last_name": user.last_name, | |
| "full_name": user.full_name, | |
| "role": user.role, | |
| "is_active": user.is_active | |
| } | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| logger.error(f"❌ Token refresh error: {str(e)}") | |
| # Parse Supabase error messages for better UX | |
| if "expired" in error_msg or "invalid" in error_msg: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Refresh token expired. Please log in again." | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=f"Token refresh failed: {str(e)}" | |
| ) | |
| async def get_current_user_profile( | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get current user's profile with project context | |
| Returns: | |
| - User profile data | |
| - primary_project: First assigned project (for single-project users) | |
| - assigned_projects: All projects user is a member of | |
| - last_active_project_id: User's last selected project (from user_preferences) | |
| Requires authentication token in header: | |
| Authorization: Bearer <token> | |
| """ | |
| from app.models.project import Project | |
| from app.models.project_team import ProjectTeam | |
| from app.models.user_preference import UserPreference | |
| from app.schemas.user import ProjectContext | |
| # Platform admins don't have project assignments | |
| # For platform_admin role, return empty project context | |
| if current_user.role == 'platform_admin': | |
| # Get profile photo URL for platform admin | |
| from app.models.user_document_link import UserDocumentLink | |
| from app.models.document import Document | |
| profile_photo_url = None | |
| doc_link = db.query(UserDocumentLink).filter( | |
| UserDocumentLink.user_id == current_user.id, | |
| UserDocumentLink.document_link_type == 'profile_photo' | |
| ).first() | |
| if doc_link: | |
| document = db.query(Document).filter( | |
| Document.id == doc_link.document_id, | |
| Document.deleted_at.is_(None) | |
| ).first() | |
| if document: | |
| profile_photo_url = document.file_url | |
| # Generate fresh signed URL for Supabase files | |
| if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): | |
| from app.integrations.supabase import SupabaseStorageService | |
| bucket = document.additional_metadata.get('bucket') | |
| path = document.additional_metadata.get('path') | |
| if bucket and path: | |
| profile_photo_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| profile_data = UserProfile( | |
| id=current_user.id, | |
| email=current_user.email, | |
| name=current_user.name, | |
| phone=current_user.phone, | |
| phone_alternate=current_user.phone_alternate, | |
| role=current_user.role, | |
| status=current_user.status, | |
| is_active=current_user.is_active, | |
| client_id=current_user.client_id, | |
| contractor_id=current_user.contractor_id, | |
| created_at=current_user.created_at, | |
| updated_at=current_user.updated_at, | |
| primary_project=None, | |
| assigned_projects=[], | |
| last_active_project_id=None, | |
| profile_photo_url=profile_photo_url | |
| ) | |
| return profile_data | |
| # Get user preferences | |
| user_prefs = db.query(UserPreference).filter( | |
| UserPreference.user_id == current_user.id, | |
| UserPreference.deleted_at.is_(None) | |
| ).first() | |
| last_active_project_id = user_prefs.last_active_project_id if user_prefs else None | |
| # Get all projects user is assigned to via project_team | |
| project_memberships = db.query(ProjectTeam).filter( | |
| ProjectTeam.user_id == current_user.id, | |
| ProjectTeam.deleted_at.is_(None), | |
| ProjectTeam.removed_at.is_(None) | |
| ).all() | |
| # Get project details | |
| project_ids = [pm.project_id for pm in project_memberships] | |
| projects = db.query(Project).filter( | |
| Project.id.in_(project_ids), | |
| Project.deleted_at.is_(None) | |
| ).all() if project_ids else [] | |
| # Build project context list | |
| assigned_projects = [ | |
| ProjectContext(id=p.id, title=p.title) | |
| for p in projects | |
| ] | |
| # Determine primary project | |
| primary_project = None | |
| if assigned_projects: | |
| # Priority: last_active_project_id > primary_manager_id > first project | |
| if last_active_project_id: | |
| primary_project = next( | |
| (p for p in assigned_projects if p.id == last_active_project_id), | |
| assigned_projects[0] | |
| ) | |
| else: | |
| # Check if user is primary manager of any project | |
| managed_project = next( | |
| (p for p in projects if p.primary_manager_id == current_user.id), | |
| None | |
| ) | |
| if managed_project: | |
| primary_project = ProjectContext(id=managed_project.id, title=managed_project.title) | |
| else: | |
| primary_project = assigned_projects[0] | |
| # Get profile photo URL | |
| from app.models.user_document_link import UserDocumentLink | |
| from app.models.document import Document | |
| profile_photo_url = None | |
| doc_link = db.query(UserDocumentLink).filter( | |
| UserDocumentLink.user_id == current_user.id, | |
| UserDocumentLink.document_link_type == 'profile_photo' | |
| ).first() | |
| if doc_link: | |
| document = db.query(Document).filter( | |
| Document.id == doc_link.document_id, | |
| Document.deleted_at.is_(None) | |
| ).first() | |
| if document: | |
| profile_photo_url = document.file_url | |
| # Generate fresh signed URL for Supabase files | |
| if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): | |
| from app.integrations.supabase import SupabaseStorageService | |
| bucket = document.additional_metadata.get('bucket') | |
| path = document.additional_metadata.get('path') | |
| if bucket and path: | |
| profile_photo_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) | |
| # Build response with Pydantic model | |
| profile_data = UserProfile( | |
| id=current_user.id, | |
| email=current_user.email, | |
| name=current_user.name, | |
| phone=current_user.phone, | |
| phone_alternate=current_user.phone_alternate, | |
| role=current_user.role, | |
| status=current_user.status, | |
| is_active=current_user.is_active, | |
| client_id=current_user.client_id, | |
| contractor_id=current_user.contractor_id, | |
| created_at=current_user.created_at, | |
| updated_at=current_user.updated_at, | |
| primary_project=primary_project, | |
| assigned_projects=assigned_projects, | |
| last_active_project_id=last_active_project_id, | |
| profile_photo_url=profile_photo_url | |
| ) | |
| return profile_data | |
| async def update_profile( | |
| profile_data: UserUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update current user's profile | |
| - **first_name**: Update first name | |
| - **last_name**: Update last name | |
| - **phone**: Update phone number | |
| Requires authentication token | |
| """ | |
| # Track changes for audit | |
| changes = {'old': {}, 'new': {}} | |
| # Update name if first_name or last_name provided | |
| if profile_data.first_name is not None or profile_data.last_name is not None: | |
| # Get current name parts | |
| current_first = current_user.first_name | |
| current_last = current_user.last_name | |
| # Use new values or keep current | |
| new_first = profile_data.first_name if profile_data.first_name is not None else current_first | |
| new_last = profile_data.last_name if profile_data.last_name is not None else current_last | |
| # Update full name | |
| old_name = current_user.name | |
| current_user.name = f"{new_first} {new_last}".strip() | |
| changes['old']['name'] = old_name | |
| changes['new']['name'] = current_user.name | |
| if profile_data.phone is not None: | |
| # Check if phone is already taken by another user | |
| existing_phone = db.query(User).filter( | |
| User.phone == profile_data.phone, | |
| User.id != current_user.id | |
| ).first() | |
| if existing_phone: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Phone number already in use" | |
| ) | |
| changes['old']['phone'] = current_user.phone | |
| current_user.phone = profile_data.phone | |
| changes['new']['phone'] = profile_data.phone | |
| db.commit() | |
| db.refresh(current_user) | |
| # Audit profile update | |
| if changes['old']: # Only log if there were actual changes | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user', | |
| entity_id=str(current_user.id), | |
| description=f"User updated profile: {current_user.email}", | |
| user=current_user, | |
| request=request, | |
| changes=changes | |
| ) | |
| return current_user | |
| # 5 password changes per hour per user | |
| async def change_password( | |
| request: Request, | |
| response: Response, | |
| password_data: PasswordChange, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Change user's password via Supabase Auth | |
| - **current_password**: Current password for verification | |
| - **new_password**: New password (min 8 chars, 1 digit, 1 uppercase) | |
| Requires authentication token | |
| Note: This endpoint requires re-authentication with current password | |
| """ | |
| try: | |
| # Verify current password by attempting to sign in | |
| await supabase_auth.sign_in( | |
| email=current_user.email, | |
| password=password_data.current_password | |
| ) | |
| # Password is correct, now update to new password using Supabase Admin API | |
| supabase_admin.auth.admin.update_user_by_id( | |
| str(current_user.id), | |
| {"password": password_data.new_password} | |
| ) | |
| # Audit password change | |
| AuditService.log_action( | |
| db=db, | |
| action='update', # Using 'update' as password_change is not in AuditAction enum | |
| entity_type='user', | |
| entity_id=str(current_user.id), | |
| description=f"User changed password: {current_user.email}", | |
| user=current_user, | |
| request=request | |
| ) | |
| logger.info(f"Password changed for user: {current_user.email}") | |
| return {"message": "Password changed successfully. Please login again with your new password."} | |
| except Exception as e: | |
| logger.error(f"Password change error: {str(e)}") | |
| # Audit failed password change | |
| AuditService.log_action( | |
| db=db, | |
| action='update', # Using 'update' as password_change_failed is not in AuditAction enum | |
| entity_type='user', | |
| entity_id=str(current_user.id), | |
| description=f"Failed password change attempt: {current_user.email}", | |
| user=current_user, | |
| request=request, | |
| additional_metadata={'reason': str(e), 'status': 'failed'} | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Current password is incorrect or password change failed" | |
| ) | |
| # 3 password reset requests per hour per IP | |
| async def forgot_password( | |
| request: Request, | |
| response: Response, | |
| request_data: ForgotPasswordRequest, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Request password reset email | |
| - **email**: User's email address | |
| Sends password reset link to email if account exists. | |
| Always returns success to prevent email enumeration attacks. | |
| """ | |
| password_reset_service = PasswordResetService() | |
| result = await password_reset_service.request_password_reset( | |
| email=request_data.email, | |
| db=db | |
| ) | |
| # Audit password reset request | |
| AuditService.log_action( | |
| db=db, | |
| action='create', # Valid AuditAction enum value (creating a reset request) | |
| entity_type='auth', | |
| description=f"Password reset requested for: {request_data.email}", | |
| request=request, | |
| additional_metadata={'email': request_data.email} | |
| ) | |
| return MessageResponse(message=result['message']) | |
| # 5 password reset attempts per hour per IP | |
| async def reset_password( | |
| request: Request, | |
| response: Response, | |
| reset_data: ResetPasswordRequest, | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Reset password using token from email | |
| - **token**: Password reset token from email | |
| - **new_password**: New password (min 8 chars, 1 digit, 1 uppercase) | |
| Resets password if token is valid and not expired. | |
| """ | |
| password_reset_service = PasswordResetService() | |
| try: | |
| result = await password_reset_service.reset_password( | |
| token=reset_data.token, | |
| new_password=reset_data.new_password, | |
| db=db | |
| ) | |
| # Audit successful password reset | |
| AuditService.log_action( | |
| db=db, | |
| action='update', # Valid AuditAction enum value (updating password) | |
| entity_type='auth', | |
| description=f"Password reset completed using token", | |
| request=request, | |
| additional_metadata={'token_prefix': reset_data.token[:8]} | |
| ) | |
| return MessageResponse(message=result['message']) | |
| except HTTPException as e: | |
| # Audit failed password reset | |
| AuditService.log_action( | |
| db=db, | |
| action='update', # Valid AuditAction enum value (failed update attempt) | |
| entity_type='auth', | |
| description=f"Password reset failed: {e.detail}", | |
| request=request, | |
| additional_metadata={ | |
| 'token_prefix': reset_data.token[:8], | |
| 'reason': e.detail | |
| } | |
| ) | |
| raise | |
| async def logout( | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Logout current user | |
| Requires authentication token. | |
| Note: With JWT tokens, actual logout is handled client-side by removing the token. | |
| This endpoint is for audit logging purposes. | |
| """ | |
| # Audit logout | |
| AuditService.log_auth_event( | |
| db=db, | |
| action='logout', | |
| user_email=current_user.email, | |
| success=True, | |
| request=request | |
| ) | |
| logger.info(f"User logged out: {current_user.email}") | |
| return MessageResponse(message="Logged out successfully") | |
| # ============================================ | |
| # USER PREFERENCES ENDPOINTS | |
| # ============================================ | |
| async def get_my_preferences( | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get current user's preferences from user_preferences table | |
| Returns user preferences with role-based defaults if preferences don't exist yet. | |
| Automatically creates preferences record if it doesn't exist (via database trigger). | |
| """ | |
| # Get or create preferences | |
| preferences = db.query(UserPreference).filter( | |
| UserPreference.user_id == current_user.id, | |
| UserPreference.deleted_at == None | |
| ).first() | |
| # If no preferences exist, create with role-based defaults | |
| if not preferences: | |
| default_favorites = get_default_favorites_for_role(current_user.role) | |
| preferences = UserPreference( | |
| user_id=current_user.id, | |
| favorite_apps=default_favorites, | |
| dashboard_widgets=DEFAULT_DASHBOARD_WIDGETS.get(current_user.role, ['recent_tickets', 'team_performance', 'sla_metrics']), | |
| theme='light', | |
| language='en' | |
| ) | |
| db.add(preferences) | |
| db.commit() | |
| db.refresh(preferences) | |
| logger.info(f"✅ Created default preferences for user: {current_user.email} (role: {current_user.role})") | |
| return UserPreferencesResponse.from_orm(preferences) | |
| async def update_my_preferences( | |
| preferences_data: UserPreferencesUpdate, | |
| request: Request, | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Update current user's preferences in user_preferences table | |
| Updates user preferences including favorite apps, theme, language, and notification settings. | |
| Favorite apps are validated against role-specific available apps (max 6). | |
| """ | |
| # Get or create preferences | |
| preferences = db.query(UserPreference).filter( | |
| UserPreference.user_id == current_user.id, | |
| UserPreference.deleted_at == None | |
| ).first() | |
| if not preferences: | |
| # Create new preferences record with role-based defaults | |
| default_favorites = get_default_favorites_for_role(current_user.role) | |
| preferences = UserPreference( | |
| user_id=current_user.id, | |
| favorite_apps=default_favorites, | |
| dashboard_widgets=DEFAULT_DASHBOARD_WIDGETS.get(current_user.role, []) | |
| ) | |
| db.add(preferences) | |
| db.flush() | |
| # Track changes for audit | |
| changes = {'old': {}, 'new': {}} | |
| # Update only provided fields | |
| update_data = preferences_data.dict(exclude_unset=True) | |
| # Validate favorite apps if provided | |
| if 'favorite_apps' in update_data and update_data['favorite_apps'] is not None: | |
| # Check max limit (6 apps) | |
| if len(update_data['favorite_apps']) > 6: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Maximum 6 favorite apps allowed" | |
| ) | |
| # Validate against role-specific available apps | |
| is_valid, invalid_apps = validate_apps_for_role(update_data['favorite_apps'], current_user.role) | |
| if not is_valid: | |
| available_apps = get_available_app_codes_for_role(current_user.role) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=f"Invalid apps for {current_user.role}: {', '.join(invalid_apps)}. " | |
| f"Available apps: {', '.join(available_apps)}" | |
| ) | |
| changes['old']['favorite_apps'] = preferences.favorite_apps | |
| changes['new']['favorite_apps'] = update_data['favorite_apps'] | |
| preferences.favorite_apps = update_data['favorite_apps'] | |
| # Validate last_active_project_id if provided | |
| if 'last_active_project_id' in update_data: | |
| from app.models.project import Project | |
| from app.models.project_team import ProjectTeam | |
| new_project_id = update_data['last_active_project_id'] | |
| # Platform admins don't have project context | |
| if current_user.role == 'platform_admin' and new_project_id is not None: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Platform admins cannot set active project (no project assignments)" | |
| ) | |
| if new_project_id is not None: | |
| # Verify user is actually assigned to this project | |
| is_member = db.query(ProjectTeam).filter( | |
| ProjectTeam.user_id == current_user.id, | |
| ProjectTeam.project_id == new_project_id, | |
| ProjectTeam.deleted_at.is_(None), | |
| ProjectTeam.removed_at.is_(None) | |
| ).first() | |
| if not is_member: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You are not assigned to this project" | |
| ) | |
| # Verify project exists and is active | |
| project = db.query(Project).filter( | |
| Project.id == new_project_id, | |
| Project.deleted_at.is_(None) | |
| ).first() | |
| if not project: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Project not found" | |
| ) | |
| changes['old']['last_active_project_id'] = str(preferences.last_active_project_id) if preferences.last_active_project_id else None | |
| changes['new']['last_active_project_id'] = str(new_project_id) if new_project_id else None | |
| preferences.last_active_project_id = new_project_id | |
| # Update other fields | |
| for field, value in update_data.items(): | |
| if field not in ['favorite_apps', 'last_active_project_id'] and value is not None: | |
| old_value = getattr(preferences, field, None) | |
| if old_value != value: | |
| changes['old'][field] = old_value | |
| changes['new'][field] = value | |
| setattr(preferences, field, value) | |
| db.commit() | |
| db.refresh(preferences) | |
| # Audit log | |
| if changes['old']: | |
| AuditService.log_action( | |
| db=db, | |
| action='update', | |
| entity_type='user_preferences', | |
| entity_id=str(preferences.id), | |
| description=f"User updated preferences: {current_user.email}", | |
| user=current_user, | |
| request=request, | |
| changes=changes | |
| ) | |
| logger.info(f"Preferences updated for user: {current_user.email}") | |
| return UserPreferencesResponse.from_orm(preferences) | |
| async def get_available_apps_for_user( | |
| current_user: User = Depends(get_current_active_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get list of apps available for user to favorite based on their role | |
| **Returns:** | |
| - Current favorite app codes | |
| - All available apps with full metadata (name, icon, route, etc.) | |
| - Default favorites for the role | |
| - Maximum favorites allowed (6) | |
| **Use Cases:** | |
| - Populate app picker in settings UI | |
| - Show which apps can be added/removed from favorites | |
| - Display role-appropriate app options with icons and descriptions | |
| """ | |
| # Get current preferences | |
| preferences = db.query(UserPreference).filter( | |
| UserPreference.user_id == current_user.id, | |
| UserPreference.deleted_at == None | |
| ).first() | |
| # Get current favorites or defaults | |
| current_favorites = preferences.favorite_apps if preferences else get_default_favorites_for_role(current_user.role) | |
| # Get available apps with full metadata | |
| available_app_objects = get_available_apps_for_role(current_user.role) | |
| available_apps_detail = [app.to_dict() for app in available_app_objects] | |
| # Get default favorites | |
| default_favorites = get_default_favorites_for_role(current_user.role) | |
| # Get meta apps (apps available outside project context) | |
| meta_apps = get_meta_apps_for_role(current_user.role) | |
| return { | |
| "role": current_user.role, | |
| "current_favorites": current_favorites, | |
| "available_apps": available_apps_detail, # Full app metadata | |
| "default_favorites": default_favorites, | |
| "meta_apps": meta_apps, # Apps available outside project context | |
| "max_favorites": 6 | |
| } | |
| async def get_all_apps_for_user( | |
| current_user: User = Depends(get_current_active_user) | |
| ): | |
| """ | |
| Get ALL apps in the system with role-based access information | |
| **Returns:** | |
| - All apps with full metadata (name, description, icon, route, category) | |
| - User's role and which apps they can access | |
| - Categorized app groupings (Core, Operations, Sales, Team, Finance, Settings) | |
| **Use Cases:** | |
| - Render main navigation menu with all accessible apps | |
| - Show app launcher/drawer with categories | |
| - Display app directory or marketplace | |
| - Generate sitemap for user's accessible routes | |
| **Frontend Integration:** | |
| - Filter apps by `has_access: true` to show only accessible apps | |
| - Group apps by `category` for organized navigation | |
| - Use `icon` and `route` to render navigation items | |
| - Show disabled state for apps where `has_access: false` | |
| """ | |
| from app.config.apps import get_all_apps, AppCategory | |
| # Get all apps in the system | |
| all_apps = get_all_apps() | |
| # Get apps user has access to | |
| accessible_app_codes = set(get_available_app_codes_for_role(current_user.role)) | |
| # Build response with access information | |
| apps_with_access = [] | |
| for app in all_apps: | |
| app_dict = app.to_dict() | |
| app_dict['has_access'] = app.code in accessible_app_codes | |
| apps_with_access.append(app_dict) | |
| # Group by category | |
| apps_by_category = {} | |
| for category in AppCategory: | |
| apps_by_category[category.value] = [ | |
| app for app in apps_with_access | |
| if app['category'] == category.value | |
| ] | |
| return { | |
| "user_role": current_user.role, | |
| "total_apps": len(all_apps), | |
| "accessible_apps": len(accessible_app_codes), | |
| "apps": apps_with_access, | |
| "apps_by_category": apps_by_category, | |
| "categories": [cat.value for cat in AppCategory] | |
| } | |