Spaces:
Running
Running
| """Authentication helpers for HF OAuth-backed requests. | |
| Spec references: | |
| - `specs/04_interfaces.md`: implements `get_current_user()`. | |
| - `specs/07_security.md`: authentication is required and user identity scopes storage access. | |
| - `specs/10_test_plan.md`: behavior is explicit and unit-testable. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any | |
| class AuthError(Exception): | |
| """Base exception for authentication failures.""" | |
| class NotAuthenticatedError(AuthError): | |
| """Raised when the current request does not include an authenticated user.""" | |
| _MISSING = object() | |
| def _safe_getattr(container: object, attribute_name: str) -> Any: | |
| """Read an attribute without propagating framework-specific accessor errors.""" | |
| try: | |
| return object.__getattribute__(container, attribute_name) | |
| except AttributeError: | |
| pass | |
| except Exception: | |
| return _MISSING | |
| try: | |
| return getattr(container, attribute_name) | |
| except Exception: | |
| return _MISSING | |
| def _extract_mapping_value(container: dict[str, Any]) -> str | None: | |
| """Extract a username from common mapping-based request contexts.""" | |
| direct_keys: tuple[str, ...] = ("username", "user", "hf_user", "current_user") | |
| for key in direct_keys: | |
| value: Any = container.get(key) | |
| if isinstance(value, str) and value.strip(): | |
| return value.strip() | |
| if isinstance(value, dict): | |
| nested_username: str | None = _extract_user_from_candidate(value) | |
| if nested_username is not None: | |
| return nested_username | |
| request: Any = container.get("request") | |
| if isinstance(request, dict): | |
| nested_username = _extract_mapping_value(request) | |
| if nested_username is not None: | |
| return nested_username | |
| state: Any = container.get("state") | |
| if isinstance(state, dict): | |
| nested_username = _extract_mapping_value(state) | |
| if nested_username is not None: | |
| return nested_username | |
| session: Any = container.get("session") | |
| if isinstance(session, dict): | |
| nested_username = _extract_mapping_value(session) | |
| if nested_username is not None: | |
| return nested_username | |
| return None | |
| def _extract_object_value(container: object) -> str | None: | |
| """Extract a username from object-based request contexts.""" | |
| attribute_names: tuple[str, ...] = ("username", "user", "hf_user", "current_user") | |
| for attribute_name in attribute_names: | |
| value: Any = _safe_getattr(container, attribute_name) | |
| if value is _MISSING: | |
| continue | |
| if isinstance(value, str) and value.strip(): | |
| return value.strip() | |
| nested_username: str | None = _extract_user_from_candidate(value) | |
| if nested_username is not None: | |
| return nested_username | |
| for attribute_name in ("request", "state", "session"): | |
| nested_container: Any = _safe_getattr(container, attribute_name) | |
| if nested_container is _MISSING: | |
| continue | |
| nested_username = _extract_user_from_candidate(nested_container) | |
| if nested_username is not None: | |
| return nested_username | |
| return None | |
| def _extract_user_from_candidate(candidate: Any) -> str | None: | |
| """Extract an authenticated username from one candidate context value.""" | |
| if isinstance(candidate, str): | |
| normalized: str = candidate.strip() | |
| return normalized or None | |
| if isinstance(candidate, dict): | |
| username_from_mapping: str | None = _extract_mapping_value(candidate) | |
| if username_from_mapping is not None: | |
| return username_from_mapping | |
| preferred_keys: tuple[str, ...] = ("preferred_username", "name", "login", "sub") | |
| for key in preferred_keys: | |
| value: Any = candidate.get(key) | |
| if isinstance(value, str) and value.strip(): | |
| return value.strip() | |
| return None | |
| if candidate is None: | |
| return None | |
| username_from_object: str | None = _extract_object_value(candidate) | |
| if username_from_object is not None: | |
| return username_from_object | |
| for attribute_name in ("preferred_username", "name", "login", "sub"): | |
| value: Any = _safe_getattr(candidate, attribute_name) | |
| if isinstance(value, str) and value.strip(): | |
| return value.strip() | |
| return None | |
| def get_current_user(request_ctx: Any) -> str: | |
| """Return the authenticated HF OAuth username from the current request context. | |
| Spec references: | |
| - `specs/04_interfaces.md`: implements `get_current_user()`. | |
| - `specs/07_security.md`: rejects unauthenticated access. | |
| Args: | |
| request_ctx: Framework-specific request or auth context object. | |
| Returns: | |
| The authenticated username string used for per-user storage isolation. | |
| Raises: | |
| NotAuthenticatedError: If no authenticated user can be extracted. | |
| """ | |
| username: str | None = _extract_user_from_candidate(request_ctx) | |
| if username is None: | |
| raise NotAuthenticatedError("Authenticated user not found in request context.") | |
| return username | |