Spaces:
Sleeping
Sleeping
| """ | |
| Authentication Router | |
| Handles user registration and login | |
| """ | |
| from datetime import timedelta | |
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from sqlalchemy.orm import Session | |
| from app.database import get_db | |
| from app.models import User | |
| from app.schemas import UserCreate, UserResponse, Token | |
| from app.services.auth_service import ( | |
| get_password_hash, | |
| authenticate_user, | |
| create_access_token, | |
| get_current_user | |
| ) | |
| from app.config import ACCESS_TOKEN_EXPIRE_MINUTES | |
| router = APIRouter() | |
| async def register(user_data: UserCreate, db: Session = Depends(get_db)): | |
| """ | |
| Register a new user | |
| - **username**: Unique username (3-50 characters) | |
| - **email**: Valid email address | |
| - **password**: Password (minimum 6 characters) | |
| """ | |
| # Check if username exists | |
| db_user = db.query(User).filter(User.username == user_data.username).first() | |
| if db_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Username already registered" | |
| ) | |
| # Check if email exists | |
| db_user = db.query(User).filter(User.email == user_data.email).first() | |
| if db_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email already registered" | |
| ) | |
| # Create new user | |
| new_user = User( | |
| username=user_data.username, | |
| email=user_data.email, | |
| hashed_password=get_password_hash(user_data.password) | |
| ) | |
| db.add(new_user) | |
| db.commit() | |
| db.refresh(new_user) | |
| return new_user | |
| async def login( | |
| form_data: OAuth2PasswordRequestForm = Depends(), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Login to get access token | |
| - **username**: Your username | |
| - **password**: Your password | |
| Returns JWT access token for authentication | |
| """ | |
| user = authenticate_user(db, form_data.username, form_data.password) | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect username or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| access_token = create_access_token( | |
| data={"sub": user.username}, expires_delta=access_token_expires | |
| ) | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| async def get_current_user_info(current_user: User = Depends(get_current_user)): | |
| """ | |
| Get current authenticated user information | |
| """ | |
| return current_user | |