"""Authentication helpers for HF OAuth-backed requests. Spec references: - `specs/04_interfaces.md`: implements `get_current_user()`. - `specs/07_security.md`: authentication is required and user identity scopes storage access. - `specs/10_test_plan.md`: behavior is explicit and unit-testable. """ from __future__ import annotations from typing import Any class AuthError(Exception): """Base exception for authentication failures.""" class NotAuthenticatedError(AuthError): """Raised when the current request does not include an authenticated user.""" _MISSING = object() def _safe_getattr(container: object, attribute_name: str) -> Any: """Read an attribute without propagating framework-specific accessor errors.""" try: return object.__getattribute__(container, attribute_name) except AttributeError: pass except Exception: return _MISSING try: return getattr(container, attribute_name) except Exception: return _MISSING def _extract_mapping_value(container: dict[str, Any]) -> str | None: """Extract a username from common mapping-based request contexts.""" direct_keys: tuple[str, ...] = ("username", "user", "hf_user", "current_user") for key in direct_keys: value: Any = container.get(key) if isinstance(value, str) and value.strip(): return value.strip() if isinstance(value, dict): nested_username: str | None = _extract_user_from_candidate(value) if nested_username is not None: return nested_username request: Any = container.get("request") if isinstance(request, dict): nested_username = _extract_mapping_value(request) if nested_username is not None: return nested_username state: Any = container.get("state") if isinstance(state, dict): nested_username = _extract_mapping_value(state) if nested_username is not None: return nested_username session: Any = container.get("session") if isinstance(session, dict): nested_username = _extract_mapping_value(session) if nested_username is not None: return nested_username return None def _extract_object_value(container: object) -> str | None: """Extract a username from object-based request contexts.""" attribute_names: tuple[str, ...] = ("username", "user", "hf_user", "current_user") for attribute_name in attribute_names: value: Any = _safe_getattr(container, attribute_name) if value is _MISSING: continue if isinstance(value, str) and value.strip(): return value.strip() nested_username: str | None = _extract_user_from_candidate(value) if nested_username is not None: return nested_username for attribute_name in ("request", "state", "session"): nested_container: Any = _safe_getattr(container, attribute_name) if nested_container is _MISSING: continue nested_username = _extract_user_from_candidate(nested_container) if nested_username is not None: return nested_username return None def _extract_user_from_candidate(candidate: Any) -> str | None: """Extract an authenticated username from one candidate context value.""" if isinstance(candidate, str): normalized: str = candidate.strip() return normalized or None if isinstance(candidate, dict): username_from_mapping: str | None = _extract_mapping_value(candidate) if username_from_mapping is not None: return username_from_mapping preferred_keys: tuple[str, ...] = ("preferred_username", "name", "login", "sub") for key in preferred_keys: value: Any = candidate.get(key) if isinstance(value, str) and value.strip(): return value.strip() return None if candidate is None: return None username_from_object: str | None = _extract_object_value(candidate) if username_from_object is not None: return username_from_object for attribute_name in ("preferred_username", "name", "login", "sub"): value: Any = _safe_getattr(candidate, attribute_name) if isinstance(value, str) and value.strip(): return value.strip() return None def get_current_user(request_ctx: Any) -> str: """Return the authenticated HF OAuth username from the current request context. Spec references: - `specs/04_interfaces.md`: implements `get_current_user()`. - `specs/07_security.md`: rejects unauthenticated access. Args: request_ctx: Framework-specific request or auth context object. Returns: The authenticated username string used for per-user storage isolation. Raises: NotAuthenticatedError: If no authenticated user can be extracted. """ username: str | None = _extract_user_from_candidate(request_ctx) if username is None: raise NotAuthenticatedError("Authenticated user not found in request context.") return username