Spaces:
Build error
Build error
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from typing import Annotated | |
| from sqlmodel.ext.asyncio.session import AsyncSession | |
| from src.api.dependencies.clients import get_db | |
| from src.api.dependencies.auth import get_auth_service, oauth2_scheme, get_current_user_required | |
| from src.api.dependencies.clients import get_redis | |
| from src.api.services.auth_service import AuthService | |
| from src.api.selectors.user.get_user import get_user_by_email | |
| from src.api.selectors.user.add_user import add_user | |
| from src.api.models.user import User | |
| from src.api.schemas.auth import RegisterRequest, LoginRequest, TokenResponse | |
| import redis.asyncio as redis | |
| from src.api.dependencies.rate_limit import login_rate_limiter | |
| router = APIRouter( | |
| prefix="/api/v1/auth", | |
| tags=["auth"] | |
| ) | |
| async def register( | |
| request: RegisterRequest, | |
| auth_service: Annotated[AuthService, Depends(get_auth_service)], | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| ): | |
| existing_user = await get_user_by_email(request.email, db) | |
| if existing_user: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Email already registered" | |
| ) | |
| user = User( | |
| email=request.email.lower(), | |
| username=request.username, | |
| password_hash=auth_service.get_password_hash(request.password), | |
| provider="local", | |
| ) | |
| user = await add_user(user, db) | |
| access_token = auth_service.create_access_token(data={"sub": user.email}) | |
| return TokenResponse(access_token=access_token) | |
| async def login( | |
| request: LoginRequest, | |
| auth_service: Annotated[AuthService, Depends(get_auth_service)], | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| ): | |
| user = await get_user_by_email(request.email, db) | |
| if not user or not user.password_hash: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid email or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| if not auth_service.verify_password(request.password, user.password_hash): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid email or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| access_token = auth_service.create_access_token(data={"sub": user.email}) | |
| return TokenResponse(access_token=access_token) | |
| async def login_for_swagger( | |
| form_data: Annotated[OAuth2PasswordRequestForm, Depends()], | |
| db: Annotated[AsyncSession, Depends(get_db)], | |
| auth_service: Annotated[AuthService, Depends(get_auth_service)] | |
| ): | |
| user = await get_user_by_email(form_data.username, db) | |
| if not user or not user.password_hash: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid email or password", | |
| headers={"WWW-Authenticate": "Bearer"} | |
| ) | |
| if user.is_active is False: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Inactive user", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| if not auth_service.verify_password(form_data.password, user.password_hash): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid email or password", | |
| headers={"WWW-Authenticate": "Bearer"} | |
| ) | |
| access_token = auth_service.create_access_token(data={"sub": user.email}) | |
| return TokenResponse(access_token=access_token) | |
| async def logout( | |
| user: Annotated[User, Depends(get_current_user_required)], | |
| token: Annotated[str, Depends(oauth2_scheme)], | |
| redis_client: Annotated[redis.Redis, Depends(get_redis)], | |
| auth_service: Annotated[AuthService, Depends(get_auth_service)] | |
| ): | |
| payload = auth_service.decode_access_token(token) | |
| if payload and "exp" in payload: | |
| from datetime import datetime, timezone | |
| exp = datetime.fromtimestamp(payload["exp"], tz=timezone.utc) | |
| ttl = int((exp - datetime.now(timezone.utc)).total_seconds()) | |
| if ttl > 0: | |
| await redis_client.setex(f"blacklist:{token}", ttl, "1") | |
| return {"message": "Successfully logged out"} |