Spaces:
Build error
Build error
File size: 4,625 Bytes
83caf02 edda8af 83caf02 c22ac93 edda8af 83caf02 791579e edda8af 791579e 83caf02 edda8af 83caf02 edda8af 83caf02 791579e 83caf02 edda8af 791579e 83caf02 edda8af 83caf02 6584789 83caf02 6584789 83caf02 edda8af 83caf02 edda8af 83caf02 c22ac93 83caf02 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
from sqlmodel.ext.asyncio.session import AsyncSession
from src.api.dependencies.clients import get_db
from src.api.dependencies.auth import get_auth_service, oauth2_scheme, get_current_user_required
from src.api.dependencies.clients import get_redis
from src.api.services.auth_service import AuthService
from src.api.selectors.user.get_user import get_user_by_email
from src.api.selectors.user.add_user import add_user
from src.api.models.user import User
from src.api.schemas.auth import RegisterRequest, LoginRequest, TokenResponse
import redis.asyncio as redis
from src.api.dependencies.rate_limit import login_rate_limiter
router = APIRouter(
prefix="/api/v1/auth",
tags=["auth"]
)
@router.post("/register", response_model=TokenResponse, dependencies=[Depends(login_rate_limiter)])
async def register(
request: RegisterRequest,
auth_service: Annotated[AuthService, Depends(get_auth_service)],
db: Annotated[AsyncSession, Depends(get_db)],
):
existing_user = await get_user_by_email(request.email, db)
if existing_user:
raise HTTPException(
status_code=400,
detail="Email already registered"
)
user = User(
email=request.email.lower(),
username=request.username,
password_hash=auth_service.get_password_hash(request.password),
provider="local",
)
user = await add_user(user, db)
access_token = auth_service.create_access_token(data={"sub": user.email})
return TokenResponse(access_token=access_token)
@router.post("/login", response_model=TokenResponse, dependencies=[Depends(login_rate_limiter)])
async def login(
request: LoginRequest,
auth_service: Annotated[AuthService, Depends(get_auth_service)],
db: Annotated[AsyncSession, Depends(get_db)],
):
user = await get_user_by_email(request.email, db)
if not user or not user.password_hash:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not auth_service.verify_password(request.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = auth_service.create_access_token(data={"sub": user.email})
return TokenResponse(access_token=access_token)
@router.post("/token", response_model=TokenResponse, include_in_schema=False, dependencies=[Depends(login_rate_limiter)])
async def login_for_swagger(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Annotated[AsyncSession, Depends(get_db)],
auth_service: Annotated[AuthService, Depends(get_auth_service)]
):
user = await get_user_by_email(form_data.username, db)
if not user or not user.password_hash:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"}
)
if user.is_active is False:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Inactive user",
headers={"WWW-Authenticate": "Bearer"},
)
if not auth_service.verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
headers={"WWW-Authenticate": "Bearer"}
)
access_token = auth_service.create_access_token(data={"sub": user.email})
return TokenResponse(access_token=access_token)
@router.post("/logout")
async def logout(
user: Annotated[User, Depends(get_current_user_required)],
token: Annotated[str, Depends(oauth2_scheme)],
redis_client: Annotated[redis.Redis, Depends(get_redis)],
auth_service: Annotated[AuthService, Depends(get_auth_service)]
):
payload = auth_service.decode_access_token(token)
if payload and "exp" in payload:
from datetime import datetime, timezone
exp = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
ttl = int((exp - datetime.now(timezone.utc)).total_seconds())
if ttl > 0:
await redis_client.setex(f"blacklist:{token}", ttl, "1")
return {"message": "Successfully logged out"} |