""" Node and edge metrics for adaptive routing. Tracking and updating: - Reliability — percentage of successful executions - Latency — average execution time - Cost — token/monetary expenditure - Quality — response quality score - Throughput """ from collections import deque from datetime import UTC, datetime from typing import Any import torch from pydantic import BaseModel, Field __all__ = [ "EdgeMetrics", "ExponentialMovingAverage", # Aggregators "MetricAggregator", "MetricHistory", "MetricSnapshot", # Main tracker "MetricsTracker", # Data classes "NodeMetrics", "SlidingWindowAverage", "compute_composite_score", # Utility "compute_reliability_score", ] class MetricSnapshot(BaseModel): timestamp: datetime value: float metadata: dict[str, Any] = Field(default_factory=dict) class MetricHistory(BaseModel): max_size: int = 1000 snapshots: list[MetricSnapshot] = Field(default_factory=list) def add(self, value: float, metadata: dict[str, Any] | None = None) -> None: """Add a value snapshot, trimming history to max_size.""" snapshot = MetricSnapshot( timestamp=datetime.now(UTC), value=value, metadata=metadata or {}, ) self.snapshots.append(snapshot) if len(self.snapshots) > self.max_size: self.snapshots = self.snapshots[-self.max_size :] def get_recent(self, n: int = 10) -> list[MetricSnapshot]: """Get the last n snapshots.""" return self.snapshots[-n:] def get_since(self, since: datetime) -> list[MetricSnapshot]: """Return snapshots collected after the given time.""" return [s for s in self.snapshots if s.timestamp >= since] def get_values(self) -> list[float]: """Return a list of numeric history values.""" return [s.value for s in self.snapshots] @property def mean(self) -> float: """Mean of the history or 0.0 if empty.""" values = self.get_values() return float(torch.mean(torch.tensor(values))) if values else 0.0 @property def std(self) -> float: """Standard deviation of the history or 0.0 when insufficient data.""" values = self.get_values() return float(torch.std(torch.tensor(values))) if len(values) > 1 else 0.0 @property def last(self) -> float | None: """Last value or None if the history is empty.""" return self.snapshots[-1].value if self.snapshots else None class NodeMetrics(BaseModel): node_id: str total_executions: int = 0 successful_executions: int = 0 failed_executions: int = 0 reliability: float = 1.0 avg_latency_ms: float = 0.0 avg_cost_tokens: float = 0.0 avg_cost_usd: float = 0.0 avg_quality: float = 1.0 latency_history: MetricHistory = Field(default_factory=MetricHistory) quality_history: MetricHistory = Field(default_factory=MetricHistory) cost_history: MetricHistory = Field(default_factory=MetricHistory) last_execution: datetime | None = None last_success: datetime | None = None last_failure: datetime | None = None tags: set[str] = Field(default_factory=set) custom_metrics: dict[str, float] = Field(default_factory=dict) def record_execution( self, success: bool, latency_ms: float, cost_tokens: int = 0, cost_usd: float = 0.0, quality: float = 1.0, metadata: dict[str, Any] | None = None, ) -> None: """Record an agent execution and update reliability/latency/cost.""" now = datetime.now(UTC) self.total_executions += 1 self.last_execution = now if success: self.successful_executions += 1 self.last_success = now else: self.failed_executions += 1 self.last_failure = now self.reliability = self.successful_executions / self.total_executions self.latency_history.add(latency_ms, metadata) self.quality_history.add(quality, metadata) self.cost_history.add(float(cost_tokens), metadata) self.avg_latency_ms = self.latency_history.mean self.avg_quality = self.quality_history.mean self.avg_cost_tokens = self.cost_history.mean alpha = 0.1 self.avg_cost_usd = alpha * cost_usd + (1 - alpha) * self.avg_cost_usd def get_composite_score( self, reliability_weight: float = 0.4, latency_weight: float = 0.2, cost_weight: float = 0.2, quality_weight: float = 0.2, latency_baseline_ms: float = 1000.0, cost_baseline_tokens: float = 1000.0, ) -> float: """Combine node metrics into a single score with the given weights.""" latency_score = max(0.0, 1.0 - self.avg_latency_ms / latency_baseline_ms) cost_score = max(0.0, 1.0 - self.avg_cost_tokens / cost_baseline_tokens) return ( reliability_weight * self.reliability + latency_weight * latency_score + cost_weight * cost_score + quality_weight * self.avg_quality ) def to_dict(self) -> dict[str, Any]: """Serialize node metrics to a dict.""" return { "node_id": self.node_id, "total_executions": self.total_executions, "successful_executions": self.successful_executions, "failed_executions": self.failed_executions, "reliability": self.reliability, "avg_latency_ms": self.avg_latency_ms, "avg_cost_tokens": self.avg_cost_tokens, "avg_cost_usd": self.avg_cost_usd, "avg_quality": self.avg_quality, "last_execution": self.last_execution.isoformat() if self.last_execution else None, "tags": list(self.tags), "custom_metrics": self.custom_metrics, } class EdgeMetrics(BaseModel): source_id: str target_id: str total_transitions: int = 0 successful_transitions: int = 0 weight: float = 1.0 reliability: float = 1.0 avg_latency_ms: float = 0.0 avg_data_volume: float = 0.0 latency_history: MetricHistory = Field(default_factory=MetricHistory) last_transition: datetime | None = None custom_metrics: dict[str, float] = Field(default_factory=dict) @property def edge_key(self) -> tuple[str, str]: """Edge key (source, target).""" return (self.source_id, self.target_id) def record_transition( self, success: bool, latency_ms: float = 0.0, data_volume: float = 0.0, metadata: dict[str, Any] | None = None, ) -> None: """Record an edge transition and update reliability/latency/traffic.""" now = datetime.now(UTC) self.total_transitions += 1 self.last_transition = now if success: self.successful_transitions += 1 self.reliability = self.successful_transitions / self.total_transitions self.latency_history.add(latency_ms, metadata) self.avg_latency_ms = self.latency_history.mean alpha = 0.1 self.avg_data_volume = alpha * data_volume + (1 - alpha) * self.avg_data_volume def get_effective_weight( self, reliability_factor: float = 0.5, latency_factor: float = 0.3, base_factor: float = 0.2, latency_baseline_ms: float = 100.0, ) -> float: """Compute the effective edge weight considering reliability and latency.""" reliability_cost = 1.0 - self.reliability latency_cost = self.avg_latency_ms / latency_baseline_ms base_cost = 1.0 / max(self.weight, 0.01) return reliability_factor * reliability_cost + latency_factor * latency_cost + base_factor * base_cost def to_dict(self) -> dict[str, Any]: """Serialize edge metrics to a dict.""" return { "source_id": self.source_id, "target_id": self.target_id, "total_transitions": self.total_transitions, "successful_transitions": self.successful_transitions, "weight": self.weight, "reliability": self.reliability, "avg_latency_ms": self.avg_latency_ms, "avg_data_volume": self.avg_data_volume, "last_transition": self.last_transition.isoformat() if self.last_transition else None, "custom_metrics": self.custom_metrics, } class MetricAggregator: def update(self, value: float) -> None: raise NotImplementedError def get_value(self) -> float: raise NotImplementedError def reset(self) -> None: raise NotImplementedError class ExponentialMovingAverage(MetricAggregator): def __init__(self, alpha: float = 0.1, initial: float = 0.0): self.alpha = alpha self.value = initial self.initialized = False def update(self, value: float) -> None: """Update the EMA with a new value.""" if not self.initialized: self.value = value self.initialized = True else: self.value = self.alpha * value + (1 - self.alpha) * self.value def get_value(self) -> float: """Current EMA value.""" return self.value def reset(self) -> None: """Reset the EMA state.""" self.value = 0.0 self.initialized = False class SlidingWindowAverage(MetricAggregator): def __init__(self, window_size: int = 100): self.window_size = window_size self.values: deque[float] = deque(maxlen=window_size) def update(self, value: float) -> None: """Add a value and maintain a fixed-size window.""" self.values.append(value) def get_value(self) -> float: """Mean of the window values.""" return float(torch.mean(torch.tensor(self.values))) if self.values else 0.0 def reset(self) -> None: """Clear the window values.""" self.values.clear() class MetricsTracker: """Stores node/edge metrics and aggregates for routing.""" def __init__( self, history_size: int = 1000, ema_alpha: float = 0.1, ): """Create a tracker with the given history size and EMA coefficient.""" self._history_size = history_size self._ema_alpha = ema_alpha self._node_metrics: dict[str, NodeMetrics] = {} self._edge_metrics: dict[tuple[str, str], EdgeMetrics] = {} self._global_latency = ExponentialMovingAverage(alpha=ema_alpha) self._global_cost = ExponentialMovingAverage(alpha=ema_alpha) self._global_quality = ExponentialMovingAverage(alpha=ema_alpha) def _get_or_create_node(self, node_id: str) -> NodeMetrics: if node_id not in self._node_metrics: self._node_metrics[node_id] = NodeMetrics( node_id=node_id, latency_history=MetricHistory(max_size=self._history_size), quality_history=MetricHistory(max_size=self._history_size), cost_history=MetricHistory(max_size=self._history_size), ) return self._node_metrics[node_id] def _get_or_create_edge(self, source_id: str, target_id: str) -> EdgeMetrics: key = (source_id, target_id) if key not in self._edge_metrics: self._edge_metrics[key] = EdgeMetrics( source_id=source_id, target_id=target_id, latency_history=MetricHistory(max_size=self._history_size), ) return self._edge_metrics[key] def record_node_execution( self, node_id: str, success: bool, latency_ms: float, cost_tokens: int = 0, cost_usd: float = 0.0, quality: float = 1.0, metadata: dict[str, Any] | None = None, ) -> None: """Record a node execution and update global EMA metrics.""" node = self._get_or_create_node(node_id) node.record_execution( success=success, latency_ms=latency_ms, cost_tokens=cost_tokens, cost_usd=cost_usd, quality=quality, metadata=metadata, ) self._global_latency.update(latency_ms) self._global_cost.update(float(cost_tokens)) self._global_quality.update(quality) def record_edge_transition( self, source_id: str, target_id: str, success: bool, latency_ms: float = 0.0, data_volume: float = 0.0, metadata: dict[str, Any] | None = None, ) -> None: """Record an edge transition and update its metrics.""" edge = self._get_or_create_edge(source_id, target_id) edge.record_transition( success=success, latency_ms=latency_ms, data_volume=data_volume, metadata=metadata, ) def set_edge_weight(self, source_id: str, target_id: str, weight: float) -> None: """Set the edge weight in the tracker (without recording history).""" edge = self._get_or_create_edge(source_id, target_id) edge.weight = weight def add_node_tag(self, node_id: str, tag: str) -> None: """Add a tag to a node.""" node = self._get_or_create_node(node_id) node.tags.add(tag) def set_node_custom_metric(self, node_id: str, name: str, value: float) -> None: """Set an arbitrary custom metric for a node.""" node = self._get_or_create_node(node_id) node.custom_metrics[name] = value def set_edge_custom_metric(self, source_id: str, target_id: str, name: str, value: float) -> None: """Set a custom metric for an edge.""" edge = self._get_or_create_edge(source_id, target_id) edge.custom_metrics[name] = value def get_node_metrics(self, node_id: str) -> NodeMetrics | None: """Get node metrics or None.""" return self._node_metrics.get(node_id) def get_edge_metrics(self, source_id: str, target_id: str) -> EdgeMetrics | None: """Get edge metrics or None.""" return self._edge_metrics.get((source_id, target_id)) def get_all_node_metrics(self) -> dict[str, NodeMetrics]: """Copy of the dict of all node metrics.""" return dict(self._node_metrics) def get_all_edge_metrics(self) -> dict[tuple[str, str], EdgeMetrics]: """Copy of the dict of all edge metrics.""" return dict(self._edge_metrics) def get_node_reliability(self, node_id: str) -> float: """Node reliability or 1.0 if no metrics are available.""" node = self._node_metrics.get(node_id) return node.reliability if node else 1.0 def get_edge_reliability(self, source_id: str, target_id: str) -> float: """Edge reliability or 1.0 if no metrics are available.""" edge = self._edge_metrics.get((source_id, target_id)) return edge.reliability if edge else 1.0 def get_routing_weights( self, reliability_factor: float = 0.5, latency_factor: float = 0.3, base_factor: float = 0.2, ) -> dict[tuple[str, str], float]: """Compute effective edge weights based on their metrics.""" weights = {} for key, edge in self._edge_metrics.items(): weights[key] = edge.get_effective_weight( reliability_factor=reliability_factor, latency_factor=latency_factor, base_factor=base_factor, ) return weights def get_node_scores( self, reliability_weight: float = 0.4, latency_weight: float = 0.2, cost_weight: float = 0.2, quality_weight: float = 0.2, ) -> dict[str, float]: """Return composite scoring values for all nodes.""" scores = {} for node_id, node in self._node_metrics.items(): scores[node_id] = node.get_composite_score( reliability_weight=reliability_weight, latency_weight=latency_weight, cost_weight=cost_weight, quality_weight=quality_weight, ) return scores def get_unreliable_nodes(self, threshold: float = 0.5) -> list[str]: """List of nodes with reliability below the threshold.""" return [node_id for node_id, node in self._node_metrics.items() if node.reliability < threshold] def get_unreliable_edges(self, threshold: float = 0.5) -> list[tuple[str, str]]: """List of edges with reliability below the threshold.""" return [key for key, edge in self._edge_metrics.items() if edge.reliability < threshold] def suggest_pruning( self, node_reliability_threshold: float = 0.3, edge_reliability_threshold: float = 0.3, max_latency_ms: float = 5000.0, ) -> dict[str, Any]: """Suggest nodes/edges for removal based on the given thresholds.""" prune_nodes = [] prune_edges = [] slow_nodes = [] for node_id, node in self._node_metrics.items(): if node.reliability < node_reliability_threshold: prune_nodes.append(node_id) elif node.avg_latency_ms > max_latency_ms: slow_nodes.append(node_id) for key, edge in self._edge_metrics.items(): if edge.reliability < edge_reliability_threshold: prune_edges.append(key) return { "prune_nodes": prune_nodes, "prune_edges": prune_edges, "slow_nodes": slow_nodes, } def get_node_features(self, node_ids: list[str]) -> torch.Tensor: """Get the node feature matrix for GNN/routing.""" features = [] for node_id in node_ids: node = self._node_metrics.get(node_id) if node: features.append( [ node.reliability, node.avg_latency_ms / 1000.0, node.avg_cost_tokens / 1000.0, node.avg_quality, min(node.total_executions / 100.0, 1.0), ] ) else: features.append([1.0, 0.0, 0.0, 1.0, 0.0]) return torch.tensor(features, dtype=torch.float32) def get_edge_features(self, edges: list[tuple[str, str]]) -> torch.Tensor: """Get the edge feature matrix.""" features = [] for src, tgt in edges: edge = self._edge_metrics.get((src, tgt)) if edge: features.append( [ edge.weight, edge.reliability, edge.avg_latency_ms / 1000.0, edge.avg_data_volume / 1000.0, ] ) else: features.append([1.0, 1.0, 0.0, 0.0]) return torch.tensor(features, dtype=torch.float32) def to_dict(self) -> dict[str, Any]: """Serialize all metrics to a dict.""" return { "nodes": {k: v.to_dict() for k, v in self._node_metrics.items()}, "edges": {f"{k[0]}->{k[1]}": v.to_dict() for k, v in self._edge_metrics.items()}, "global": { "avg_latency_ms": self._global_latency.get_value(), "avg_cost_tokens": self._global_cost.get_value(), "avg_quality": self._global_quality.get_value(), }, } def reset(self) -> None: """Fully clear all metrics and aggregates.""" self._node_metrics.clear() self._edge_metrics.clear() self._global_latency.reset() self._global_cost.reset() self._global_quality.reset() def compute_reliability_score( successes: int, failures: int, prior_successes: int = 1, prior_failures: int = 1, ) -> float: """Beta-posterior reliability estimate with pseudo-counts.""" alpha = successes + prior_successes beta = failures + prior_failures return alpha / (alpha + beta) def compute_composite_score( reliability: float, latency_ms: float, cost: float, quality: float, weights: tuple[float, float, float, float] = (0.4, 0.2, 0.2, 0.2), latency_baseline: float = 1000.0, cost_baseline: float = 1000.0, ) -> float: """Combine metrics into a single score with normalised latency and cost.""" w_rel, w_lat, w_cost, w_qual = weights lat_score = max(0.0, 1.0 - latency_ms / latency_baseline) cost_score = max(0.0, 1.0 - cost / cost_baseline) return w_rel * reliability + w_lat * lat_score + w_cost * cost_score + w_qual * quality