| """Authentication helpers for HF OAuth-backed requests. |
| |
| Spec references: |
| - `specs/04_interfaces.md`: implements `get_current_user()`. |
| - `specs/07_security.md`: authentication is required and user identity scopes storage access. |
| - `specs/10_test_plan.md`: behavior is explicit and unit-testable. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import Any |
|
|
|
|
| class AuthError(Exception): |
| """Base exception for authentication failures.""" |
|
|
|
|
| class NotAuthenticatedError(AuthError): |
| """Raised when the current request does not include an authenticated user.""" |
|
|
|
|
| _MISSING = object() |
|
|
|
|
| def _safe_getattr(container: object, attribute_name: str) -> Any: |
| """Read an attribute without propagating framework-specific accessor errors.""" |
|
|
| try: |
| return object.__getattribute__(container, attribute_name) |
| except AttributeError: |
| pass |
| except Exception: |
| return _MISSING |
|
|
| try: |
| return getattr(container, attribute_name) |
| except Exception: |
| return _MISSING |
|
|
|
|
| def _extract_mapping_value(container: dict[str, Any]) -> str | None: |
| """Extract a username from common mapping-based request contexts.""" |
|
|
| direct_keys: tuple[str, ...] = ("username", "user", "hf_user", "current_user") |
| for key in direct_keys: |
| value: Any = container.get(key) |
| if isinstance(value, str) and value.strip(): |
| return value.strip() |
| if isinstance(value, dict): |
| nested_username: str | None = _extract_user_from_candidate(value) |
| if nested_username is not None: |
| return nested_username |
|
|
| request: Any = container.get("request") |
| if isinstance(request, dict): |
| nested_username = _extract_mapping_value(request) |
| if nested_username is not None: |
| return nested_username |
|
|
| state: Any = container.get("state") |
| if isinstance(state, dict): |
| nested_username = _extract_mapping_value(state) |
| if nested_username is not None: |
| return nested_username |
|
|
| session: Any = container.get("session") |
| if isinstance(session, dict): |
| nested_username = _extract_mapping_value(session) |
| if nested_username is not None: |
| return nested_username |
|
|
| return None |
|
|
|
|
| def _extract_object_value(container: object) -> str | None: |
| """Extract a username from object-based request contexts.""" |
|
|
| attribute_names: tuple[str, ...] = ("username", "user", "hf_user", "current_user") |
| for attribute_name in attribute_names: |
| value: Any = _safe_getattr(container, attribute_name) |
| if value is _MISSING: |
| continue |
| if isinstance(value, str) and value.strip(): |
| return value.strip() |
| nested_username: str | None = _extract_user_from_candidate(value) |
| if nested_username is not None: |
| return nested_username |
|
|
| for attribute_name in ("request", "state", "session"): |
| nested_container: Any = _safe_getattr(container, attribute_name) |
| if nested_container is _MISSING: |
| continue |
| nested_username = _extract_user_from_candidate(nested_container) |
| if nested_username is not None: |
| return nested_username |
|
|
| return None |
|
|
|
|
| def _extract_user_from_candidate(candidate: Any) -> str | None: |
| """Extract an authenticated username from one candidate context value.""" |
|
|
| if isinstance(candidate, str): |
| normalized: str = candidate.strip() |
| return normalized or None |
|
|
| if isinstance(candidate, dict): |
| username_from_mapping: str | None = _extract_mapping_value(candidate) |
| if username_from_mapping is not None: |
| return username_from_mapping |
|
|
| preferred_keys: tuple[str, ...] = ("preferred_username", "name", "login", "sub") |
| for key in preferred_keys: |
| value: Any = candidate.get(key) |
| if isinstance(value, str) and value.strip(): |
| return value.strip() |
| return None |
|
|
| if candidate is None: |
| return None |
|
|
| username_from_object: str | None = _extract_object_value(candidate) |
| if username_from_object is not None: |
| return username_from_object |
|
|
| for attribute_name in ("preferred_username", "name", "login", "sub"): |
| value: Any = _safe_getattr(candidate, attribute_name) |
| if isinstance(value, str) and value.strip(): |
| return value.strip() |
|
|
| return None |
|
|
|
|
| def get_current_user(request_ctx: Any) -> str: |
| """Return the authenticated HF OAuth username from the current request context. |
| |
| Spec references: |
| - `specs/04_interfaces.md`: implements `get_current_user()`. |
| - `specs/07_security.md`: rejects unauthenticated access. |
| |
| Args: |
| request_ctx: Framework-specific request or auth context object. |
| |
| Returns: |
| The authenticated username string used for per-user storage isolation. |
| |
| Raises: |
| NotAuthenticatedError: If no authenticated user can be extracted. |
| """ |
|
|
| username: str | None = _extract_user_from_candidate(request_ctx) |
| if username is None: |
| raise NotAuthenticatedError("Authenticated user not found in request context.") |
| return username |
|
|