richtext's picture
Upload folder using huggingface_hub
3674b4b verified
from fastapi import APIRouter, HTTPException, status, Request
from backend.core.dependencies import DbSession, CurrentUser
from backend.schemas.auth import LoginRequest, Token, RefreshRequest
from backend.schemas.user import UserResponse
from backend.services.auth import authenticate_user, create_token_for_user
from backend.core.security import verify_refresh_token
from backend.models import User, AuditLog
router = APIRouter()
@router.post("/login", response_model=Token)
def login(request: LoginRequest, db: DbSession, req: Request):
user = authenticate_user(db, request.email, request.password)
if not user:
# Log failed login attempt
log = AuditLog(
action="login_failed",
details={"email": request.email},
ip_address=req.client.host if req.client else None
)
db.add(log)
db.commit()
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is disabled",
)
# Log successful login
log = AuditLog(
user_id=user.id,
action="login_success",
details={"email": user.email},
ip_address=req.client.host if req.client else None
)
db.add(log)
db.commit()
return create_token_for_user(db, user)
@router.get("/me", response_model=UserResponse)
def get_current_user_info(current_user: CurrentUser, db: DbSession):
# Get group names for response
group_names = [ug.group.name for ug in current_user.groups]
return UserResponse(
id=current_user.id,
email=current_user.email,
full_name=current_user.full_name,
is_admin=current_user.is_admin,
is_active=current_user.is_active,
created_at=current_user.created_at,
last_login=current_user.last_login,
groups=group_names
)
@router.post("/logout")
def logout(current_user: CurrentUser, db: DbSession):
"""Logout user by invalidating all their tokens."""
current_user.token_version += 1
db.commit()
return {"message": "Successfully logged out"}
@router.post("/refresh", response_model=Token)
def refresh_token(request: RefreshRequest, db: DbSession):
"""Exchange a valid refresh token for new access + refresh tokens."""
payload = verify_refresh_token(request.refresh_token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is disabled",
)
# Check token version - ensures refresh tokens are invalidated on password change
token_version = payload.get("token_version")
if token_version is None or token_version != user.token_version:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked",
headers={"WWW-Authenticate": "Bearer"},
)
# Generate new tokens
return create_token_for_user(db, user)