orgstate / core /drift.py
Legal-i's picture
Stage 187: Ξ± acceleration signal
f1ea47c verified
from dataclasses import dataclass, field
from datetime import date
from typing import Dict, List
@dataclass
class DriftIssue:
issue_id: str
entity_id: str
entity_type: str
detected_at: date
score: float
severity: str
title: str
explanation: str
evidence: List[Dict] = field(default_factory=list)
# Documented defaults. Stage 1 makes these overridable per entity-type via
# config (see core/config.py), but the defaults stay pinned here so existing
# behaviour β€” and the tests that pin it β€” never change silently.
DEFAULT_DRIFT_WEIGHTS = {
"delta": 0.25,
"psi": 0.20,
"xi": 0.25,
"gamma": 0.15,
"kappa": 0.15,
# Stage 187 β€” Ξ± (acceleration / second derivative). Default
# weight 0 so the signal is fully opt-in via entity_type
# config β€” adding the signal must not silently shift drift
# scores on any pre-existing tenant.
"alpha": 0.0,
}
DEFAULT_SEVERITY_THRESHOLDS = {
"critical": 0.75,
"high": 0.55,
"medium": 0.35,
}
def drift_score(signals: Dict[str, float], weights: Dict[str, float] = None) -> float:
"""Weighted aggregate of the five signals into a 0..1 drift score.
``psi`` and ``kappa`` are "health" signals (1.0 = healthy) so they enter as
``(1 - value)``; ``delta``, ``xi`` and ``gamma`` are "stress" signals.
``weights`` defaults to ``DEFAULT_DRIFT_WEIGHTS`` β€” pass a config-derived
dict to override per entity-type. Missing keys fall back to the default.
"""
w = DEFAULT_DRIFT_WEIGHTS if weights is None else weights
delta = signals.get("delta", 0.0)
psi = signals.get("psi", 1.0)
xi = signals.get("xi", 0.0)
gamma = signals.get("gamma", 0.0)
kappa = signals.get("kappa", 1.0)
alpha = signals.get("alpha", 0.0) # Stage 187
return (
w.get("delta", DEFAULT_DRIFT_WEIGHTS["delta"]) * delta +
w.get("psi", DEFAULT_DRIFT_WEIGHTS["psi"]) * (1.0 - psi) +
w.get("xi", DEFAULT_DRIFT_WEIGHTS["xi"]) * xi +
w.get("gamma", DEFAULT_DRIFT_WEIGHTS["gamma"]) * gamma +
w.get("kappa", DEFAULT_DRIFT_WEIGHTS["kappa"]) * (1.0 - kappa) +
w.get("alpha", DEFAULT_DRIFT_WEIGHTS["alpha"]) * alpha
)
def severity_from_score(score: float, thresholds: Dict[str, float] = None) -> str:
"""Map a drift score to a severity label.
``thresholds`` defaults to ``DEFAULT_SEVERITY_THRESHOLDS`` β€” pass a
config-derived dict to override per entity-type.
"""
t = DEFAULT_SEVERITY_THRESHOLDS if thresholds is None else thresholds
if score >= t.get("critical", DEFAULT_SEVERITY_THRESHOLDS["critical"]):
return "critical"
if score >= t.get("high", DEFAULT_SEVERITY_THRESHOLDS["high"]):
return "high"
if score >= t.get("medium", DEFAULT_SEVERITY_THRESHOLDS["medium"]):
return "medium"
return "low"