MAC / mac /middleware /auth_middleware.py
Aaryan17's picture
chore: upload MAC codebase to HF Space
0e76632 verified
"""Auth dependency — extracts user from JWT, API key, or scoped API key."""
from fastapi import Depends, HTTPException, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from mac.database import get_db
from mac.utils.security import decode_access_token
from mac.services.auth_service import get_user_by_id, get_user_by_api_key
from mac.models.user import User
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db),
) -> User:
"""Extract and validate the current user from Authorization header.
Supports JWT access tokens, legacy API keys (mac_sk_live_xxx),
and scoped API keys (mac_sk_xxx).
"""
token = credentials.credentials
# Check if it's a legacy API key
if token.startswith("mac_sk_live_"):
user = await get_user_by_api_key(db, token)
if not user or not user.is_active:
raise HTTPException(status_code=401, detail={
"code": "authentication_failed",
"message": "Invalid or inactive API key",
})
return user
# Check if it's a scoped API key (mac_sk_ but not mac_sk_live_)
if token.startswith("mac_sk_"):
from mac.services.scoped_key_service import get_key_by_hash
scoped_key = await get_key_by_hash(db, token)
if not scoped_key:
raise HTTPException(status_code=401, detail={
"code": "authentication_failed",
"message": "Invalid, expired, or revoked API key",
})
user = await get_user_by_id(db, scoped_key.user_id)
if not user or not user.is_active:
raise HTTPException(status_code=401, detail={
"code": "authentication_failed",
"message": "User not found or inactive",
})
# Attach scoped key info to request state for downstream checks
user._scoped_key = scoped_key
return user
# Otherwise treat as JWT
payload = decode_access_token(token)
if not payload:
raise HTTPException(status_code=401, detail={
"code": "authentication_failed",
"message": "Invalid or expired access token",
})
# Check blacklist (handles logout)
jti = payload.get("jti")
if jti:
from mac.services.token_blacklist_service import is_blacklisted
if await is_blacklisted(jti):
raise HTTPException(status_code=401, detail={
"code": "token_revoked",
"message": "Token has been revoked. Please log in again.",
})
user = await get_user_by_id(db, payload["sub"])
if not user or not user.is_active:
raise HTTPException(status_code=401, detail={
"code": "authentication_failed",
"message": "User not found or inactive",
})
return user
async def require_admin(user: User = Depends(get_current_user)) -> User:
"""Require admin role."""
if user.role != "admin":
raise HTTPException(status_code=403, detail={
"code": "forbidden",
"message": "Admin access required",
})
return user
async def require_faculty_or_admin(user: User = Depends(get_current_user)) -> User:
"""Require faculty or admin role."""
if user.role not in ("faculty", "admin"):
raise HTTPException(status_code=403, detail={
"code": "forbidden",
"message": "Faculty or admin access required",
})
return user