Spaces:
Running
Running
File size: 5,634 Bytes
690700c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
"""
Authentication API Endpoints
User registration, login, and token management
"""
from fastapi import APIRouter, HTTPException, status, Depends
from datetime import timedelta
from app.models.user import UserRegister, UserLogin, Token, UserResponse, TokenData
from app.db.repositories.user_repository import UserRepository
from app.utils.security import verify_password, create_access_token
from app.utils.dependencies import get_current_user
from app.config import settings
router = APIRouter()
@router.post("/register", response_model=Token, status_code=status.HTTP_201_CREATED)
async def register_user(user_data: UserRegister):
"""
Register a new user.
Creates a new user account with hashed password and returns
an access token for immediate login.
Args:
user_data: User registration data (email, password, full_name)
Returns:
Token: JWT access token and user info
Raises:
HTTPException: If email already exists
"""
user_repo = UserRepository()
try:
# Create user
user_id = await user_repo.create_user(
email=user_data.email,
password=user_data.password,
full_name=user_data.full_name
)
# Get created user
user = await user_repo.get_user_by_id(user_id)
# Generate access token
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"user_id": user["user_id"], "email": user["email"]},
expires_delta=access_token_expires
)
# Return token and user info
return Token(
access_token=access_token,
token_type="bearer",
user=UserResponse(
user_id=user["user_id"],
email=user["email"],
full_name=user["full_name"],
created_at=user["created_at"]
)
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
print(f"❌ Registration error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to register user"
)
@router.post("/login", response_model=Token)
async def login_user(user_data: UserLogin):
"""
Login user and get access token.
Validates user credentials and returns JWT access token.
Args:
user_data: User login data (email, password)
Returns:
Token: JWT access token and user info
Raises:
HTTPException: If credentials are invalid
"""
user_repo = UserRepository()
# Get user by email
user = await user_repo.get_user_by_email(user_data.email)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify password
if not verify_password(user_data.password, user["hashed_password"]):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Check if user is active
if not user.get("is_active", False):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is inactive"
)
# Generate access token
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"user_id": user["user_id"], "email": user["email"]},
expires_delta=access_token_expires
)
# Return token and user info
return Token(
access_token=access_token,
token_type="bearer",
user=UserResponse(
user_id=user["user_id"],
email=user["email"],
full_name=user["full_name"],
created_at=user["created_at"]
)
)
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(current_user: TokenData = Depends(get_current_user)):
"""
Get current authenticated user information.
Protected route that requires valid JWT token.
Args:
current_user: Current authenticated user (from token)
Returns:
UserResponse: Current user information
"""
user_repo = UserRepository()
user = await user_repo.get_user_by_id(current_user.user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return UserResponse(
user_id=user["user_id"],
email=user["email"],
full_name=user["full_name"],
created_at=user["created_at"]
)
@router.post("/logout")
async def logout_user(current_user: TokenData = Depends(get_current_user)):
"""
Logout user (client-side token deletion).
In JWT-based auth, logout is handled client-side by
deleting the token. This endpoint is for logging purposes.
Args:
current_user: Current authenticated user (from token)
Returns:
dict: Success message
"""
print(f"👋 User logged out: {current_user.email}")
return {
"message": "Successfully logged out",
"user_id": current_user.user_id
}
|