File size: 3,989 Bytes
1941764 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | """
Authentication API endpoints for user signup and signin.
This module provides:
- POST /api/auth/signup - Create new user account
- POST /api/auth/signin - Authenticate existing user
"""
from fastapi import APIRouter, HTTPException, Depends
from sqlmodel import Session, select
from pydantic import BaseModel, EmailStr, Field
from ..models.user import User
from ..services.auth import hash_password, verify_password, create_access_token
from ..database import get_session
router = APIRouter()
# Request/Response Models
class SignUpRequest(BaseModel):
"""Request model for user signup."""
email: EmailStr = Field(..., description="User email address")
password: str = Field(..., min_length=8, description="User password (minimum 8 characters)")
class SignInRequest(BaseModel):
"""Request model for user signin."""
email: EmailStr = Field(..., description="User email address")
password: str = Field(..., description="User password")
class UserResponse(BaseModel):
"""User data response model."""
id: int
email: str
created_at: str
updated_at: str
class AuthResponse(BaseModel):
"""Authentication response with token and user data."""
token: str
user: UserResponse
@router.post("/signup", response_model=AuthResponse, status_code=201)
async def signup(
request: SignUpRequest,
session: Session = Depends(get_session)
) -> AuthResponse:
"""
Create a new user account.
Args:
request: Signup request with email and password
session: Database session
Returns:
AuthResponse with JWT token and user data
Raises:
HTTPException 400: If email already exists
HTTPException 422: If validation fails
"""
# Check if email already exists
statement = select(User).where(User.email == request.email)
existing_user = session.exec(statement).first()
if existing_user:
raise HTTPException(
status_code=400,
detail="Email already registered"
)
# Hash password
hashed_password = hash_password(request.password)
# Create new user
new_user = User(
email=request.email,
hashed_password=hashed_password
)
session.add(new_user)
session.commit()
session.refresh(new_user)
# Create JWT token
token = create_access_token(
data={
"user_id": new_user.id,
"email": new_user.email
}
)
# Return response
return AuthResponse(
token=token,
user=UserResponse(
id=new_user.id,
email=new_user.email,
created_at=new_user.created_at.isoformat(),
updated_at=new_user.updated_at.isoformat()
)
)
@router.post("/signin", response_model=AuthResponse)
async def signin(
request: SignInRequest,
session: Session = Depends(get_session)
) -> AuthResponse:
"""
Authenticate an existing user.
Args:
request: Signin request with email and password
session: Database session
Returns:
AuthResponse with JWT token and user data
Raises:
HTTPException 401: If credentials are invalid
"""
# Find user by email
statement = select(User).where(User.email == request.email)
user = session.exec(statement).first()
# Verify user exists and password is correct
if not user or not verify_password(request.password, user.hashed_password):
raise HTTPException(
status_code=401,
detail="Invalid email or password"
)
# Create JWT token
token = create_access_token(
data={
"user_id": user.id,
"email": user.email
}
)
# Return response
return AuthResponse(
token=token,
user=UserResponse(
id=user.id,
email=user.email,
created_at=user.created_at.isoformat(),
updated_at=user.updated_at.isoformat()
)
)
|