Sarp Bilgiç
general rate limit for all endpoints
791579e
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
from sqlmodel.ext.asyncio.session import AsyncSession
from src.api.dependencies.clients import get_db
from src.api.dependencies.auth import get_auth_service, oauth2_scheme, get_current_user_required
from src.api.dependencies.clients import get_redis
from src.api.services.auth_service import AuthService
from src.api.selectors.user.get_user import get_user_by_email
from src.api.selectors.user.add_user import add_user
from src.api.models.user import User
from src.api.schemas.auth import RegisterRequest, LoginRequest, TokenResponse
import redis.asyncio as redis
from src.api.dependencies.rate_limit import login_rate_limiter
router = APIRouter(
prefix="/api/v1/auth",
tags=["auth"]
)
@router.post("/register", response_model=TokenResponse, dependencies=[Depends(login_rate_limiter)])
async def register(
request: RegisterRequest,
auth_service: Annotated[AuthService, Depends(get_auth_service)],
db: Annotated[AsyncSession, Depends(get_db)],
):
existing_user = await get_user_by_email(request.email, db)
if existing_user:
raise HTTPException(
status_code=400,
detail="Email already registered"
)
user = User(
email=request.email.lower(),
username=request.username,
password_hash=auth_service.get_password_hash(request.password),
provider="local",
)
user = await add_user(user, db)
access_token = auth_service.create_access_token(data={"sub": user.email})
return TokenResponse(access_token=access_token)
@router.post("/login", response_model=TokenResponse, dependencies=[Depends(login_rate_limiter)])
async def login(
request: LoginRequest,
auth_service: Annotated[AuthService, Depends(get_auth_service)],
db: Annotated[AsyncSession, Depends(get_db)],
):
user = await get_user_by_email(request.email, db)
if not user or not user.password_hash:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not auth_service.verify_password(request.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = auth_service.create_access_token(data={"sub": user.email})
return TokenResponse(access_token=access_token)
@router.post("/token", response_model=TokenResponse, include_in_schema=False, dependencies=[Depends(login_rate_limiter)])
async def login_for_swagger(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Annotated[AsyncSession, Depends(get_db)],
auth_service: Annotated[AuthService, Depends(get_auth_service)]
):
user = await get_user_by_email(form_data.username, db)
if not user or not user.password_hash:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"}
)
if user.is_active is False:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Inactive user",
headers={"WWW-Authenticate": "Bearer"},
)
if not auth_service.verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"}
)
access_token = auth_service.create_access_token(data={"sub": user.email})
return TokenResponse(access_token=access_token)
@router.post("/logout")
async def logout(
user: Annotated[User, Depends(get_current_user_required)],
token: Annotated[str, Depends(oauth2_scheme)],
redis_client: Annotated[redis.Redis, Depends(get_redis)],
auth_service: Annotated[AuthService, Depends(get_auth_service)]
):
payload = auth_service.decode_access_token(token)
if payload and "exp" in payload:
from datetime import datetime, timezone
exp = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
ttl = int((exp - datetime.now(timezone.utc)).total_seconds())
if ttl > 0:
await redis_client.setex(f"blacklist:{token}", ttl, "1")
return {"message": "Successfully logged out"}