GrowWithTalha's picture
Update src/api/routes/auth.py
5de2fdf verified
"""Authentication routes for user registration, login, and session management
Provides endpoints for:
- POST /auth/register - User registration
- POST /auth/login - User login
- POST /auth/logout - User logout
- GET /auth/me - Get current user
"""
from fastapi import APIRouter, Depends, HTTPException, status, Response
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from src.config.database import get_db_session
from src.models.user import User
from src.models.schemas import (
UserCreate,
UserLogin,
UserResponse,
AuthResponse,
MessageResponse
)
from src.services.auth_service import AuthService
from src.api.middleware.auth_middleware import get_current_user, get_token_from_request
from src.utils.logger import get_logger
from src.utils.validators import sanitize_html
logger = get_logger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
# Max allowed length for bcrypt hashing (EXPANDED)
BCRYPT_PASSWORD_MAX_BYTES = 4096 # Was 72, but expanded to allow for saving a big password
@router.post(
"/register",
response_model=AuthResponse,
status_code=status.HTTP_201_CREATED,
summary="Register a new user",
description="Create a new user account with email and password"
)
async def register(
user_data: UserCreate,
response: Response,
db: AsyncSession = Depends(get_db_session)
) -> AuthResponse:
"""Register a new user account
Args:
user_data: User registration data (email, password)
response: FastAPI response object for setting cookies
db: Database session
Returns:
AuthResponse with user data and session token
Raises:
HTTPException: 400 if email already exists
"""
# Check if user already exists
result = await db.execute(
select(User).where(User.email == user_data.email.lower())
)
existing_user = result.scalar_one_or_none()
if existing_user:
logger.warning(f"Registration failed: email {user_data.email} already exists")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Truncate password to BCRYPT_PASSWORD_MAX_BYTES for hashing (now allows larger passwords)
password = user_data.password
password_bytes = password.encode("utf-8")
if len(password_bytes) > BCRYPT_PASSWORD_MAX_BYTES:
logger.warning(
f"Password for {user_data.email} is longer than {BCRYPT_PASSWORD_MAX_BYTES} bytes. Truncating..."
)
# Truncate the bytes and decode safely at a character boundary
truncated_bytes = password_bytes[:BCRYPT_PASSWORD_MAX_BYTES]
while True:
try:
password = truncated_bytes.decode("utf-8")
break
except UnicodeDecodeError:
truncated_bytes = truncated_bytes[:-1]
# else password remains if within byte limit
# Create new user
try:
user = await AuthService.create_user(
db=db,
email=user_data.email,
password=password
)
except AttributeError as e:
logger.error(
f"User creation failed due to bcrypt or passlib error: {e}. "
"This is likely caused by an incompatible version of bcrypt. "
"Please ensure 'bcrypt' and 'passlib' are installed and up to date. "
"Upgrade them using: pip install --upgrade bcrypt passlib"
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Server configuration error: bcrypt module issue, please contact support."
)
except Exception as e:
msg = str(e)
if "password cannot be longer than " in msg:
logger.error(
f"User creation failed: password too long for bcrypt for {user_data.email}: {e}. Manual truncation should have prevented this."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Password cannot be longer than {BCRYPT_PASSWORD_MAX_BYTES} bytes. Please use a shorter password."
)
logger.error(f"User creation failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create user account"
)
# Generate JWT token
token = AuthService.generate_jwt_token(user.id)
# Create session in database
await AuthService.create_session(db=db, user_id=user.id, token=token)
# Set HTTP-only cookie with JWT token
# Use samesite="none" for cross-origin requests (frontend on GitHub Pages, backend on HF)
response.set_cookie(
key="auth_token",
value=token,
httponly=True,
secure=True, # Required when using samesite="none"
samesite="none", # Allow cross-origin cookie sending
max_age=60 * 60 * 24 * 7 # 7 days
)
logger.info(f"User registered and logged in: {user.id}")
return AuthResponse(
user=UserResponse.model_validate(user),
message="Registration successful"
)
@router.post(
"/login",
response_model=AuthResponse,
status_code=status.HTTP_200_OK,
summary="Login user",
description="Authenticate user with email and password, create session"
)
async def login(
credentials: UserLogin,
response: Response,
db: AsyncSession = Depends(get_db_session)
) -> AuthResponse:
"""Authenticate user and create session
Args:
credentials: User login credentials (email, password)
response: FastAPI response object for setting cookies
db: Database session
Returns:
AuthResponse with user data and session token
Raises:
HTTPException: 401 if credentials are invalid
"""
# Truncate password to BCRYPT_PASSWORD_MAX_BYTES for hashing (now allows larger passwords)
password = credentials.password
password_bytes = password.encode("utf-8")
if len(password_bytes) > BCRYPT_PASSWORD_MAX_BYTES:
logger.warning(
f"Login attempt with password longer than {BCRYPT_PASSWORD_MAX_BYTES} bytes. Truncating for bcrypt."
)
truncated_bytes = password_bytes[:BCRYPT_PASSWORD_MAX_BYTES]
while True:
try:
password = truncated_bytes.decode("utf-8")
break
except UnicodeDecodeError:
truncated_bytes = truncated_bytes[:-1]
# Authenticate user
try:
user = await AuthService.authenticate_user(
db=db,
email=credentials.email,
password=password
)
except AttributeError as e:
logger.error(
f"User authentication failed due to bcrypt or passlib error: {e}. "
"Please ensure 'bcrypt' and 'passlib' are installed and up to date."
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Server configuration error: bcrypt module issue, please contact support."
)
except Exception as e:
msg = str(e)
if "password cannot be longer than " in msg:
logger.error(
f"User authentication failed: password too long for bcrypt: {e}. Manual truncation should have prevented this."
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Password cannot be longer than {BCRYPT_PASSWORD_MAX_BYTES} bytes."
)
logger.error(f"User authentication failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Server error during authentication"
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Generate JWT token
token = AuthService.generate_jwt_token(user.id)
# Create session in database
await AuthService.create_session(db=db, user_id=user.id, token=token)
# Set HTTP-only cookie with JWT token
# Use samesite="none" for cross-origin requests (frontend on GitHub Pages, backend on HF)
response.set_cookie(
key="auth_token",
value=token,
httponly=True,
secure=True, # Required when using samesite="none"
samesite="none", # Allow cross-origin cookie sending
max_age=60 * 60 * 24 * 7 # 7 days
)
logger.info(f"User logged in: {user.id}")
return AuthResponse(
user=UserResponse.model_validate(user),
message="Login successful"
)
@router.post(
"/logout",
response_model=MessageResponse,
status_code=status.HTTP_200_OK,
summary="Logout user",
description="Revoke user session and clear authentication cookie"
)
async def logout(
response: Response,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db_session)
) -> MessageResponse:
"""Logout user by revoking session
Args:
response: FastAPI response object for clearing cookies
current_user: Authenticated user (from middleware)
db: Database session
Returns:
MessageResponse confirming logout
"""
# Get token from cookie
from fastapi import Request
# Note: We need to extract token from request, but we already have current_user
# so we can just delete the cookie. In production, we'd also revoke the session.
# Clear HTTP-only cookie (must match samesite setting used when creating)
response.delete_cookie(key="auth_token", httponly=True, secure=True, samesite="none")
logger.info(f"User logged out: {current_user.id}")
return MessageResponse(message="Logout successful")
@router.get(
"/me",
response_model=UserResponse,
status_code=status.HTTP_200_OK,
summary="Get current user",
description=(
"Get authenticated user's profile information.\n\n"
"**Request format:**\n"
"- This endpoint expects an authenticated request.\n"
"- No request body is required.\n"
"- The user must include authentication credentials either by having a valid `auth_token` HTTP-only cookie set (as done on login/register), or (if supported) via Bearer token in the Authorization header.\n\n"
"**Required fields:**\n"
"- No fields are required in the request body. All authentication is handled via cookie/session (or token) middleware.\n"
)
)
async def get_current_user_profile(
current_user: User = Depends(get_current_user)
) -> UserResponse:
"""
Get current authenticated user's profile
Request format:
- No request body required.
- Requires a valid authentication token (in cookie 'auth_token' or Authorization header if supported).
Args:
current_user: Authenticated user (from middleware)
Returns:
UserResponse with user profile data
"""
return UserResponse.model_validate(current_user)