""" Authentication Endpoints - Supabase Auth Integration """ from fastapi import APIRouter, Depends, HTTPException, status, Request, Response from sqlalchemy.orm import Session from app.api.deps import get_db, get_current_active_user from app.core.rate_limit import limiter from app.schemas.auth import ( LoginRequest, TokenResponse, PasswordChange, RefreshTokenRequest, ForgotPasswordRequest, ResetPasswordRequest, MessageResponse ) from app.schemas.user import ( UserCreate, UserResponse, UserUpdate, UserProfile, AdminOTPRequest, AdminRegistrationRequest ) from app.schemas.user_preferences import ( UserPreferencesUpdate, UserPreferencesResponse, DEFAULT_DASHBOARD_WIDGETS ) from app.config.apps import ( get_available_apps_for_role, get_available_app_codes_for_role, get_default_favorites_for_role, get_meta_apps_for_role, validate_apps_for_role, get_app_by_code, APPS ) from app.models.user_preference import UserPreference from app.models.user import User from app.core.supabase_auth import supabase_auth from app.core.supabase_client import supabase_admin from app.services.audit_service import AuditService from app.services.password_reset_service import PasswordResetService import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/auth", tags=["Authentication"]) @router.post("/send-admin-otp", response_model=MessageResponse, status_code=status.HTTP_200_OK) @limiter.limit("3/hour") # 3 OTP requests per hour per IP async def send_admin_registration_otp( request: Request, response: Response, otp_request: AdminOTPRequest, db: Session = Depends(get_db) ): """ 🔒 Step 1: Send OTP for Platform Admin Registration **USER-FRIENDLY FLOW:** User provides basic info (name, email, phone) → Admin receives OTP with details **PROCESS:** 1. User submits: name, email, phone (NO PASSWORD YET) 2. System sends OTP to configured admin email 3. OTP email includes: registrant's name, email, phone 4. Admin verifies identity offline and shares OTP with user 5. User proceeds to /auth/register with OTP + password **SECURITY MODEL:** - Self-registration DISABLED for regular users (must use invitation flow) - Platform admin creation requires OTP verification - Admin email configured via PLATFORM_ADMIN_EMAIL environment variable - Password is NOT sent or stored at this stage **Request Body:** ```json { "email": "admin@example.com", "first_name": "John", "last_name": "Doe", "phone": "+1234567890" } ``` **Note:** Regular users (field agents, managers, etc.) must be invited by existing admins. """ from app.config import settings from app.services.otp_service import OTPService PLATFORM_ADMIN_EMAIL = settings.PLATFORM_ADMIN_EMAIL try: # Check if user already exists existing_user = db.query(User).filter(User.email == otp_request.email).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Get OTP service otp_service = OTPService.get_instance() # Store basic registration data temporarily (30 minutes TTL) # NOTE: Password is NOT stored here - will be provided in step 2 full_name = f"{otp_request.first_name} {otp_request.last_name}" # Capture request metadata for security audit from datetime import datetime ip_address = request.client.host if request.client else "Unknown" timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC") await otp_service.store_registration_data( identifier=otp_request.email, data={ "email": otp_request.email, "name": full_name, "first_name": otp_request.first_name, "last_name": otp_request.last_name, "phone": otp_request.phone, "role": "platform_admin", "registration_type": "platform_admin_otp", "ip_address": ip_address, "timestamp": timestamp }, ttl=1800 # 30 minutes ) # Generate and send OTP to admin email with registration context otp_result = await otp_service.send_otp( identifier=otp_request.email, # Storage key channel="email", recipient=PLATFORM_ADMIN_EMAIL, # Send to configured admin email purpose="Platform Admin Registration", db=db, additional_metadata={ "admin_registration": True, "registrant_name": full_name, "registrant_email": otp_request.email, "registrant_phone": otp_request.phone or "Not provided", "ip_address": ip_address, "timestamp": timestamp } ) # Audit log AuditService.log_auth_event( db=db, action='register', user_email=otp_request.email, success=False, # Not completed yet request=request, reason=f"OTP sent to {PLATFORM_ADMIN_EMAIL}" ) logger.info(f"Platform admin registration OTP sent for: {otp_request.email}") return { "message": f"✅ Registration request received! An OTP code has been sent to {PLATFORM_ADMIN_EMAIL} with your details (name, email, phone). " f"Once the admin verifies your identity, they will share the OTP code with you. Then use /auth/register with the OTP and your password to complete registration." } except HTTPException: raise except Exception as e: logger.error(f"OTP send error: {str(e)}") AuditService.log_auth_event( db=db, action='register', user_email=otp_request.email, success=False, request=request, reason=f"OTP send failed: {str(e)}" ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Failed to send OTP: {str(e)}" ) @router.post("/register", response_model=TokenResponse, status_code=status.HTTP_201_CREATED) @limiter.limit("5/hour") # 5 registration attempts per hour per IP async def register( request: Request, response: Response, registration_data: AdminRegistrationRequest, db: Session = Depends(get_db) ): """ 🔒 Step 2: Complete Platform Admin Registration with OTP and Password **USER-FRIENDLY FLOW:** After receiving OTP from admin, complete registration with password. **PROCESS:** 1. User previously called /auth/send-admin-otp with (name, email, phone) 2. Admin received OTP email with user details 3. Admin verified identity and shared OTP with user 4. User now submits: name, email, phone, password, OTP code 5. System verifies OTP and creates account 6. User is logged in immediately **REQUEST BODY:** ```json { "email": "admin@example.com", "first_name": "John", "last_name": "Doe", "phone": "+1234567890", "password": "SecurePass123!", "otp_code": "123456" } ``` **PASSWORD SECURITY:** - Send password as plain text over HTTPS (standard practice) - Backend hashes password using Supabase Auth (bcrypt) - Password must be 8+ chars with uppercase and digit **Returns:** Access token for immediate login """ from app.services.otp_service import OTPService try: # Get OTP service otp_service = OTPService.get_instance() # Verify OTP verification_result = otp_service.verify_otp( email=registration_data.email, phone=None, code=registration_data.otp_code, purpose="Platform Admin Registration", db=db ) if not verification_result['verified']: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"❌ Invalid or expired OTP. {verification_result.get('attempts_remaining', 0)} attempts remaining." ) # Retrieve stored registration data from step 1 stored_data = await otp_service.get_registration_data(registration_data.email) if not stored_data: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="❌ Registration data not found or expired. Please start the registration process again via /auth/send-admin-otp." ) # Verify registration type if stored_data.get('registration_type') != 'platform_admin_otp': raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid registration type" ) # Verify that submitted data matches stored data from step 1 stored_email = stored_data.get('email') stored_name = stored_data.get('name') submitted_name = f"{registration_data.first_name} {registration_data.last_name}" if stored_email != registration_data.email or stored_name != submitted_name: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="❌ Registration data mismatch. Please ensure you're using the same details from step 1." ) # Check if user already exists (safety check) existing_user = db.query(User).filter(User.email == registration_data.email).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Create Supabase auth user with password from current request full_name = f"{registration_data.first_name} {registration_data.last_name}" auth_response = await supabase_auth.sign_up( email=registration_data.email, password=registration_data.password, # Password provided in this step user_metadata={ "first_name": registration_data.first_name, "last_name": registration_data.last_name, "phone": registration_data.phone, "full_name": full_name } ) auth_user = auth_response["user"] session = auth_response["session"] # Create user record in our database new_user = User( id=auth_user.id, email=registration_data.email, name=full_name, phone=registration_data.phone, is_active=True, role="platform_admin", status="active" # Active after OTP verification ) db.add(new_user) db.commit() db.refresh(new_user) # Clean up stored registration data await otp_service.delete_registration_data(registration_data.email) # Audit log AuditService.log_action( db=db, action='create', entity_type='user', entity_id=str(new_user.id), description=f"Platform admin account created (OTP verified): {registration_data.email}", user=new_user, request=request, additional_metadata={ 'role': new_user.role, 'verification_method': 'otp_email' } ) logger.info(f"✅ Platform admin account created successfully: {registration_data.email}") return { "access_token": session.access_token, "token_type": "bearer", "user": { "id": str(new_user.id), "email": new_user.email, "name": new_user.name, "full_name": new_user.full_name, "is_active": new_user.is_active, "role": new_user.role } } except HTTPException: raise except Exception as e: logger.error(f"Registration error: {str(e)}") db.rollback() AuditService.log_auth_event( db=db, action='register', user_email=registration_data.email, success=False, request=request, reason=f"Registration failed: {str(e)}" ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to complete registration: {str(e)}" ) @router.post("/login", response_model=TokenResponse) @limiter.limit("10/minute") # 10 login attempts per minute per IP async def login(request: Request, response: Response, credentials: LoginRequest, db: Session = Depends(get_db)): """ Login with email and password via Supabase Auth - **email**: User's email address - **password**: User's password Returns access token for authenticated requests """ try: # Authenticate with Supabase Auth auth_response = await supabase_auth.sign_in( email=credentials.email, password=credentials.password ) auth_user = auth_response["user"] access_token = auth_response["access_token"] # PERFORMANCE: Query user with joinedload to get all data in ONE query # Use .options() instead of separate queries from sqlalchemy.orm import joinedload user = db.query(User).filter( User.id == auth_user.id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User profile not found" ) # Check if user is active if not user.is_active: # Audit failed login AuditService.log_auth_event( db=db, action='login', user_email=credentials.email, success=False, request=request, reason='Account inactive' ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Account is inactive. Please contact support." ) # Audit successful login AuditService.log_auth_event( db=db, action='login', user_email=credentials.email, success=True, request=request, user=user # Pass authenticated user for proper audit trail ) logger.info(f"User logged in successfully: {credentials.email}") # Get refresh token from auth response session object session = auth_response.get("session") refresh_token = session.refresh_token if session else None expires_in = session.expires_in if session else 3600 return { "access_token": access_token, "refresh_token": refresh_token, "expires_in": expires_in, "token_type": "bearer", "user": { "id": str(user.id), "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "full_name": user.full_name, "role": user.role, "is_active": user.is_active } } except HTTPException: raise except Exception as e: logger.error(f"Login error: {str(e)}") # Audit failed login AuditService.log_auth_event( db=db, action='login', user_email=credentials.email, success=False, request=request, reason='Invalid credentials' ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password" ) @router.post("/login-full", response_model=dict) @limiter.limit("10/minute") async def login_full( request: Request, response: Response, credentials: LoginRequest, db: Session = Depends(get_db) ): """ 🚀 OPTIMIZED LOGIN - Returns everything in ONE request This endpoint combines: - /auth/login - /auth/me - /auth/me/preferences - /auth/me/preferences/available-apps **Reduces 4 API calls to 1 = 75% faster login** Use this instead of separate endpoints to reduce network latency. Especially important for high-latency connections (cross-region, mobile, etc.) """ try: # Step 1: Authenticate with Supabase auth_response = await supabase_auth.sign_in( email=credentials.email, password=credentials.password ) auth_user = auth_response["user"] access_token = auth_response["access_token"] session = auth_response.get("session") # Step 2: Get user from database user = db.query(User).filter( User.id == auth_user.id, User.deleted_at == None ).first() if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Account is inactive or not found" ) # Step 3: Get preferences user_prefs = db.query(UserPreference).filter( UserPreference.user_id == user.id, UserPreference.deleted_at.is_(None) ).first() preferences = { "last_active_project_id": str(user_prefs.last_active_project_id) if user_prefs and user_prefs.last_active_project_id else None, "dashboard_widgets": user_prefs.dashboard_widgets if user_prefs else DEFAULT_DASHBOARD_WIDGETS, "favorite_apps": user_prefs.favorite_apps if user_prefs else get_default_favorites_for_role(user.role) } # Step 4: Get available apps available_apps = get_available_apps_for_role(user.role) # Step 4b: Get meta apps (apps available outside project context) meta_apps = get_meta_apps_for_role(user.role) # Step 5: Audit log (async, doesn't block response) AuditService.log_auth_event( db=db, action='login', user_email=credentials.email, success=True, request=request, user=user ) logger.info(f"User logged in successfully (full): {credentials.email}") # Return everything in one response return { "access_token": access_token, "refresh_token": session.refresh_token if session else None, "expires_in": session.expires_in if session else 3600, "token_type": "bearer", "user": { "id": str(user.id), "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "full_name": user.full_name, "role": user.role, "is_active": user.is_active, "client_id": str(user.client_id) if user.client_id else None, "contractor_id": str(user.contractor_id) if user.contractor_id else None, }, "preferences": preferences, "available_apps": available_apps, "meta_apps": meta_apps } except HTTPException: raise except Exception as e: logger.error(f"Login error: {str(e)}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password" ) @router.post("/refresh-token", response_model=TokenResponse) async def refresh_access_token( request_data: RefreshTokenRequest, db: Session = Depends(get_db) ): """ Refresh access token using refresh token **Purpose:** Get a new access token without re-logging in **When to use:** - Access token expired (401 error) - Before token expires (recommended: 5 min before) - On app startup to check session validity **Request Body:** ```json { "refresh_token": "your-refresh-token-here" } ``` **Response:** New access_token and refresh_token (rotated for security) **Supabase Behavior:** - Refresh tokens are automatically rotated on each use - Old refresh token becomes invalid after rotation - Refresh tokens last 30 days by default (Supabase Free tier) **Error Handling:** - 401: Invalid or expired refresh token → redirect to login """ try: # Refresh session with Supabase auth_response = await supabase_auth.refresh_session(request_data.refresh_token) if not auth_response or not auth_response.get("session"): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Refresh token expired. Please log in again." ) session = auth_response["session"] auth_user = auth_response["user"] # Get user from database to verify they still exist and are active user = db.query(User).filter( User.id == auth_user.id, User.deleted_at == None ).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User account no longer exists" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Account is inactive. Contact support." ) logger.info(f"✅ Token refreshed successfully for: {user.email}") return { "access_token": session.access_token, "refresh_token": session.refresh_token, # New rotated token (Supabase rotates on each use) "expires_in": session.expires_in, "token_type": "bearer", "user": { "id": str(user.id), "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "full_name": user.full_name, "role": user.role, "is_active": user.is_active } } except HTTPException: raise except Exception as e: error_msg = str(e).lower() logger.error(f"❌ Token refresh error: {str(e)}") # Parse Supabase error messages for better UX if "expired" in error_msg or "invalid" in error_msg: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Refresh token expired. Please log in again." ) else: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Token refresh failed: {str(e)}" ) @router.get("/me", response_model=UserProfile) async def get_current_user_profile( current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get current user's profile with project context Returns: - User profile data - primary_project: First assigned project (for single-project users) - assigned_projects: All projects user is a member of - last_active_project_id: User's last selected project (from user_preferences) Requires authentication token in header: Authorization: Bearer """ from app.models.project import Project from app.models.project_team import ProjectTeam from app.models.user_preference import UserPreference from app.schemas.user import ProjectContext # Platform admins don't have project assignments # For platform_admin role, return empty project context if current_user.role == 'platform_admin': # Get profile photo URL for platform admin from app.models.user_document_link import UserDocumentLink from app.models.document import Document profile_photo_url = None doc_link = db.query(UserDocumentLink).filter( UserDocumentLink.user_id == current_user.id, UserDocumentLink.document_link_type == 'profile_photo' ).first() if doc_link: document = db.query(Document).filter( Document.id == doc_link.document_id, Document.deleted_at.is_(None) ).first() if document: profile_photo_url = document.file_url # Generate fresh signed URL for Supabase files if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): from app.integrations.supabase import SupabaseStorageService bucket = document.additional_metadata.get('bucket') path = document.additional_metadata.get('path') if bucket and path: profile_photo_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) profile_data = UserProfile( id=current_user.id, email=current_user.email, name=current_user.name, phone=current_user.phone, phone_alternate=current_user.phone_alternate, role=current_user.role, status=current_user.status, is_active=current_user.is_active, client_id=current_user.client_id, contractor_id=current_user.contractor_id, created_at=current_user.created_at, updated_at=current_user.updated_at, primary_project=None, assigned_projects=[], last_active_project_id=None, profile_photo_url=profile_photo_url ) return profile_data # Get user preferences user_prefs = db.query(UserPreference).filter( UserPreference.user_id == current_user.id, UserPreference.deleted_at.is_(None) ).first() last_active_project_id = user_prefs.last_active_project_id if user_prefs else None # Get all projects user is assigned to via project_team project_memberships = db.query(ProjectTeam).filter( ProjectTeam.user_id == current_user.id, ProjectTeam.deleted_at.is_(None), ProjectTeam.removed_at.is_(None) ).all() # Get project details project_ids = [pm.project_id for pm in project_memberships] projects = db.query(Project).filter( Project.id.in_(project_ids), Project.deleted_at.is_(None) ).all() if project_ids else [] # Build project context list assigned_projects = [ ProjectContext(id=p.id, title=p.title) for p in projects ] # Determine primary project primary_project = None if assigned_projects: # Priority: last_active_project_id > primary_manager_id > first project if last_active_project_id: primary_project = next( (p for p in assigned_projects if p.id == last_active_project_id), assigned_projects[0] ) else: # Check if user is primary manager of any project managed_project = next( (p for p in projects if p.primary_manager_id == current_user.id), None ) if managed_project: primary_project = ProjectContext(id=managed_project.id, title=managed_project.title) else: primary_project = assigned_projects[0] # Get profile photo URL from app.models.user_document_link import UserDocumentLink from app.models.document import Document profile_photo_url = None doc_link = db.query(UserDocumentLink).filter( UserDocumentLink.user_id == current_user.id, UserDocumentLink.document_link_type == 'profile_photo' ).first() if doc_link: document = db.query(Document).filter( Document.id == doc_link.document_id, Document.deleted_at.is_(None) ).first() if document: profile_photo_url = document.file_url # Generate fresh signed URL for Supabase files if document.storage_provider == 'supabase' and document.file_url.startswith('supabase://'): from app.integrations.supabase import SupabaseStorageService bucket = document.additional_metadata.get('bucket') path = document.additional_metadata.get('path') if bucket and path: profile_photo_url = SupabaseStorageService.get_signed_url(bucket, path, 3600) # Build response with Pydantic model profile_data = UserProfile( id=current_user.id, email=current_user.email, name=current_user.name, phone=current_user.phone, phone_alternate=current_user.phone_alternate, role=current_user.role, status=current_user.status, is_active=current_user.is_active, client_id=current_user.client_id, contractor_id=current_user.contractor_id, created_at=current_user.created_at, updated_at=current_user.updated_at, primary_project=primary_project, assigned_projects=assigned_projects, last_active_project_id=last_active_project_id, profile_photo_url=profile_photo_url ) return profile_data @router.put("/me", response_model=UserProfile) async def update_profile( profile_data: UserUpdate, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Update current user's profile - **first_name**: Update first name - **last_name**: Update last name - **phone**: Update phone number Requires authentication token """ # Track changes for audit changes = {'old': {}, 'new': {}} # Update name if first_name or last_name provided if profile_data.first_name is not None or profile_data.last_name is not None: # Get current name parts current_first = current_user.first_name current_last = current_user.last_name # Use new values or keep current new_first = profile_data.first_name if profile_data.first_name is not None else current_first new_last = profile_data.last_name if profile_data.last_name is not None else current_last # Update full name old_name = current_user.name current_user.name = f"{new_first} {new_last}".strip() changes['old']['name'] = old_name changes['new']['name'] = current_user.name if profile_data.phone is not None: # Check if phone is already taken by another user existing_phone = db.query(User).filter( User.phone == profile_data.phone, User.id != current_user.id ).first() if existing_phone: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Phone number already in use" ) changes['old']['phone'] = current_user.phone current_user.phone = profile_data.phone changes['new']['phone'] = profile_data.phone db.commit() db.refresh(current_user) # Audit profile update if changes['old']: # Only log if there were actual changes AuditService.log_action( db=db, action='update', entity_type='user', entity_id=str(current_user.id), description=f"User updated profile: {current_user.email}", user=current_user, request=request, changes=changes ) return current_user @router.post("/change-password", response_model=MessageResponse, status_code=status.HTTP_200_OK) @limiter.limit("5/hour") # 5 password changes per hour per user async def change_password( request: Request, response: Response, password_data: PasswordChange, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Change user's password via Supabase Auth - **current_password**: Current password for verification - **new_password**: New password (min 8 chars, 1 digit, 1 uppercase) Requires authentication token Note: This endpoint requires re-authentication with current password """ try: # Verify current password by attempting to sign in await supabase_auth.sign_in( email=current_user.email, password=password_data.current_password ) # Password is correct, now update to new password using Supabase Admin API supabase_admin.auth.admin.update_user_by_id( str(current_user.id), {"password": password_data.new_password} ) # Audit password change AuditService.log_action( db=db, action='update', # Using 'update' as password_change is not in AuditAction enum entity_type='user', entity_id=str(current_user.id), description=f"User changed password: {current_user.email}", user=current_user, request=request ) logger.info(f"Password changed for user: {current_user.email}") return {"message": "Password changed successfully. Please login again with your new password."} except Exception as e: logger.error(f"Password change error: {str(e)}") # Audit failed password change AuditService.log_action( db=db, action='update', # Using 'update' as password_change_failed is not in AuditAction enum entity_type='user', entity_id=str(current_user.id), description=f"Failed password change attempt: {current_user.email}", user=current_user, request=request, additional_metadata={'reason': str(e), 'status': 'failed'} ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Current password is incorrect or password change failed" ) @router.post("/forgot-password", response_model=MessageResponse, status_code=status.HTTP_200_OK) @limiter.limit("3/hour") # 3 password reset requests per hour per IP async def forgot_password( request: Request, response: Response, request_data: ForgotPasswordRequest, db: Session = Depends(get_db) ): """ Request password reset email - **email**: User's email address Sends password reset link to email if account exists. Always returns success to prevent email enumeration attacks. """ password_reset_service = PasswordResetService() result = await password_reset_service.request_password_reset( email=request_data.email, db=db ) # Audit password reset request AuditService.log_action( db=db, action='create', # Valid AuditAction enum value (creating a reset request) entity_type='auth', description=f"Password reset requested for: {request_data.email}", request=request, additional_metadata={'email': request_data.email} ) return MessageResponse(message=result['message']) @router.post("/reset-password", response_model=MessageResponse, status_code=status.HTTP_200_OK) @limiter.limit("5/hour") # 5 password reset attempts per hour per IP async def reset_password( request: Request, response: Response, reset_data: ResetPasswordRequest, db: Session = Depends(get_db) ): """ Reset password using token from email - **token**: Password reset token from email - **new_password**: New password (min 8 chars, 1 digit, 1 uppercase) Resets password if token is valid and not expired. """ password_reset_service = PasswordResetService() try: result = await password_reset_service.reset_password( token=reset_data.token, new_password=reset_data.new_password, db=db ) # Audit successful password reset AuditService.log_action( db=db, action='update', # Valid AuditAction enum value (updating password) entity_type='auth', description=f"Password reset completed using token", request=request, additional_metadata={'token_prefix': reset_data.token[:8]} ) return MessageResponse(message=result['message']) except HTTPException as e: # Audit failed password reset AuditService.log_action( db=db, action='update', # Valid AuditAction enum value (failed update attempt) entity_type='auth', description=f"Password reset failed: {e.detail}", request=request, additional_metadata={ 'token_prefix': reset_data.token[:8], 'reason': e.detail } ) raise @router.post("/logout", response_model=MessageResponse, status_code=status.HTTP_200_OK) async def logout( request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Logout current user Requires authentication token. Note: With JWT tokens, actual logout is handled client-side by removing the token. This endpoint is for audit logging purposes. """ # Audit logout AuditService.log_auth_event( db=db, action='logout', user_email=current_user.email, success=True, request=request ) logger.info(f"User logged out: {current_user.email}") return MessageResponse(message="Logged out successfully") # ============================================ # USER PREFERENCES ENDPOINTS # ============================================ @router.get("/me/preferences", response_model=UserPreferencesResponse) async def get_my_preferences( current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get current user's preferences from user_preferences table Returns user preferences with role-based defaults if preferences don't exist yet. Automatically creates preferences record if it doesn't exist (via database trigger). """ # Get or create preferences preferences = db.query(UserPreference).filter( UserPreference.user_id == current_user.id, UserPreference.deleted_at == None ).first() # If no preferences exist, create with role-based defaults if not preferences: default_favorites = get_default_favorites_for_role(current_user.role) preferences = UserPreference( user_id=current_user.id, favorite_apps=default_favorites, dashboard_widgets=DEFAULT_DASHBOARD_WIDGETS.get(current_user.role, ['recent_tickets', 'team_performance', 'sla_metrics']), theme='light', language='en' ) db.add(preferences) db.commit() db.refresh(preferences) logger.info(f"✅ Created default preferences for user: {current_user.email} (role: {current_user.role})") return UserPreferencesResponse.from_orm(preferences) @router.put("/me/preferences", response_model=UserPreferencesResponse) async def update_my_preferences( preferences_data: UserPreferencesUpdate, request: Request, current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Update current user's preferences in user_preferences table Updates user preferences including favorite apps, theme, language, and notification settings. Favorite apps are validated against role-specific available apps (max 6). """ # Get or create preferences preferences = db.query(UserPreference).filter( UserPreference.user_id == current_user.id, UserPreference.deleted_at == None ).first() if not preferences: # Create new preferences record with role-based defaults default_favorites = get_default_favorites_for_role(current_user.role) preferences = UserPreference( user_id=current_user.id, favorite_apps=default_favorites, dashboard_widgets=DEFAULT_DASHBOARD_WIDGETS.get(current_user.role, []) ) db.add(preferences) db.flush() # Track changes for audit changes = {'old': {}, 'new': {}} # Update only provided fields update_data = preferences_data.dict(exclude_unset=True) # Validate favorite apps if provided if 'favorite_apps' in update_data and update_data['favorite_apps'] is not None: # Check max limit (6 apps) if len(update_data['favorite_apps']) > 6: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Maximum 6 favorite apps allowed" ) # Validate against role-specific available apps is_valid, invalid_apps = validate_apps_for_role(update_data['favorite_apps'], current_user.role) if not is_valid: available_apps = get_available_app_codes_for_role(current_user.role) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid apps for {current_user.role}: {', '.join(invalid_apps)}. " f"Available apps: {', '.join(available_apps)}" ) changes['old']['favorite_apps'] = preferences.favorite_apps changes['new']['favorite_apps'] = update_data['favorite_apps'] preferences.favorite_apps = update_data['favorite_apps'] # Validate last_active_project_id if provided if 'last_active_project_id' in update_data: from app.models.project import Project from app.models.project_team import ProjectTeam new_project_id = update_data['last_active_project_id'] # Platform admins don't have project context if current_user.role == 'platform_admin' and new_project_id is not None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Platform admins cannot set active project (no project assignments)" ) if new_project_id is not None: # Verify user is actually assigned to this project is_member = db.query(ProjectTeam).filter( ProjectTeam.user_id == current_user.id, ProjectTeam.project_id == new_project_id, ProjectTeam.deleted_at.is_(None), ProjectTeam.removed_at.is_(None) ).first() if not is_member: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You are not assigned to this project" ) # Verify project exists and is active project = db.query(Project).filter( Project.id == new_project_id, Project.deleted_at.is_(None) ).first() if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Project not found" ) changes['old']['last_active_project_id'] = str(preferences.last_active_project_id) if preferences.last_active_project_id else None changes['new']['last_active_project_id'] = str(new_project_id) if new_project_id else None preferences.last_active_project_id = new_project_id # Update other fields for field, value in update_data.items(): if field not in ['favorite_apps', 'last_active_project_id'] and value is not None: old_value = getattr(preferences, field, None) if old_value != value: changes['old'][field] = old_value changes['new'][field] = value setattr(preferences, field, value) db.commit() db.refresh(preferences) # Audit log if changes['old']: AuditService.log_action( db=db, action='update', entity_type='user_preferences', entity_id=str(preferences.id), description=f"User updated preferences: {current_user.email}", user=current_user, request=request, changes=changes ) logger.info(f"Preferences updated for user: {current_user.email}") return UserPreferencesResponse.from_orm(preferences) @router.get("/me/preferences/available-apps", response_model=dict) async def get_available_apps_for_user( current_user: User = Depends(get_current_active_user), db: Session = Depends(get_db) ): """ Get list of apps available for user to favorite based on their role **Returns:** - Current favorite app codes - All available apps with full metadata (name, icon, route, etc.) - Default favorites for the role - Maximum favorites allowed (6) **Use Cases:** - Populate app picker in settings UI - Show which apps can be added/removed from favorites - Display role-appropriate app options with icons and descriptions """ # Get current preferences preferences = db.query(UserPreference).filter( UserPreference.user_id == current_user.id, UserPreference.deleted_at == None ).first() # Get current favorites or defaults current_favorites = preferences.favorite_apps if preferences else get_default_favorites_for_role(current_user.role) # Get available apps with full metadata available_app_objects = get_available_apps_for_role(current_user.role) available_apps_detail = [app.to_dict() for app in available_app_objects] # Get default favorites default_favorites = get_default_favorites_for_role(current_user.role) # Get meta apps (apps available outside project context) meta_apps = get_meta_apps_for_role(current_user.role) return { "role": current_user.role, "current_favorites": current_favorites, "available_apps": available_apps_detail, # Full app metadata "default_favorites": default_favorites, "meta_apps": meta_apps, # Apps available outside project context "max_favorites": 6 } @router.get("/apps", response_model=dict) async def get_all_apps_for_user( current_user: User = Depends(get_current_active_user) ): """ Get ALL apps in the system with role-based access information **Returns:** - All apps with full metadata (name, description, icon, route, category) - User's role and which apps they can access - Categorized app groupings (Core, Operations, Sales, Team, Finance, Settings) **Use Cases:** - Render main navigation menu with all accessible apps - Show app launcher/drawer with categories - Display app directory or marketplace - Generate sitemap for user's accessible routes **Frontend Integration:** - Filter apps by `has_access: true` to show only accessible apps - Group apps by `category` for organized navigation - Use `icon` and `route` to render navigation items - Show disabled state for apps where `has_access: false` """ from app.config.apps import get_all_apps, AppCategory # Get all apps in the system all_apps = get_all_apps() # Get apps user has access to accessible_app_codes = set(get_available_app_codes_for_role(current_user.role)) # Build response with access information apps_with_access = [] for app in all_apps: app_dict = app.to_dict() app_dict['has_access'] = app.code in accessible_app_codes apps_with_access.append(app_dict) # Group by category apps_by_category = {} for category in AppCategory: apps_by_category[category.value] = [ app for app in apps_with_access if app['category'] == category.value ] return { "user_role": current_user.role, "total_apps": len(all_apps), "accessible_apps": len(accessible_app_codes), "apps": apps_with_access, "apps_by_category": apps_by_category, "categories": [cat.value for cat in AppCategory] }