orgstate / infra /api /auth.py
Legal-i's picture
Stage 154 — SSO cookie fallback in require_key via contextvar
6b9fff4 verified
"""
infra.api.auth — FastAPI authentication dependency for the HTTP API.
Wires the ``infra.auth.keys`` storage into the request lifecycle:
Authorization: Bearer osk_... -> ApiKey -> tenant scope
The dependency raises ``ApiError`` (which the app's exception handler
turns into a JSON 401/403) rather than HTTPException, so the
error-shape contract stays consistent with the rest of the API.
Pure helpers — no fastapi import — live alongside, so handler code
can call them from non-HTTP contexts (CLI, tests) too.
"""
from __future__ import annotations
from typing import Callable, Optional
# Stage 152: Request hoisted to module scope so the PEP 563
# stringified annotation on the dependency function
# (`request: Request`) resolves to fastapi.Request at
# FastAPI's get_type_hints() introspection time. Without this,
# FastAPI sees `request` as an unknown name and treats it as a
# required query param (422 on missing).
from fastapi import Request
from infra.auth import ApiKey, get_admin_key
from infra.auth.roles import role_satisfies
from infra.service import OrgStateService
from .errors import ApiError
# --- pure helpers (no fastapi) -------------------------------------------
def parse_bearer(authorization: Optional[str]) -> Optional[str]:
"""``'Bearer foo '`` -> ``'foo'``; missing / malformed -> None."""
if not authorization:
return None
parts = authorization.strip().split(None, 1)
if len(parts) != 2 or parts[0].lower() != "bearer":
return None
return parts[1].strip() or None
def require_key(svc: OrgStateService, authorization: Optional[str]) -> ApiKey:
"""Resolve an Authorization header to an active ApiKey or raise 401.
Stage 154: when the Authorization header is absent, fall through to
the SSO session cookie (Stage 97 OIDC) before raising. The cookie
travels in a contextvar stamped by the request-context middleware
so this helper stays callable from legacy route handlers that
don't take a FastAPI Request — which is most of them (12 sites
using the require_tenant_or_admin pattern). Without this fallback
every dashboard call on those routes returns 401 even though the
user is signed in via Google.
"""
raw = parse_bearer(authorization)
if raw is None:
# Stage 154 fallback: SSO session cookie.
from .request_context import current_request_cookies
cookies = current_request_cookies()
sess_token = cookies.get("orgstate_sso_session")
if sess_token:
info = svc.verify_sso_session(sess_token)
if info is not None:
# Synthetic ApiKey scoped to the SSO session's
# tenant; role=admin so the user can perform the
# same actions as a tenant admin (login already
# gated who's allowed in via Google).
try:
from .request_context import set_tenant_context
set_tenant_context(info["tenant_id"])
except ImportError: # pragma: no cover
pass
return ApiKey(
key_id=f"sso:{sess_token[:12]}",
tenant_id=info["tenant_id"],
key_hash="",
name=f"sso:{info.get('user_email','')}",
created_at=info.get("created_at", ""),
role="admin",
)
raise ApiError(
"auth_missing", "missing or malformed Authorization: Bearer header",
status=401,
)
key = svc.verify_api_key(raw)
if key is None:
raise ApiError(
"auth_invalid", "unknown or revoked API key", status=401,
)
# Stage 84: stamp the resolved tenant on the per-request
# contextvar so subsequent log lines (handler, DB layer) carry
# tenant_id automatically. Imported lazily so this module is
# still importable when request_context isn't relevant (CLI).
try:
from .request_context import set_tenant_context
set_tenant_context(key.tenant_id)
except ImportError: # pragma: no cover
pass
return key
def require_tenant_access(key: ApiKey, tenant_id: str) -> None:
"""The caller's key must be bound to ``tenant_id`` — else 403."""
if key.tenant_id != tenant_id:
raise ApiError(
"tenant_forbidden",
f"api key is not authorised for tenant {tenant_id!r}",
status=403,
)
def require_role(key: ApiKey, minimum: str) -> None:
"""The caller's key must satisfy ``minimum`` role or above — else 403
with code ``role_forbidden``. Roles are ordered readonly < operator
< admin (see :mod:`infra.auth.roles`)."""
if not role_satisfies(key.role, minimum):
raise ApiError(
"role_forbidden",
f"role {key.role!r} does not satisfy required {minimum!r}",
status=403,
)
def require_run_access(svc: OrgStateService, key: ApiKey,
run_id: str) -> dict:
"""Fetch a run and check its tenant matches the caller's key.
Returns the run dict on success. 404 if missing, 403 if cross-tenant —
in that order, so cross-tenant probes cannot distinguish "exists for
someone else" from "does not exist".
"""
run = svc.get_run(run_id)
if run is None:
raise ApiError("not_found", f"run {run_id!r} not found", status=404)
if run["tenant_id"] != key.tenant_id:
raise ApiError("not_found", f"run {run_id!r} not found", status=404)
return run
# --- admin (Stage 5b) ----------------------------------------------------
def is_admin(authorization: Optional[str],
admin_key: Optional[str] = None,
*,
svc: Optional[OrgStateService] = None) -> bool:
"""Pure predicate: is the caller authenticated as admin?
Sources, in order:
1. Explicit ``admin_key`` argument (test override).
2. ``ORGSTATE_ADMIN_KEY`` env var (Stage 5b — single-secret config).
3. ``admin_keys`` rows in the DB (Stage 5c — rotatable without restart),
consulted only when ``svc`` is supplied.
Returns False if admin auth is not configured at all *and* no DB key
matches. Routes that *enforce* admin should use :func:`require_admin`,
which knows the difference between "not configured" (open) and
"configured but wrong key" (401).
"""
raw = parse_bearer(authorization)
if raw is None:
return False
effective_env = admin_key if admin_key is not None else get_admin_key()
if effective_env is not None and raw == effective_env:
return True
if svc is not None and svc.is_admin_credential(raw):
return True
return False
def require_admin(authorization: Optional[str],
admin_key: Optional[str] = None,
*,
svc: Optional[OrgStateService] = None) -> None:
"""Require an admin credential when admin auth is configured.
If neither ``ORGSTATE_ADMIN_KEY`` is set NOR any ``admin_keys`` row
exists (when ``svc`` is supplied to check), this is a no-op — admin
endpoints stay open for the v1 dev/bootstrap workflow. Once *any*
admin credential is configured, the Authorization header must carry
a matching token.
"""
effective_env = admin_key if admin_key is not None else get_admin_key()
has_any_db_key = (
svc is not None and bool(svc.list_admin_keys(include_revoked=False))
)
if effective_env is None and not has_any_db_key:
return # admin not configured anywhere -> open (dev mode)
if not is_admin(authorization, admin_key, svc=svc):
raise ApiError(
"admin_required", "admin credential required for this route",
status=401,
)
def require_tenant_or_admin(svc: OrgStateService,
authorization: Optional[str],
tenant_id: str,
admin_key: Optional[str] = None) -> Optional[ApiKey]:
"""Allow admin OR a tenant key bound to ``tenant_id``.
Returns the resolved ApiKey on the tenant path, or None when the
caller is admin (no specific key was used). Raises 401/403 the same
way :func:`require_key` / :func:`require_tenant_access` do.
"""
if is_admin(authorization, admin_key, svc=svc):
return None
key = require_key(svc, authorization)
require_tenant_access(key, tenant_id)
return key
# --- fastapi dependency factory ------------------------------------------
def make_dependency(svc: OrgStateService) -> Callable:
"""Build the dependency that FastAPI route handlers can ``Depends(...)`` on.
Built as a closure so the SDK that uses the API never has to think
about fastapi's request-scoped DI for the service; the service is
bound once at app construction time.
Stage 152: dependency also accepts SSO session cookies (Stage 97
Google OIDC + Stage 102 SAML). Resolution order:
1. ``Authorization: Bearer <api-key>`` (SDK / curl / non-browser)
2. ``orgstate_session`` cookie (browser dashboard after SSO)
Cookie-side path derives a virtual ApiKey from the SSO session:
tenant_id + role=admin (the company already gated who can sign in
via Google; once they're in, they get the same access an API-key
holder would). The virtual key is NOT persisted; it lives only
for the request lifetime.
"""
from fastapi import Header
async def _dep(
request: Request,
authorization: Optional[str] = Header(default=None),
) -> ApiKey:
if authorization:
return require_key(svc, authorization)
# Fall through to SSO session cookie. Constant is
# duplicated from infra/api/app.py (which defines it
# inside create_app, not module-scope) — kept stable
# across Stage 97 / 152.
_SSO_SESSION_COOKIE = "orgstate_sso_session"
from infra.auth.keys import ApiKey as _ApiKey
sess_token = request.cookies.get(_SSO_SESSION_COOKIE)
if sess_token:
info = svc.verify_sso_session(sess_token)
if info is not None:
# Synthetic ApiKey for the request's lifetime.
# key_id encodes the SSO origin so audit rows can
# tell SDK calls apart from dashboard calls;
# the truncated prefix matches Stage 12's
# convention.
return _ApiKey(
key_id=f"sso:{sess_token[:12]}",
tenant_id=info["tenant_id"],
key_hash="", # never persisted
name=f"sso:{info.get('user_email','')}",
created_at=info.get("created_at", ""),
role="admin",
)
return require_key(svc, authorization) # raises 401
return _dep