| """Auth dependency — extracts user from JWT, API key, or scoped API key."""
|
|
|
| from fastapi import Depends, HTTPException, Request
|
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
| from sqlalchemy.ext.asyncio import AsyncSession
|
| from mac.database import get_db
|
| from mac.utils.security import decode_access_token
|
| from mac.services.auth_service import get_user_by_id, get_user_by_api_key
|
| from mac.models.user import User
|
|
|
| security = HTTPBearer()
|
|
|
|
|
| async def get_current_user(
|
| credentials: HTTPAuthorizationCredentials = Depends(security),
|
| db: AsyncSession = Depends(get_db),
|
| ) -> User:
|
| """Extract and validate the current user from Authorization header.
|
| Supports JWT access tokens, legacy API keys (mac_sk_live_xxx),
|
| and scoped API keys (mac_sk_xxx).
|
| """
|
| token = credentials.credentials
|
|
|
|
|
| if token.startswith("mac_sk_live_"):
|
| user = await get_user_by_api_key(db, token)
|
| if not user or not user.is_active:
|
| raise HTTPException(status_code=401, detail={
|
| "code": "authentication_failed",
|
| "message": "Invalid or inactive API key",
|
| })
|
| return user
|
|
|
|
|
| if token.startswith("mac_sk_"):
|
| from mac.services.scoped_key_service import get_key_by_hash
|
| scoped_key = await get_key_by_hash(db, token)
|
| if not scoped_key:
|
| raise HTTPException(status_code=401, detail={
|
| "code": "authentication_failed",
|
| "message": "Invalid, expired, or revoked API key",
|
| })
|
| user = await get_user_by_id(db, scoped_key.user_id)
|
| if not user or not user.is_active:
|
| raise HTTPException(status_code=401, detail={
|
| "code": "authentication_failed",
|
| "message": "User not found or inactive",
|
| })
|
|
|
| user._scoped_key = scoped_key
|
| return user
|
|
|
|
|
| payload = decode_access_token(token)
|
| if not payload:
|
| raise HTTPException(status_code=401, detail={
|
| "code": "authentication_failed",
|
| "message": "Invalid or expired access token",
|
| })
|
|
|
|
|
| jti = payload.get("jti")
|
| if jti:
|
| from mac.services.token_blacklist_service import is_blacklisted
|
| if await is_blacklisted(jti):
|
| raise HTTPException(status_code=401, detail={
|
| "code": "token_revoked",
|
| "message": "Token has been revoked. Please log in again.",
|
| })
|
|
|
| user = await get_user_by_id(db, payload["sub"])
|
| if not user or not user.is_active:
|
| raise HTTPException(status_code=401, detail={
|
| "code": "authentication_failed",
|
| "message": "User not found or inactive",
|
| })
|
|
|
| return user
|
|
|
|
|
| async def require_admin(user: User = Depends(get_current_user)) -> User:
|
| """Require admin role."""
|
| if user.role != "admin":
|
| raise HTTPException(status_code=403, detail={
|
| "code": "forbidden",
|
| "message": "Admin access required",
|
| })
|
| return user
|
|
|
|
|
| async def require_faculty_or_admin(user: User = Depends(get_current_user)) -> User:
|
| """Require faculty or admin role."""
|
| if user.role not in ("faculty", "admin"):
|
| raise HTTPException(status_code=403, detail={
|
| "code": "forbidden",
|
| "message": "Faculty or admin access required",
|
| })
|
| return user
|
|
|