Spaces:
Running
Running
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from sqlalchemy.orm import Session | |
| from datetime import timedelta | |
| from src.infrastructure.database import get_db | |
| from src.core.domain.db_models import User | |
| from src.core.domain.schemas import TokenResponse, RefreshRequest, UserResponse | |
| from src.core.security import ( | |
| verify_password, create_access_token, create_refresh_token, | |
| rotate_refresh_token, revoke_all_refresh_tokens, | |
| get_current_user | |
| ) | |
| from src.core.config import settings | |
| router = APIRouter() | |
| def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): | |
| """Login with email + password. Returns access + refresh tokens.""" | |
| user = db.query(User).filter(User.email == form_data.username).first() | |
| if not user or not verify_password(form_data.password, user.hashed_password): | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password") | |
| if not user.is_active: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Account is disabled") | |
| access_token = create_access_token( | |
| data={"sub": user.email, "role": user.role}, | |
| expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) | |
| ) | |
| refresh_token = create_refresh_token(user.id, db) | |
| return TokenResponse(access_token=access_token, refresh_token=refresh_token) | |
| def refresh_tokens(body: RefreshRequest, db: Session = Depends(get_db)): | |
| """Exchange a valid refresh token for a new access + refresh token pair.""" | |
| new_refresh, user = rotate_refresh_token(body.refresh_token, db) | |
| access_token = create_access_token( | |
| data={"sub": user.email, "role": user.role}, | |
| expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) | |
| ) | |
| return TokenResponse(access_token=access_token, refresh_token=new_refresh) | |
| def logout(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)): | |
| """Revoke all refresh tokens for the current user (full logout).""" | |
| revoke_all_refresh_tokens(current_user.id, db) | |
| def get_me(current_user: User = Depends(get_current_user)): | |
| """Get the currently authenticated user's profile.""" | |
| return current_user | |