| """ |
| core.config — the per-entity-type configuration schema. |
| |
| Stage 1 goal: kill the hardcoded magic numbers. In the legacy engine the |
| metric set, the change-detection scales, which metrics feed which signal, the |
| drift weights and the severity thresholds were all baked into the pipeline |
| code (run_demo_pipeline.py). Here they become *data*. |
| |
| A new customer or vertical should be config + a thin schema adapter — never a |
| new copy of the pipeline. |
| |
| This module is pure (stdlib only). ``load_yaml_config`` is a convenience |
| loader that needs PyYAML; everything else works from plain dicts, so the core |
| test suite stays dependency-free. |
| """ |
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from typing import Dict, List, Optional |
|
|
| from .drift import DEFAULT_DRIFT_WEIGHTS, DEFAULT_SEVERITY_THRESHOLDS |
|
|
| |
| HIGHER_IS_WORSE = "higher_is_worse" |
| LOWER_IS_WORSE = "lower_is_worse" |
| _DIRECTIONS = {HIGHER_IS_WORSE, LOWER_IS_WORSE} |
|
|
| |
| _DIRECTION_ALIASES = { |
| "higher_is_better": LOWER_IS_WORSE, |
| "lower_is_better": HIGHER_IS_WORSE, |
| } |
|
|
|
|
| @dataclass |
| class MetricConfig: |
| """How one metric participates in the signal computation. |
| |
| ``direction`` — which way is bad (drives the directional Δ). |
| ``weight`` — relative weight inside the Δ aggregate. |
| ``feeds_stability`` — included in the ψ (stability) computation. |
| ``feeds_anomaly`` — included in the ξ (anomaly) computation. |
| ``feeds_acceleration``— included in the α (acceleration / second |
| derivative) computation. Off by default |
| so legacy configs are unaffected by |
| Stage 187. |
| ``latency_target`` — if set, the metric contributes to γ against this SLA. |
| ``coherence_target`` — if set, distance from this target contributes to κ. |
| """ |
| name: str |
| direction: str |
| weight: float = 1.0 |
| feeds_stability: bool = False |
| feeds_anomaly: bool = True |
| feeds_acceleration: bool = False |
| latency_target: Optional[float] = None |
| coherence_target: Optional[float] = None |
|
|
| def __post_init__(self): |
| self.direction = _DIRECTION_ALIASES.get(self.direction, self.direction) |
| if self.direction not in _DIRECTIONS: |
| raise ValueError( |
| f"metric {self.name!r}: direction must be one of " |
| f"{sorted(_DIRECTIONS)} (or a legacy alias), got {self.direction!r}" |
| ) |
| if self.weight < 0: |
| raise ValueError(f"metric {self.name!r}: weight must be >= 0") |
|
|
| @classmethod |
| def from_dict(cls, name: str, d: Dict) -> "MetricConfig": |
| return cls( |
| name=name, |
| direction=d["direction"], |
| weight=float(d.get("weight", 1.0)), |
| feeds_stability=bool(d.get("feeds_stability", False)), |
| feeds_anomaly=bool(d.get("feeds_anomaly", True)), |
| feeds_acceleration=bool(d.get("feeds_acceleration", False)), |
| latency_target=_opt_float(d.get("latency_target")), |
| coherence_target=_opt_float(d.get("coherence_target")), |
| ) |
|
|
| def to_dict(self) -> Dict: |
| return { |
| "direction": self.direction, |
| "weight": self.weight, |
| "feeds_stability": self.feeds_stability, |
| "feeds_anomaly": self.feeds_anomaly, |
| "feeds_acceleration": self.feeds_acceleration, |
| "latency_target": self.latency_target, |
| "coherence_target": self.coherence_target, |
| } |
|
|
|
|
| @dataclass |
| class EntityTypeConfig: |
| """Everything the generic pipeline needs to analyse one entity type. |
| |
| Windows: |
| ``baseline_window`` — warmup; no scoring until this many points exist. |
| ``baseline_lag`` — baseline = mean of history[: i - baseline_lag]. |
| ``recent_window`` — window fed to the ψ stability computation. |
| """ |
| entity_type: str |
| metrics: List[MetricConfig] |
| baseline_window: int = 14 |
| baseline_lag: int = 7 |
| recent_window: int = 14 |
| signal_weights: Dict[str, float] = field( |
| default_factory=lambda: dict(DEFAULT_DRIFT_WEIGHTS) |
| ) |
| severity_thresholds: Dict[str, float] = field( |
| default_factory=lambda: dict(DEFAULT_SEVERITY_THRESHOLDS) |
| ) |
|
|
| def __post_init__(self): |
| if not self.metrics: |
| raise ValueError(f"entity type {self.entity_type!r}: needs >=1 metric") |
| if self.baseline_window < 2: |
| raise ValueError("baseline_window must be >= 2") |
| if not 0 < self.baseline_lag < self.baseline_window: |
| raise ValueError("baseline_lag must satisfy 0 < lag < baseline_window") |
| names = [m.name for m in self.metrics] |
| if len(names) != len(set(names)): |
| raise ValueError(f"entity type {self.entity_type!r}: duplicate metric names") |
|
|
| @property |
| def metric_names(self) -> List[str]: |
| return [m.name for m in self.metrics] |
|
|
| def metric(self, name: str) -> MetricConfig: |
| for m in self.metrics: |
| if m.name == name: |
| return m |
| raise KeyError(name) |
|
|
| @classmethod |
| def from_dict(cls, d: Dict) -> "EntityTypeConfig": |
| metrics = [MetricConfig.from_dict(name, md) for name, md in d["metrics"].items()] |
| return cls( |
| entity_type=d["entity_type"], |
| metrics=metrics, |
| baseline_window=int(d.get("baseline_window", 14)), |
| baseline_lag=int(d.get("baseline_lag", 7)), |
| recent_window=int(d.get("recent_window", 14)), |
| signal_weights={**DEFAULT_DRIFT_WEIGHTS, **d.get("signal_weights", {})}, |
| severity_thresholds={**DEFAULT_SEVERITY_THRESHOLDS, |
| **d.get("severity_thresholds", {})}, |
| ) |
|
|
| def to_dict(self) -> Dict: |
| return { |
| "entity_type": self.entity_type, |
| "metrics": {m.name: m.to_dict() for m in self.metrics}, |
| "baseline_window": self.baseline_window, |
| "baseline_lag": self.baseline_lag, |
| "recent_window": self.recent_window, |
| "signal_weights": dict(self.signal_weights), |
| "severity_thresholds": dict(self.severity_thresholds), |
| } |
|
|
|
|
| @dataclass |
| class VerticalConfig: |
| """A vertical is just a named bundle of entity-type configs.""" |
| name: str |
| entity_types: Dict[str, EntityTypeConfig] |
|
|
| def entity_type(self, name: str) -> EntityTypeConfig: |
| return self.entity_types[name] |
|
|
| @classmethod |
| def from_dict(cls, d: Dict) -> "VerticalConfig": |
| ets = { |
| name: EntityTypeConfig.from_dict({"entity_type": name, **etd}) |
| for name, etd in d["entity_types"].items() |
| } |
| return cls(name=d["name"], entity_types=ets) |
|
|
| def to_dict(self) -> Dict: |
| return { |
| "name": self.name, |
| "entity_types": { |
| name: {k: v for k, v in et.to_dict().items() if k != "entity_type"} |
| for name, et in self.entity_types.items() |
| }, |
| } |
|
|
|
|
| def load_yaml_config(path) -> VerticalConfig: |
| """Load a VerticalConfig from a YAML file. Needs PyYAML (in requirements.txt). |
| |
| Kept thin on purpose: it only does file I/O + yaml.safe_load, then hands |
| off to the pure ``VerticalConfig.from_dict``. |
| """ |
| try: |
| import yaml |
| except ImportError as exc: |
| raise ImportError( |
| "load_yaml_config needs PyYAML — `pip install pyyaml`, or build " |
| "the VerticalConfig from a dict via VerticalConfig.from_dict()." |
| ) from exc |
| from pathlib import Path |
|
|
| raw = yaml.safe_load(Path(path).read_text(encoding="utf-8")) |
| return VerticalConfig.from_dict(raw) |
|
|
|
|
| def _opt_float(v) -> Optional[float]: |
| return None if v is None else float(v) |
|
|