""" infra.api.auth — FastAPI authentication dependency for the HTTP API. Wires the ``infra.auth.keys`` storage into the request lifecycle: Authorization: Bearer osk_... -> ApiKey -> tenant scope The dependency raises ``ApiError`` (which the app's exception handler turns into a JSON 401/403) rather than HTTPException, so the error-shape contract stays consistent with the rest of the API. Pure helpers — no fastapi import — live alongside, so handler code can call them from non-HTTP contexts (CLI, tests) too. """ from __future__ import annotations from typing import Callable, Optional # Stage 152: Request hoisted to module scope so the PEP 563 # stringified annotation on the dependency function # (`request: Request`) resolves to fastapi.Request at # FastAPI's get_type_hints() introspection time. Without this, # FastAPI sees `request` as an unknown name and treats it as a # required query param (422 on missing). from fastapi import Request from infra.auth import ApiKey, get_admin_key from infra.auth.roles import role_satisfies from infra.service import OrgStateService from .errors import ApiError # --- pure helpers (no fastapi) ------------------------------------------- def parse_bearer(authorization: Optional[str]) -> Optional[str]: """``'Bearer foo '`` -> ``'foo'``; missing / malformed -> None.""" if not authorization: return None parts = authorization.strip().split(None, 1) if len(parts) != 2 or parts[0].lower() != "bearer": return None return parts[1].strip() or None def require_key(svc: OrgStateService, authorization: Optional[str]) -> ApiKey: """Resolve an Authorization header to an active ApiKey or raise 401. Stage 154: when the Authorization header is absent, fall through to the SSO session cookie (Stage 97 OIDC) before raising. The cookie travels in a contextvar stamped by the request-context middleware so this helper stays callable from legacy route handlers that don't take a FastAPI Request — which is most of them (12 sites using the require_tenant_or_admin pattern). Without this fallback every dashboard call on those routes returns 401 even though the user is signed in via Google. """ raw = parse_bearer(authorization) if raw is None: # Stage 154 fallback: SSO session cookie. from .request_context import current_request_cookies cookies = current_request_cookies() sess_token = cookies.get("orgstate_sso_session") if sess_token: info = svc.verify_sso_session(sess_token) if info is not None: # Synthetic ApiKey scoped to the SSO session's # tenant; role=admin so the user can perform the # same actions as a tenant admin (login already # gated who's allowed in via Google). try: from .request_context import set_tenant_context set_tenant_context(info["tenant_id"]) except ImportError: # pragma: no cover pass return ApiKey( key_id=f"sso:{sess_token[:12]}", tenant_id=info["tenant_id"], key_hash="", name=f"sso:{info.get('user_email','')}", created_at=info.get("created_at", ""), role="admin", ) raise ApiError( "auth_missing", "missing or malformed Authorization: Bearer header", status=401, ) key = svc.verify_api_key(raw) if key is None: raise ApiError( "auth_invalid", "unknown or revoked API key", status=401, ) # Stage 84: stamp the resolved tenant on the per-request # contextvar so subsequent log lines (handler, DB layer) carry # tenant_id automatically. Imported lazily so this module is # still importable when request_context isn't relevant (CLI). try: from .request_context import set_tenant_context set_tenant_context(key.tenant_id) except ImportError: # pragma: no cover pass return key def require_tenant_access(key: ApiKey, tenant_id: str) -> None: """The caller's key must be bound to ``tenant_id`` — else 403.""" if key.tenant_id != tenant_id: raise ApiError( "tenant_forbidden", f"api key is not authorised for tenant {tenant_id!r}", status=403, ) def require_role(key: ApiKey, minimum: str) -> None: """The caller's key must satisfy ``minimum`` role or above — else 403 with code ``role_forbidden``. Roles are ordered readonly < operator < admin (see :mod:`infra.auth.roles`).""" if not role_satisfies(key.role, minimum): raise ApiError( "role_forbidden", f"role {key.role!r} does not satisfy required {minimum!r}", status=403, ) def require_run_access(svc: OrgStateService, key: ApiKey, run_id: str) -> dict: """Fetch a run and check its tenant matches the caller's key. Returns the run dict on success. 404 if missing, 403 if cross-tenant — in that order, so cross-tenant probes cannot distinguish "exists for someone else" from "does not exist". """ run = svc.get_run(run_id) if run is None: raise ApiError("not_found", f"run {run_id!r} not found", status=404) if run["tenant_id"] != key.tenant_id: raise ApiError("not_found", f"run {run_id!r} not found", status=404) return run # --- admin (Stage 5b) ---------------------------------------------------- def is_admin(authorization: Optional[str], admin_key: Optional[str] = None, *, svc: Optional[OrgStateService] = None) -> bool: """Pure predicate: is the caller authenticated as admin? Sources, in order: 1. Explicit ``admin_key`` argument (test override). 2. ``ORGSTATE_ADMIN_KEY`` env var (Stage 5b — single-secret config). 3. ``admin_keys`` rows in the DB (Stage 5c — rotatable without restart), consulted only when ``svc`` is supplied. Returns False if admin auth is not configured at all *and* no DB key matches. Routes that *enforce* admin should use :func:`require_admin`, which knows the difference between "not configured" (open) and "configured but wrong key" (401). """ raw = parse_bearer(authorization) if raw is None: return False effective_env = admin_key if admin_key is not None else get_admin_key() if effective_env is not None and raw == effective_env: return True if svc is not None and svc.is_admin_credential(raw): return True return False def require_admin(authorization: Optional[str], admin_key: Optional[str] = None, *, svc: Optional[OrgStateService] = None) -> None: """Require an admin credential when admin auth is configured. If neither ``ORGSTATE_ADMIN_KEY`` is set NOR any ``admin_keys`` row exists (when ``svc`` is supplied to check), this is a no-op — admin endpoints stay open for the v1 dev/bootstrap workflow. Once *any* admin credential is configured, the Authorization header must carry a matching token. """ effective_env = admin_key if admin_key is not None else get_admin_key() has_any_db_key = ( svc is not None and bool(svc.list_admin_keys(include_revoked=False)) ) if effective_env is None and not has_any_db_key: return # admin not configured anywhere -> open (dev mode) if not is_admin(authorization, admin_key, svc=svc): raise ApiError( "admin_required", "admin credential required for this route", status=401, ) def require_tenant_or_admin(svc: OrgStateService, authorization: Optional[str], tenant_id: str, admin_key: Optional[str] = None) -> Optional[ApiKey]: """Allow admin OR a tenant key bound to ``tenant_id``. Returns the resolved ApiKey on the tenant path, or None when the caller is admin (no specific key was used). Raises 401/403 the same way :func:`require_key` / :func:`require_tenant_access` do. """ if is_admin(authorization, admin_key, svc=svc): return None key = require_key(svc, authorization) require_tenant_access(key, tenant_id) return key # --- fastapi dependency factory ------------------------------------------ def make_dependency(svc: OrgStateService) -> Callable: """Build the dependency that FastAPI route handlers can ``Depends(...)`` on. Built as a closure so the SDK that uses the API never has to think about fastapi's request-scoped DI for the service; the service is bound once at app construction time. Stage 152: dependency also accepts SSO session cookies (Stage 97 Google OIDC + Stage 102 SAML). Resolution order: 1. ``Authorization: Bearer `` (SDK / curl / non-browser) 2. ``orgstate_session`` cookie (browser dashboard after SSO) Cookie-side path derives a virtual ApiKey from the SSO session: tenant_id + role=admin (the company already gated who can sign in via Google; once they're in, they get the same access an API-key holder would). The virtual key is NOT persisted; it lives only for the request lifetime. """ from fastapi import Header async def _dep( request: Request, authorization: Optional[str] = Header(default=None), ) -> ApiKey: if authorization: return require_key(svc, authorization) # Fall through to SSO session cookie. Constant is # duplicated from infra/api/app.py (which defines it # inside create_app, not module-scope) — kept stable # across Stage 97 / 152. _SSO_SESSION_COOKIE = "orgstate_sso_session" from infra.auth.keys import ApiKey as _ApiKey sess_token = request.cookies.get(_SSO_SESSION_COOKIE) if sess_token: info = svc.verify_sso_session(sess_token) if info is not None: # Synthetic ApiKey for the request's lifetime. # key_id encodes the SSO origin so audit rows can # tell SDK calls apart from dashboard calls; # the truncated prefix matches Stage 12's # convention. return _ApiKey( key_id=f"sso:{sess_token[:12]}", tenant_id=info["tenant_id"], key_hash="", # never persisted name=f"sso:{info.get('user_email','')}", created_at=info.get("created_at", ""), role="admin", ) return require_key(svc, authorization) # raises 401 return _dep