zenith-backend / app /modules /auth /router.py
teoat's picture
Upload app/modules/auth/router.py with huggingface_hub
e7642ee verified
import os
from datetime import datetime, timedelta, timezone
import pyotp
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from app.services.infrastructure.rbac_service import rbac_service
from app.services.infrastructure.storage.database_service import db_service
from core.config import settings
from core.database import User
from core.logging import logger
from .schemas import (
LoginRequest,
MFASetupResponse,
MFAVerifyRequest,
MFAVerifyResponse,
PasswordResetConfirm,
PasswordResetRequest,
RegisterRequest,
RegisterResponse,
SecurityMetrics,
SessionInfo,
UserProfileResponse,
)
from .service import auth_service
router = APIRouter()
# ===== AUTHENTICATION ENDPOINTS =====
@router.post(
"/register", response_model=RegisterResponse, status_code=status.HTTP_201_CREATED
)
async def register(user_data: RegisterRequest):
"""
Register a new user with password strength validation
"""
try:
# Validate password strength
password_errors = auth_service.validate_password_strength(user_data.password)
if password_errors:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"message": "Password does not meet security requirements",
"errors": password_errors,
},
)
# Check if username already exists
existing_user = auth_service.get_user_by_username(user_data.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Username already exists"
)
# Check if email already exists
existing_email = auth_service.get_user_by_email(user_data.email)
if existing_email:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Email already registered"
)
# Create user
new_user = auth_service.create_user(user_data)
logger.info(f"New user registered: {new_user.username}")
return {
"user_id": new_user.id,
"username": new_user.username,
"email": new_user.email,
"message": "User registered successfully",
"created_at": new_user.created_at,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Registration error: {e!s}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Registration error: {e!s}",
)
@router.post("/login", response_model=UserProfileResponse)
async def login(login_data: LoginRequest, request: Request, response: Response):
"""
Authenticate user and set HttpOnly cookies.
"""
try:
user = auth_service.authenticate_user(login_data.email, login_data.password)
if not user:
# Record failed login attempt
try:
from app.services.infrastructure.security_monitoring_service import (
security_monitoring,
)
await security_monitoring.record_login_attempt(
email=login_data.email,
user_id=None,
ip_address=request.client.host or "unknown",
user_agent=request.headers.get("user-agent", ""),
success=False,
failure_reason="Invalid credentials",
)
except Exception as e:
logger.warning(f"Failed to record login attempt: {e}")
logger.warning(
f"Failed login attempt for email: {login_data.email} from IP: {request.client.host if request.client else 'unknown'}"
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
)
# CHECK MFA
mfa_required = user.mfa_enabled
# Enforce MFA for admins if configured in production
if (
not mfa_required
and user.role in ["admin", "superuser"]
and getattr(settings, "mfa_required_for_admin", False)
):
# In production, we require MFA for administrative roles
logger.info(
f"MFA enforcement: Requiring MFA for admin user {user.username}"
)
mfa_required = True
if mfa_required:
# If MFA enabled or required and no code provided, return 403 to trigger frontend prompt
if not login_data.mfa_code:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="MFA code required"
)
if not user.mfa_secret:
logger.error(f"User {user.username} has MFA enabled but no secret")
raise HTTPException(status_code=500, detail="MFA configuration error")
totp = pyotp.TOTP(user.mfa_secret)
if not totp.verify(login_data.mfa_code):
logger.warning(f"Invalid MFA code for user {user.username}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid MFA code"
)
# Record successful login attempt
try:
from app.services.infrastructure.security_monitoring_service import (
security_monitoring,
)
await security_monitoring.record_login_attempt(
email=user.email,
user_id=user.id,
ip_address=request.client.host or "unknown",
user_agent=request.headers.get("user-agent", ""),
success=True,
)
except Exception as e:
logger.warning(f"Failed to record successful login: {e}")
logger.info(
f"Successful login for user: {user.username} (ID: {user.id}) from IP: {request.client.host if request.client else 'unknown'}"
)
# Track user journey
try:
from app.services.business.user_journey_tracker import user_journey_tracker
user_journey_tracker.track_event(
user_id=user.id,
event_type="login",
metadata={"role": user.role, "mfa": user.mfa_enabled},
)
except Exception as e:
logger.warning(f"Failed to track user journey event: {e}")
# Create tokens
access_token = auth_service.create_access_token(
{
"sub": user.id,
"username": user.username,
"role": user.role,
"mfa_verified": user.mfa_enabled,
}
)
refresh_token = auth_service.create_refresh_token(user.id)
# Create session record
try:
from app.services.infrastructure.security_monitoring_service import (
security_monitoring,
)
await security_monitoring.create_user_session(
user_id=user.id,
session_token=refresh_token,
ip_address=request.client.host or "unknown",
user_agent=request.headers.get("user-agent", ""),
expires_at=datetime.now(timezone.utc) + timedelta(days=7),
)
except Exception as e:
logger.warning(f"Failed to create session record: {e}")
# Set HttpOnly Cookies
# Set HttpOnly Cookies
# Secure=True in production (HTTPS), False in dev if needed
is_production = os.getenv("ENVIRONMENT", "development").lower() == "production"
secure_cookie = is_production
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=secure_cookie,
samesite="strict",
max_age=1800, # 30 minutes
)
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True,
secure=secure_cookie,
samesite="strict",
path="/api/v1/auth/refresh",
max_age=7 * 24 * 60 * 60, # 7 days
)
return {
"id": user.id,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"role": user.role,
"is_active": user.is_active,
"mfa_enabled": user.mfa_enabled,
"created_at": user.created_at,
"last_login": user.last_login,
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Login error: {e!s}")
raise HTTPException(status_code=500, detail="Internal login error")
@router.get("/mfa/setup", response_model=MFASetupResponse)
async def mfa_setup(current_user: User = Depends(auth_service.get_current_user)):
"""Generate MFA secret and QR code URI for setup"""
if current_user.mfa_enabled:
raise HTTPException(status_code=400, detail="MFA is already enabled")
# Generate secret
secret = pyotp.random_base32()
# Save secret to DB (but don't enable yet)
# We must fetch a fresh user instance attached to a session to update
with db_service.get_db() as db:
user = db.query(User).filter(User.id == current_user.id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.mfa_secret = secret
db.commit()
# Generate Provisioning URI
uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=current_user.email, issuer_name="Zenith Fraud Platform"
)
return {"secret": secret, "otpauth_url": uri}
@router.post("/mfa/verify", response_model=MFAVerifyResponse)
async def mfa_verify(
verify_data: MFAVerifyRequest,
current_user: User = Depends(auth_service.get_current_user),
):
"""Verify MFA code and enable MFA for the account"""
if current_user.mfa_enabled:
return {"message": "MFA is already enabled"}
# We need the secret from the DB
# Fetch fresh user
with db_service.get_db() as db:
user = db.query(User).filter(User.id == current_user.id).first()
if not user or not user.mfa_secret:
raise HTTPException(
status_code=400, detail="MFA setup not initiated (no secret found)"
)
totp = pyotp.TOTP(user.mfa_secret)
if not totp.verify(verify_data.code):
raise HTTPException(status_code=400, detail="Invalid code")
# Enable MFA
user.mfa_enabled = True
db.commit()
logger.info(f"MFA enabled for user {current_user.username}")
return {"message": "MFA enabled successfully"}
@router.post("/refresh")
async def refresh_token(request: Request, response: Response):
"""
Refresh access token using HttpOnly cookie.
"""
refresh_token = request.cookies.get("refresh_token")
if not refresh_token:
raise HTTPException(status_code=401, detail="Refresh cookie missing")
try:
# Verify refresh token
payload = auth_service.decode_token(refresh_token)
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid token type")
user_id = payload.get("sub")
# Determine claims
# We can just call get_user directly from db_service as we have the ID, or use auth_service helper?
# auth_service.get_user isn't explicitly defined above in the new file, it was just db_service.get_user wrapped
# But auth_service.get_current_user does it.
# Let's use db_service directly here for clarity or add a helper to auth_service.
# The original code had: user = auth_service.get_user(user_id) if hasattr(auth_service, "get_user") else None
# Let's fix this properly.
user = db_service.get_user_by_id(user_id)
if not user:
# Security fix: Do not issue tokens for non-existent/inactive users
# Previously this fell back to "unknown" user which allowed access after deletion
logger.warning(f"Refresh attempt for non-existent user: {user_id}")
raise HTTPException(
status_code=401, detail="User no longer exists or is inactive"
)
claims = {
"sub": user_id,
"username": user.username,
"role": user.role,
"mfa_verified": user.mfa_enabled,
}
new_access_token = auth_service.create_access_token(claims)
# Set new access token cookie
response.set_cookie(
key="access_token",
value=new_access_token,
httponly=True,
secure=True,
samesite="strict",
max_age=1800, # 30 minutes
)
return {"message": "Token refreshed"}
except Exception as e:
logger.warning(f"Refresh failed: {e}")
# Clear cookies on failure
response.delete_cookie("access_token")
response.delete_cookie("refresh_token", path="/api/v1/auth/refresh")
raise HTTPException(status_code=401, detail="Invalid refresh token")
@router.post("/logout")
async def logout(response: Response):
"""
Logout user by clearing cookies.
"""
response.delete_cookie("access_token")
response.delete_cookie("refresh_token", path="/api/v1/auth/refresh")
return {"message": "Logged out successfully"}
@router.get("/me", response_model=UserProfileResponse)
async def get_current_user_profile(
current_user: User = Depends(auth_service.get_current_user),
):
"""Get current user profile"""
return {
"id": current_user.id,
"username": current_user.username,
"email": current_user.email,
"full_name": current_user.full_name,
"role": current_user.role,
"is_active": current_user.is_active,
"mfa_enabled": current_user.mfa_enabled,
"created_at": current_user.created_at,
"last_login": current_user.last_login,
}
# ===== PASSWORD RESET ENDPOINTS =====
@router.post("/password/reset-request")
async def request_password_reset(request_data: PasswordResetRequest):
"""
Request a password reset link via email
"""
try:
# Check if user exists
user = auth_service.get_user_by_email(request_data.email)
if not user:
# Don't reveal if email exists for security
logger.info(
f"Password reset requested for unknown email: {request_data.email}"
)
return {"message": "If the email exists, a reset link has been sent"}
# Generate reset token (expires in 1 hour)
reset_token = auth_service.create_access_token(
{"sub": user.id, "type": "password_reset"}, expires_delta=timedelta(hours=1)
)
# Send email with reset link
reset_url = f"{os.getenv('FRONTEND_URL', 'http://localhost:3000')}/reset-password?token={reset_token}"
# Import email service
try:
from app.services.infrastructure.email_service import email_service
email_sent = await email_service.send_password_reset_email(
user.email, reset_url
)
if not email_sent:
logger.warning(f"Failed to send password reset email to {user.email}")
except ImportError:
logger.warning("Email service not available, logging reset URL instead")
logger.info(f"Password reset URL for {user.email}: {reset_url}")
# In production, integrate with email service:
# await email_service.send_password_reset(user.email, reset_url)
return {"message": "If the email exists, a reset link has been sent"}
except Exception as e:
logger.error(f"Password reset request error: {e}")
# Don't reveal internal errors for security
return {"message": "If the email exists, a reset link has been sent"}
@router.post("/password/reset")
async def reset_password(request_data: PasswordResetConfirm):
"""
Reset password using reset token
"""
try:
# Verify token
payload = auth_service.decode_token(request_data.token)
if payload.get("type") != "password_reset":
raise HTTPException(status_code=400, detail="Invalid reset token")
user_id = payload.get("sub")
user = db_service.get_user_by_id(user_id)
if not user:
raise HTTPException(status_code=400, detail="Invalid reset token")
# Validate new password
password_errors = auth_service.validate_password_strength(request_data.password)
if password_errors:
raise HTTPException(
status_code=400,
detail={
"message": "Password does not meet security requirements",
"errors": password_errors,
},
)
# Update password
hashed_password = auth_service.hash_password(request_data.password)
with db_service.get_db() as db:
db_user = db.query(User).filter(User.id == user_id).first()
db_user.password_hash = hashed_password
db_user.updated_at = datetime.now(timezone.utc)
db.commit()
logger.info(f"Password reset successful for user: {user.email}")
# Send confirmation email
try:
from app.services.infrastructure.email_service import email_service
await email_service.send_password_reset_confirmation(user.email)
except Exception as e:
logger.warning(f"Failed to send password reset confirmation email: {e}")
return {"message": "Password reset successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Password reset error: {e}")
raise HTTPException(status_code=500, detail="Password reset failed")
# ===== SESSION MANAGEMENT ENDPOINTS =====
@router.get("/sessions", response_model=list[SessionInfo])
async def get_user_sessions(
current_user: User = Depends(auth_service.get_current_user),
):
"""
Get all active sessions for the current user
"""
try:
from app.services.infrastructure.security_monitoring_service import (
security_monitoring,
)
sessions = await security_monitoring.get_user_sessions(current_user.id)
return sessions
except Exception as e:
logger.error(f"Failed to get user sessions: {e}")
return []
@router.delete("/sessions/{session_id}")
async def revoke_session(
session_id: str,
current_user: User = Depends(auth_service.get_current_user),
):
"""
Revoke a specific session
"""
try:
from app.services.infrastructure.security_monitoring_service import (
security_monitoring,
)
success = await security_monitoring.revoke_session(session_id, current_user.id)
if not success:
raise HTTPException(status_code=404, detail="Session not found")
return {"message": "Session revoked successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to revoke session: {e}")
raise HTTPException(status_code=500, detail="Failed to revoke session")
@router.delete("/sessions")
async def revoke_all_other_sessions(
current_user: User = Depends(auth_service.get_current_user),
):
"""
Revoke all sessions except the current one
"""
try:
from app.services.infrastructure.security_monitoring_service import (
security_monitoring,
)
count = await security_monitoring.revoke_all_user_sessions(
current_user.id, except_current=True
)
logger.info(
f"All other sessions revoked for user: {current_user.email} ({count} sessions)"
)
return {
"message": f"All other sessions revoked successfully ({count} sessions)"
}
except Exception as e:
logger.error(f"Failed to revoke all sessions: {e}")
raise HTTPException(status_code=500, detail="Failed to revoke sessions")
# ===== SECURITY MONITORING ENDPOINTS =====
@router.get("/admin/security/metrics", response_model=SecurityMetrics)
async def get_security_metrics(
current_user: User = Depends(rbac_service.require_role("admin")),
):
"""
Get security metrics and login attempt statistics (Admin only)
"""
try:
from app.services.infrastructure.security_monitoring_service import (
security_monitoring,
)
metrics = await security_monitoring.get_security_metrics()
return metrics
except Exception as e:
logger.error(f"Failed to get security metrics: {e}")
raise HTTPException(
status_code=500, detail="Failed to retrieve security metrics"
)
@router.get("/admin/security/login-attempts")
async def get_login_attempts(
limit: int = 50,
offset: int = 0,
current_user: User = Depends(rbac_service.require_role("admin")),
):
"""
Get detailed login attempt logs (Admin only)
"""
try:
# Get recent attempts from security monitoring service
from app.services.infrastructure.security_monitoring_service import (
security_monitoring,
)
metrics = await security_monitoring.get_security_metrics()
attempts = metrics.get("recent_attempts", [])
total = len(attempts)
return {
"attempts": attempts[offset : offset + limit],
"total": total,
"limit": limit,
"offset": offset,
}
except Exception as e:
logger.error(f"Failed to get login attempts: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve login attempts")