import os from datetime import datetime, timedelta, timezone import pyotp from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from app.services.infrastructure.rbac_service import rbac_service from app.services.infrastructure.storage.database_service import db_service from core.config import settings from core.database import User from core.logging import logger from .schemas import ( LoginRequest, MFASetupResponse, MFAVerifyRequest, MFAVerifyResponse, PasswordResetConfirm, PasswordResetRequest, RegisterRequest, RegisterResponse, SecurityMetrics, SessionInfo, UserProfileResponse, ) from .service import auth_service router = APIRouter() # ===== AUTHENTICATION ENDPOINTS ===== @router.post( "/register", response_model=RegisterResponse, status_code=status.HTTP_201_CREATED ) async def register(user_data: RegisterRequest): """ Register a new user with password strength validation """ try: # Validate password strength password_errors = auth_service.validate_password_strength(user_data.password) if password_errors: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail={ "message": "Password does not meet security requirements", "errors": password_errors, }, ) # Check if username already exists existing_user = auth_service.get_user_by_username(user_data.username) if existing_user: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Username already exists" ) # Check if email already exists existing_email = auth_service.get_user_by_email(user_data.email) if existing_email: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Email already registered" ) # Create user new_user = auth_service.create_user(user_data) logger.info(f"New user registered: {new_user.username}") return { "user_id": new_user.id, "username": new_user.username, "email": new_user.email, "message": "User registered successfully", "created_at": new_user.created_at, } except HTTPException: raise except Exception as e: logger.error(f"Registration error: {e!s}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Registration error: {e!s}", ) @router.post("/login", response_model=UserProfileResponse) async def login(login_data: LoginRequest, request: Request, response: Response): """ Authenticate user and set HttpOnly cookies. """ try: user = auth_service.authenticate_user(login_data.email, login_data.password) if not user: # Record failed login attempt try: from app.services.infrastructure.security_monitoring_service import ( security_monitoring, ) await security_monitoring.record_login_attempt( email=login_data.email, user_id=None, ip_address=request.client.host or "unknown", user_agent=request.headers.get("user-agent", ""), success=False, failure_reason="Invalid credentials", ) except Exception as e: logger.warning(f"Failed to record login attempt: {e}") logger.warning( f"Failed login attempt for email: {login_data.email} from IP: {request.client.host if request.client else 'unknown'}" ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password", ) # CHECK MFA mfa_required = user.mfa_enabled # Enforce MFA for admins if configured in production if ( not mfa_required and user.role in ["admin", "superuser"] and getattr(settings, "mfa_required_for_admin", False) ): # In production, we require MFA for administrative roles logger.info( f"MFA enforcement: Requiring MFA for admin user {user.username}" ) mfa_required = True if mfa_required: # If MFA enabled or required and no code provided, return 403 to trigger frontend prompt if not login_data.mfa_code: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="MFA code required" ) if not user.mfa_secret: logger.error(f"User {user.username} has MFA enabled but no secret") raise HTTPException(status_code=500, detail="MFA configuration error") totp = pyotp.TOTP(user.mfa_secret) if not totp.verify(login_data.mfa_code): logger.warning(f"Invalid MFA code for user {user.username}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid MFA code" ) # Record successful login attempt try: from app.services.infrastructure.security_monitoring_service import ( security_monitoring, ) await security_monitoring.record_login_attempt( email=user.email, user_id=user.id, ip_address=request.client.host or "unknown", user_agent=request.headers.get("user-agent", ""), success=True, ) except Exception as e: logger.warning(f"Failed to record successful login: {e}") logger.info( f"Successful login for user: {user.username} (ID: {user.id}) from IP: {request.client.host if request.client else 'unknown'}" ) # Track user journey try: from app.services.business.user_journey_tracker import user_journey_tracker user_journey_tracker.track_event( user_id=user.id, event_type="login", metadata={"role": user.role, "mfa": user.mfa_enabled}, ) except Exception as e: logger.warning(f"Failed to track user journey event: {e}") # Create tokens access_token = auth_service.create_access_token( { "sub": user.id, "username": user.username, "role": user.role, "mfa_verified": user.mfa_enabled, } ) refresh_token = auth_service.create_refresh_token(user.id) # Create session record try: from app.services.infrastructure.security_monitoring_service import ( security_monitoring, ) await security_monitoring.create_user_session( user_id=user.id, session_token=refresh_token, ip_address=request.client.host or "unknown", user_agent=request.headers.get("user-agent", ""), expires_at=datetime.now(timezone.utc) + timedelta(days=7), ) except Exception as e: logger.warning(f"Failed to create session record: {e}") # Set HttpOnly Cookies # Set HttpOnly Cookies # Secure=True in production (HTTPS), False in dev if needed is_production = os.getenv("ENVIRONMENT", "development").lower() == "production" secure_cookie = is_production response.set_cookie( key="access_token", value=access_token, httponly=True, secure=secure_cookie, samesite="strict", max_age=1800, # 30 minutes ) response.set_cookie( key="refresh_token", value=refresh_token, httponly=True, secure=secure_cookie, samesite="strict", path="/api/v1/auth/refresh", max_age=7 * 24 * 60 * 60, # 7 days ) return { "id": user.id, "username": user.username, "email": user.email, "full_name": user.full_name, "role": user.role, "is_active": user.is_active, "mfa_enabled": user.mfa_enabled, "created_at": user.created_at, "last_login": user.last_login, } except HTTPException: raise except Exception as e: logger.error(f"Login error: {e!s}") raise HTTPException(status_code=500, detail="Internal login error") @router.get("/mfa/setup", response_model=MFASetupResponse) async def mfa_setup(current_user: User = Depends(auth_service.get_current_user)): """Generate MFA secret and QR code URI for setup""" if current_user.mfa_enabled: raise HTTPException(status_code=400, detail="MFA is already enabled") # Generate secret secret = pyotp.random_base32() # Save secret to DB (but don't enable yet) # We must fetch a fresh user instance attached to a session to update with db_service.get_db() as db: user = db.query(User).filter(User.id == current_user.id).first() if not user: raise HTTPException(status_code=404, detail="User not found") user.mfa_secret = secret db.commit() # Generate Provisioning URI uri = pyotp.totp.TOTP(secret).provisioning_uri( name=current_user.email, issuer_name="Zenith Fraud Platform" ) return {"secret": secret, "otpauth_url": uri} @router.post("/mfa/verify", response_model=MFAVerifyResponse) async def mfa_verify( verify_data: MFAVerifyRequest, current_user: User = Depends(auth_service.get_current_user), ): """Verify MFA code and enable MFA for the account""" if current_user.mfa_enabled: return {"message": "MFA is already enabled"} # We need the secret from the DB # Fetch fresh user with db_service.get_db() as db: user = db.query(User).filter(User.id == current_user.id).first() if not user or not user.mfa_secret: raise HTTPException( status_code=400, detail="MFA setup not initiated (no secret found)" ) totp = pyotp.TOTP(user.mfa_secret) if not totp.verify(verify_data.code): raise HTTPException(status_code=400, detail="Invalid code") # Enable MFA user.mfa_enabled = True db.commit() logger.info(f"MFA enabled for user {current_user.username}") return {"message": "MFA enabled successfully"} @router.post("/refresh") async def refresh_token(request: Request, response: Response): """ Refresh access token using HttpOnly cookie. """ refresh_token = request.cookies.get("refresh_token") if not refresh_token: raise HTTPException(status_code=401, detail="Refresh cookie missing") try: # Verify refresh token payload = auth_service.decode_token(refresh_token) if payload.get("type") != "refresh": raise HTTPException(status_code=401, detail="Invalid token type") user_id = payload.get("sub") # Determine claims # We can just call get_user directly from db_service as we have the ID, or use auth_service helper? # auth_service.get_user isn't explicitly defined above in the new file, it was just db_service.get_user wrapped # But auth_service.get_current_user does it. # Let's use db_service directly here for clarity or add a helper to auth_service. # The original code had: user = auth_service.get_user(user_id) if hasattr(auth_service, "get_user") else None # Let's fix this properly. user = db_service.get_user_by_id(user_id) if not user: # Security fix: Do not issue tokens for non-existent/inactive users # Previously this fell back to "unknown" user which allowed access after deletion logger.warning(f"Refresh attempt for non-existent user: {user_id}") raise HTTPException( status_code=401, detail="User no longer exists or is inactive" ) claims = { "sub": user_id, "username": user.username, "role": user.role, "mfa_verified": user.mfa_enabled, } new_access_token = auth_service.create_access_token(claims) # Set new access token cookie response.set_cookie( key="access_token", value=new_access_token, httponly=True, secure=True, samesite="strict", max_age=1800, # 30 minutes ) return {"message": "Token refreshed"} except Exception as e: logger.warning(f"Refresh failed: {e}") # Clear cookies on failure response.delete_cookie("access_token") response.delete_cookie("refresh_token", path="/api/v1/auth/refresh") raise HTTPException(status_code=401, detail="Invalid refresh token") @router.post("/logout") async def logout(response: Response): """ Logout user by clearing cookies. """ response.delete_cookie("access_token") response.delete_cookie("refresh_token", path="/api/v1/auth/refresh") return {"message": "Logged out successfully"} @router.get("/me", response_model=UserProfileResponse) async def get_current_user_profile( current_user: User = Depends(auth_service.get_current_user), ): """Get current user profile""" return { "id": current_user.id, "username": current_user.username, "email": current_user.email, "full_name": current_user.full_name, "role": current_user.role, "is_active": current_user.is_active, "mfa_enabled": current_user.mfa_enabled, "created_at": current_user.created_at, "last_login": current_user.last_login, } # ===== PASSWORD RESET ENDPOINTS ===== @router.post("/password/reset-request") async def request_password_reset(request_data: PasswordResetRequest): """ Request a password reset link via email """ try: # Check if user exists user = auth_service.get_user_by_email(request_data.email) if not user: # Don't reveal if email exists for security logger.info( f"Password reset requested for unknown email: {request_data.email}" ) return {"message": "If the email exists, a reset link has been sent"} # Generate reset token (expires in 1 hour) reset_token = auth_service.create_access_token( {"sub": user.id, "type": "password_reset"}, expires_delta=timedelta(hours=1) ) # Send email with reset link reset_url = f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/reset-password?token={reset_token}" # Import email service try: from app.services.infrastructure.email_service import email_service email_sent = await email_service.send_password_reset_email( user.email, reset_url ) if not email_sent: logger.warning(f"Failed to send password reset email to {user.email}") except ImportError: logger.warning("Email service not available, logging reset URL instead") logger.info(f"Password reset URL for {user.email}: {reset_url}") # In production, integrate with email service: # await email_service.send_password_reset(user.email, reset_url) return {"message": "If the email exists, a reset link has been sent"} except Exception as e: logger.error(f"Password reset request error: {e}") # Don't reveal internal errors for security return {"message": "If the email exists, a reset link has been sent"} @router.post("/password/reset") async def reset_password(request_data: PasswordResetConfirm): """ Reset password using reset token """ try: # Verify token payload = auth_service.decode_token(request_data.token) if payload.get("type") != "password_reset": raise HTTPException(status_code=400, detail="Invalid reset token") user_id = payload.get("sub") user = db_service.get_user_by_id(user_id) if not user: raise HTTPException(status_code=400, detail="Invalid reset token") # Validate new password password_errors = auth_service.validate_password_strength(request_data.password) if password_errors: raise HTTPException( status_code=400, detail={ "message": "Password does not meet security requirements", "errors": password_errors, }, ) # Update password hashed_password = auth_service.hash_password(request_data.password) with db_service.get_db() as db: db_user = db.query(User).filter(User.id == user_id).first() db_user.password_hash = hashed_password db_user.updated_at = datetime.now(timezone.utc) db.commit() logger.info(f"Password reset successful for user: {user.email}") # Send confirmation email try: from app.services.infrastructure.email_service import email_service await email_service.send_password_reset_confirmation(user.email) except Exception as e: logger.warning(f"Failed to send password reset confirmation email: {e}") return {"message": "Password reset successfully"} except HTTPException: raise except Exception as e: logger.error(f"Password reset error: {e}") raise HTTPException(status_code=500, detail="Password reset failed") # ===== SESSION MANAGEMENT ENDPOINTS ===== @router.get("/sessions", response_model=list[SessionInfo]) async def get_user_sessions( current_user: User = Depends(auth_service.get_current_user), ): """ Get all active sessions for the current user """ try: from app.services.infrastructure.security_monitoring_service import ( security_monitoring, ) sessions = await security_monitoring.get_user_sessions(current_user.id) return sessions except Exception as e: logger.error(f"Failed to get user sessions: {e}") return [] @router.delete("/sessions/{session_id}") async def revoke_session( session_id: str, current_user: User = Depends(auth_service.get_current_user), ): """ Revoke a specific session """ try: from app.services.infrastructure.security_monitoring_service import ( security_monitoring, ) success = await security_monitoring.revoke_session(session_id, current_user.id) if not success: raise HTTPException(status_code=404, detail="Session not found") return {"message": "Session revoked successfully"} except HTTPException: raise except Exception as e: logger.error(f"Failed to revoke session: {e}") raise HTTPException(status_code=500, detail="Failed to revoke session") @router.delete("/sessions") async def revoke_all_other_sessions( current_user: User = Depends(auth_service.get_current_user), ): """ Revoke all sessions except the current one """ try: from app.services.infrastructure.security_monitoring_service import ( security_monitoring, ) count = await security_monitoring.revoke_all_user_sessions( current_user.id, except_current=True ) logger.info( f"All other sessions revoked for user: {current_user.email} ({count} sessions)" ) return { "message": f"All other sessions revoked successfully ({count} sessions)" } except Exception as e: logger.error(f"Failed to revoke all sessions: {e}") raise HTTPException(status_code=500, detail="Failed to revoke sessions") # ===== SECURITY MONITORING ENDPOINTS ===== @router.get("/admin/security/metrics", response_model=SecurityMetrics) async def get_security_metrics( current_user: User = Depends(rbac_service.require_role("admin")), ): """ Get security metrics and login attempt statistics (Admin only) """ try: from app.services.infrastructure.security_monitoring_service import ( security_monitoring, ) metrics = await security_monitoring.get_security_metrics() return metrics except Exception as e: logger.error(f"Failed to get security metrics: {e}") raise HTTPException( status_code=500, detail="Failed to retrieve security metrics" ) @router.get("/admin/security/login-attempts") async def get_login_attempts( limit: int = 50, offset: int = 0, current_user: User = Depends(rbac_service.require_role("admin")), ): """ Get detailed login attempt logs (Admin only) """ try: # Get recent attempts from security monitoring service from app.services.infrastructure.security_monitoring_service import ( security_monitoring, ) metrics = await security_monitoring.get_security_metrics() attempts = metrics.get("recent_attempts", []) total = len(attempts) return { "attempts": attempts[offset : offset + limit], "total": total, "limit": limit, "offset": offset, } except Exception as e: logger.error(f"Failed to get login attempts: {e}") raise HTTPException(status_code=500, detail="Failed to retrieve login attempts")