Spaces:
Running
Running
File size: 2,513 Bytes
a63c61f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta
from src.infrastructure.database import get_db
from src.core.domain.db_models import User
from src.core.domain.schemas import TokenResponse, RefreshRequest, UserResponse
from src.core.security import (
verify_password, create_access_token, create_refresh_token,
rotate_refresh_token, revoke_all_refresh_tokens,
get_current_user
)
from src.core.config import settings
router = APIRouter()
@router.post("/login", response_model=TokenResponse)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
"""Login with email + password. Returns access + refresh tokens."""
user = db.query(User).filter(User.email == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password")
if not user.is_active:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Account is disabled")
access_token = create_access_token(
data={"sub": user.email, "role": user.role},
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
refresh_token = create_refresh_token(user.id, db)
return TokenResponse(access_token=access_token, refresh_token=refresh_token)
@router.post("/refresh", response_model=TokenResponse)
def refresh_tokens(body: RefreshRequest, db: Session = Depends(get_db)):
"""Exchange a valid refresh token for a new access + refresh token pair."""
new_refresh, user = rotate_refresh_token(body.refresh_token, db)
access_token = create_access_token(
data={"sub": user.email, "role": user.role},
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
return TokenResponse(access_token=access_token, refresh_token=new_refresh)
@router.post("/logout", status_code=status.HTTP_204_NO_CONTENT)
def logout(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
"""Revoke all refresh tokens for the current user (full logout)."""
revoke_all_refresh_tokens(current_user.id, db)
@router.get("/me", response_model=UserResponse)
def get_me(current_user: User = Depends(get_current_user)):
"""Get the currently authenticated user's profile."""
return current_user
|