orgstate / infra /api /telemetry.py
Legal-i's picture
Initial OrgState deploy via Stage 150 free-tier stack
d2d1903 verified
"""
infra.api.telemetry β€” error aggregation (Stage 83).
After production sprint 78-82 closed the security/compliance
blockers, the next gap is *visibility*. Today an unhandled 5xx
exception goes to the JSON log line and that's it β€” no
aggregation, no alerting, no metric. An operator only finds out
about a regression when a customer complains. Stage 83 adds:
1. **In-process error counter** (always on, zero deps) exposed
via /metrics as ``orgstate_http_errors_total`` with status
label. Visible immediately in Prometheus + Grafana.
2. **Optional Sentry integration** (no hard dep on sentry-sdk).
If ``SENTRY_DSN`` is set AND ``sentry_sdk`` is importable,
``init_sentry()`` initializes it with sensible defaults:
- PII scrubbing on Authorization headers + ``secret*`` fields
- traces_sample_rate from ``SENTRY_TRACES_SAMPLE_RATE`` (default 0)
- environment from ``SENTRY_ENVIRONMENT`` (default "production")
3. **Middleware hook** that increments the counter on 5xx and
re-raises so FastAPI's default 500 handler still runs (so
Sentry's auto-instrumentation also fires if installed).
Module is stdlib-only by default; sentry-sdk import is lazy.
"""
from __future__ import annotations
import os
import threading
from collections import defaultdict
from typing import Any, Dict, Optional
# --- error counter --------------------------------------------------
class ErrorCounter:
"""Thread-safe counter of HTTP error responses, bucketed by
status code. The /metrics endpoint reads ``snapshot()`` once
per scrape; the middleware calls ``increment(code)`` per error.
Two reasons it's a class instead of a module-global dict:
1. Tests can build an isolated counter and inject it β€” no
cross-test pollution.
2. Multi-app processes (rare today, but the rate limiter
already supports this pattern) can each have their own.
"""
def __init__(self) -> None:
self._counts: Dict[int, int] = defaultdict(int)
self._lock = threading.Lock()
def increment(self, status_code: int) -> None:
with self._lock:
self._counts[status_code] += 1
def snapshot(self) -> Dict[int, int]:
"""Return a copy so the caller can iterate without
racing with concurrent increments."""
with self._lock:
return dict(self._counts)
def total(self) -> int:
with self._lock:
return sum(self._counts.values())
def reset(self) -> None:
"""Tests only β€” production counter is meant to live for
the process lifetime and reset on restart (which is fine;
Prometheus is monotonic-counter-friendly via its rate()
function)."""
with self._lock:
self._counts.clear()
def should_record(status_code: int) -> bool:
"""Predicate for the middleware: only 5xx responses indicate
PLATFORM errors. 4xx are caller errors (bad request, auth
failure, rate-limited) and recording them as platform errors
would pollute the signal."""
return status_code >= 500
# --- PII scrubbing --------------------------------------------------
# Headers that may contain credentials or PII. Lowercased for
# case-insensitive compare.
_SCRUBBED_HEADERS = frozenset([
"authorization",
"x-api-key",
"cookie",
"set-cookie",
"x-orgstate-signature", # the HMAC of webhook deliveries
])
# Body field substrings (case-insensitive) that mark a sensitive
# value. We replace VALUE with "[scrubbed]" rather than dropping
# the key β€” keeps the structure intact for debugging.
_SCRUBBED_FIELD_SUBSTRINGS = (
"secret",
"password",
"token",
"api_key",
"raw_key",
)
def _is_sensitive_field(name: str) -> bool:
lower = name.lower()
return any(sub in lower for sub in _SCRUBBED_FIELD_SUBSTRINGS)
def scrub_dict(d: Any) -> Any:
"""Recursively replace sensitive values with ``"[scrubbed]"``.
Used by the Sentry before_send hook and any other code that
wants to log a payload without leaking secrets."""
if isinstance(d, dict):
out: Dict[str, Any] = {}
for k, v in d.items():
if isinstance(k, str) and _is_sensitive_field(k):
out[k] = "[scrubbed]"
elif isinstance(k, str) and k.lower() in _SCRUBBED_HEADERS:
out[k] = "[scrubbed]"
else:
out[k] = scrub_dict(v)
return out
if isinstance(d, list):
return [scrub_dict(item) for item in d]
return d
def scrub_event(event: Dict[str, Any], _hint: Optional[Dict] = None
) -> Optional[Dict[str, Any]]:
"""Sentry ``before_send`` callback. Mutates the event in-place
to scrub Authorization headers, cookies, and any field whose
name matches a sensitive substring. Returns the (modified)
event so Sentry sends it; return None to drop entirely.
Defensive copy β€” Sentry's contract says ``before_send`` may
return a new dict. We mutate the input as well since Sentry
won't reuse it after this returns."""
return scrub_dict(event)
# --- Sentry init ----------------------------------------------------
class _SentryInitResult:
"""Returned by init_sentry β€” small dict-like for the route to
surface to the operator. Always carries ``configured`` so the
test can verify the right branch fired."""
def __init__(self, *, configured: bool, reason: str,
dsn_set: bool, sdk_present: bool):
self.configured = configured
self.reason = reason
self.dsn_set = dsn_set
self.sdk_present = sdk_present
def __repr__(self) -> str: # for debug-friendly test failures
return (f"<SentryInit configured={self.configured} "
f"reason={self.reason!r}>")
def init_sentry(env: Optional[Dict[str, str]] = None) -> _SentryInitResult:
"""Initialize sentry-sdk if both DSN and the SDK are available.
Noop in tests / dev β€” operators opt in by exporting SENTRY_DSN.
``env`` defaults to ``os.environ``; pass a dict in tests to
control behavior without monkeypatching the process env."""
env = env if env is not None else os.environ
dsn = env.get("SENTRY_DSN", "").strip()
if not dsn:
return _SentryInitResult(
configured=False, reason="SENTRY_DSN not set",
dsn_set=False, sdk_present=False,
)
try:
import sentry_sdk # type: ignore
except ImportError:
# SDK not installed β€” operator set DSN but didn't `pip
# install sentry-sdk`. Treat as misconfiguration; log
# via reason but don't crash the app startup.
return _SentryInitResult(
configured=False,
reason="sentry-sdk not installed but SENTRY_DSN is set "
"β€” `pip install sentry-sdk`",
dsn_set=True, sdk_present=False,
)
# Sample rates from env. 0.0 = no traces (default); 1.0 = all.
try:
traces_rate = float(env.get("SENTRY_TRACES_SAMPLE_RATE", "0.0"))
except ValueError:
traces_rate = 0.0
sentry_sdk.init(
dsn=dsn,
environment=env.get("SENTRY_ENVIRONMENT", "production"),
traces_sample_rate=traces_rate,
before_send=scrub_event,
# Auto-capturing of exceptions is the default; we get
# FastAPI/Starlette unhandled errors for free.
)
return _SentryInitResult(
configured=True, reason="initialized",
dsn_set=True, sdk_present=True,
)