Spaces:
Sleeping
Sleeping
File size: 7,545 Bytes
697c967 6a3de9e 697c967 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel.ext.asyncio.session import AsyncSession
from pydantic import BaseModel
from database.session import get_session_dep
from models.user import User, UserCreate
from services.user_service import UserService
from auth.jwt_handler import create_access_token, create_refresh_token, verify_token
from utils.logging import get_logger
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import logging
router = APIRouter()
logger = get_logger(__name__)
# Models for auth endpoints
class UserLogin(BaseModel):
email: str
password: str # In a real app, this would be hashed, but for this demo we'll keep it simple
class UserRegister(BaseModel):
email: str
password: str # In a real app, this would be hashed
name: str
class AuthResponse(BaseModel):
user: dict
token: str
refresh_token: str = None
# Initialize security for token verification (for logout)
security = HTTPBearer()
@router.post("/auth/register", response_model=AuthResponse, status_code=status.HTTP_201_CREATED)
async def register_user(
user_data: UserRegister,
session: AsyncSession = Depends(get_session_dep)
):
"""
Register a new user and return JWT token.
Args:
user_data: User registration data (email, password, name)
session: Database session
Returns:
AuthResponse with user data and JWT token
"""
try:
# Create user data object for the service
user_create_data = UserCreate(
email=user_data.email,
name=user_data.name
)
# Create user in database
created_user = await UserService.create_user(session, user_create_data)
# Create JWT tokens
token_data = {"sub": str(created_user.id), "email": created_user.email}
token = create_access_token(data=token_data)
refresh_token = create_refresh_token(data=token_data)
logger.info(f"Successfully registered user {created_user.id} with email {created_user.email}")
return AuthResponse(
user=created_user.model_dump(),
token=token,
refresh_token=refresh_token
)
except HTTPException:
# Re-raise HTTP exceptions (like 400 for duplicate email)
raise
except Exception as e:
logger.error(f"Error registering user with email {user_data.email}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error registering user"
)
@router.post("/auth/login", response_model=AuthResponse)
async def login_user(
user_data: UserLogin,
session: AsyncSession = Depends(get_session_dep)
):
"""
Login a user and return JWT token.
Args:
user_data: User login data (email, password)
session: Database session
Returns:
AuthResponse with user data and JWT token
"""
try:
# Find user by email
user = await UserService.get_user_by_email(session, user_data.email)
if not user:
logger.warning(f"Login attempt with non-existent email: {user_data.email}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
# In a real app, we would verify the password here.
# For this implementation, we'll just proceed with login.
# Create JWT tokens
token_data = {"sub": str(user.id), "email": user.email}
token = create_access_token(data=token_data)
refresh_token = create_refresh_token(data=token_data)
logger.info(f"Successfully logged in user {user.id} with email {user.email}")
# Convert user to dict for response
user_dict = {
"id": user.id,
"email": user.email,
"name": user.name,
"created_at": user.created_at
}
return AuthResponse(
user=user_dict,
token=token,
refresh_token=refresh_token
)
except HTTPException:
# Re-raise HTTP exceptions (like 401 for invalid credentials)
raise
except Exception as e:
logger.error(f"Error logging in user with email {user_data.email}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error during login"
)
@router.post("/auth/logout")
async def logout_user(
token: HTTPAuthorizationCredentials = Depends(security)
):
"""
Logout endpoint.
In a real application, this would add the token to a blacklist/jti store.
For this implementation, we'll just return a success message.
"""
try:
# In a real app, you would add the token to a blacklist or token revocation store
# For this demo, we'll just return a success message
logger.info(f"User logged out successfully")
return {"message": "Successfully logged out"}
except Exception as e:
logger.error(f"Error during logout: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error during logout"
)
class RefreshTokenRequest(BaseModel):
refresh_token: str
@router.post("/auth/refresh", response_model=AuthResponse)
async def refresh_token(
refresh_request: RefreshTokenRequest
):
"""
Refresh access token using a valid refresh token.
Args:
refresh_request: Contains the refresh token to use for generating a new access token
Returns:
AuthResponse with new access token and refresh token
"""
try:
# Verify the refresh token
payload = verify_token(refresh_request.refresh_token)
# Check if this is a refresh token (not an access token)
token_type = payload.get("type")
if token_type != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type for refresh",
headers={"WWW-Authenticate": "Bearer"},
)
# Extract user data from the refresh token
user_id = payload.get("sub")
user_email = payload.get("email")
if not user_id or not user_email:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
# Create new access and refresh tokens
token_data = {"sub": user_id, "email": user_email}
new_access_token = create_access_token(data=token_data)
new_refresh_token = create_refresh_token(data=token_data)
logger.info(f"Successfully refreshed token for user {user_id}")
# Return new tokens with minimal user data (we don't have full user details here)
user_dict = {
"id": user_id,
"email": user_email
}
return AuthResponse(
user=user_dict,
token=new_access_token,
refresh_token=new_refresh_token
)
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
logger.error(f"Error refreshing token: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not refresh token",
headers={"WWW-Authenticate": "Bearer"},
) |