Spaces:
Paused
Paused
| import os | |
| from datetime import datetime, timedelta, timezone | |
| import pyotp | |
| from fastapi import APIRouter, Depends, HTTPException, Request, Response, status | |
| from app.services.infrastructure.rbac_service import rbac_service | |
| from app.services.infrastructure.storage.database_service import db_service | |
| from core.config import settings | |
| from core.database import User | |
| from core.logging import logger | |
| from .schemas import ( | |
| LoginRequest, | |
| MFASetupResponse, | |
| MFAVerifyRequest, | |
| MFAVerifyResponse, | |
| PasswordResetConfirm, | |
| PasswordResetRequest, | |
| RegisterRequest, | |
| RegisterResponse, | |
| SecurityMetrics, | |
| SessionInfo, | |
| UserProfileResponse, | |
| ) | |
| from .service import auth_service | |
| router = APIRouter() | |
| # ===== AUTHENTICATION ENDPOINTS ===== | |
| async def register(user_data: RegisterRequest): | |
| """ | |
| Register a new user with password strength validation | |
| """ | |
| try: | |
| # Validate password strength | |
| password_errors = auth_service.validate_password_strength(user_data.password) | |
| if password_errors: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail={ | |
| "message": "Password does not meet security requirements", | |
| "errors": password_errors, | |
| }, | |
| ) | |
| # Check if username already exists | |
| existing_user = auth_service.get_user_by_username(user_data.username) | |
| if existing_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, detail="Username already exists" | |
| ) | |
| # Check if email already exists | |
| existing_email = auth_service.get_user_by_email(user_data.email) | |
| if existing_email: | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, detail="Email already registered" | |
| ) | |
| # Create user | |
| new_user = auth_service.create_user(user_data) | |
| logger.info(f"New user registered: {new_user.username}") | |
| return { | |
| "user_id": new_user.id, | |
| "username": new_user.username, | |
| "email": new_user.email, | |
| "message": "User registered successfully", | |
| "created_at": new_user.created_at, | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Registration error: {e!s}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Registration error: {e!s}", | |
| ) | |
| async def login(login_data: LoginRequest, request: Request, response: Response): | |
| """ | |
| Authenticate user and set HttpOnly cookies. | |
| """ | |
| try: | |
| user = auth_service.authenticate_user(login_data.email, login_data.password) | |
| if not user: | |
| # Record failed login attempt | |
| try: | |
| from app.services.infrastructure.security_monitoring_service import ( | |
| security_monitoring, | |
| ) | |
| await security_monitoring.record_login_attempt( | |
| email=login_data.email, | |
| user_id=None, | |
| ip_address=request.client.host or "unknown", | |
| user_agent=request.headers.get("user-agent", ""), | |
| success=False, | |
| failure_reason="Invalid credentials", | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to record login attempt: {e}") | |
| logger.warning( | |
| f"Failed login attempt for email: {login_data.email} from IP: {request.client.host if request.client else 'unknown'}" | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid username or password", | |
| ) | |
| # CHECK MFA | |
| mfa_required = user.mfa_enabled | |
| # Enforce MFA for admins if configured in production | |
| if ( | |
| not mfa_required | |
| and user.role in ["admin", "superuser"] | |
| and getattr(settings, "mfa_required_for_admin", False) | |
| ): | |
| # In production, we require MFA for administrative roles | |
| logger.info( | |
| f"MFA enforcement: Requiring MFA for admin user {user.username}" | |
| ) | |
| mfa_required = True | |
| if mfa_required: | |
| # If MFA enabled or required and no code provided, return 403 to trigger frontend prompt | |
| if not login_data.mfa_code: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="MFA code required" | |
| ) | |
| if not user.mfa_secret: | |
| logger.error(f"User {user.username} has MFA enabled but no secret") | |
| raise HTTPException(status_code=500, detail="MFA configuration error") | |
| totp = pyotp.TOTP(user.mfa_secret) | |
| if not totp.verify(login_data.mfa_code): | |
| logger.warning(f"Invalid MFA code for user {user.username}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid MFA code" | |
| ) | |
| # Record successful login attempt | |
| try: | |
| from app.services.infrastructure.security_monitoring_service import ( | |
| security_monitoring, | |
| ) | |
| await security_monitoring.record_login_attempt( | |
| email=user.email, | |
| user_id=user.id, | |
| ip_address=request.client.host or "unknown", | |
| user_agent=request.headers.get("user-agent", ""), | |
| success=True, | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to record successful login: {e}") | |
| logger.info( | |
| f"Successful login for user: {user.username} (ID: {user.id}) from IP: {request.client.host if request.client else 'unknown'}" | |
| ) | |
| # Track user journey | |
| try: | |
| from app.services.business.user_journey_tracker import user_journey_tracker | |
| user_journey_tracker.track_event( | |
| user_id=user.id, | |
| event_type="login", | |
| metadata={"role": user.role, "mfa": user.mfa_enabled}, | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to track user journey event: {e}") | |
| # Create tokens | |
| access_token = auth_service.create_access_token( | |
| { | |
| "sub": user.id, | |
| "username": user.username, | |
| "role": user.role, | |
| "mfa_verified": user.mfa_enabled, | |
| } | |
| ) | |
| refresh_token = auth_service.create_refresh_token(user.id) | |
| # Create session record | |
| try: | |
| from app.services.infrastructure.security_monitoring_service import ( | |
| security_monitoring, | |
| ) | |
| await security_monitoring.create_user_session( | |
| user_id=user.id, | |
| session_token=refresh_token, | |
| ip_address=request.client.host or "unknown", | |
| user_agent=request.headers.get("user-agent", ""), | |
| expires_at=datetime.now(timezone.utc) + timedelta(days=7), | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to create session record: {e}") | |
| # Set HttpOnly Cookies | |
| # Set HttpOnly Cookies | |
| # Secure=True in production (HTTPS), False in dev if needed | |
| is_production = os.getenv("ENVIRONMENT", "development").lower() == "production" | |
| secure_cookie = is_production | |
| response.set_cookie( | |
| key="access_token", | |
| value=access_token, | |
| httponly=True, | |
| secure=secure_cookie, | |
| samesite="strict", | |
| max_age=1800, # 30 minutes | |
| ) | |
| response.set_cookie( | |
| key="refresh_token", | |
| value=refresh_token, | |
| httponly=True, | |
| secure=secure_cookie, | |
| samesite="strict", | |
| path="/api/v1/auth/refresh", | |
| max_age=7 * 24 * 60 * 60, # 7 days | |
| ) | |
| return { | |
| "id": user.id, | |
| "username": user.username, | |
| "email": user.email, | |
| "full_name": user.full_name, | |
| "role": user.role, | |
| "is_active": user.is_active, | |
| "mfa_enabled": user.mfa_enabled, | |
| "created_at": user.created_at, | |
| "last_login": user.last_login, | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Login error: {e!s}") | |
| raise HTTPException(status_code=500, detail="Internal login error") | |
| async def mfa_setup(current_user: User = Depends(auth_service.get_current_user)): | |
| """Generate MFA secret and QR code URI for setup""" | |
| if current_user.mfa_enabled: | |
| raise HTTPException(status_code=400, detail="MFA is already enabled") | |
| # Generate secret | |
| secret = pyotp.random_base32() | |
| # Save secret to DB (but don't enable yet) | |
| # We must fetch a fresh user instance attached to a session to update | |
| with db_service.get_db() as db: | |
| user = db.query(User).filter(User.id == current_user.id).first() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| user.mfa_secret = secret | |
| db.commit() | |
| # Generate Provisioning URI | |
| uri = pyotp.totp.TOTP(secret).provisioning_uri( | |
| name=current_user.email, issuer_name="Zenith Fraud Platform" | |
| ) | |
| return {"secret": secret, "otpauth_url": uri} | |
| async def mfa_verify( | |
| verify_data: MFAVerifyRequest, | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """Verify MFA code and enable MFA for the account""" | |
| if current_user.mfa_enabled: | |
| return {"message": "MFA is already enabled"} | |
| # We need the secret from the DB | |
| # Fetch fresh user | |
| with db_service.get_db() as db: | |
| user = db.query(User).filter(User.id == current_user.id).first() | |
| if not user or not user.mfa_secret: | |
| raise HTTPException( | |
| status_code=400, detail="MFA setup not initiated (no secret found)" | |
| ) | |
| totp = pyotp.TOTP(user.mfa_secret) | |
| if not totp.verify(verify_data.code): | |
| raise HTTPException(status_code=400, detail="Invalid code") | |
| # Enable MFA | |
| user.mfa_enabled = True | |
| db.commit() | |
| logger.info(f"MFA enabled for user {current_user.username}") | |
| return {"message": "MFA enabled successfully"} | |
| async def refresh_token(request: Request, response: Response): | |
| """ | |
| Refresh access token using HttpOnly cookie. | |
| """ | |
| refresh_token = request.cookies.get("refresh_token") | |
| if not refresh_token: | |
| raise HTTPException(status_code=401, detail="Refresh cookie missing") | |
| try: | |
| # Verify refresh token | |
| payload = auth_service.decode_token(refresh_token) | |
| if payload.get("type") != "refresh": | |
| raise HTTPException(status_code=401, detail="Invalid token type") | |
| user_id = payload.get("sub") | |
| # Determine claims | |
| # We can just call get_user directly from db_service as we have the ID, or use auth_service helper? | |
| # auth_service.get_user isn't explicitly defined above in the new file, it was just db_service.get_user wrapped | |
| # But auth_service.get_current_user does it. | |
| # Let's use db_service directly here for clarity or add a helper to auth_service. | |
| # The original code had: user = auth_service.get_user(user_id) if hasattr(auth_service, "get_user") else None | |
| # Let's fix this properly. | |
| user = db_service.get_user_by_id(user_id) | |
| if not user: | |
| # Security fix: Do not issue tokens for non-existent/inactive users | |
| # Previously this fell back to "unknown" user which allowed access after deletion | |
| logger.warning(f"Refresh attempt for non-existent user: {user_id}") | |
| raise HTTPException( | |
| status_code=401, detail="User no longer exists or is inactive" | |
| ) | |
| claims = { | |
| "sub": user_id, | |
| "username": user.username, | |
| "role": user.role, | |
| "mfa_verified": user.mfa_enabled, | |
| } | |
| new_access_token = auth_service.create_access_token(claims) | |
| # Set new access token cookie | |
| response.set_cookie( | |
| key="access_token", | |
| value=new_access_token, | |
| httponly=True, | |
| secure=True, | |
| samesite="strict", | |
| max_age=1800, # 30 minutes | |
| ) | |
| return {"message": "Token refreshed"} | |
| except Exception as e: | |
| logger.warning(f"Refresh failed: {e}") | |
| # Clear cookies on failure | |
| response.delete_cookie("access_token") | |
| response.delete_cookie("refresh_token", path="/api/v1/auth/refresh") | |
| raise HTTPException(status_code=401, detail="Invalid refresh token") | |
| async def logout(response: Response): | |
| """ | |
| Logout user by clearing cookies. | |
| """ | |
| response.delete_cookie("access_token") | |
| response.delete_cookie("refresh_token", path="/api/v1/auth/refresh") | |
| return {"message": "Logged out successfully"} | |
| async def get_current_user_profile( | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """Get current user profile""" | |
| return { | |
| "id": current_user.id, | |
| "username": current_user.username, | |
| "email": current_user.email, | |
| "full_name": current_user.full_name, | |
| "role": current_user.role, | |
| "is_active": current_user.is_active, | |
| "mfa_enabled": current_user.mfa_enabled, | |
| "created_at": current_user.created_at, | |
| "last_login": current_user.last_login, | |
| } | |
| # ===== PASSWORD RESET ENDPOINTS ===== | |
| async def request_password_reset(request_data: PasswordResetRequest): | |
| """ | |
| Request a password reset link via email | |
| """ | |
| try: | |
| # Check if user exists | |
| user = auth_service.get_user_by_email(request_data.email) | |
| if not user: | |
| # Don't reveal if email exists for security | |
| logger.info( | |
| f"Password reset requested for unknown email: {request_data.email}" | |
| ) | |
| return {"message": "If the email exists, a reset link has been sent"} | |
| # Generate reset token (expires in 1 hour) | |
| reset_token = auth_service.create_access_token( | |
| {"sub": user.id, "type": "password_reset"}, expires_delta=timedelta(hours=1) | |
| ) | |
| # Send email with reset link | |
| reset_url = f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/reset-password?token={reset_token}" | |
| # Import email service | |
| try: | |
| from app.services.infrastructure.email_service import email_service | |
| email_sent = await email_service.send_password_reset_email( | |
| user.email, reset_url | |
| ) | |
| if not email_sent: | |
| logger.warning(f"Failed to send password reset email to {user.email}") | |
| except ImportError: | |
| logger.warning("Email service not available, logging reset URL instead") | |
| logger.info(f"Password reset URL for {user.email}: {reset_url}") | |
| # In production, integrate with email service: | |
| # await email_service.send_password_reset(user.email, reset_url) | |
| return {"message": "If the email exists, a reset link has been sent"} | |
| except Exception as e: | |
| logger.error(f"Password reset request error: {e}") | |
| # Don't reveal internal errors for security | |
| return {"message": "If the email exists, a reset link has been sent"} | |
| async def reset_password(request_data: PasswordResetConfirm): | |
| """ | |
| Reset password using reset token | |
| """ | |
| try: | |
| # Verify token | |
| payload = auth_service.decode_token(request_data.token) | |
| if payload.get("type") != "password_reset": | |
| raise HTTPException(status_code=400, detail="Invalid reset token") | |
| user_id = payload.get("sub") | |
| user = db_service.get_user_by_id(user_id) | |
| if not user: | |
| raise HTTPException(status_code=400, detail="Invalid reset token") | |
| # Validate new password | |
| password_errors = auth_service.validate_password_strength(request_data.password) | |
| if password_errors: | |
| raise HTTPException( | |
| status_code=400, | |
| detail={ | |
| "message": "Password does not meet security requirements", | |
| "errors": password_errors, | |
| }, | |
| ) | |
| # Update password | |
| hashed_password = auth_service.hash_password(request_data.password) | |
| with db_service.get_db() as db: | |
| db_user = db.query(User).filter(User.id == user_id).first() | |
| db_user.password_hash = hashed_password | |
| db_user.updated_at = datetime.now(timezone.utc) | |
| db.commit() | |
| logger.info(f"Password reset successful for user: {user.email}") | |
| # Send confirmation email | |
| try: | |
| from app.services.infrastructure.email_service import email_service | |
| await email_service.send_password_reset_confirmation(user.email) | |
| except Exception as e: | |
| logger.warning(f"Failed to send password reset confirmation email: {e}") | |
| return {"message": "Password reset successfully"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Password reset error: {e}") | |
| raise HTTPException(status_code=500, detail="Password reset failed") | |
| # ===== SESSION MANAGEMENT ENDPOINTS ===== | |
| async def get_user_sessions( | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """ | |
| Get all active sessions for the current user | |
| """ | |
| try: | |
| from app.services.infrastructure.security_monitoring_service import ( | |
| security_monitoring, | |
| ) | |
| sessions = await security_monitoring.get_user_sessions(current_user.id) | |
| return sessions | |
| except Exception as e: | |
| logger.error(f"Failed to get user sessions: {e}") | |
| return [] | |
| async def revoke_session( | |
| session_id: str, | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """ | |
| Revoke a specific session | |
| """ | |
| try: | |
| from app.services.infrastructure.security_monitoring_service import ( | |
| security_monitoring, | |
| ) | |
| success = await security_monitoring.revoke_session(session_id, current_user.id) | |
| if not success: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| return {"message": "Session revoked successfully"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to revoke session: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to revoke session") | |
| async def revoke_all_other_sessions( | |
| current_user: User = Depends(auth_service.get_current_user), | |
| ): | |
| """ | |
| Revoke all sessions except the current one | |
| """ | |
| try: | |
| from app.services.infrastructure.security_monitoring_service import ( | |
| security_monitoring, | |
| ) | |
| count = await security_monitoring.revoke_all_user_sessions( | |
| current_user.id, except_current=True | |
| ) | |
| logger.info( | |
| f"All other sessions revoked for user: {current_user.email} ({count} sessions)" | |
| ) | |
| return { | |
| "message": f"All other sessions revoked successfully ({count} sessions)" | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to revoke all sessions: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to revoke sessions") | |
| # ===== SECURITY MONITORING ENDPOINTS ===== | |
| async def get_security_metrics( | |
| current_user: User = Depends(rbac_service.require_role("admin")), | |
| ): | |
| """ | |
| Get security metrics and login attempt statistics (Admin only) | |
| """ | |
| try: | |
| from app.services.infrastructure.security_monitoring_service import ( | |
| security_monitoring, | |
| ) | |
| metrics = await security_monitoring.get_security_metrics() | |
| return metrics | |
| except Exception as e: | |
| logger.error(f"Failed to get security metrics: {e}") | |
| raise HTTPException( | |
| status_code=500, detail="Failed to retrieve security metrics" | |
| ) | |
| async def get_login_attempts( | |
| limit: int = 50, | |
| offset: int = 0, | |
| current_user: User = Depends(rbac_service.require_role("admin")), | |
| ): | |
| """ | |
| Get detailed login attempt logs (Admin only) | |
| """ | |
| try: | |
| # Get recent attempts from security monitoring service | |
| from app.services.infrastructure.security_monitoring_service import ( | |
| security_monitoring, | |
| ) | |
| metrics = await security_monitoring.get_security_metrics() | |
| attempts = metrics.get("recent_attempts", []) | |
| total = len(attempts) | |
| return { | |
| "attempts": attempts[offset : offset + limit], | |
| "total": total, | |
| "limit": limit, | |
| "offset": offset, | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get login attempts: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to retrieve login attempts") | |