orgstate / core /config.py
Legal-i's picture
Stage 187: α acceleration signal
f1ea47c verified
"""
core.config — the per-entity-type configuration schema.
Stage 1 goal: kill the hardcoded magic numbers. In the legacy engine the
metric set, the change-detection scales, which metrics feed which signal, the
drift weights and the severity thresholds were all baked into the pipeline
code (run_demo_pipeline.py). Here they become *data*.
A new customer or vertical should be config + a thin schema adapter — never a
new copy of the pipeline.
This module is pure (stdlib only). ``load_yaml_config`` is a convenience
loader that needs PyYAML; everything else works from plain dicts, so the core
test suite stays dependency-free.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from .drift import DEFAULT_DRIFT_WEIGHTS, DEFAULT_SEVERITY_THRESHOLDS
# directions a metric can move in
HIGHER_IS_WORSE = "higher_is_worse"
LOWER_IS_WORSE = "lower_is_worse"
_DIRECTIONS = {HIGHER_IS_WORSE, LOWER_IS_WORSE}
# the legacy data contracts use these — accepted as aliases
_DIRECTION_ALIASES = {
"higher_is_better": LOWER_IS_WORSE,
"lower_is_better": HIGHER_IS_WORSE,
}
@dataclass
class MetricConfig:
"""How one metric participates in the signal computation.
``direction`` — which way is bad (drives the directional Δ).
``weight`` — relative weight inside the Δ aggregate.
``feeds_stability`` — included in the ψ (stability) computation.
``feeds_anomaly`` — included in the ξ (anomaly) computation.
``feeds_acceleration``— included in the α (acceleration / second
derivative) computation. Off by default
so legacy configs are unaffected by
Stage 187.
``latency_target`` — if set, the metric contributes to γ against this SLA.
``coherence_target`` — if set, distance from this target contributes to κ.
"""
name: str
direction: str
weight: float = 1.0
feeds_stability: bool = False
feeds_anomaly: bool = True
feeds_acceleration: bool = False
latency_target: Optional[float] = None
coherence_target: Optional[float] = None
def __post_init__(self):
self.direction = _DIRECTION_ALIASES.get(self.direction, self.direction)
if self.direction not in _DIRECTIONS:
raise ValueError(
f"metric {self.name!r}: direction must be one of "
f"{sorted(_DIRECTIONS)} (or a legacy alias), got {self.direction!r}"
)
if self.weight < 0:
raise ValueError(f"metric {self.name!r}: weight must be >= 0")
@classmethod
def from_dict(cls, name: str, d: Dict) -> "MetricConfig":
return cls(
name=name,
direction=d["direction"],
weight=float(d.get("weight", 1.0)),
feeds_stability=bool(d.get("feeds_stability", False)),
feeds_anomaly=bool(d.get("feeds_anomaly", True)),
feeds_acceleration=bool(d.get("feeds_acceleration", False)),
latency_target=_opt_float(d.get("latency_target")),
coherence_target=_opt_float(d.get("coherence_target")),
)
def to_dict(self) -> Dict:
return {
"direction": self.direction,
"weight": self.weight,
"feeds_stability": self.feeds_stability,
"feeds_anomaly": self.feeds_anomaly,
"feeds_acceleration": self.feeds_acceleration,
"latency_target": self.latency_target,
"coherence_target": self.coherence_target,
}
@dataclass
class EntityTypeConfig:
"""Everything the generic pipeline needs to analyse one entity type.
Windows:
``baseline_window`` — warmup; no scoring until this many points exist.
``baseline_lag`` — baseline = mean of history[: i - baseline_lag].
``recent_window`` — window fed to the ψ stability computation.
"""
entity_type: str
metrics: List[MetricConfig]
baseline_window: int = 14
baseline_lag: int = 7
recent_window: int = 14
signal_weights: Dict[str, float] = field(
default_factory=lambda: dict(DEFAULT_DRIFT_WEIGHTS)
)
severity_thresholds: Dict[str, float] = field(
default_factory=lambda: dict(DEFAULT_SEVERITY_THRESHOLDS)
)
def __post_init__(self):
if not self.metrics:
raise ValueError(f"entity type {self.entity_type!r}: needs >=1 metric")
if self.baseline_window < 2:
raise ValueError("baseline_window must be >= 2")
if not 0 < self.baseline_lag < self.baseline_window:
raise ValueError("baseline_lag must satisfy 0 < lag < baseline_window")
names = [m.name for m in self.metrics]
if len(names) != len(set(names)):
raise ValueError(f"entity type {self.entity_type!r}: duplicate metric names")
@property
def metric_names(self) -> List[str]:
return [m.name for m in self.metrics]
def metric(self, name: str) -> MetricConfig:
for m in self.metrics:
if m.name == name:
return m
raise KeyError(name)
@classmethod
def from_dict(cls, d: Dict) -> "EntityTypeConfig":
metrics = [MetricConfig.from_dict(name, md) for name, md in d["metrics"].items()]
return cls(
entity_type=d["entity_type"],
metrics=metrics,
baseline_window=int(d.get("baseline_window", 14)),
baseline_lag=int(d.get("baseline_lag", 7)),
recent_window=int(d.get("recent_window", 14)),
signal_weights={**DEFAULT_DRIFT_WEIGHTS, **d.get("signal_weights", {})},
severity_thresholds={**DEFAULT_SEVERITY_THRESHOLDS,
**d.get("severity_thresholds", {})},
)
def to_dict(self) -> Dict:
return {
"entity_type": self.entity_type,
"metrics": {m.name: m.to_dict() for m in self.metrics},
"baseline_window": self.baseline_window,
"baseline_lag": self.baseline_lag,
"recent_window": self.recent_window,
"signal_weights": dict(self.signal_weights),
"severity_thresholds": dict(self.severity_thresholds),
}
@dataclass
class VerticalConfig:
"""A vertical is just a named bundle of entity-type configs."""
name: str
entity_types: Dict[str, EntityTypeConfig]
def entity_type(self, name: str) -> EntityTypeConfig:
return self.entity_types[name]
@classmethod
def from_dict(cls, d: Dict) -> "VerticalConfig":
ets = {
name: EntityTypeConfig.from_dict({"entity_type": name, **etd})
for name, etd in d["entity_types"].items()
}
return cls(name=d["name"], entity_types=ets)
def to_dict(self) -> Dict:
return {
"name": self.name,
"entity_types": {
name: {k: v for k, v in et.to_dict().items() if k != "entity_type"}
for name, et in self.entity_types.items()
},
}
def load_yaml_config(path) -> VerticalConfig:
"""Load a VerticalConfig from a YAML file. Needs PyYAML (in requirements.txt).
Kept thin on purpose: it only does file I/O + yaml.safe_load, then hands
off to the pure ``VerticalConfig.from_dict``.
"""
try:
import yaml
except ImportError as exc: # pragma: no cover
raise ImportError(
"load_yaml_config needs PyYAML — `pip install pyyaml`, or build "
"the VerticalConfig from a dict via VerticalConfig.from_dict()."
) from exc
from pathlib import Path
raw = yaml.safe_load(Path(path).read_text(encoding="utf-8"))
return VerticalConfig.from_dict(raw)
def _opt_float(v) -> Optional[float]:
return None if v is None else float(v)