GrowthOps_OS / app /security.py
Corin1998's picture
Update app/security.py
d9936d7 verified
from jose import jwt, JWTError
from datetime import datetime, timedelta, timezone
from passlib.hash import bcrypt
from fastapi import HTTPException, status, Depends
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from .config import Settings
from .database import get_db
from .models import User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
settings = Settings()
def hash_password(password: str) -> str:
return bcrypt.hash(password)
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.verify(password, hashed)
def create_access_token(sub: str, expires_in: int = 3600) -> str:
to_encode = {"sub": sub, "exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in)}
return jwt.encode(to_encode, settings.jwt_secret, algorithm="HS256")
def parse_token(token: str) -> dict:
try:
return jwt.decode(token, settings.jwt_secret, algorithms=["HS256"])
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)) -> User:
payload = parse_token(token)
email = payload.get("sub")
if not email:
raise HTTPException(status_code=401, detail="Invalid token")
user = db.query(User).filter(User.email == email).first()
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="Inactive user")
return user
def require_tenant_admin(user: User = Depends(get_current_user)) -> User:
if not user.is_tenant_admin:
raise HTTPException(status_code=403, detail="Admin required")
return user