Spaces:
Sleeping
Sleeping
File size: 2,934 Bytes
c09e844 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
"""
Authentication Router
Handles user registration and login
"""
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.database import get_db
from app.models import User
from app.schemas import UserCreate, UserResponse, Token
from app.services.auth_service import (
get_password_hash,
authenticate_user,
create_access_token,
get_current_user
)
from app.config import ACCESS_TOKEN_EXPIRE_MINUTES
router = APIRouter()
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(user_data: UserCreate, db: Session = Depends(get_db)):
"""
Register a new user
- **username**: Unique username (3-50 characters)
- **email**: Valid email address
- **password**: Password (minimum 6 characters)
"""
# Check if username exists
db_user = db.query(User).filter(User.username == user_data.username).first()
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Check if email exists
db_user = db.query(User).filter(User.email == user_data.email).first()
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
new_user = User(
username=user_data.username,
email=user_data.email,
hashed_password=get_password_hash(user_data.password)
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
@router.post("/login", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""
Login to get access token
- **username**: Your username
- **password**: Your password
Returns JWT access token for authentication
"""
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(current_user: User = Depends(get_current_user)):
"""
Get current authenticated user information
"""
return current_user
|