Spaces:
Runtime error
Runtime error
| from jose import jwt, JWTError | |
| from datetime import datetime, timedelta, timezone | |
| from passlib.hash import bcrypt | |
| from fastapi import HTTPException, status, Depends | |
| from fastapi.security import OAuth2PasswordBearer | |
| from sqlalchemy.orm import Session | |
| from .config import Settings | |
| from .database import get_db | |
| from .models import User | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login") | |
| settings = Settings() | |
| def hash_password(password: str) -> str: | |
| return bcrypt.hash(password) | |
| def verify_password(password: str, hashed: str) -> bool: | |
| return bcrypt.verify(password, hashed) | |
| def create_access_token(sub: str, expires_in: int = 3600) -> str: | |
| to_encode = {"sub": sub, "exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in)} | |
| return jwt.encode(to_encode, settings.jwt_secret, algorithm="HS256") | |
| def parse_token(token: str) -> dict: | |
| try: | |
| return jwt.decode(token, settings.jwt_secret, algorithms=["HS256"]) | |
| except JWTError: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") | |
| def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)) -> User: | |
| payload = parse_token(token) | |
| email = payload.get("sub") | |
| if not email: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| user = db.query(User).filter(User.email == email).first() | |
| if not user or not user.is_active: | |
| raise HTTPException(status_code=401, detail="Inactive user") | |
| return user | |
| def require_tenant_admin(user: User = Depends(get_current_user)) -> User: | |
| if not user.is_tenant_admin: | |
| raise HTTPException(status_code=403, detail="Admin required") | |
| return user | |