Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException, status, Form | |
| from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm | |
| from sqlalchemy.orm import Session | |
| from datetime import timedelta | |
| from . import crud, models, schemas | |
| from .core import security | |
| from .database import get_db | |
| router = APIRouter() | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token") # Points to the token login endpoint | |
| ACCESS_TOKEN_EXPIRE_MINUTES = security.ACCESS_TOKEN_EXPIRE_MINUTES | |
| async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> models.User: | |
| credentials_exception = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| payload = security.decode_access_token(token) | |
| if payload is None: | |
| raise credentials_exception | |
| email: str = payload.get("sub") | |
| if email is None: | |
| raise credentials_exception | |
| user = crud.get_user_by_email(db, email=email) | |
| if user is None: | |
| raise credentials_exception | |
| return user | |
| async def get_current_active_user(current_user: models.User = Depends(get_current_user)) -> models.User: | |
| # Add any active/inactive checks here if needed | |
| # if not current_user.is_active: | |
| # raise HTTPException(status_code=400, detail="Inactive user") | |
| return current_user | |
| async def get_current_admin_user(current_user: models.User = Depends(get_current_active_user)) -> models.User: | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="The user doesn't have enough privileges" | |
| ) | |
| return current_user | |
| async def signup_user(user: schemas.UserCreate, db: Session = Depends(get_db)): | |
| db_user = crud.get_user_by_email(db, email=user.email) | |
| if db_user: | |
| raise HTTPException(status_code=400, detail="Email already registered") | |
| if user.password != user.confirm_password: | |
| raise HTTPException(status_code=400, detail="Passwords do not match") | |
| return crud.create_user(db=db, user=user) | |
| async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): | |
| user = crud.get_user_by_email(db, email=form_data.username) # OAuth2 form uses 'username' for email | |
| if not user or not security.verify_password(form_data.password, user.hashed_password): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect email or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| access_token = security.create_access_token( | |
| data={"sub": user.email}, expires_delta=access_token_expires | |
| ) | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| # This is a utility endpoint, usually not directly called by frontend for login | |
| # It's what OAuth2PasswordRequestForm uses internally. | |
| # The actual login form should post to /auth/token. | |
| async def login_user_form(email: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)): | |
| """ | |
| Login endpoint that takes email and password from form data. | |
| This is an alternative if not using OAuth2PasswordRequestForm directly in frontend JS. | |
| Frontend forms would post to this. | |
| """ | |
| user = crud.get_user_by_email(db, email=email) | |
| if not user or not security.verify_password(password, user.hashed_password): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Incorrect email or password", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| access_token = security.create_access_token( | |
| data={"sub": user.email}, expires_delta=access_token_expires | |
| ) | |
| return {"access_token": access_token, "token_type": "bearer"} | |
| async def read_users_me(current_user: models.User = Depends(get_current_active_user)): | |
| return current_user | |