""" core.config — the per-entity-type configuration schema. Stage 1 goal: kill the hardcoded magic numbers. In the legacy engine the metric set, the change-detection scales, which metrics feed which signal, the drift weights and the severity thresholds were all baked into the pipeline code (run_demo_pipeline.py). Here they become *data*. A new customer or vertical should be config + a thin schema adapter — never a new copy of the pipeline. This module is pure (stdlib only). ``load_yaml_config`` is a convenience loader that needs PyYAML; everything else works from plain dicts, so the core test suite stays dependency-free. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Dict, List, Optional from .drift import DEFAULT_DRIFT_WEIGHTS, DEFAULT_SEVERITY_THRESHOLDS # directions a metric can move in HIGHER_IS_WORSE = "higher_is_worse" LOWER_IS_WORSE = "lower_is_worse" _DIRECTIONS = {HIGHER_IS_WORSE, LOWER_IS_WORSE} # the legacy data contracts use these — accepted as aliases _DIRECTION_ALIASES = { "higher_is_better": LOWER_IS_WORSE, "lower_is_better": HIGHER_IS_WORSE, } @dataclass class MetricConfig: """How one metric participates in the signal computation. ``direction`` — which way is bad (drives the directional Δ). ``weight`` — relative weight inside the Δ aggregate. ``feeds_stability`` — included in the ψ (stability) computation. ``feeds_anomaly`` — included in the ξ (anomaly) computation. ``feeds_acceleration``— included in the α (acceleration / second derivative) computation. Off by default so legacy configs are unaffected by Stage 187. ``latency_target`` — if set, the metric contributes to γ against this SLA. ``coherence_target`` — if set, distance from this target contributes to κ. """ name: str direction: str weight: float = 1.0 feeds_stability: bool = False feeds_anomaly: bool = True feeds_acceleration: bool = False latency_target: Optional[float] = None coherence_target: Optional[float] = None def __post_init__(self): self.direction = _DIRECTION_ALIASES.get(self.direction, self.direction) if self.direction not in _DIRECTIONS: raise ValueError( f"metric {self.name!r}: direction must be one of " f"{sorted(_DIRECTIONS)} (or a legacy alias), got {self.direction!r}" ) if self.weight < 0: raise ValueError(f"metric {self.name!r}: weight must be >= 0") @classmethod def from_dict(cls, name: str, d: Dict) -> "MetricConfig": return cls( name=name, direction=d["direction"], weight=float(d.get("weight", 1.0)), feeds_stability=bool(d.get("feeds_stability", False)), feeds_anomaly=bool(d.get("feeds_anomaly", True)), feeds_acceleration=bool(d.get("feeds_acceleration", False)), latency_target=_opt_float(d.get("latency_target")), coherence_target=_opt_float(d.get("coherence_target")), ) def to_dict(self) -> Dict: return { "direction": self.direction, "weight": self.weight, "feeds_stability": self.feeds_stability, "feeds_anomaly": self.feeds_anomaly, "feeds_acceleration": self.feeds_acceleration, "latency_target": self.latency_target, "coherence_target": self.coherence_target, } @dataclass class EntityTypeConfig: """Everything the generic pipeline needs to analyse one entity type. Windows: ``baseline_window`` — warmup; no scoring until this many points exist. ``baseline_lag`` — baseline = mean of history[: i - baseline_lag]. ``recent_window`` — window fed to the ψ stability computation. """ entity_type: str metrics: List[MetricConfig] baseline_window: int = 14 baseline_lag: int = 7 recent_window: int = 14 signal_weights: Dict[str, float] = field( default_factory=lambda: dict(DEFAULT_DRIFT_WEIGHTS) ) severity_thresholds: Dict[str, float] = field( default_factory=lambda: dict(DEFAULT_SEVERITY_THRESHOLDS) ) def __post_init__(self): if not self.metrics: raise ValueError(f"entity type {self.entity_type!r}: needs >=1 metric") if self.baseline_window < 2: raise ValueError("baseline_window must be >= 2") if not 0 < self.baseline_lag < self.baseline_window: raise ValueError("baseline_lag must satisfy 0 < lag < baseline_window") names = [m.name for m in self.metrics] if len(names) != len(set(names)): raise ValueError(f"entity type {self.entity_type!r}: duplicate metric names") @property def metric_names(self) -> List[str]: return [m.name for m in self.metrics] def metric(self, name: str) -> MetricConfig: for m in self.metrics: if m.name == name: return m raise KeyError(name) @classmethod def from_dict(cls, d: Dict) -> "EntityTypeConfig": metrics = [MetricConfig.from_dict(name, md) for name, md in d["metrics"].items()] return cls( entity_type=d["entity_type"], metrics=metrics, baseline_window=int(d.get("baseline_window", 14)), baseline_lag=int(d.get("baseline_lag", 7)), recent_window=int(d.get("recent_window", 14)), signal_weights={**DEFAULT_DRIFT_WEIGHTS, **d.get("signal_weights", {})}, severity_thresholds={**DEFAULT_SEVERITY_THRESHOLDS, **d.get("severity_thresholds", {})}, ) def to_dict(self) -> Dict: return { "entity_type": self.entity_type, "metrics": {m.name: m.to_dict() for m in self.metrics}, "baseline_window": self.baseline_window, "baseline_lag": self.baseline_lag, "recent_window": self.recent_window, "signal_weights": dict(self.signal_weights), "severity_thresholds": dict(self.severity_thresholds), } @dataclass class VerticalConfig: """A vertical is just a named bundle of entity-type configs.""" name: str entity_types: Dict[str, EntityTypeConfig] def entity_type(self, name: str) -> EntityTypeConfig: return self.entity_types[name] @classmethod def from_dict(cls, d: Dict) -> "VerticalConfig": ets = { name: EntityTypeConfig.from_dict({"entity_type": name, **etd}) for name, etd in d["entity_types"].items() } return cls(name=d["name"], entity_types=ets) def to_dict(self) -> Dict: return { "name": self.name, "entity_types": { name: {k: v for k, v in et.to_dict().items() if k != "entity_type"} for name, et in self.entity_types.items() }, } def load_yaml_config(path) -> VerticalConfig: """Load a VerticalConfig from a YAML file. Needs PyYAML (in requirements.txt). Kept thin on purpose: it only does file I/O + yaml.safe_load, then hands off to the pure ``VerticalConfig.from_dict``. """ try: import yaml except ImportError as exc: # pragma: no cover raise ImportError( "load_yaml_config needs PyYAML — `pip install pyyaml`, or build " "the VerticalConfig from a dict via VerticalConfig.from_dict()." ) from exc from pathlib import Path raw = yaml.safe_load(Path(path).read_text(encoding="utf-8")) return VerticalConfig.from_dict(raw) def _opt_float(v) -> Optional[float]: return None if v is None else float(v)