File size: 7,441 Bytes
e5fce98 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
"""
Authentication API routes.
Provides endpoints for user registration, login, logout, and token verification.
"""
from fastapi import APIRouter, Depends, HTTPException, Response, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import Session
from src.api.deps import get_current_user, get_db
from src.core.config import settings
from src.models.user import User
from src.schemas.auth import AuthResponse, LoginRequest, SignupRequest
from src.schemas.user import UserResponse
from src.services.auth_service import (
authenticate_user,
create_user,
create_user_token,
get_user_by_email,
)
router = APIRouter()
@router.post(
'/signup',
response_model=AuthResponse,
status_code=status.HTTP_201_CREATED,
summary='Register a new user',
description='Create a new user account with email and password',
)
async def signup(
user_data: SignupRequest,
db: Session = Depends(get_db),
):
"""
Register a new user.
Validates email format, checks for duplicate emails,
validates password strength, and creates a new user.
Args:
user_data: User registration data (name, email, password)
db: Database session
Returns:
AuthResponse: JWT token and user information
Raises:
HTTPException 400: If validation fails or email already exists
"""
print(f"DEBUG: Signup request received: {user_data.dict()}")
try:
# Create user
user = create_user(db, user_data)
# Generate JWT token
access_token = create_user_token(user.id)
# Return response
return AuthResponse(
access_token=access_token,
token_type='bearer',
user={
'id': str(user.id),
'name': user.name,
'email': user.email,
'avatar_url': user.avatar_url,
'created_at': user.created_at.isoformat(),
'updated_at': user.updated_at.isoformat(),
},
)
except ValueError as e:
# Handle validation errors
error_msg = str(e)
if 'already registered' in error_msg:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Email is already registered. Please use a different email or login.',
)
elif 'Password' in error_msg:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=error_msg,
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Validation failed: ' + error_msg,
)
except Exception as e:
# Handle unexpected errors
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='An error occurred while creating your account. Please try again.',
)
@router.post(
'/login',
response_model=AuthResponse,
summary='Login user',
description='Authenticate user with email and password',
)
async def login(
user_data: LoginRequest,
response: Response,
db: Session = Depends(get_db),
):
"""
Login a user.
Validates credentials and returns a JWT token.
Args:
user_data: Login credentials (email, password)
response: FastAPI response object
db: Database session
Returns:
AuthResponse: JWT token and user information
Raises:
HTTPException 401: If credentials are invalid
"""
print(f"DEBUG: Login request received: email={user_data.email}")
# Authenticate user
user = authenticate_user(db, user_data.email, user_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Invalid email or password',
headers={'WWW-Authenticate': 'Bearer'},
)
# Generate JWT token
access_token = create_user_token(user.id)
# Set httpOnly cookie (optional, for additional security)
response.set_cookie(
key='access_token',
value=access_token,
httponly=True,
secure=not settings.is_development, # HTTPS in production
samesite='lax',
max_age=settings.jwt_expiration_days * 24 * 60 * 60, # Convert days to seconds
)
# Return response
return AuthResponse(
access_token=access_token,
token_type='bearer',
user={
'id': str(user.id),
'name': user.name,
'email': user.email,
'avatar_url': user.avatar_url,
'created_at': user.created_at.isoformat(),
'updated_at': user.updated_at.isoformat(),
},
)
@router.post(
'/logout',
summary='Logout user',
description='Logout user and clear authentication token',
)
async def logout(response: Response):
"""
Logout a user.
Clears the authentication cookie.
Args:
response: FastAPI response object
Returns:
dict: Logout confirmation message
"""
# Clear authentication cookie
response.delete_cookie('access_token')
return {'message': 'Successfully logged out'}
@router.get(
'/me',
response_model=UserResponse,
summary='Get current user',
description='Get information about the currently authenticated user',
)
async def get_current_user_info(
current_user: User = Depends(get_current_user),
):
"""
Get current authenticated user.
Requires valid JWT token in Authorization header.
Args:
current_user: Current user from dependency
Returns:
UserResponse: Current user information
"""
return current_user
# OAuth2 compatible endpoint for token generation
@router.post(
'/token',
response_model=AuthResponse,
summary='Get access token',
description='OAuth2 compatible endpoint to get access token',
)
async def get_access_token(
response: Response,
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db),
):
"""
OAuth2 compatible token endpoint.
Used by OAuth2 clients to obtain access tokens.
Args:
form_data: OAuth2 password request form
response: FastAPI response object
db: Database session
Returns:
AuthResponse: JWT token and user information
"""
# Use login logic
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Incorrect email or password',
headers={'WWW-Authenticate': 'Bearer'},
)
access_token = create_user_token(user.id)
# Set cookie
response.set_cookie(
key='access_token',
value=access_token,
httponly=True,
secure=not settings.is_development,
samesite='lax',
max_age=settings.jwt_expiration_days * 24 * 60 * 60,
)
return AuthResponse(
access_token=access_token,
token_type='bearer',
user={
'id': str(user.id),
'name': user.name,
'email': user.email,
'avatar_url': user.avatar_url,
'created_at': user.created_at.isoformat(),
'updated_at': user.updated_at.isoformat(),
},
)
# Export router
__all__ = ['router']
|