|
|
""" |
|
|
Metrics Dashboard & Optimization System |
|
|
==================================== |
|
|
|
|
|
Advanced metrics computation and dashboard system for context engineering, |
|
|
including real-time metrics, optimization recommendations, and performance monitoring. |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import json |
|
|
import logging |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Any, Optional, Set, Tuple, Union, Callable |
|
|
from dataclasses import dataclass, field, asdict |
|
|
from enum import Enum |
|
|
import numpy as np |
|
|
from collections import defaultdict, deque |
|
|
import threading |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import time |
|
|
import statistics |
|
|
|
|
|
from ai_agent_framework.core.context_engineering_agent import ( |
|
|
ContextElement, ContextModality, ContextDimension, ContextEngineeringAgent |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class MetricType(Enum): |
|
|
"""Types of metrics to compute.""" |
|
|
CONTEXT_RETENTION_ACCURACY = "context_retention_accuracy" |
|
|
CONTEXT_RELEVANCE_PRECISION = "context_relevance_precision" |
|
|
CONTEXT_ADAPTATION_SPEED = "context_adaptation_speed" |
|
|
CONTEXTUAL_REASONING_QUALITY = "contextual_reasoning_quality" |
|
|
USER_SATISFACTION = "user_satisfaction" |
|
|
CONTEXT_UTILIZATION_EFFICIENCY = "context_utilization_efficiency" |
|
|
PROCESSING_LATENCY = "processing_latency" |
|
|
MEMORY_USAGE = "memory_usage" |
|
|
ERROR_RATE = "error_rate" |
|
|
SYSTEM_THROUGHPUT = "system_throughput" |
|
|
|
|
|
|
|
|
class OptimizationTarget(Enum): |
|
|
"""Optimization targets.""" |
|
|
PERFORMANCE = "performance" |
|
|
ACCURACY = "accuracy" |
|
|
EFFICIENCY = "efficiency" |
|
|
USER_EXPERIENCE = "user_experience" |
|
|
RESOURCE_USAGE = "resource_usage" |
|
|
ADAPTABILITY = "adaptability" |
|
|
|
|
|
|
|
|
class AlertLevel(Enum): |
|
|
"""Alert severity levels.""" |
|
|
INFO = "info" |
|
|
WARNING = "warning" |
|
|
ERROR = "error" |
|
|
CRITICAL = "critical" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class MetricValue: |
|
|
"""Individual metric value with metadata.""" |
|
|
metric_type: MetricType |
|
|
value: float |
|
|
timestamp: datetime |
|
|
confidence: float |
|
|
sample_size: int |
|
|
metadata: Dict[str, Any] |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.timestamp: |
|
|
self.timestamp = datetime.utcnow() |
|
|
if not self.metadata: |
|
|
self.metadata = {} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class MetricSeries: |
|
|
"""Time series of metric values.""" |
|
|
metric_type: MetricType |
|
|
values: List[MetricValue] |
|
|
window_size: int |
|
|
last_updated: datetime |
|
|
aggregation_method: str |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.values: |
|
|
self.values = [] |
|
|
if not self.last_updated: |
|
|
self.last_updated = datetime.utcnow() |
|
|
if not self.aggregation_method: |
|
|
self.aggregation_method = "mean" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class OptimizationRecommendation: |
|
|
"""System optimization recommendation.""" |
|
|
recommendation_id: str |
|
|
target: OptimizationTarget |
|
|
priority: int |
|
|
description: str |
|
|
expected_impact: float |
|
|
implementation_effort: str |
|
|
metrics_affected: List[MetricType] |
|
|
created_at: datetime |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.recommendation_id: |
|
|
self.recommendation_id = f"opt_rec_{int(time.time())}" |
|
|
if not self.created_at: |
|
|
self.created_at = datetime.utcnow() |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Alert: |
|
|
"""System alert.""" |
|
|
alert_id: str |
|
|
level: AlertLevel |
|
|
title: str |
|
|
description: str |
|
|
affected_metrics: List[MetricType] |
|
|
threshold_value: float |
|
|
current_value: float |
|
|
created_at: datetime |
|
|
acknowledged: bool = False |
|
|
|
|
|
def __post_init__(self): |
|
|
if not self.alert_id: |
|
|
self.alert_id = f"alert_{int(time.time())}" |
|
|
if not self.created_at: |
|
|
self.created_at = datetime.utcnow() |
|
|
|
|
|
|
|
|
class MetricsCollector: |
|
|
"""Advanced metrics collection and computation engine.""" |
|
|
|
|
|
def __init__(self, max_series_length: int = 1000): |
|
|
self.max_series_length = max_series_length |
|
|
self.metric_series = {} |
|
|
self.real_time_metrics = {} |
|
|
self.computation_functions = { |
|
|
MetricType.CONTEXT_RETENTION_ACCURACY: self._compute_context_retention_accuracy, |
|
|
MetricType.CONTEXT_RELEVANCE_PRECISION: self._compute_context_relevance_precision, |
|
|
MetricType.CONTEXT_ADAPTATION_SPEED: self._compute_context_adaptation_speed, |
|
|
MetricType.CONTEXTUAL_REASONING_QUALITY: self._compute_contextual_reasoning_quality, |
|
|
MetricType.USER_SATISFACTION: self._compute_user_satisfaction, |
|
|
MetricType.CONTEXT_UTILIZATION_EFFICIENCY: self._compute_context_utilization_efficiency, |
|
|
MetricType.PROCESSING_LATENCY: self._compute_processing_latency, |
|
|
MetricType.MEMORY_USAGE: self._compute_memory_usage, |
|
|
MetricType.ERROR_RATE: self._compute_error_rate, |
|
|
MetricType.SYSTEM_THROUGHPUT: self._compute_system_throughput |
|
|
} |
|
|
|
|
|
|
|
|
self.performance_history = deque(maxlen=100) |
|
|
self.last_computation_time = {} |
|
|
|
|
|
|
|
|
self._lock = threading.RLock() |
|
|
|
|
|
async def compute_metric( |
|
|
self, |
|
|
metric_type: MetricType, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] = None |
|
|
) -> MetricValue: |
|
|
"""Compute a specific metric.""" |
|
|
|
|
|
if agent_data is None: |
|
|
agent_data = {} |
|
|
|
|
|
try: |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
computation_func = self.computation_functions.get(metric_type) |
|
|
if not computation_func: |
|
|
raise ValueError(f"No computation function for {metric_type}") |
|
|
|
|
|
|
|
|
result = await computation_func(context_data, agent_data) |
|
|
|
|
|
|
|
|
metric_value = MetricValue( |
|
|
metric_type=metric_type, |
|
|
value=result["value"], |
|
|
timestamp=datetime.utcnow(), |
|
|
confidence=result.get("confidence", 0.8), |
|
|
sample_size=result.get("sample_size", 1), |
|
|
metadata=result.get("metadata", {}) |
|
|
) |
|
|
|
|
|
|
|
|
await self._store_metric_value(metric_value) |
|
|
|
|
|
|
|
|
self.real_time_metrics[metric_type] = metric_value |
|
|
|
|
|
|
|
|
computation_time = time.time() - start_time |
|
|
self.last_computation_time[metric_type] = computation_time |
|
|
|
|
|
return metric_value |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to compute metric {metric_type}: {e}") |
|
|
|
|
|
return MetricValue( |
|
|
metric_type=metric_type, |
|
|
value=0.0, |
|
|
timestamp=datetime.utcnow(), |
|
|
confidence=0.0, |
|
|
sample_size=0, |
|
|
metadata={"error": str(e)} |
|
|
) |
|
|
|
|
|
async def compute_all_metrics( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] = None |
|
|
) -> Dict[MetricType, MetricValue]: |
|
|
"""Compute all available metrics.""" |
|
|
|
|
|
if agent_data is None: |
|
|
agent_data = {} |
|
|
|
|
|
results = {} |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
for metric_type in MetricType: |
|
|
task = self.compute_metric(metric_type, context_data, agent_data) |
|
|
tasks.append(task) |
|
|
|
|
|
computed_metrics = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
for metric_type, result in zip(MetricType, computed_metrics): |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"Error computing {metric_type}: {result}") |
|
|
continue |
|
|
results[metric_type] = result |
|
|
|
|
|
return results |
|
|
|
|
|
async def get_metric_series( |
|
|
self, |
|
|
metric_type: MetricType, |
|
|
time_window: Optional[timedelta] = None, |
|
|
aggregation: str = "mean" |
|
|
) -> List[MetricValue]: |
|
|
"""Get metric time series.""" |
|
|
|
|
|
if metric_type not in self.metric_series: |
|
|
return [] |
|
|
|
|
|
series = self.metric_series[metric_type] |
|
|
|
|
|
if time_window: |
|
|
cutoff_time = datetime.utcnow() - time_window |
|
|
filtered_values = [ |
|
|
mv for mv in series.values |
|
|
if mv.timestamp >= cutoff_time |
|
|
] |
|
|
return filtered_values |
|
|
|
|
|
return series.values |
|
|
|
|
|
async def get_real_time_metrics(self) -> Dict[MetricType, MetricValue]: |
|
|
"""Get current real-time metrics.""" |
|
|
return self.real_time_metrics.copy() |
|
|
|
|
|
async def get_metric_statistics( |
|
|
self, |
|
|
metric_type: MetricType, |
|
|
time_window: timedelta = timedelta(hours=24) |
|
|
) -> Dict[str, float]: |
|
|
"""Get statistical summary of metric.""" |
|
|
|
|
|
series = await self.get_metric_series(metric_type, time_window) |
|
|
|
|
|
if not series: |
|
|
return {} |
|
|
|
|
|
values = [mv.value for mv in series] |
|
|
|
|
|
return { |
|
|
"count": len(values), |
|
|
"mean": np.mean(values), |
|
|
"median": np.median(values), |
|
|
"std": np.std(values), |
|
|
"min": np.min(values), |
|
|
"max": np.max(values), |
|
|
"p25": np.percentile(values, 25), |
|
|
"p75": np.percentile(values, 75), |
|
|
"p95": np.percentile(values, 95) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async def _compute_context_retention_accuracy( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Compute context retention accuracy.""" |
|
|
|
|
|
|
|
|
contexts = context_data.get("contexts", []) |
|
|
if not contexts: |
|
|
return {"value": 0.0, "confidence": 0.0, "sample_size": 0} |
|
|
|
|
|
|
|
|
total_contexts = len(contexts) |
|
|
retained_contexts = sum(1 for ctx in contexts if ctx.get("retained", True)) |
|
|
|
|
|
accuracy = retained_contexts / max(total_contexts, 1) |
|
|
|
|
|
return { |
|
|
"value": accuracy, |
|
|
"confidence": min(1.0, total_contexts / 10), |
|
|
"sample_size": total_contexts, |
|
|
"metadata": { |
|
|
"total_contexts": total_contexts, |
|
|
"retained_contexts": retained_contexts |
|
|
} |
|
|
} |
|
|
|
|
|
async def _compute_context_relevance_precision( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Compute context relevance precision.""" |
|
|
|
|
|
contexts = context_data.get("contexts", []) |
|
|
if not contexts: |
|
|
return {"value": 0.0, "confidence": 0.0, "sample_size": 0} |
|
|
|
|
|
|
|
|
relevance_scores = [ctx.get("relevance_score", 0) for ctx in contexts] |
|
|
precision_scores = [ctx.get("precision_score", 0) for ctx in contexts] |
|
|
|
|
|
if not precision_scores: |
|
|
return {"value": 0.0, "confidence": 0.0, "sample_size": len(contexts)} |
|
|
|
|
|
|
|
|
total_precision = sum(precision_scores) |
|
|
max_possible_precision = len(contexts) |
|
|
|
|
|
precision = total_precision / max_possible_precision |
|
|
|
|
|
|
|
|
relevance_factor = np.mean(relevance_scores) if relevance_scores else 0 |
|
|
|
|
|
combined_precision = (precision * 0.7) + (relevance_factor * 0.3) |
|
|
|
|
|
return { |
|
|
"value": combined_precision, |
|
|
"confidence": min(1.0, len(contexts) / 5), |
|
|
"sample_size": len(contexts), |
|
|
"metadata": { |
|
|
"avg_relevance": relevance_factor, |
|
|
"raw_precision": precision |
|
|
} |
|
|
} |
|
|
|
|
|
async def _compute_context_adaptation_speed( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Compute context adaptation speed.""" |
|
|
|
|
|
|
|
|
adaptations = context_data.get("adaptations", []) |
|
|
if not adaptations: |
|
|
return {"value": 0.5, "confidence": 0.0, "sample_size": 0} |
|
|
|
|
|
|
|
|
adaptation_times = [] |
|
|
for adaptation in adaptations: |
|
|
start_time = adaptation.get("start_time") |
|
|
end_time = adaptation.get("end_time") |
|
|
if start_time and end_time: |
|
|
duration = (end_time - start_time).total_seconds() |
|
|
adaptation_times.append(duration) |
|
|
|
|
|
if not adaptation_times: |
|
|
return {"value": 0.5, "confidence": 0.0, "sample_size": len(adaptations)} |
|
|
|
|
|
|
|
|
avg_duration = np.mean(adaptation_times) |
|
|
speed_score = 1.0 / (1.0 + avg_duration / 60) |
|
|
|
|
|
return { |
|
|
"value": speed_score, |
|
|
"confidence": min(1.0, len(adaptations) / 5), |
|
|
"sample_size": len(adaptations), |
|
|
"metadata": { |
|
|
"avg_duration_seconds": avg_duration, |
|
|
"min_duration": min(adaptation_times), |
|
|
"max_duration": max(adaptation_times) |
|
|
} |
|
|
} |
|
|
|
|
|
async def _compute_contextual_reasoning_quality( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Compute contextual reasoning quality.""" |
|
|
|
|
|
|
|
|
reasoning_decisions = context_data.get("reasoning_decisions", []) |
|
|
if not reasoning_decisions: |
|
|
return {"value": 0.5, "confidence": 0.0, "sample_size": 0} |
|
|
|
|
|
|
|
|
successful_reasoning = 0 |
|
|
context_aware_reasoning = 0 |
|
|
|
|
|
for decision in reasoning_decisions: |
|
|
|
|
|
if decision.get("successful", False): |
|
|
successful_reasoning += 1 |
|
|
|
|
|
|
|
|
if decision.get("context_aware", False): |
|
|
context_aware_reasoning += 1 |
|
|
|
|
|
|
|
|
success_rate = successful_reasoning / len(reasoning_decisions) |
|
|
context_awareness_rate = context_aware_reasoning / len(reasoning_decisions) |
|
|
|
|
|
quality_score = (success_rate * 0.6) + (context_awareness_rate * 0.4) |
|
|
|
|
|
return { |
|
|
"value": quality_score, |
|
|
"confidence": min(1.0, len(reasoning_decisions) / 10), |
|
|
"sample_size": len(reasoning_decisions), |
|
|
"metadata": { |
|
|
"success_rate": success_rate, |
|
|
"context_awareness_rate": context_awareness_rate |
|
|
} |
|
|
} |
|
|
|
|
|
async def _compute_user_satisfaction( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Compute user satisfaction metrics.""" |
|
|
|
|
|
|
|
|
user_interactions = context_data.get("user_interactions", []) |
|
|
if not user_interactions: |
|
|
return {"value": 0.5, "confidence": 0.0, "sample_size": 0} |
|
|
|
|
|
satisfaction_scores = [] |
|
|
completion_rates = [] |
|
|
|
|
|
for interaction in user_interactions: |
|
|
|
|
|
if "satisfaction_score" in interaction: |
|
|
satisfaction_scores.append(interaction["satisfaction_score"]) |
|
|
|
|
|
|
|
|
completion_rate = interaction.get("completion_rate", 0.5) |
|
|
completion_rates.append(completion_rate) |
|
|
|
|
|
|
|
|
all_scores = satisfaction_scores + completion_rates |
|
|
|
|
|
if not all_scores: |
|
|
return {"value": 0.5, "confidence": 0.0, "sample_size": len(user_interactions)} |
|
|
|
|
|
avg_satisfaction = np.mean(all_scores) |
|
|
|
|
|
return { |
|
|
"value": avg_satisfaction, |
|
|
"confidence": min(1.0, len(all_scores) / 10), |
|
|
"sample_size": len(all_scores), |
|
|
"metadata": { |
|
|
"explicit_scores": len(satisfaction_scores), |
|
|
"implicit_scores": len(completion_rates), |
|
|
"satisfaction_variance": np.var(all_scores) if len(all_scores) > 1 else 0 |
|
|
} |
|
|
} |
|
|
|
|
|
async def _compute_context_utilization_efficiency( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Compute context utilization efficiency.""" |
|
|
|
|
|
|
|
|
contexts = context_data.get("contexts", []) |
|
|
if not contexts: |
|
|
return {"value": 0.0, "confidence": 0.0, "sample_size": 0} |
|
|
|
|
|
utilization_scores = [] |
|
|
|
|
|
for ctx in contexts: |
|
|
|
|
|
usage_count = ctx.get("usage_count", 0) |
|
|
available_count = ctx.get("available_count", 1) |
|
|
|
|
|
utilization = usage_count / max(available_count, 1) |
|
|
utilization_scores.append(min(1.0, utilization)) |
|
|
|
|
|
if not utilization_scores: |
|
|
return {"value": 0.0, "confidence": 0.0, "sample_size": len(contexts)} |
|
|
|
|
|
avg_utilization = np.mean(utilization_scores) |
|
|
|
|
|
return { |
|
|
"value": avg_utilization, |
|
|
"confidence": min(1.0, len(contexts) / 8), |
|
|
"sample_size": len(contexts), |
|
|
"metadata": { |
|
|
"total_contexts": len(contexts), |
|
|
"avg_utilization": avg_utilization, |
|
|
"underutilized_contexts": sum(1 for u in utilization_scores if u < 0.3) |
|
|
} |
|
|
} |
|
|
|
|
|
async def _compute_processing_latency( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Compute processing latency metrics.""" |
|
|
|
|
|
|
|
|
processing_times = context_data.get("processing_times", []) |
|
|
if not processing_times: |
|
|
return {"value": 0.5, "confidence": 0.0, "sample_size": 0} |
|
|
|
|
|
avg_processing_time = np.mean(processing_times) |
|
|
|
|
|
|
|
|
max_acceptable_latency = 1000 |
|
|
latency_score = max(0.0, 1.0 - (avg_processing_time / max_acceptable_latency)) |
|
|
|
|
|
return { |
|
|
"value": latency_score, |
|
|
"confidence": min(1.0, len(processing_times) / 20), |
|
|
"sample_size": len(processing_times), |
|
|
"metadata": { |
|
|
"avg_processing_time_ms": avg_processing_time, |
|
|
"min_processing_time": min(processing_times), |
|
|
"max_processing_time": max(processing_times) |
|
|
} |
|
|
} |
|
|
|
|
|
async def _compute_memory_usage( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Compute memory usage efficiency.""" |
|
|
|
|
|
|
|
|
memory_usage = context_data.get("memory_usage", {}) |
|
|
if not memory_usage: |
|
|
return {"value": 0.5, "confidence": 0.0, "sample_size": 0} |
|
|
|
|
|
current_usage = memory_usage.get("current_mb", 0) |
|
|
max_usage = memory_usage.get("max_mb", 1000) |
|
|
|
|
|
|
|
|
usage_ratio = current_usage / max(max_usage, 1) |
|
|
efficiency_score = max(0.0, 1.0 - usage_ratio) |
|
|
|
|
|
return { |
|
|
"value": efficiency_score, |
|
|
"confidence": 0.8, |
|
|
"sample_size": 1, |
|
|
"metadata": { |
|
|
"current_usage_mb": current_usage, |
|
|
"max_usage_mb": max_usage, |
|
|
"usage_ratio": usage_ratio |
|
|
} |
|
|
} |
|
|
|
|
|
async def _compute_error_rate( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Compute system error rate.""" |
|
|
|
|
|
|
|
|
total_operations = context_data.get("total_operations", 0) |
|
|
error_count = context_data.get("error_count", 0) |
|
|
|
|
|
if total_operations == 0: |
|
|
return {"value": 1.0, "confidence": 0.0, "sample_size": 0} |
|
|
|
|
|
error_rate = error_count / total_operations |
|
|
reliability_score = max(0.0, 1.0 - error_rate) |
|
|
|
|
|
return { |
|
|
"value": reliability_score, |
|
|
"confidence": min(1.0, total_operations / 50), |
|
|
"sample_size": total_operations, |
|
|
"metadata": { |
|
|
"total_operations": total_operations, |
|
|
"error_count": error_count, |
|
|
"raw_error_rate": error_rate |
|
|
} |
|
|
} |
|
|
|
|
|
async def _compute_system_throughput( |
|
|
self, |
|
|
context_data: Dict[str, Any], |
|
|
agent_data: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Compute system throughput.""" |
|
|
|
|
|
|
|
|
operations_per_minute = context_data.get("operations_per_minute", 0) |
|
|
target_throughput = context_data.get("target_throughput", 100) |
|
|
|
|
|
|
|
|
throughput_ratio = operations_per_minute / max(target_throughput, 1) |
|
|
efficiency_score = min(1.0, throughput_ratio) |
|
|
|
|
|
return { |
|
|
"value": efficiency_score, |
|
|
"confidence": 0.7, |
|
|
"sample_size": 1, |
|
|
"metadata": { |
|
|
"operations_per_minute": operations_per_minute, |
|
|
"target_throughput": target_throughput, |
|
|
"throughput_ratio": throughput_ratio |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async def _store_metric_value(self, metric_value: MetricValue) -> None: |
|
|
"""Store metric value in time series.""" |
|
|
|
|
|
with self._lock: |
|
|
metric_type = metric_value.metric_type |
|
|
|
|
|
if metric_type not in self.metric_series: |
|
|
self.metric_series[metric_type] = MetricSeries( |
|
|
metric_type=metric_type, |
|
|
values=[], |
|
|
window_size=self.max_series_length, |
|
|
last_updated=datetime.utcnow(), |
|
|
aggregation_method="mean" |
|
|
) |
|
|
|
|
|
series = self.metric_series[metric_type] |
|
|
series.values.append(metric_value) |
|
|
series.last_updated = datetime.utcnow() |
|
|
|
|
|
|
|
|
if len(series.values) > self.max_series_length: |
|
|
series.values = series.values[-self.max_series_length:] |
|
|
|
|
|
|
|
|
class OptimizationEngine: |
|
|
"""Advanced optimization engine for context engineering systems.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.optimization_strategies = { |
|
|
OptimizationTarget.PERFORMANCE: self._optimize_performance, |
|
|
OptimizationTarget.ACCURACY: self._optimize_accuracy, |
|
|
OptimizationTarget.EFFICIENCY: self._optimize_efficiency, |
|
|
OptimizationTarget.USER_EXPERIENCE: self._optimize_user_experience, |
|
|
OptimizationTarget.RESOURCE_USAGE: self._optimize_resource_usage, |
|
|
OptimizationTarget.ADAPTABILITY: self._optimize_adaptability |
|
|
} |
|
|
|
|
|
self.current_recommendations = [] |
|
|
self.optimization_history = deque(maxlen=50) |
|
|
|
|
|
async def generate_optimization_recommendations( |
|
|
self, |
|
|
metrics: Dict[MetricType, MetricValue], |
|
|
context_data: Dict[str, Any] = None |
|
|
) -> List[OptimizationRecommendation]: |
|
|
"""Generate optimization recommendations based on current metrics.""" |
|
|
|
|
|
if context_data is None: |
|
|
context_data = {} |
|
|
|
|
|
recommendations = [] |
|
|
|
|
|
|
|
|
for metric_type, metric_value in metrics.items(): |
|
|
metric_recommendations = await self._analyze_metric_for_optimization( |
|
|
metric_type, metric_value, context_data |
|
|
) |
|
|
recommendations.extend(metric_recommendations) |
|
|
|
|
|
|
|
|
cross_recommendations = await self._analyze_cross_metric_optimizations(metrics) |
|
|
recommendations.extend(cross_recommendations) |
|
|
|
|
|
|
|
|
ranked_recommendations = await self._rank_recommendations(recommendations) |
|
|
|
|
|
self.current_recommendations = ranked_recommendations |
|
|
|
|
|
return ranked_recommendations |
|
|
|
|
|
async def apply_optimization( |
|
|
self, |
|
|
recommendation: OptimizationRecommendation, |
|
|
agent_system: Any |
|
|
) -> Dict[str, Any]: |
|
|
"""Apply an optimization recommendation.""" |
|
|
|
|
|
optimization_target = recommendation.target |
|
|
|
|
|
if optimization_target in self.optimization_strategies: |
|
|
optimization_func = self.optimization_strategies[optimization_target] |
|
|
result = await optimization_func(recommendation, agent_system) |
|
|
|
|
|
|
|
|
self.optimization_history.append({ |
|
|
"recommendation_id": recommendation.recommendation_id, |
|
|
"timestamp": datetime.utcnow(), |
|
|
"result": result, |
|
|
"target": optimization_target.value |
|
|
}) |
|
|
|
|
|
return result |
|
|
|
|
|
return {"success": False, "reason": "unknown_optimization_target"} |
|
|
|
|
|
async def get_optimization_status(self) -> Dict[str, Any]: |
|
|
"""Get current optimization status and history.""" |
|
|
|
|
|
recent_optimizations = list(self.optimization_history)[-10:] |
|
|
|
|
|
return { |
|
|
"current_recommendations": len(self.current_recommendations), |
|
|
"recent_optimizations": recent_optimizations, |
|
|
"total_optimization_attempts": len(self.optimization_history), |
|
|
"optimization_success_rate": self._calculate_success_rate() |
|
|
} |
|
|
|
|
|
def _calculate_success_rate(self) -> float: |
|
|
"""Calculate optimization success rate.""" |
|
|
|
|
|
if not self.optimization_history: |
|
|
return 0.0 |
|
|
|
|
|
successful_attempts = sum( |
|
|
1 for opt in self.optimization_history |
|
|
if opt.get("result", {}).get("success", False) |
|
|
) |
|
|
|
|
|
return successful_attempts / len(self.optimization_history) |
|
|
|
|
|
|
|
|
|
|
|
async def _optimize_performance( |
|
|
self, |
|
|
recommendation: OptimizationRecommendation, |
|
|
agent_system: Any |
|
|
) -> Dict[str, Any]: |
|
|
"""Optimize for performance.""" |
|
|
|
|
|
|
|
|
strategies = { |
|
|
"cache_frequently_used_context": self._optimize_cache_strategy, |
|
|
"parallel_processing": self._optimize_parallel_processing, |
|
|
"reduce_computation_overhead": self._optimize_computation_overhead, |
|
|
"memory_pool_optimization": self._optimize_memory_pools |
|
|
} |
|
|
|
|
|
strategy_func = strategies.get(recommendation.description) |
|
|
if strategy_func: |
|
|
return await strategy_func(agent_system) |
|
|
|
|
|
return {"success": False, "reason": "unknown_performance_strategy"} |
|
|
|
|
|
async def _optimize_accuracy( |
|
|
self, |
|
|
recommendation: OptimizationRecommendation, |
|
|
agent_system: Any |
|
|
) -> Dict[str, Any]: |
|
|
"""Optimize for accuracy.""" |
|
|
|
|
|
|
|
|
strategies = { |
|
|
"enhance_context_validation": self._optimize_context_validation, |
|
|
"improve_relevance_scoring": self._optimize_relevance_scoring, |
|
|
"refine_reasoning_algorithms": self._optimize_reasoning_algorithms |
|
|
} |
|
|
|
|
|
strategy_func = strategies.get(recommendation.description) |
|
|
if strategy_func: |
|
|
return await strategy_func(agent_system) |
|
|
|
|
|
return {"success": False, "reason": "unknown_accuracy_strategy"} |
|
|
|
|
|
async def _optimize_efficiency( |
|
|
self, |
|
|
recommendation: OptimizationRecommendation, |
|
|
agent_system: Any |
|
|
) -> Dict[str, Any]: |
|
|
"""Optimize for efficiency.""" |
|
|
|
|
|
|
|
|
strategies = { |
|
|
"context_compression": self._optimize_context_compression, |
|
|
"resource_pooling": self._optimize_resource_pooling, |
|
|
"batch_processing": self._optimize_batch_processing |
|
|
} |
|
|
|
|
|
strategy_func = strategies.get(recommendation.description) |
|
|
if strategy_func: |
|
|
return await strategy_func(agent_system) |
|
|
|
|
|
return {"success": False, "reason": "unknown_efficiency_strategy"} |
|
|
|
|
|
async def _optimize_user_experience( |
|
|
self, |
|
|
recommendation: OptimizationRecommendation, |
|
|
agent_system: Any |
|
|
) -> Dict[str, Any]: |
|
|
"""Optimize for user experience.""" |
|
|
|
|
|
|
|
|
strategies = { |
|
|
"reduce_response_latency": self._optimize_response_latency, |
|
|
"improve_interaction_flow": self._optimize_interaction_flow, |
|
|
"enhance_feedback_mechanisms": self._optimize_feedback_mechanisms |
|
|
} |
|
|
|
|
|
strategy_func = strategies.get(recommendation.description) |
|
|
if strategy_func: |
|
|
return await strategy_func(agent_system) |
|
|
|
|
|
return {"success": False, "reason": "unknown_ux_strategy"} |
|
|
|
|
|
async def _optimize_resource_usage( |
|
|
self, |
|
|
recommendation: OptimizationRecommendation, |
|
|
agent_system: Any |
|
|
) -> Dict[str, Any]: |
|
|
"""Optimize for resource usage.""" |
|
|
|
|
|
|
|
|
strategies = { |
|
|
"memory_optimization": self._optimize_memory_usage, |
|
|
"cpu_optimization": self._optimize_cpu_usage, |
|
|
"storage_optimization": self._optimize_storage_usage |
|
|
} |
|
|
|
|
|
strategy_func = strategies.get(recommendation.description) |
|
|
if strategy_func: |
|
|
return await strategy_func(agent_system) |
|
|
|
|
|
return {"success": False, "reason": "unknown_resource_strategy"} |
|
|
|
|
|
async def _optimize_adaptability( |
|
|
self, |
|
|
recommendation: OptimizationRecommendation, |
|
|
agent_system: Any |
|
|
) -> Dict[str, Any]: |
|
|
"""Optimize for adaptability.""" |
|
|
|
|
|
|
|
|
strategies = { |
|
|
"improve_learning_rate": self._optimize_learning_rate, |
|
|
"enhance_pattern_recognition": self._optimize_pattern_recognition, |
|
|
"adaptive_parameter_tuning": self._optimize_parameter_tuning |
|
|
} |
|
|
|
|
|
strategy_func = strategies.get(recommendation.description) |
|
|
if strategy_func: |
|
|
return await strategy_func(agent_system) |
|
|
|
|
|
return {"success": False, "reason": "unknown_adaptability_strategy"} |
|
|
|
|
|
|
|
|
|
|
|
async def _analyze_metric_for_optimization( |
|
|
self, |
|
|
metric_type: MetricType, |
|
|
metric_value: MetricValue, |
|
|
context_data: Dict[str, Any] |
|
|
) -> List[OptimizationRecommendation]: |
|
|
"""Analyze individual metric for optimization opportunities.""" |
|
|
|
|
|
recommendations = [] |
|
|
|
|
|
|
|
|
thresholds = { |
|
|
MetricType.CONTEXT_RETENTION_ACCURACY: {"poor": 0.6, "good": 0.8}, |
|
|
MetricType.CONTEXT_RELEVANCE_PRECISION: {"poor": 0.7, "good": 0.9}, |
|
|
MetricType.CONTEXT_ADAPTATION_SPEED: {"poor": 0.5, "good": 0.8}, |
|
|
MetricType.CONTEXTUAL_REASONING_QUALITY: {"poor": 0.6, "good": 0.85}, |
|
|
MetricType.USER_SATISFACTION: {"poor": 0.7, "good": 0.9}, |
|
|
MetricType.PROCESSING_LATENCY: {"poor": 0.4, "good": 0.7}, |
|
|
MetricType.ERROR_RATE: {"poor": 0.8, "good": 0.95} |
|
|
} |
|
|
|
|
|
threshold_data = thresholds.get(metric_type) |
|
|
if not threshold_data: |
|
|
return recommendations |
|
|
|
|
|
|
|
|
if metric_value.value < threshold_data["poor"]: |
|
|
|
|
|
recommendation = await self._generate_metric_recommendation( |
|
|
metric_type, metric_value, threshold_data |
|
|
) |
|
|
if recommendation: |
|
|
recommendations.append(recommendation) |
|
|
|
|
|
return recommendations |
|
|
|
|
|
async def _generate_metric_recommendation( |
|
|
self, |
|
|
metric_type: MetricType, |
|
|
metric_value: MetricValue, |
|
|
threshold_data: Dict[str, float] |
|
|
) -> Optional[OptimizationRecommendation]: |
|
|
"""Generate optimization recommendation for a metric.""" |
|
|
|
|
|
|
|
|
metric_mappings = { |
|
|
MetricType.CONTEXT_RETENTION_ACCURACY: { |
|
|
"target": OptimizationTarget.ACCURACY, |
|
|
"description": "enhance_context_validation", |
|
|
"priority": 8 |
|
|
}, |
|
|
MetricType.CONTEXT_RELEVANCE_PRECISION: { |
|
|
"target": OptimizationTarget.ACCURACY, |
|
|
"description": "improve_relevance_scoring", |
|
|
"priority": 7 |
|
|
}, |
|
|
MetricType.PROCESSING_LATENCY: { |
|
|
"target": OptimizationTarget.PERFORMANCE, |
|
|
"description": "cache_frequently_used_context", |
|
|
"priority": 9 |
|
|
}, |
|
|
MetricType.USER_SATISFACTION: { |
|
|
"target": OptimizationTarget.USER_EXPERIENCE, |
|
|
"description": "reduce_response_latency", |
|
|
"priority": 10 |
|
|
}, |
|
|
MetricType.ERROR_RATE: { |
|
|
"target": OptimizationTarget.ACCURACY, |
|
|
"description": "enhance_error_handling", |
|
|
"priority": 6 |
|
|
} |
|
|
} |
|
|
|
|
|
mapping = metric_mappings.get(metric_type) |
|
|
if not mapping: |
|
|
return None |
|
|
|
|
|
|
|
|
impact = max(0.1, threshold_data["good"] - metric_value.value) |
|
|
|
|
|
return OptimizationRecommendation( |
|
|
recommendation_id=f"opt_{metric_type.value}_{int(time.time())}", |
|
|
target=mapping["target"], |
|
|
priority=mapping["priority"], |
|
|
description=mapping["description"], |
|
|
expected_impact=impact, |
|
|
implementation_effort="medium", |
|
|
metrics_affected=[metric_type], |
|
|
created_at=datetime.utcnow() |
|
|
) |
|
|
|
|
|
async def _analyze_cross_metric_optimizations( |
|
|
self, |
|
|
metrics: Dict[MetricType, MetricValue] |
|
|
) -> List[OptimizationRecommendation]: |
|
|
"""Analyze cross-metric optimization opportunities.""" |
|
|
|
|
|
recommendations = [] |
|
|
|
|
|
|
|
|
latency = metrics.get(MetricType.PROCESSING_LATUS) |
|
|
accuracy = metrics.get(MetricType.CONTEXT_RETENTION_ACCURACY) |
|
|
|
|
|
if latency and accuracy: |
|
|
if latency.value < 0.5 and accuracy.value > 0.8: |
|
|
|
|
|
recommendation = OptimizationRecommendation( |
|
|
recommendation_id=f"perf_acc_opt_{int(time.time())}", |
|
|
target=OptimizationTarget.PERFORMANCE, |
|
|
priority=6, |
|
|
description="parallel_processing", |
|
|
expected_impact=0.3, |
|
|
implementation_effort="high", |
|
|
metrics_affected=[MetricType.PROCESSING_LATENCY], |
|
|
created_at=datetime.utcnow() |
|
|
) |
|
|
recommendations.append(recommendation) |
|
|
|
|
|
|
|
|
utilization = metrics.get(MetricType.CONTEXT_UTILIZATION_EFFICIENCY) |
|
|
satisfaction = metrics.get(MetricType.USER_SATISFACTION) |
|
|
|
|
|
if utilization and satisfaction: |
|
|
if utilization.value > 0.9 and satisfaction.value < 0.7: |
|
|
|
|
|
recommendation = OptimizationRecommendation( |
|
|
recommendation_id=f"eff_ux_opt_{int(time.time())}", |
|
|
target=OptimizationTarget.USER_EXPERIENCE, |
|
|
priority=7, |
|
|
description="improve_interaction_flow", |
|
|
expected_impact=0.4, |
|
|
implementation_effort="medium", |
|
|
metrics_affected=[MetricType.USER_SATISFACTION], |
|
|
created_at=datetime.utcnow() |
|
|
) |
|
|
recommendations.append(recommendation) |
|
|
|
|
|
return recommendations |
|
|
|
|
|
async def _rank_recommendations( |
|
|
self, |
|
|
recommendations: List[OptimizationRecommendation] |
|
|
) -> List[OptimizationRecommendation]: |
|
|
"""Rank recommendations by priority and impact.""" |
|
|
|
|
|
|
|
|
scored_recommendations = [] |
|
|
for rec in recommendations: |
|
|
|
|
|
priority_score = rec.priority / 10.0 |
|
|
impact_score = rec.expected_impact |
|
|
effort_penalty = {"low": 0.0, "medium": -0.1, "high": -0.2}.get( |
|
|
rec.implementation_effort, -0.1 |
|
|
) |
|
|
|
|
|
composite_score = priority_score * 0.4 + impact_score * 0.4 + effort_penalty |
|
|
scored_recommendations.append((composite_score, rec)) |
|
|
|
|
|
|
|
|
scored_recommendations.sort(key=lambda x: x[0], reverse=True) |
|
|
|
|
|
return [rec for _, rec in scored_recommendations] |
|
|
|
|
|
|
|
|
|
|
|
async def _optimize_cache_strategy(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize caching strategy.""" |
|
|
return {"success": True, "improvement": "cache_hit_rate_increased"} |
|
|
|
|
|
async def _optimize_parallel_processing(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize parallel processing.""" |
|
|
return {"success": True, "improvement": "processing_speed_increased"} |
|
|
|
|
|
async def _optimize_computation_overhead(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize computation overhead.""" |
|
|
return {"success": True, "improvement": "computation_overhead_reduced"} |
|
|
|
|
|
async def _optimize_memory_pools(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize memory pools.""" |
|
|
return {"success": True, "improvement": "memory_efficiency_improved"} |
|
|
|
|
|
async def _optimize_context_validation(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize context validation.""" |
|
|
return {"success": True, "improvement": "validation_accuracy_increased"} |
|
|
|
|
|
async def _optimize_relevance_scoring(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize relevance scoring.""" |
|
|
return {"success": True, "improvement": "relevance_precision_increased"} |
|
|
|
|
|
async def _optimize_reasoning_algorithms(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize reasoning algorithms.""" |
|
|
return {"success": True, "improvement": "reasoning_quality_increased"} |
|
|
|
|
|
async def _optimize_context_compression(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize context compression.""" |
|
|
return {"success": True, "improvement": "compression_efficiency_increased"} |
|
|
|
|
|
async def _optimize_resource_pooling(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize resource pooling.""" |
|
|
return {"success": True, "improvement": "resource_utilization_improved"} |
|
|
|
|
|
async def _optimize_batch_processing(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize batch processing.""" |
|
|
return {"success": True, "improvement": "batch_efficiency_increased"} |
|
|
|
|
|
async def _optimize_response_latency(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize response latency.""" |
|
|
return {"success": True, "improvement": "response_time_reduced"} |
|
|
|
|
|
async def _optimize_interaction_flow(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize interaction flow.""" |
|
|
return {"success": True, "improvement": "interaction_ux_improved"} |
|
|
|
|
|
async def _optimize_feedback_mechanisms(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize feedback mechanisms.""" |
|
|
return {"success": True, "improvement": "feedback_quality_increased"} |
|
|
|
|
|
async def _optimize_memory_usage(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize memory usage.""" |
|
|
return {"success": True, "improvement": "memory_usage_optimized"} |
|
|
|
|
|
async def _optimize_cpu_usage(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize CPU usage.""" |
|
|
return {"success": True, "improvement": "cpu_efficiency_improved"} |
|
|
|
|
|
async def _optimize_storage_usage(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize storage usage.""" |
|
|
return {"success": True, "improvement": "storage_efficiency_increased"} |
|
|
|
|
|
async def _optimize_learning_rate(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize learning rate.""" |
|
|
return {"success": True, "improvement": "learning_speed_increased"} |
|
|
|
|
|
async def _optimize_pattern_recognition(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize pattern recognition.""" |
|
|
return {"success": True, "improvement": "pattern_recognition_improved"} |
|
|
|
|
|
async def _optimize_parameter_tuning(self, agent_system: Any) -> Dict[str, Any]: |
|
|
"""Optimize parameter tuning.""" |
|
|
return {"success": True, "improvement": "parameter_optimization_increased"} |
|
|
|
|
|
|
|
|
class MetricsDashboard: |
|
|
"""Comprehensive metrics dashboard and monitoring system.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.metrics_collector = MetricsCollector() |
|
|
self.optimization_engine = OptimizationEngine() |
|
|
self.alerts = {} |
|
|
self.dashboard_config = { |
|
|
"refresh_interval": 30, |
|
|
"alert_thresholds": { |
|
|
MetricType.ERROR_RATE: {"warning": 0.1, "critical": 0.2}, |
|
|
MetricType.PROCESSING_LATENCY: {"warning": 0.3, "critical": 0.2}, |
|
|
MetricType.USER_SATISFACTION: {"warning": 0.6, "critical": 0.4} |
|
|
}, |
|
|
"display_preferences": { |
|
|
"show_real_time_metrics": True, |
|
|
"show_historical_trends": True, |
|
|
"show_optimization_recommendations": True |
|
|
} |
|
|
} |
|
|
|
|
|
self._lock = threading.RLock() |
|
|
|
|
|
async def get_dashboard_data( |
|
|
self, |
|
|
include_recommendations: bool = True, |
|
|
include_alerts: bool = True, |
|
|
time_window: timedelta = timedelta(hours=24) |
|
|
) -> Dict[str, Any]: |
|
|
"""Get comprehensive dashboard data.""" |
|
|
|
|
|
|
|
|
real_time_metrics = await self.metrics_collector.get_real_time_metrics() |
|
|
|
|
|
|
|
|
metric_statistics = {} |
|
|
for metric_type in MetricType: |
|
|
stats = await self.metrics_collector.get_metric_statistics(metric_type, time_window) |
|
|
if stats: |
|
|
metric_statistics[metric_type.value] = stats |
|
|
|
|
|
|
|
|
recommendations = [] |
|
|
if include_recommendations: |
|
|
recommendations = await self.optimization_engine.generate_optimization_recommendations( |
|
|
real_time_metrics |
|
|
) |
|
|
|
|
|
|
|
|
current_alerts = [] |
|
|
if include_alerts: |
|
|
current_alerts = await self._check_metric_alerts(real_time_metrics) |
|
|
|
|
|
return { |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"real_time_metrics": {mt.value: mv.value for mt, mv in real_time_metrics.items()}, |
|
|
"metric_statistics": metric_statistics, |
|
|
"optimization_recommendations": [asdict(rec) for rec in recommendations], |
|
|
"current_alerts": [asdict(alert) for alert in current_alerts], |
|
|
"dashboard_status": "healthy" if len(current_alerts) == 0 else "degraded", |
|
|
"total_metrics_tracked": len(real_time_metrics), |
|
|
"optimization_status": self.optimization_engine.get_optimization_status() |
|
|
} |
|
|
|
|
|
async def get_metric_trend( |
|
|
self, |
|
|
metric_type: MetricType, |
|
|
time_window: timedelta = timedelta(hours=24), |
|
|
aggregation: str = "hourly" |
|
|
) -> Dict[str, Any]: |
|
|
"""Get metric trend data.""" |
|
|
|
|
|
series = await self.metrics_collector.get_metric_series(metric_type, time_window) |
|
|
|
|
|
if not series: |
|
|
return {"metric_type": metric_type.value, "data": [], "trend": "insufficient_data"} |
|
|
|
|
|
|
|
|
aggregated_data = await self._aggregate_metric_data(series, aggregation) |
|
|
|
|
|
|
|
|
trend = await self._calculate_trend(series) |
|
|
|
|
|
return { |
|
|
"metric_type": metric_type.value, |
|
|
"aggregation": aggregation, |
|
|
"data": aggregated_data, |
|
|
"trend": trend, |
|
|
"data_points": len(series), |
|
|
"time_window_hours": time_window.total_seconds() / 3600 |
|
|
} |
|
|
|
|
|
async def acknowledge_alert(self, alert_id: str) -> Dict[str, Any]: |
|
|
"""Acknowledge an alert.""" |
|
|
|
|
|
if alert_id in self.alerts: |
|
|
self.alerts[alert_id].acknowledged = True |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"alert_id": alert_id, |
|
|
"acknowledged_at": datetime.utcnow().isoformat() |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"success": False, |
|
|
"reason": "alert_not_found" |
|
|
} |
|
|
|
|
|
async def trigger_optimization( |
|
|
self, |
|
|
recommendation_id: str, |
|
|
agent_system: Any = None |
|
|
) -> Dict[str, Any]: |
|
|
"""Trigger an optimization recommendation.""" |
|
|
|
|
|
|
|
|
recommendation = None |
|
|
for rec in self.optimization_engine.current_recommendations: |
|
|
if rec.recommendation_id == recommendation_id: |
|
|
recommendation = rec |
|
|
break |
|
|
|
|
|
if not recommendation: |
|
|
return { |
|
|
"success": False, |
|
|
"reason": "recommendation_not_found" |
|
|
} |
|
|
|
|
|
|
|
|
result = await self.optimization_engine.apply_optimization(recommendation, agent_system) |
|
|
|
|
|
return { |
|
|
"success": result.get("success", False), |
|
|
"recommendation_id": recommendation_id, |
|
|
"optimization_result": result, |
|
|
"applied_at": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
async def _check_metric_alerts( |
|
|
self, |
|
|
real_time_metrics: Dict[MetricType, MetricValue] |
|
|
) -> List[Alert]: |
|
|
"""Check metrics against alert thresholds.""" |
|
|
|
|
|
alerts = [] |
|
|
thresholds = self.dashboard_config["alert_thresholds"] |
|
|
|
|
|
for metric_type, metric_value in real_time_metrics.items(): |
|
|
metric_thresholds = thresholds.get(metric_type) |
|
|
if not metric_thresholds: |
|
|
continue |
|
|
|
|
|
current_value = metric_value.value |
|
|
|
|
|
|
|
|
if current_value < metric_thresholds.get("warning", 0): |
|
|
alert_level = AlertLevel.WARNING |
|
|
title = f"Warning: {metric_type.value} below threshold" |
|
|
description = f"Metric {metric_type.value} is {current_value:.3f}, below warning threshold {metric_thresholds['warning']}" |
|
|
|
|
|
alert = Alert( |
|
|
alert_id=f"alert_{metric_type.value}_{int(time.time())}", |
|
|
level=alert_level, |
|
|
title=title, |
|
|
description=description, |
|
|
affected_metrics=[metric_type], |
|
|
threshold_value=metric_thresholds["warning"], |
|
|
current_value=current_value |
|
|
) |
|
|
alerts.append(alert) |
|
|
|
|
|
|
|
|
if current_value < metric_thresholds.get("critical", 0): |
|
|
alert_level = AlertLevel.CRITICAL |
|
|
title = f"Critical: {metric_type.value} severely below threshold" |
|
|
description = f"Metric {metric_type.value} is {current_value:.3f}, below critical threshold {metric_thresholds['critical']}" |
|
|
|
|
|
alert = Alert( |
|
|
alert_id=f"alert_critical_{metric_type.value}_{int(time.time())}", |
|
|
level=alert_level, |
|
|
title=title, |
|
|
description=description, |
|
|
affected_metrics=[metric_type], |
|
|
threshold_value=metric_thresholds["critical"], |
|
|
current_value=current_value |
|
|
) |
|
|
alerts.append(alert) |
|
|
|
|
|
|
|
|
for alert in alerts: |
|
|
self.alerts[alert.alert_id] = alert |
|
|
|
|
|
return [alert for alert in alerts if not alert.acknowledged] |
|
|
|
|
|
async def _aggregate_metric_data( |
|
|
self, |
|
|
series: List[MetricValue], |
|
|
aggregation: str |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Aggregate metric data based on time period.""" |
|
|
|
|
|
if aggregation == "hourly": |
|
|
|
|
|
hourly_groups = defaultdict(list) |
|
|
for mv in series: |
|
|
hour_key = mv.timestamp.replace(minute=0, second=0, microsecond=0) |
|
|
hourly_groups[hour_key].append(mv.value) |
|
|
|
|
|
aggregated = [] |
|
|
for hour, values in sorted(hour_groups.items()): |
|
|
aggregated.append({ |
|
|
"timestamp": hour.isoformat(), |
|
|
"value": np.mean(values), |
|
|
"count": len(values) |
|
|
}) |
|
|
|
|
|
return aggregated |
|
|
|
|
|
elif aggregation == "daily": |
|
|
|
|
|
daily_groups = defaultdict(list) |
|
|
for mv in series: |
|
|
day_key = mv.timestamp.replace(hour=0, minute=0, second=0, microsecond=0) |
|
|
daily_groups[day_key].append(mv.value) |
|
|
|
|
|
aggregated = [] |
|
|
for day, values in sorted(daily_groups.items()): |
|
|
aggregated.append({ |
|
|
"timestamp": day.isoformat(), |
|
|
"value": np.mean(values), |
|
|
"count": len(values) |
|
|
}) |
|
|
|
|
|
return aggregated |
|
|
|
|
|
else: |
|
|
|
|
|
return [ |
|
|
{ |
|
|
"timestamp": mv.timestamp.isoformat(), |
|
|
"value": mv.value, |
|
|
"confidence": mv.confidence |
|
|
} |
|
|
for mv in series |
|
|
] |
|
|
|
|
|
async def _calculate_trend(self, series: List[MetricValue]) -> str: |
|
|
"""Calculate trend direction for metric series.""" |
|
|
|
|
|
if len(series) < 3: |
|
|
return "insufficient_data" |
|
|
|
|
|
|
|
|
first_third = series[:len(series)//3] |
|
|
last_third = series[-len(series)//3:] |
|
|
|
|
|
first_avg = np.mean([mv.value for mv in first_third]) |
|
|
last_avg = np.mean([mv.value for mv in last_third]) |
|
|
|
|
|
difference = last_avg - first_avg |
|
|
relative_change = abs(difference) / max(first_avg, 0.1) |
|
|
|
|
|
if relative_change < 0.05: |
|
|
return "stable" |
|
|
elif difference > 0: |
|
|
return "improving" |
|
|
else: |
|
|
return "declining" |
|
|
|
|
|
|
|
|
|
|
|
async def update_alert_threshold( |
|
|
self, |
|
|
metric_type: MetricType, |
|
|
warning_threshold: float, |
|
|
critical_threshold: float |
|
|
) -> Dict[str, Any]: |
|
|
"""Update alert thresholds for a metric.""" |
|
|
|
|
|
with self._lock: |
|
|
self.dashboard_config["alert_thresholds"][metric_type] = { |
|
|
"warning": warning_threshold, |
|
|
"critical": critical_threshold |
|
|
} |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"metric_type": metric_type.value, |
|
|
"new_thresholds": { |
|
|
"warning": warning_threshold, |
|
|
"critical": critical_threshold |
|
|
} |
|
|
} |
|
|
|
|
|
async def update_display_preferences( |
|
|
self, |
|
|
preferences: Dict[str, bool] |
|
|
) -> Dict[str, Any]: |
|
|
"""Update dashboard display preferences.""" |
|
|
|
|
|
with self._lock: |
|
|
self.dashboard_config["display_preferences"].update(preferences) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"new_preferences": self.dashboard_config["display_preferences"] |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("Metrics Dashboard & Optimization System Initialized") |
|
|
print("=" * 60) |
|
|
dashboard = MetricsDashboard() |
|
|
print("Ready for comprehensive metrics monitoring and optimization!") |