from jose import jwt, JWTError from datetime import datetime, timedelta, timezone from passlib.hash import bcrypt from fastapi import HTTPException, status, Depends from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session from .config import Settings from .database import get_db from .models import User oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login") settings = Settings() def hash_password(password: str) -> str: return bcrypt.hash(password) def verify_password(password: str, hashed: str) -> bool: return bcrypt.verify(password, hashed) def create_access_token(sub: str, expires_in: int = 3600) -> str: to_encode = {"sub": sub, "exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in)} return jwt.encode(to_encode, settings.jwt_secret, algorithm="HS256") def parse_token(token: str) -> dict: try: return jwt.decode(token, settings.jwt_secret, algorithms=["HS256"]) except JWTError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)) -> User: payload = parse_token(token) email = payload.get("sub") if not email: raise HTTPException(status_code=401, detail="Invalid token") user = db.query(User).filter(User.email == email).first() if not user or not user.is_active: raise HTTPException(status_code=401, detail="Inactive user") return user def require_tenant_admin(user: User = Depends(get_current_user)) -> User: if not user.is_tenant_admin: raise HTTPException(status_code=403, detail="Admin required") return user