aifinanceadvisor / app /auth.py
triflix's picture
Upload 53 files
3c4e575 verified
from fastapi import APIRouter, Depends, HTTPException, status, Form
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta
from . import crud, models, schemas
from .core import security
from .database import get_db
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token") # Points to the token login endpoint
ACCESS_TOKEN_EXPIRE_MINUTES = security.ACCESS_TOKEN_EXPIRE_MINUTES
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> models.User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = security.decode_access_token(token)
if payload is None:
raise credentials_exception
email: str = payload.get("sub")
if email is None:
raise credentials_exception
user = crud.get_user_by_email(db, email=email)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: models.User = Depends(get_current_user)) -> models.User:
# Add any active/inactive checks here if needed
# if not current_user.is_active:
# raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def get_current_admin_user(current_user: models.User = Depends(get_current_active_user)) -> models.User:
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges"
)
return current_user
@router.post("/signup", response_model=schemas.User)
async def signup_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
if user.password != user.confirm_password:
raise HTTPException(status_code=400, detail="Passwords do not match")
return crud.create_user(db=db, user=user)
@router.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = crud.get_user_by_email(db, email=form_data.username) # OAuth2 form uses 'username' for email
if not user or not security.verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = security.create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# This is a utility endpoint, usually not directly called by frontend for login
# It's what OAuth2PasswordRequestForm uses internally.
# The actual login form should post to /auth/token.
@router.post("/login", response_model=schemas.Token)
async def login_user_form(email: str = Form(...), password: str = Form(...), db: Session = Depends(get_db)):
"""
Login endpoint that takes email and password from form data.
This is an alternative if not using OAuth2PasswordRequestForm directly in frontend JS.
Frontend forms would post to this.
"""
user = crud.get_user_by_email(db, email=email)
if not user or not security.verify_password(password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = security.create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/users/me", response_model=schemas.User)
async def read_users_me(current_user: models.User = Depends(get_current_active_user)):
return current_user