Spaces:
Running
Running
| from __future__ import annotations | |
| import os | |
| import secrets | |
| from dataclasses import dataclass | |
| from typing import Any | |
| from urllib.parse import urlencode | |
| import httpx | |
| from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer | |
| class HFOAuthError(RuntimeError): | |
| pass | |
| class HFOAuthSettings: | |
| client_id: str | |
| client_secret: str | |
| authorize_url: str | |
| token_url: str | |
| userinfo_url: str | |
| scope: str | |
| OAUTH_STATE_SALT = "hf-oauth-state" | |
| DEFAULT_OAUTH_STATE_SECRET = "dev-only-session-secret-change-me" | |
| def _oauth_state_serializer() -> URLSafeTimedSerializer: | |
| secret = os.getenv("APP_SESSION_SECRET", DEFAULT_OAUTH_STATE_SECRET).strip() | |
| if not secret: | |
| secret = DEFAULT_OAUTH_STATE_SECRET | |
| return URLSafeTimedSerializer(secret_key=secret, salt=OAUTH_STATE_SALT) | |
| def get_hf_oauth_settings() -> HFOAuthSettings: | |
| client_id = os.getenv("HF_OAUTH_CLIENT_ID", "").strip() | |
| client_secret = os.getenv("HF_OAUTH_CLIENT_SECRET", "").strip() | |
| if not client_id or not client_secret: | |
| raise HFOAuthError("HF OAuth client configuration is missing.") | |
| return HFOAuthSettings( | |
| client_id=client_id, | |
| client_secret=client_secret, | |
| authorize_url=os.getenv("HF_OAUTH_AUTHORIZE_URL", "https://huggingface.co/oauth/authorize").strip(), | |
| token_url=os.getenv("HF_OAUTH_TOKEN_URL", "https://huggingface.co/oauth/token").strip(), | |
| userinfo_url=os.getenv("HF_OAUTH_USERINFO_URL", "https://huggingface.co/oauth/userinfo").strip(), | |
| scope=os.getenv("HF_OAUTH_SCOPE", "openid profile email").strip(), | |
| ) | |
| def generate_oauth_state() -> str: | |
| payload = {"nonce": secrets.token_urlsafe(32)} | |
| return _oauth_state_serializer().dumps(payload) | |
| def is_valid_oauth_state(state: str) -> bool: | |
| ttl_seconds = int(os.getenv("AUTH_OAUTH_STATE_TTL_SECONDS", "600")) | |
| try: | |
| payload = _oauth_state_serializer().loads(state, max_age=ttl_seconds) | |
| except (BadSignature, SignatureExpired): | |
| return False | |
| if not isinstance(payload, dict): | |
| return False | |
| nonce = payload.get("nonce") | |
| return isinstance(nonce, str) and bool(nonce.strip()) | |
| def build_hf_authorize_url(redirect_uri: str, state: str) -> str: | |
| settings = get_hf_oauth_settings() | |
| query = urlencode( | |
| { | |
| "client_id": settings.client_id, | |
| "redirect_uri": redirect_uri, | |
| "response_type": "code", | |
| "scope": settings.scope, | |
| "state": state, | |
| } | |
| ) | |
| return f"{settings.authorize_url}?{query}" | |
| async def exchange_code_for_hf_user(code: str, redirect_uri: str) -> dict[str, Any]: | |
| settings = get_hf_oauth_settings() | |
| timeout = httpx.Timeout(20.0, connect=10.0) | |
| async with httpx.AsyncClient(timeout=timeout) as client: | |
| token_response = await client.post( | |
| settings.token_url, | |
| data={ | |
| "grant_type": "authorization_code", | |
| "code": code, | |
| "redirect_uri": redirect_uri, | |
| "client_id": settings.client_id, | |
| "client_secret": settings.client_secret, | |
| }, | |
| headers={"Accept": "application/json"}, | |
| ) | |
| if token_response.status_code >= 400: | |
| raise HFOAuthError( | |
| f"HF token exchange failed: {token_response.status_code} {token_response.text}" | |
| ) | |
| token_payload = token_response.json() | |
| access_token = token_payload.get("access_token") | |
| if not access_token: | |
| raise HFOAuthError("HF token response missing access_token.") | |
| userinfo_response = await client.get( | |
| settings.userinfo_url, | |
| headers={"Authorization": f"Bearer {access_token}", "Accept": "application/json"}, | |
| ) | |
| if userinfo_response.status_code >= 400: | |
| raise HFOAuthError( | |
| f"HF userinfo failed: {userinfo_response.status_code} {userinfo_response.text}" | |
| ) | |
| userinfo = userinfo_response.json() | |
| email = str( | |
| userinfo.get("email") | |
| or userinfo.get("preferred_username") | |
| or userinfo.get("sub") | |
| or "" | |
| ).strip() | |
| if not email: | |
| raise HFOAuthError("Unable to resolve user identity from HF OAuth response.") | |
| display_name = userinfo.get("name") or userinfo.get("preferred_username") or None | |
| avatar_url = userinfo.get("picture") or None | |
| return { | |
| "email": email, | |
| "display_name": str(display_name) if display_name else None, | |
| "avatar_url": str(avatar_url) if avatar_url else None, | |
| "provider_sub": str(userinfo.get("sub") or ""), | |
| } | |