Spaces:
Running
Running
| """ | |
| Authentication API endpoints for user signup and login. | |
| """ | |
| from datetime import datetime | |
| from datetime import timedelta | |
| from typing import Optional | |
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from pydantic import BaseModel, EmailStr, Field | |
| from sqlmodel import select | |
| from sqlmodel.ext.asyncio.session import AsyncSession | |
| from src.db.firebase import get_firebase_db | |
| from src.db.models import User | |
| from src.auth.security import hash_password, verify_password, create_access_token | |
| from src.utils.logger import setup_logger | |
| from src.utils.config import settings | |
| logger = setup_logger(__name__) | |
| router = APIRouter(prefix="/auth", tags=["Authentication"]) | |
| # Request/Response Models | |
| class SignupRequest(BaseModel): | |
| """Request model for user signup.""" | |
| email: EmailStr = Field(..., description="User email address") | |
| username: str = Field(..., min_length=3, max_length=50, description="Username") | |
| password: str = Field(..., min_length=6, description="Password (min 6 characters)") | |
| age: Optional[int] = Field(None, ge=0, le=120, description="User age") | |
| gender: Optional[str] = Field(None, max_length=20, description="User gender") | |
| class Config: | |
| json_schema_extra = { | |
| "example": { | |
| "email": "student@example.com", | |
| "username": "Student123", | |
| "password": "secure_password", | |
| } | |
| } | |
| class UserResponse(BaseModel): | |
| """Response model for user data (without password).""" | |
| id: str | |
| email: str | |
| username: str | |
| role: str | |
| age: Optional[int] = None | |
| gender: Optional[str] = None | |
| created_at: str | |
| class Config: | |
| json_schema_extra = { | |
| "example": { | |
| "id": "abc-123", | |
| "email": "student@example.com", | |
| "username": "Student123", | |
| "role": "user", | |
| "created_at": "2024-01-27T05:00:00", | |
| } | |
| } | |
| class TokenResponse(BaseModel): | |
| """Response model for login token.""" | |
| access_token: str = Field(..., description="JWT access token") | |
| token_type: str = Field(default="bearer", description="Token type") | |
| expires_in: int = Field(..., description="Token expiration time in minutes") | |
| class Config: | |
| json_schema_extra = { | |
| "example": { | |
| "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", | |
| "token_type": "bearer", | |
| "expires_in": 60, | |
| } | |
| } | |
| async def signup( | |
| signup_data: SignupRequest | |
| ): | |
| """ | |
| Register a new user using Firestore. | |
| """ | |
| db = get_firebase_db() | |
| if db is None: | |
| raise HTTPException(status_code=500, detail="Firebase not configured") | |
| # Check if email or username already exists | |
| users_ref = db.collection("users") | |
| email_check = users_ref.where("email", "==", signup_data.email).limit(1).stream() | |
| username_check = users_ref.where("username", "==", signup_data.username).limit(1).stream() | |
| if next(email_check, None) or next(username_check, None): | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="Email or Username already registered", | |
| ) | |
| # Create new user with hashed password | |
| hashed_password_value = hash_password(signup_data.password) | |
| user_dict = { | |
| "email": signup_data.email, | |
| "username": signup_data.username, | |
| "password_hash": hashed_password_value, | |
| "role": "user", | |
| "age": signup_data.age, | |
| "gender": signup_data.gender, | |
| "created_at": datetime.utcnow() | |
| } | |
| _, new_user_ref = users_ref.add(user_dict) | |
| logger.info(f"New user registered in Firestore: {signup_data.email}") | |
| return UserResponse( | |
| id=new_user_ref.id, | |
| email=signup_data.email, | |
| username=signup_data.username, | |
| role="user", | |
| age=signup_data.age, | |
| gender=signup_data.gender, | |
| created_at=str(user_dict["created_at"]), | |
| ) | |
| async def login( | |
| form_data: OAuth2PasswordRequestForm = Depends() | |
| ): | |
| """ | |
| Authenticate user and return JWT access token using Firestore. | |
| """ | |
| db = get_firebase_db() | |
| if db is None: | |
| raise HTTPException(status_code=500, detail="Firebase not configured") | |
| users_ref = db.collection("users") | |
| # Find user by username | |
| query = users_ref.where("username", "==", form_data.username).limit(1).stream() | |
| user_doc = next(query, None) | |
| # If not found by username, try finding by email | |
| if not user_doc: | |
| query = users_ref.where("email", "==", form_data.username).limit(1).stream() | |
| user_doc = next(query, None) | |
| # Verify user exists and password is correct | |
| if not user_doc: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect username or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| user_data = user_doc.to_dict() | |
| if not verify_password(form_data.password, user_data["password_hash"]): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect username or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| # Create access token | |
| access_token_expires = timedelta(minutes=settings.access_token_expire_minutes) | |
| access_token = create_access_token( | |
| data={"sub": user_data["username"]}, expires_delta=access_token_expires | |
| ) | |
| logger.info(f"User logged in from Firestore: {user_data['username']}") | |
| return TokenResponse( | |
| access_token=access_token, | |
| token_type="bearer", | |
| expires_in=settings.access_token_expire_minutes, | |
| ) | |