File size: 2,584 Bytes
c9abf3f 76a77bc c9abf3f 2eb6f6c 96b7a3d 2eb6f6c c9abf3f 96b7a3d c9abf3f 96b7a3d c9abf3f 76a77bc 2eb6f6c c9abf3f 76a77bc c9abf3f 76a77bc c9abf3f 76a77bc c9abf3f 76a77bc c9abf3f 76a77bc c9abf3f 76a77bc c9abf3f 76a77bc c9abf3f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from datetime import timedelta
from app.schema import UserCreate, LoginRequest
from app.schema.models import LoginResponse
from app.models import User
from app.core import verify_password, get_password_hash, create_access_token
from app.api.deps import get_db
from app.config import settings
router = APIRouter()
@router.post("/register", response_model=dict)
async def register(user: UserCreate, db: AsyncSession = Depends(get_db)):
try:
result = await db.execute(select(User).filter(User.email == user.email))
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Username already registered'
)
new_user = User(
username=user.username,
email=user.email,
hashed_password=get_password_hash(user.password)
)
db.add(new_user)
await db.commit()
return {"message": "User registered sucessfully", "username": user.username}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f'registered failed: {str(e)}'
)
@router.post("/login", response_model=LoginResponse)
async def login(request: LoginRequest, db: AsyncSession = Depends(get_db)):
try:
result = await db.execute(select(User).filter(User.email == request.email))
user = result.scalar_one_or_none()
if not user or not verify_password(request.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_deltas=access_token_expires
)
return LoginResponse(
access_token=access_token,
token_type="bearer",
username=user.username
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Login failed: {str(e)}"
) |