from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from ..core.security import create_access_token, verify_password, get_password_hash from ..db.database import get_db from ..db.models import User from ..db.schemas import UserInDB from datetime import timedelta from typing import Any router = APIRouter() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") @router.post("/login") async def login( form_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db) ) -> Any: stmt = select(User).where(User.email == form_data.username) result = await db.execute(stmt) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", ) if not verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect email or password", ) access_token = create_access_token(user.id) return {"access_token": access_token, "token_type": "bearer"} @router.post("/register", response_model=UserInDB) async def register( user_data: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db) ) -> Any: # Check if user exists stmt = select(User).where(User.email == user_data.username) result = await db.execute(stmt) existing_user = result.scalar_one_or_none() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered", ) # Create new user user = User( email=user_data.username, hashed_password=get_password_hash(user_data.password), full_name=user_data.username, # You might want to add this as a separate field in the form username=user_data.username, is_active=True, is_superuser=False, roles=["user"] ) db.add(user) await db.commit() await db.refresh(user) return user