""" Clerk authentication utilities and integration. This module provides Clerk SDK integration, token validation, and user information extraction for the FastAPI backend. """ import logging from typing import Optional, Dict, Any from datetime import datetime import jwt from jwt import PyJWTError from fastapi import HTTPException, status from clerk_backend_api import Clerk from clerk_backend_api.models import User as ClerkUser from .config import get_settings logger = logging.getLogger(__name__) settings = get_settings() class ClerkAuthError(Exception): """Custom exception for Clerk authentication errors.""" pass class ClerkManager: """ Clerk authentication manager. Provides centralized Clerk SDK integration, token validation, and user management for the FastAPI application. """ def __init__(self): self._client: Optional[Clerk] = None self._is_initialized = False def initialize(self) -> None: """Initialize Clerk client.""" try: if not settings.clerk_secret_key: raise ClerkAuthError("Clerk secret key not configured") self._client = Clerk(bearer_auth=settings.clerk_secret_key) self._is_initialized = True logger.info("Clerk authentication initialized successfully") except Exception as e: logger.error(f"Failed to initialize Clerk authentication: {e}") self._is_initialized = False raise ClerkAuthError(f"Clerk initialization failed: {e}") @property def client(self) -> Clerk: """Get Clerk client instance.""" if not self._client or not self._is_initialized: raise ClerkAuthError("Clerk client not initialized") return self._client @property def is_initialized(self) -> bool: """Check if Clerk is initialized.""" return self._is_initialized async def verify_session_token(self, session_token: str) -> Dict[str, Any]: """ Verify Clerk session token and extract claims. Args: session_token: Clerk session token from request Returns: Dict containing token claims and user information Raises: ClerkAuthError: If token verification fails """ try: if not settings.clerk_jwt_verification: logger.warning("JWT verification is disabled - this should only be used in development") # In development, we might want to skip verification # This is NOT recommended for production return {"sub": "dev_user", "session_id": "dev_session"} # Verify the JWT token # Note: In a real implementation, you would need to fetch Clerk's public keys # and verify the token signature. For now, we'll decode without verification # in development mode only. if settings.is_development: # Development mode - decode without verification (NOT for production) decoded_token = jwt.decode( session_token, options={"verify_signature": False} ) else: # Production mode - proper verification needed # You would need to implement proper JWT verification with Clerk's public keys raise NotImplementedError( "Production JWT verification not implemented. " "Please implement proper JWT verification with Clerk's public keys." ) # Extract user ID and session information user_id = decoded_token.get("sub") session_id = decoded_token.get("sid") if not user_id: raise ClerkAuthError("Invalid token: missing user ID") return { "user_id": user_id, "session_id": session_id, "claims": decoded_token, "verified_at": datetime.utcnow().isoformat() } except PyJWTError as e: logger.error(f"JWT verification failed: {e}") raise ClerkAuthError(f"Invalid session token: {e}") except Exception as e: logger.error(f"Token verification error: {e}") raise ClerkAuthError(f"Token verification failed: {e}") async def get_user_info(self, user_id: str) -> Dict[str, Any]: """ Get user information from Clerk. Args: user_id: Clerk user ID Returns: Dict containing user information Raises: ClerkAuthError: If user retrieval fails """ try: # In development mode, provide fallback user info if Clerk API fails if settings.is_development: try: # Try to get user from Clerk user_response = self.client.users.get(user_id=user_id) if user_response and user_response.object: user: ClerkUser = user_response.object # Return the full user object structure for proper processing # Convert the Clerk user object to a dict with all fields user_dict = user.to_dict() if hasattr(user, 'to_dict') else {} # Ensure we have at least the basic structure user_info = { "id": getattr(user, 'id', user_id), "object": getattr(user, 'object', 'user'), "username": getattr(user, 'username', None), "first_name": getattr(user, 'first_name', None), "last_name": getattr(user, 'last_name', None), "image_url": getattr(user, 'image_url', None), "has_image": getattr(user, 'has_image', False), "primary_email_address_id": getattr(user, 'primary_email_address_id', None), "primary_phone_number_id": getattr(user, 'primary_phone_number_id', None), "primary_web3_wallet_id": getattr(user, 'primary_web3_wallet_id', None), "password_enabled": getattr(user, 'password_enabled', False), "two_factor_enabled": getattr(user, 'two_factor_enabled', False), "email_addresses": [], "phone_numbers": getattr(user, 'phone_numbers', []), "web3_wallets": getattr(user, 'web3_wallets', []), "external_accounts": getattr(user, 'external_accounts', []), "public_metadata": getattr(user, 'public_metadata', {}), "private_metadata": getattr(user, 'private_metadata', {}), "unsafe_metadata": getattr(user, 'unsafe_metadata', {}), "delete_self_enabled": getattr(user, 'delete_self_enabled', True), "create_organization_enabled": getattr(user, 'create_organization_enabled', True), "last_sign_in_at": getattr(user, 'last_sign_in_at', None), "banned": getattr(user, 'banned', False), "locked": getattr(user, 'locked', False), "lockout_expires_in_seconds": getattr(user, 'lockout_expires_in_seconds', None), "verification_attempts_remaining": getattr(user, 'verification_attempts_remaining', 3), "updated_at": getattr(user, 'updated_at', None), "created_at": getattr(user, 'created_at', None), "last_active_at": getattr(user, 'last_active_at', None) } # Extract email addresses with full structure if hasattr(user, 'email_addresses') and user.email_addresses: email_addresses = [] for email_addr in user.email_addresses: email_data = { "id": getattr(email_addr, 'id', None), "object": getattr(email_addr, 'object', 'email_address'), "email_address": getattr(email_addr, 'email_address', None), "verification": { "status": getattr(getattr(email_addr, 'verification', None), 'status', 'unverified'), "strategy": getattr(getattr(email_addr, 'verification', None), 'strategy', None), "attempts": getattr(getattr(email_addr, 'verification', None), 'attempts', None), "expire_at": getattr(getattr(email_addr, 'verification', None), 'expire_at', None) }, "linked_to": getattr(email_addr, 'linked_to', []) } email_addresses.append(email_data) user_info["email_addresses"] = email_addresses return user_info else: raise ClerkAuthError(f"User not found: {user_id}") except Exception as clerk_error: logger.warning(f"Clerk API failed in development mode, using fallback: {clerk_error}") # Fallback user info for development with proper structure return { "id": user_id, "object": "user", "username": "testuser", "first_name": "Test", "last_name": "User", "image_url": None, "has_image": False, "primary_email_address_id": "test_email_id", "primary_phone_number_id": None, "primary_web3_wallet_id": None, "password_enabled": False, "two_factor_enabled": False, "email_addresses": [ { "id": "test_email_id", "object": "email_address", "email_address": "test@example.com", "verification": { "status": "verified", "strategy": "admin", "attempts": None, "expire_at": None }, "linked_to": [] } ], "phone_numbers": [], "web3_wallets": [], "external_accounts": [], "public_metadata": {}, "private_metadata": {}, "unsafe_metadata": {}, "delete_self_enabled": True, "create_organization_enabled": True, "last_sign_in_at": int(datetime.utcnow().timestamp() * 1000), "banned": False, "locked": False, "lockout_expires_in_seconds": None, "verification_attempts_remaining": 3, "updated_at": int(datetime.utcnow().timestamp() * 1000), "created_at": int(datetime.utcnow().timestamp() * 1000), "last_active_at": int(datetime.utcnow().timestamp() * 1000) } else: # Production mode - strict Clerk API usage user_response = self.client.users.get(user_id=user_id) if not user_response or not user_response.object: raise ClerkAuthError(f"User not found: {user_id}") user: ClerkUser = user_response.object # Return the full user object structure for proper processing user_info = { "id": getattr(user, 'id', user_id), "object": getattr(user, 'object', 'user'), "username": getattr(user, 'username', None), "first_name": getattr(user, 'first_name', None), "last_name": getattr(user, 'last_name', None), "image_url": getattr(user, 'image_url', None), "has_image": getattr(user, 'has_image', False), "primary_email_address_id": getattr(user, 'primary_email_address_id', None), "primary_phone_number_id": getattr(user, 'primary_phone_number_id', None), "primary_web3_wallet_id": getattr(user, 'primary_web3_wallet_id', None), "password_enabled": getattr(user, 'password_enabled', False), "two_factor_enabled": getattr(user, 'two_factor_enabled', False), "email_addresses": [], "phone_numbers": getattr(user, 'phone_numbers', []), "web3_wallets": getattr(user, 'web3_wallets', []), "external_accounts": getattr(user, 'external_accounts', []), "public_metadata": getattr(user, 'public_metadata', {}), "private_metadata": getattr(user, 'private_metadata', {}), "unsafe_metadata": getattr(user, 'unsafe_metadata', {}), "delete_self_enabled": getattr(user, 'delete_self_enabled', True), "create_organization_enabled": getattr(user, 'create_organization_enabled', True), "last_sign_in_at": getattr(user, 'last_sign_in_at', None), "banned": getattr(user, 'banned', False), "locked": getattr(user, 'locked', False), "lockout_expires_in_seconds": getattr(user, 'lockout_expires_in_seconds', None), "verification_attempts_remaining": getattr(user, 'verification_attempts_remaining', 3), "updated_at": getattr(user, 'updated_at', None), "created_at": getattr(user, 'created_at', None), "last_active_at": getattr(user, 'last_active_at', None) } # Extract email addresses with full structure if hasattr(user, 'email_addresses') and user.email_addresses: email_addresses = [] for email_addr in user.email_addresses: email_data = { "id": getattr(email_addr, 'id', None), "object": getattr(email_addr, 'object', 'email_address'), "email_address": getattr(email_addr, 'email_address', None), "verification": { "status": getattr(getattr(email_addr, 'verification', None), 'status', 'unverified'), "strategy": getattr(getattr(email_addr, 'verification', None), 'strategy', None), "attempts": getattr(getattr(email_addr, 'verification', None), 'attempts', None), "expire_at": getattr(getattr(email_addr, 'verification', None), 'expire_at', None) }, "linked_to": getattr(email_addr, 'linked_to', []) } email_addresses.append(email_data) user_info["email_addresses"] = email_addresses return user_info except ClerkAuthError: raise except Exception as e: logger.error(f"Failed to get user info for {user_id}: {e}") raise ClerkAuthError(f"Failed to retrieve user information: {e}") async def validate_user_permissions(self, user_id: str, required_permission: str = None) -> bool: """ Validate user permissions (placeholder for future implementation). Args: user_id: Clerk user ID required_permission: Required permission (optional) Returns: True if user has required permissions """ try: # For now, all authenticated users have basic permissions # This can be extended to check specific roles/permissions user_info = await self.get_user_info(user_id) # Basic validation - user exists and is verified if not user_info.get("email_verified", False): logger.warning(f"User {user_id} email not verified") return False # TODO: Implement role-based permission checking # This would involve checking user roles/metadata in Clerk return True except Exception as e: logger.error(f"Permission validation failed for {user_id}: {e}") return False def health_check(self) -> Dict[str, Any]: """ Perform Clerk authentication health check. Returns: Dict containing health status """ try: if not self._is_initialized: return { "status": "unhealthy", "error": "Clerk client not initialized" } # Basic connectivity check # In a real implementation, you might want to make a test API call return { "status": "healthy", "initialized": self._is_initialized, "jwt_verification_enabled": settings.clerk_jwt_verification, "environment": settings.environment } except Exception as e: logger.error(f"Clerk health check failed: {e}") return { "status": "unhealthy", "error": str(e) } # Global Clerk manager instance clerk_manager = ClerkManager() def get_clerk_manager() -> ClerkManager: """ Get Clerk manager instance. Returns: ClerkManager instance """ return clerk_manager class AuthenticationError(HTTPException): """Custom authentication error with consistent formatting.""" def __init__(self, detail: str = "Authentication failed"): super().__init__( status_code=status.HTTP_401_UNAUTHORIZED, detail=detail, headers={"WWW-Authenticate": "Bearer"} ) class AuthorizationError(HTTPException): """Custom authorization error with consistent formatting.""" def __init__(self, detail: str = "Insufficient permissions"): super().__init__( status_code=status.HTTP_403_FORBIDDEN, detail=detail ) def extract_bearer_token(authorization_header: str) -> str: """ Extract bearer token from Authorization header. Args: authorization_header: Authorization header value Returns: Bearer token string Raises: AuthenticationError: If token extraction fails """ if not authorization_header: raise AuthenticationError("Missing authorization header") try: scheme, token = authorization_header.split(" ", 1) if scheme.lower() != "bearer": raise AuthenticationError("Invalid authentication scheme") if not token: raise AuthenticationError("Missing bearer token") return token except ValueError: raise AuthenticationError("Invalid authorization header format") async def verify_clerk_token(token: str) -> Dict[str, Any]: """ Verify Clerk session token and return user information. Args: token: Clerk session token Returns: Dict containing user information and token claims Raises: AuthenticationError: If token verification fails """ try: # Verify token with Clerk token_info = await clerk_manager.verify_session_token(token) # Get user information user_info = await clerk_manager.get_user_info(token_info["user_id"]) return { "token_info": token_info, "user_info": user_info } except ClerkAuthError as e: raise AuthenticationError(str(e)) except Exception as e: logger.error(f"Token verification failed: {e}") raise AuthenticationError("Token verification failed")