"""Authentication service for user signup and signin.""" from sqlmodel import Session, select from fastapi import HTTPException, status from src.models.user import User from src.schemas.auth import SignupRequest, SigninRequest, SignupResponse, TokenResponse, UserProfile from src.core.security import hash_password, verify_password, create_jwt_token from src.core.config import settings from datetime import datetime import re class AuthService: """Service for handling authentication operations.""" def __init__(self, db: Session): self.db = db def signup(self, signup_data: SignupRequest) -> SignupResponse: """ Create a new user account. Args: signup_data: User signup information Returns: SignupResponse with created user details Raises: HTTPException: 409 if email already exists HTTPException: 400 if validation fails """ # Validate email format (RFC 5322) if not self._validate_email(signup_data.email): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid email format", ) # Validate password strength password_errors = self._validate_password(signup_data.password) if password_errors: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Password does not meet requirements", headers={"X-Password-Errors": ", ".join(password_errors)} ) # Check if email already exists existing_user = self.db.exec( select(User).where(User.email == signup_data.email) ).first() if existing_user: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Email already registered", ) # Hash password password_hash = hash_password(signup_data.password) # Create user user = User( email=signup_data.email, name=signup_data.name, password_hash=password_hash, created_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) self.db.add(user) self.db.commit() self.db.refresh(user) return SignupResponse( id=user.id, email=user.email, name=user.name, created_at=user.created_at, ) def signin(self, signin_data: SigninRequest) -> TokenResponse: """ Authenticate user and issue JWT token. Args: signin_data: User signin credentials Returns: TokenResponse with JWT token and user profile Raises: HTTPException: 401 if credentials are invalid """ # Find user by email user = self.db.exec( select(User).where(User.email == signin_data.email) ).first() # Verify password if not user or not verify_password(signin_data.password, user.password_hash): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", ) # Create JWT token token = create_jwt_token( user_id=user.id, email=user.email, secret=settings.BETTER_AUTH_SECRET, expiration_days=settings.JWT_EXPIRATION_DAYS, ) # Calculate expiration in seconds (7 days = 604800 seconds) expires_in = settings.JWT_EXPIRATION_DAYS * 24 * 60 * 60 return TokenResponse( access_token=token, token_type="bearer", expires_in=expires_in, user=UserProfile( id=user.id, email=user.email, name=user.name, created_at=user.created_at, ), ) def _validate_email(self, email: str) -> bool: """ Validate email format (RFC 5322). Args: email: Email address to validate Returns: True if valid, False otherwise """ # Simplified RFC 5322 email validation pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not None def _validate_password(self, password: str) -> list[str]: """ Validate password strength. Requirements: - Minimum 8 characters - At least one uppercase letter - At least one lowercase letter - At least one number Args: password: Password to validate Returns: List of validation error messages (empty if valid) """ errors = [] if len(password) < 8: errors.append("Password must be at least 8 characters") if not re.search(r'[A-Z]', password): errors.append("Password must contain at least one uppercase letter") if not re.search(r'[a-z]', password): errors.append("Password must contain at least one lowercase letter") if not re.search(r'\d', password): errors.append("Password must contain at least one number") return errors