AIdea-Server / src /api /auth_routes.py
amanyelfiky's picture
Update src/api/auth_routes.py
d03ed31 verified
"""
Authentication API endpoints for user signup and login.
"""
from datetime import datetime
from datetime import timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr, Field
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from src.db.firebase import get_firebase_db
from src.db.models import User
from src.auth.security import hash_password, verify_password, create_access_token
from src.utils.logger import setup_logger
from src.utils.config import settings
logger = setup_logger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
# Request/Response Models
class SignupRequest(BaseModel):
"""Request model for user signup."""
email: EmailStr = Field(..., description="User email address")
username: str = Field(..., min_length=3, max_length=50, description="Username")
password: str = Field(..., min_length=6, description="Password (min 6 characters)")
age: Optional[int] = Field(None, ge=0, le=120, description="User age")
gender: Optional[str] = Field(None, max_length=20, description="User gender")
class Config:
json_schema_extra = {
"example": {
"email": "student@example.com",
"username": "Student123",
"password": "secure_password",
}
}
class UserResponse(BaseModel):
"""Response model for user data (without password)."""
id: str
email: str
username: str
role: str
age: Optional[int] = None
gender: Optional[str] = None
created_at: str
class Config:
json_schema_extra = {
"example": {
"id": "abc-123",
"email": "student@example.com",
"username": "Student123",
"role": "user",
"created_at": "2024-01-27T05:00:00",
}
}
class TokenResponse(BaseModel):
"""Response model for login token."""
access_token: str = Field(..., description="JWT access token")
token_type: str = Field(default="bearer", description="Token type")
expires_in: int = Field(..., description="Token expiration time in minutes")
class Config:
json_schema_extra = {
"example": {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 60,
}
}
@router.post(
"/signup", response_model=UserResponse, status_code=status.HTTP_201_CREATED
)
async def signup(
signup_data: SignupRequest
):
"""
Register a new user using Firestore.
"""
db = get_firebase_db()
if db is None:
raise HTTPException(status_code=500, detail="Firebase not configured")
# Check if email or username already exists
users_ref = db.collection("users")
email_check = users_ref.where("email", "==", signup_data.email).limit(1).stream()
username_check = users_ref.where("username", "==", signup_data.username).limit(1).stream()
if next(email_check, None) or next(username_check, None):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email or Username already registered",
)
# Create new user with hashed password
hashed_password_value = hash_password(signup_data.password)
user_dict = {
"email": signup_data.email,
"username": signup_data.username,
"password_hash": hashed_password_value,
"role": "user",
"age": signup_data.age,
"gender": signup_data.gender,
"created_at": datetime.utcnow()
}
_, new_user_ref = users_ref.add(user_dict)
logger.info(f"New user registered in Firestore: {signup_data.email}")
return UserResponse(
id=new_user_ref.id,
email=signup_data.email,
username=signup_data.username,
role="user",
age=signup_data.age,
gender=signup_data.gender,
created_at=str(user_dict["created_at"]),
)
@router.post("/login", response_model=TokenResponse)
async def login(
form_data: OAuth2PasswordRequestForm = Depends()
):
"""
Authenticate user and return JWT access token using Firestore.
"""
db = get_firebase_db()
if db is None:
raise HTTPException(status_code=500, detail="Firebase not configured")
users_ref = db.collection("users")
# Find user by username
query = users_ref.where("username", "==", form_data.username).limit(1).stream()
user_doc = next(query, None)
# If not found by username, try finding by email
if not user_doc:
query = users_ref.where("email", "==", form_data.username).limit(1).stream()
user_doc = next(query, None)
# Verify user exists and password is correct
if not user_doc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
user_data = user_doc.to_dict()
if not verify_password(form_data.password, user_data["password_hash"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user_data["username"]}, expires_delta=access_token_expires
)
logger.info(f"User logged in from Firestore: {user_data['username']}")
return TokenResponse(
access_token=access_token,
token_type="bearer",
expires_in=settings.access_token_expire_minutes,
)