Spaces:
Sleeping
Sleeping
| """ | |
| Authentication endpoints for token management | |
| """ | |
| from fastapi import APIRouter, HTTPException, Depends, Request | |
| from fastapi.security import HTTPBearer | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any | |
| import jwt | |
| import os | |
| import uuid | |
| import logging | |
| from ..schemas.auth_schemas import TokenRequest, TokenResponse, UserRegistrationRequest, UserResponse | |
| from ....core.config import settings | |
| from ....core.auth import authenticate_request, require_current_user | |
| from ....services.database import ( | |
| get_user_by_username, | |
| get_user_by_email, | |
| create_user, | |
| verify_password, | |
| update_last_login, | |
| create_user_session, | |
| get_user_session, | |
| revoke_user_session | |
| ) | |
| from ....services.auth_logger import AuthEventLogger, get_client_info | |
| # Setup logger for authentication events | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| security = HTTPBearer(auto_error=False) | |
| async def get_access_token(credentials: TokenRequest, request: Request): | |
| """ | |
| Exchange user credentials for a temporary access token | |
| This endpoint allows users to authenticate and receive a JWT token | |
| """ | |
| # Extract client information for logging | |
| client_ip, user_agent = get_client_info(request) | |
| # Validate user credentials against your Supabase database | |
| user = await validate_user_credentials(credentials.model_dump()) | |
| if not user: | |
| # Log failed login attempt | |
| AuthEventLogger.log_login_attempt( | |
| username=credentials.username, | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=False, | |
| failure_reason="Invalid credentials" | |
| ) | |
| raise HTTPException(status_code=401, detail="Invalid credentials") | |
| # Update last login timestamp | |
| await update_last_login(user["id"]) | |
| # Generate unique JTI (JWT ID) for session tracking | |
| jti = str(uuid.uuid4()) | |
| expires_at = datetime.utcnow() + timedelta(hours=24) | |
| # Create session in database | |
| await create_user_session(user["id"], jti, expires_at) | |
| # Generate a temporary JWT token with minimal payload | |
| payload = { | |
| "sub": credentials.username, # Subject (username) - needed for user lookup | |
| "jti": jti, # JWT ID for session tracking | |
| "exp": expires_at, # Expiration time | |
| "iat": datetime.utcnow(), # Issued at time | |
| "type": "access_token" # Token type | |
| # Note: user_id removed - will be looked up from database using username | |
| } | |
| secret_key = os.getenv("SECRET_KEY", "your-secret-key-change-this") | |
| token = jwt.encode(payload, secret_key, algorithm="HS256") | |
| # Log successful login | |
| AuthEventLogger.log_login_attempt( | |
| username=credentials.username, | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=True, | |
| user_id=user["id"] | |
| ) | |
| return TokenResponse( | |
| access_token=token, | |
| token_type="bearer", | |
| expires_in=86400 # 24 hours | |
| ) | |
| async def refresh_access_token(refresh_token: str, request: Request): | |
| """ | |
| Refresh an expired access token | |
| """ | |
| # Extract client information for logging | |
| client_ip, user_agent = get_client_info(request) | |
| try: | |
| secret_key = os.getenv("SECRET_KEY", "your-secret-key-change-this") | |
| payload = jwt.decode(refresh_token, secret_key, algorithms=["HS256"]) | |
| if payload.get("type") != "refresh_token": | |
| AuthEventLogger.log_token_refresh( | |
| username=payload.get("sub", "unknown"), | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=False, | |
| failure_reason="Invalid token type" | |
| ) | |
| raise HTTPException(status_code=401, detail="Invalid token type") | |
| # Generate new access token | |
| new_payload = { | |
| "sub": payload["sub"], | |
| "exp": datetime.utcnow() + timedelta(hours=24), | |
| "iat": datetime.utcnow(), | |
| "type": "access_token" | |
| } | |
| new_token = jwt.encode(new_payload, secret_key, algorithm="HS256") | |
| AuthEventLogger.log_token_refresh( | |
| username=payload["sub"], | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=True | |
| ) | |
| return TokenResponse( | |
| access_token=new_token, | |
| token_type="bearer", | |
| expires_in=86400 | |
| ) | |
| except jwt.InvalidTokenError as e: | |
| AuthEventLogger.log_token_refresh( | |
| username="unknown", | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=False, | |
| failure_reason=f"Invalid refresh token: {str(e)}" | |
| ) | |
| raise HTTPException(status_code=401, detail="Invalid refresh token") | |
| async def register_user(user_data: UserRegistrationRequest, request: Request): | |
| """ | |
| Register a new user account | |
| """ | |
| # Extract client information for logging | |
| client_ip, user_agent = get_client_info(request) | |
| # Check if username already exists | |
| existing_user = await get_user_by_username(user_data.username) | |
| if existing_user: | |
| AuthEventLogger.log_registration_attempt( | |
| username=user_data.username, | |
| email=user_data.email, | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=False, | |
| failure_reason="Username already exists" | |
| ) | |
| raise HTTPException(status_code=400, detail="Username already exists") | |
| # Check if email already exists | |
| existing_email = await get_user_by_email(user_data.email) | |
| if existing_email: | |
| AuthEventLogger.log_registration_attempt( | |
| username=user_data.username, | |
| email=user_data.email, | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=False, | |
| failure_reason="Email already registered" | |
| ) | |
| raise HTTPException(status_code=400, detail="Email already registered") | |
| # Create new user | |
| new_user = await create_user(user_data.username, user_data.email, user_data.password) | |
| if not new_user: | |
| AuthEventLogger.log_registration_attempt( | |
| username=user_data.username, | |
| email=user_data.email, | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=False, | |
| failure_reason="Database error - failed to create user" | |
| ) | |
| raise HTTPException(status_code=500, detail="Failed to create user") | |
| AuthEventLogger.log_registration_attempt( | |
| username=user_data.username, | |
| email=user_data.email, | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=True, | |
| user_id=new_user["id"] | |
| ) | |
| return UserResponse(**new_user) | |
| async def logout_user(request: Request, token: str = Depends(security)): | |
| """ | |
| Logout user by revoking their session | |
| """ | |
| # Extract client information for logging | |
| client_ip, user_agent = get_client_info(request) | |
| if not token: | |
| AuthEventLogger.log_logout( | |
| username="unknown", | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=False, | |
| failure_reason="No token provided" | |
| ) | |
| raise HTTPException(status_code=401, detail="No token provided") | |
| try: | |
| secret_key = os.getenv("SECRET_KEY", "your-secret-key-change-this") | |
| payload = jwt.decode(token.credentials, secret_key, algorithms=["HS256"]) | |
| jti = payload.get("jti") | |
| username = payload.get("sub") | |
| if jti: | |
| await revoke_user_session(jti) | |
| AuthEventLogger.log_logout( | |
| username=username, | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=True | |
| ) | |
| return {"message": "Successfully logged out"} | |
| else: | |
| AuthEventLogger.log_logout( | |
| username=username, | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=False, | |
| failure_reason="Invalid token format - missing JTI" | |
| ) | |
| raise HTTPException(status_code=400, detail="Invalid token format") | |
| except jwt.InvalidTokenError as e: | |
| AuthEventLogger.log_logout( | |
| username="unknown", | |
| client_ip=client_ip, | |
| user_agent=user_agent, | |
| success=False, | |
| failure_reason=f"Invalid token: {str(e)}" | |
| ) | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| async def get_current_user_info(current_user: Dict[str, Any] = Depends(require_current_user)): | |
| """ | |
| Get current authenticated user information | |
| Requires valid JWT token authentication | |
| """ | |
| return UserResponse(**current_user) | |
| async def validate_user_credentials(credentials: Dict[str, Any]) -> Dict[str, Any] | None: | |
| """ | |
| Validate user credentials against your Supabase database | |
| Returns user data if valid, None if invalid | |
| """ | |
| username = credentials.get("username") | |
| password = credentials.get("password") | |
| if not username or not password: | |
| AuthEventLogger.log_credential_validation( | |
| username=username or "unknown", | |
| success=False, | |
| failure_reason="Missing username or password" | |
| ) | |
| return None | |
| # Get user from database | |
| user = await get_user_by_username(username) | |
| if not user: | |
| AuthEventLogger.log_credential_validation( | |
| username=username, | |
| success=False, | |
| failure_reason="User not found in database" | |
| ) | |
| return None | |
| # Verify password | |
| if not await verify_password(password, user["password_hash"]): | |
| AuthEventLogger.log_credential_validation( | |
| username=username, | |
| success=False, | |
| failure_reason="Invalid password" | |
| ) | |
| return None | |
| AuthEventLogger.log_credential_validation( | |
| username=username, | |
| success=True | |
| ) | |
| return user | |