Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, HTTPException, status, Request | |
| from backend.core.dependencies import DbSession, CurrentUser | |
| from backend.schemas.auth import LoginRequest, Token, RefreshRequest | |
| from backend.schemas.user import UserResponse | |
| from backend.services.auth import authenticate_user, create_token_for_user | |
| from backend.core.security import verify_refresh_token | |
| from backend.models import User, AuditLog | |
| router = APIRouter() | |
| def login(request: LoginRequest, db: DbSession, req: Request): | |
| user = authenticate_user(db, request.email, request.password) | |
| if not user: | |
| # Log failed login attempt | |
| log = AuditLog( | |
| action="login_failed", | |
| details={"email": request.email}, | |
| ip_address=req.client.host if req.client else None | |
| ) | |
| db.add(log) | |
| db.commit() | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect email or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| if not user.is_active: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="User account is disabled", | |
| ) | |
| # Log successful login | |
| log = AuditLog( | |
| user_id=user.id, | |
| action="login_success", | |
| details={"email": user.email}, | |
| ip_address=req.client.host if req.client else None | |
| ) | |
| db.add(log) | |
| db.commit() | |
| return create_token_for_user(db, user) | |
| def get_current_user_info(current_user: CurrentUser, db: DbSession): | |
| # Get group names for response | |
| group_names = [ug.group.name for ug in current_user.groups] | |
| return UserResponse( | |
| id=current_user.id, | |
| email=current_user.email, | |
| full_name=current_user.full_name, | |
| is_admin=current_user.is_admin, | |
| is_active=current_user.is_active, | |
| created_at=current_user.created_at, | |
| last_login=current_user.last_login, | |
| groups=group_names | |
| ) | |
| def logout(current_user: CurrentUser, db: DbSession): | |
| """Logout user by invalidating all their tokens.""" | |
| current_user.token_version += 1 | |
| db.commit() | |
| return {"message": "Successfully logged out"} | |
| def refresh_token(request: RefreshRequest, db: DbSession): | |
| """Exchange a valid refresh token for new access + refresh tokens.""" | |
| payload = verify_refresh_token(request.refresh_token) | |
| if payload is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired refresh token", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| user_id = payload.get("sub") | |
| if not user_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid token payload", | |
| ) | |
| user = db.query(User).filter(User.id == user_id).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="User not found", | |
| ) | |
| if not user.is_active: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="User account is disabled", | |
| ) | |
| # Check token version - ensures refresh tokens are invalidated on password change | |
| token_version = payload.get("token_version") | |
| if token_version is None or token_version != user.token_version: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Token has been revoked", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| # Generate new tokens | |
| return create_token_for_user(db, user) | |