| from fastapi import APIRouter, Depends, HTTPException, status |
| from fastapi.security import OAuth2PasswordRequestForm |
| from sqlalchemy.orm import Session |
| from typing import Annotated |
| from datetime import timedelta |
|
|
| from app.models.user import UserLogin |
| from app.models.token import Token |
| from app.db.base import get_db |
| from app.db.models import User |
| from app.utils.security import verify_password |
| from app.utils.auth import create_access_token, get_current_user |
| from app.core.config import settings |
|
|
| router = APIRouter( |
| prefix="/auth", |
| tags=["Authentication"] |
| ) |
|
|
| @router.post("/login", response_model=Token) |
| async def login( |
| form_data: Annotated[OAuth2PasswordRequestForm, Depends()], |
| db: Annotated[Session, Depends(get_db)] |
| ): |
| """ |
| OAuth2 compatible token login, get an access token for future requests. |
| """ |
| |
| user = db.query(User).filter(User.email == form_data.username).first() |
| |
| if not user: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Incorrect email or password", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| |
| if not verify_password(form_data.password, user.hashed_password): |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Incorrect email or password", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| |
| access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) |
| access_token = create_access_token( |
| data={"sub": user.id}, |
| expires_delta=access_token_expires |
| ) |
| |
| |
| return {"access_token": access_token, "token_type": "bearer"} |
|
|
|
|
| @router.post("/login/email", response_model=Token) |
| async def login_with_email( |
| user_credentials: UserLogin, |
| db: Annotated[Session, Depends(get_db)] |
| ): |
| """ |
| Login with email and password, get an access token for future requests. |
| """ |
| |
| user = db.query(User).filter(User.email == user_credentials.email).first() |
| |
| if not user: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Incorrect email or password", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| |
| if not verify_password(user_credentials.password, user.hashed_password): |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Incorrect email or password", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| |
| access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) |
| access_token = create_access_token( |
| data={"sub": user.id}, |
| expires_delta=access_token_expires |
| ) |
| |
| |
| return {"access_token": access_token, "token_type": "bearer"} |
|
|
| @router.post("/refresh", response_model=Token) |
| async def refresh_token( |
| current_user: Annotated[User, Depends(get_current_user)], |
| ): |
| """ |
| Refresh the access token before it expires. |
| """ |
| try: |
| print(f"[refresh_token] Processing refresh request for user: {current_user.id}") |
| |
| |
| access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) |
| access_token = create_access_token( |
| data={"sub": current_user.id}, |
| expires_delta=access_token_expires |
| ) |
| |
| print(f"[refresh_token] Successfully refreshed token for user: {current_user.id}") |
| |
| |
| return {"access_token": access_token, "token_type": "bearer"} |
| except Exception as e: |
| print(f"[refresh_token] Error refreshing token for user {current_user.id}: {str(e)}") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Error refreshing token: {str(e)}", |
| ) |