be / app /auth /view.py
Lucii1's picture
Wtire docker file
e750069
from .schemas import CreateUserRequest, UserResponse, LoginRequest
from .models import User, UserProfile
from fastapi import HTTPException, status
from passlib.context import CryptContext
from app.security import create_access_token, create_refresh_token
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class AuthView:
async def login(self, req: LoginRequest):
user = await User.find_one(User.username == req.username)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
if not pwd_context.verify(req.password, user.password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password"
)
access_token = create_access_token(str(user.id))
refresh_token = create_refresh_token(str(user.id))
return {
"msg": f"User {user.username} logged in",
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
async def register(self, req: CreateUserRequest):
existingEmail = await User.find_one(User.email == req.email)
if existingEmail:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered")
existingUsername = await User.find_one(User.username == req.username)
if existingUsername:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already taken")
profile = None
if req.profile:
profile = UserProfile(**req.profile.model_dump())
await profile.insert()
hashed_password = pwd_context.hash(req.password)
user = User(
username=req.username,
email=req.email,
password=hashed_password,
profile=profile
)
await user.save()
return UserResponse.model_validate(user)
async def refresh(self):
# Placeholder for refresh token logic
return {"msg": "Token refreshed"}
async def logout(self):
# Placeholder for logout logic
return {"msg": "User logged out"}