hibatorrahmen's picture
Add backend application and Dockerfile
8ae78b0
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from typing import Annotated
from datetime import timedelta
from app.models.user import UserLogin
from app.models.token import Token
from app.db.base import get_db
from app.db.models import User
from app.utils.security import verify_password
from app.utils.auth import create_access_token, get_current_user
from app.core.config import settings
router = APIRouter(
prefix="/auth",
tags=["Authentication"]
)
@router.post("/login", response_model=Token)
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Annotated[Session, Depends(get_db)]
):
"""
OAuth2 compatible token login, get an access token for future requests.
"""
# Find the user by email
user = db.query(User).filter(User.email == form_data.username).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify the password
if not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.id},
expires_delta=access_token_expires
)
# Return token
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/login/email", response_model=Token)
async def login_with_email(
user_credentials: UserLogin,
db: Annotated[Session, Depends(get_db)]
):
"""
Login with email and password, get an access token for future requests.
"""
# Find the user by email
user = db.query(User).filter(User.email == user_credentials.email).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify the password
if not verify_password(user_credentials.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.id},
expires_delta=access_token_expires
)
# Return token
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/refresh", response_model=Token)
async def refresh_token(
current_user: Annotated[User, Depends(get_current_user)],
):
"""
Refresh the access token before it expires.
"""
try:
print(f"[refresh_token] Processing refresh request for user: {current_user.id}")
# Create a new access token with the current user's ID
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": current_user.id},
expires_delta=access_token_expires
)
print(f"[refresh_token] Successfully refreshed token for user: {current_user.id}")
# Return the new token
return {"access_token": access_token, "token_type": "bearer"}
except Exception as e:
print(f"[refresh_token] Error refreshing token for user {current_user.id}: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error refreshing token: {str(e)}",
)