fuzik / app /routers /authentication.py
date3k2's picture
Add new files and models
d2726bc
from fastapi import APIRouter, Depends, HTTPException, status, Security, Body, Query
from fastapi.security import OAuth2PasswordRequestForm
from models.user import UserSignup
from db.supabase_service import get_supabase
from typing import Annotated
from supabase import Client
from utils.auth import get_id
from fastapi.encoders import jsonable_encoder
from models.enums import Role
from gotrue.errors import AuthApiError
from crud.user import update_user
from utils.exceptions import BAD_REQUEST, CONFLICT, NOT_FOUND
router = APIRouter(tags=["Authentication"])
@router.post("/login", description="Login user using email and password")
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
supabase: Annotated[Client, Depends(get_supabase)],
):
email = form_data.username
password = form_data.password
try:
user = supabase.auth.sign_in_with_password(
{"email": email, "password": password}
)
return {
"access_token": user.session.access_token,
"token_type": "bearer",
}
except:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect email or password",
)
@router.post(
"/signup", description="Sign up new user", status_code=status.HTTP_201_CREATED
)
async def signup(
user: Annotated[UserSignup, Body()],
supabase: Annotated[Client, Depends(get_supabase)],
):
email = user.email
password = user.password
if len(password) < 6:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password should be at least 6 characters",
)
try:
user.role = Role.user
res = supabase.auth.sign_up(
{
"email": email,
"password": password,
"options": {
"data": jsonable_encoder(user, exclude=("password", "email"))
},
}
)
return {"detail": "User created"}
except AuthApiError:
raise CONFLICT
except:
raise BAD_REQUEST
@router.post("/reset_password", description="Send reset password email")
async def reset_password(
supabase: Annotated[Client, Depends(get_supabase)],
email: str,
):
try:
if (
len(supabase.rpc("get_user_id_by_email", {"email": email}).execute().data)
> 0
):
supabase.auth.reset_password_email(email)
return {"detail": "Reset password email sent"}
else:
raise NOT_FOUND
except:
raise NOT_FOUND
@router.post("/reset_password_confirm", description="Reset password")
async def reset_password_confirm(
supabase: Annotated[Client, Depends(get_supabase)],
email: Annotated[str, Query(description="Email", title="Email")],
new_password: Annotated[
str, Query(min_length=6, description="New password", title="New password")
],
token: Annotated[
str, Query(description="Reset password token", title="Reset password token")
],
):
try:
supabase.auth.verify_otp(
{
"email": email,
"token": token,
"type": "email",
}
)
await update_user(supabase, password=new_password)
return {"detail": "Your password has been reset"}
except:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect token",
)