""" Authentication API endpoints for user signup and login. """ from datetime import datetime from datetime import timedelta from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from pydantic import BaseModel, EmailStr, Field from sqlmodel import select from sqlmodel.ext.asyncio.session import AsyncSession from src.db.firebase import get_firebase_db from src.db.models import User from src.auth.security import hash_password, verify_password, create_access_token from src.utils.logger import setup_logger from src.utils.config import settings logger = setup_logger(__name__) router = APIRouter(prefix="/auth", tags=["Authentication"]) # Request/Response Models class SignupRequest(BaseModel): """Request model for user signup.""" email: EmailStr = Field(..., description="User email address") username: str = Field(..., min_length=3, max_length=50, description="Username") password: str = Field(..., min_length=6, description="Password (min 6 characters)") age: Optional[int] = Field(None, ge=0, le=120, description="User age") gender: Optional[str] = Field(None, max_length=20, description="User gender") class Config: json_schema_extra = { "example": { "email": "student@example.com", "username": "Student123", "password": "secure_password", } } class UserResponse(BaseModel): """Response model for user data (without password).""" id: str email: str username: str role: str age: Optional[int] = None gender: Optional[str] = None created_at: str class Config: json_schema_extra = { "example": { "id": "abc-123", "email": "student@example.com", "username": "Student123", "role": "user", "created_at": "2024-01-27T05:00:00", } } class TokenResponse(BaseModel): """Response model for login token.""" access_token: str = Field(..., description="JWT access token") token_type: str = Field(default="bearer", description="Token type") expires_in: int = Field(..., description="Token expiration time in minutes") class Config: json_schema_extra = { "example": { "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "token_type": "bearer", "expires_in": 60, } } @router.post( "/signup", response_model=UserResponse, status_code=status.HTTP_201_CREATED ) async def signup( signup_data: SignupRequest ): """ Register a new user using Firestore. """ db = get_firebase_db() if db is None: raise HTTPException(status_code=500, detail="Firebase not configured") # Check if email or username already exists users_ref = db.collection("users") email_check = users_ref.where("email", "==", signup_data.email).limit(1).stream() username_check = users_ref.where("username", "==", signup_data.username).limit(1).stream() if next(email_check, None) or next(username_check, None): raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Email or Username already registered", ) # Create new user with hashed password hashed_password_value = hash_password(signup_data.password) user_dict = { "email": signup_data.email, "username": signup_data.username, "password_hash": hashed_password_value, "role": "user", "age": signup_data.age, "gender": signup_data.gender, "created_at": datetime.utcnow() } _, new_user_ref = users_ref.add(user_dict) logger.info(f"New user registered in Firestore: {signup_data.email}") return UserResponse( id=new_user_ref.id, email=signup_data.email, username=signup_data.username, role="user", age=signup_data.age, gender=signup_data.gender, created_at=str(user_dict["created_at"]), ) @router.post("/login", response_model=TokenResponse) async def login( form_data: OAuth2PasswordRequestForm = Depends() ): """ Authenticate user and return JWT access token using Firestore. """ db = get_firebase_db() if db is None: raise HTTPException(status_code=500, detail="Firebase not configured") users_ref = db.collection("users") # Find user by username query = users_ref.where("username", "==", form_data.username).limit(1).stream() user_doc = next(query, None) # If not found by username, try finding by email if not user_doc: query = users_ref.where("email", "==", form_data.username).limit(1).stream() user_doc = next(query, None) # Verify user exists and password is correct if not user_doc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) user_data = user_doc.to_dict() if not verify_password(form_data.password, user_data["password_hash"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # Create access token access_token_expires = timedelta(minutes=settings.access_token_expire_minutes) access_token = create_access_token( data={"sub": user_data["username"]}, expires_delta=access_token_expires ) logger.info(f"User logged in from Firestore: {user_data['username']}") return TokenResponse( access_token=access_token, token_type="bearer", expires_in=settings.access_token_expire_minutes, )