rajkumarrawal's picture
Initial commit
2ec0d39
"""
Metrics Dashboard & Optimization System
====================================
Advanced metrics computation and dashboard system for context engineering,
including real-time metrics, optimization recommendations, and performance monitoring.
"""
import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Set, Tuple, Union, Callable
from dataclasses import dataclass, field, asdict
from enum import Enum
import numpy as np
from collections import defaultdict, deque
import threading
from concurrent.futures import ThreadPoolExecutor
import time
import statistics
from ai_agent_framework.core.context_engineering_agent import (
ContextElement, ContextModality, ContextDimension, ContextEngineeringAgent
)
logger = logging.getLogger(__name__)
class MetricType(Enum):
"""Types of metrics to compute."""
CONTEXT_RETENTION_ACCURACY = "context_retention_accuracy"
CONTEXT_RELEVANCE_PRECISION = "context_relevance_precision"
CONTEXT_ADAPTATION_SPEED = "context_adaptation_speed"
CONTEXTUAL_REASONING_QUALITY = "contextual_reasoning_quality"
USER_SATISFACTION = "user_satisfaction"
CONTEXT_UTILIZATION_EFFICIENCY = "context_utilization_efficiency"
PROCESSING_LATENCY = "processing_latency"
MEMORY_USAGE = "memory_usage"
ERROR_RATE = "error_rate"
SYSTEM_THROUGHPUT = "system_throughput"
class OptimizationTarget(Enum):
"""Optimization targets."""
PERFORMANCE = "performance"
ACCURACY = "accuracy"
EFFICIENCY = "efficiency"
USER_EXPERIENCE = "user_experience"
RESOURCE_USAGE = "resource_usage"
ADAPTABILITY = "adaptability"
class AlertLevel(Enum):
"""Alert severity levels."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class MetricValue:
"""Individual metric value with metadata."""
metric_type: MetricType
value: float
timestamp: datetime
confidence: float
sample_size: int
metadata: Dict[str, Any]
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.utcnow()
if not self.metadata:
self.metadata = {}
@dataclass
class MetricSeries:
"""Time series of metric values."""
metric_type: MetricType
values: List[MetricValue]
window_size: int
last_updated: datetime
aggregation_method: str
def __post_init__(self):
if not self.values:
self.values = []
if not self.last_updated:
self.last_updated = datetime.utcnow()
if not self.aggregation_method:
self.aggregation_method = "mean"
@dataclass
class OptimizationRecommendation:
"""System optimization recommendation."""
recommendation_id: str
target: OptimizationTarget
priority: int
description: str
expected_impact: float
implementation_effort: str
metrics_affected: List[MetricType]
created_at: datetime
def __post_init__(self):
if not self.recommendation_id:
self.recommendation_id = f"opt_rec_{int(time.time())}"
if not self.created_at:
self.created_at = datetime.utcnow()
@dataclass
class Alert:
"""System alert."""
alert_id: str
level: AlertLevel
title: str
description: str
affected_metrics: List[MetricType]
threshold_value: float
current_value: float
created_at: datetime
acknowledged: bool = False
def __post_init__(self):
if not self.alert_id:
self.alert_id = f"alert_{int(time.time())}"
if not self.created_at:
self.created_at = datetime.utcnow()
class MetricsCollector:
"""Advanced metrics collection and computation engine."""
def __init__(self, max_series_length: int = 1000):
self.max_series_length = max_series_length
self.metric_series = {} # MetricType -> MetricSeries
self.real_time_metrics = {} # current metric values
self.computation_functions = {
MetricType.CONTEXT_RETENTION_ACCURACY: self._compute_context_retention_accuracy,
MetricType.CONTEXT_RELEVANCE_PRECISION: self._compute_context_relevance_precision,
MetricType.CONTEXT_ADAPTATION_SPEED: self._compute_context_adaptation_speed,
MetricType.CONTEXTUAL_REASONING_QUALITY: self._compute_contextual_reasoning_quality,
MetricType.USER_SATISFACTION: self._compute_user_satisfaction,
MetricType.CONTEXT_UTILIZATION_EFFICIENCY: self._compute_context_utilization_efficiency,
MetricType.PROCESSING_LATENCY: self._compute_processing_latency,
MetricType.MEMORY_USAGE: self._compute_memory_usage,
MetricType.ERROR_RATE: self._compute_error_rate,
MetricType.SYSTEM_THROUGHPUT: self._compute_system_throughput
}
# Performance tracking
self.performance_history = deque(maxlen=100)
self.last_computation_time = {}
# Thread safety
self._lock = threading.RLock()
async def compute_metric(
self,
metric_type: MetricType,
context_data: Dict[str, Any],
agent_data: Dict[str, Any] = None
) -> MetricValue:
"""Compute a specific metric."""
if agent_data is None:
agent_data = {}
try:
start_time = time.time()
# Get computation function
computation_func = self.computation_functions.get(metric_type)
if not computation_func:
raise ValueError(f"No computation function for {metric_type}")
# Compute metric
result = await computation_func(context_data, agent_data)
# Create metric value
metric_value = MetricValue(
metric_type=metric_type,
value=result["value"],
timestamp=datetime.utcnow(),
confidence=result.get("confidence", 0.8),
sample_size=result.get("sample_size", 1),
metadata=result.get("metadata", {})
)
# Store in series
await self._store_metric_value(metric_value)
# Update real-time metrics
self.real_time_metrics[metric_type] = metric_value
# Record computation time
computation_time = time.time() - start_time
self.last_computation_time[metric_type] = computation_time
return metric_value
except Exception as e:
logger.error(f"Failed to compute metric {metric_type}: {e}")
# Return default metric value
return MetricValue(
metric_type=metric_type,
value=0.0,
timestamp=datetime.utcnow(),
confidence=0.0,
sample_size=0,
metadata={"error": str(e)}
)
async def compute_all_metrics(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any] = None
) -> Dict[MetricType, MetricValue]:
"""Compute all available metrics."""
if agent_data is None:
agent_data = {}
results = {}
# Compute metrics concurrently
tasks = []
for metric_type in MetricType:
task = self.compute_metric(metric_type, context_data, agent_data)
tasks.append(task)
computed_metrics = await asyncio.gather(*tasks, return_exceptions=True)
for metric_type, result in zip(MetricType, computed_metrics):
if isinstance(result, Exception):
logger.error(f"Error computing {metric_type}: {result}")
continue
results[metric_type] = result
return results
async def get_metric_series(
self,
metric_type: MetricType,
time_window: Optional[timedelta] = None,
aggregation: str = "mean"
) -> List[MetricValue]:
"""Get metric time series."""
if metric_type not in self.metric_series:
return []
series = self.metric_series[metric_type]
if time_window:
cutoff_time = datetime.utcnow() - time_window
filtered_values = [
mv for mv in series.values
if mv.timestamp >= cutoff_time
]
return filtered_values
return series.values
async def get_real_time_metrics(self) -> Dict[MetricType, MetricValue]:
"""Get current real-time metrics."""
return self.real_time_metrics.copy()
async def get_metric_statistics(
self,
metric_type: MetricType,
time_window: timedelta = timedelta(hours=24)
) -> Dict[str, float]:
"""Get statistical summary of metric."""
series = await self.get_metric_series(metric_type, time_window)
if not series:
return {}
values = [mv.value for mv in series]
return {
"count": len(values),
"mean": np.mean(values),
"median": np.median(values),
"std": np.std(values),
"min": np.min(values),
"max": np.max(values),
"p25": np.percentile(values, 25),
"p75": np.percentile(values, 75),
"p95": np.percentile(values, 95)
}
# Individual metric computation functions
async def _compute_context_retention_accuracy(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute context retention accuracy."""
# Context retention measures how well important context is preserved
contexts = context_data.get("contexts", [])
if not contexts:
return {"value": 0.0, "confidence": 0.0, "sample_size": 0}
# Simplified retention calculation
total_contexts = len(contexts)
retained_contexts = sum(1 for ctx in contexts if ctx.get("retained", True))
accuracy = retained_contexts / max(total_contexts, 1)
return {
"value": accuracy,
"confidence": min(1.0, total_contexts / 10), # Higher confidence with more data
"sample_size": total_contexts,
"metadata": {
"total_contexts": total_contexts,
"retained_contexts": retained_contexts
}
}
async def _compute_context_relevance_precision(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute context relevance precision."""
contexts = context_data.get("contexts", [])
if not contexts:
return {"value": 0.0, "confidence": 0.0, "sample_size": 0}
# Calculate precision based on relevance scores
relevance_scores = [ctx.get("relevance_score", 0) for ctx in contexts]
precision_scores = [ctx.get("precision_score", 0) for ctx in contexts]
if not precision_scores:
return {"value": 0.0, "confidence": 0.0, "sample_size": len(contexts)}
# Weighted precision calculation
total_precision = sum(precision_scores)
max_possible_precision = len(contexts)
precision = total_precision / max_possible_precision
# Additional relevance factor
relevance_factor = np.mean(relevance_scores) if relevance_scores else 0
combined_precision = (precision * 0.7) + (relevance_factor * 0.3)
return {
"value": combined_precision,
"confidence": min(1.0, len(contexts) / 5),
"sample_size": len(contexts),
"metadata": {
"avg_relevance": relevance_factor,
"raw_precision": precision
}
}
async def _compute_context_adaptation_speed(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute context adaptation speed."""
# Adaptation speed measures how quickly the system adapts to new contexts
adaptations = context_data.get("adaptations", [])
if not adaptations:
return {"value": 0.5, "confidence": 0.0, "sample_size": 0}
# Calculate adaptation times
adaptation_times = []
for adaptation in adaptations:
start_time = adaptation.get("start_time")
end_time = adaptation.get("end_time")
if start_time and end_time:
duration = (end_time - start_time).total_seconds()
adaptation_times.append(duration)
if not adaptation_times:
return {"value": 0.5, "confidence": 0.0, "sample_size": len(adaptations)}
# Convert to speed score (lower time = higher speed)
avg_duration = np.mean(adaptation_times)
speed_score = 1.0 / (1.0 + avg_duration / 60) # Normalize by 60 seconds
return {
"value": speed_score,
"confidence": min(1.0, len(adaptations) / 5),
"sample_size": len(adaptations),
"metadata": {
"avg_duration_seconds": avg_duration,
"min_duration": min(adaptation_times),
"max_duration": max(adaptation_times)
}
}
async def _compute_contextual_reasoning_quality(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute contextual reasoning quality."""
# Reasoning quality based on decision outcomes and context usage
reasoning_decisions = context_data.get("reasoning_decisions", [])
if not reasoning_decisions:
return {"value": 0.5, "confidence": 0.0, "sample_size": 0}
# Analyze reasoning quality factors
successful_reasoning = 0
context_aware_reasoning = 0
for decision in reasoning_decisions:
# Success factor
if decision.get("successful", False):
successful_reasoning += 1
# Context awareness factor
if decision.get("context_aware", False):
context_aware_reasoning += 1
# Calculate quality score
success_rate = successful_reasoning / len(reasoning_decisions)
context_awareness_rate = context_aware_reasoning / len(reasoning_decisions)
quality_score = (success_rate * 0.6) + (context_awareness_rate * 0.4)
return {
"value": quality_score,
"confidence": min(1.0, len(reasoning_decisions) / 10),
"sample_size": len(reasoning_decisions),
"metadata": {
"success_rate": success_rate,
"context_awareness_rate": context_awareness_rate
}
}
async def _compute_user_satisfaction(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute user satisfaction metrics."""
# User satisfaction based on interaction outcomes
user_interactions = context_data.get("user_interactions", [])
if not user_interactions:
return {"value": 0.5, "confidence": 0.0, "sample_size": 0}
satisfaction_scores = []
completion_rates = []
for interaction in user_interactions:
# Explicit satisfaction scores
if "satisfaction_score" in interaction:
satisfaction_scores.append(interaction["satisfaction_score"])
# Implicit satisfaction indicators
completion_rate = interaction.get("completion_rate", 0.5)
completion_rates.append(completion_rate)
# Calculate overall satisfaction
all_scores = satisfaction_scores + completion_rates
if not all_scores:
return {"value": 0.5, "confidence": 0.0, "sample_size": len(user_interactions)}
avg_satisfaction = np.mean(all_scores)
return {
"value": avg_satisfaction,
"confidence": min(1.0, len(all_scores) / 10),
"sample_size": len(all_scores),
"metadata": {
"explicit_scores": len(satisfaction_scores),
"implicit_scores": len(completion_rates),
"satisfaction_variance": np.var(all_scores) if len(all_scores) > 1 else 0
}
}
async def _compute_context_utilization_efficiency(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute context utilization efficiency."""
# Efficiency measures how well available context is utilized
contexts = context_data.get("contexts", [])
if not contexts:
return {"value": 0.0, "confidence": 0.0, "sample_size": 0}
utilization_scores = []
for ctx in contexts:
# Context usage frequency
usage_count = ctx.get("usage_count", 0)
available_count = ctx.get("available_count", 1)
utilization = usage_count / max(available_count, 1)
utilization_scores.append(min(1.0, utilization))
if not utilization_scores:
return {"value": 0.0, "confidence": 0.0, "sample_size": len(contexts)}
avg_utilization = np.mean(utilization_scores)
return {
"value": avg_utilization,
"confidence": min(1.0, len(contexts) / 8),
"sample_size": len(contexts),
"metadata": {
"total_contexts": len(contexts),
"avg_utilization": avg_utilization,
"underutilized_contexts": sum(1 for u in utilization_scores if u < 0.3)
}
}
async def _compute_processing_latency(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute processing latency metrics."""
# Processing time analysis
processing_times = context_data.get("processing_times", [])
if not processing_times:
return {"value": 0.5, "confidence": 0.0, "sample_size": 0}
avg_processing_time = np.mean(processing_times)
# Convert to performance score (lower latency = higher score)
max_acceptable_latency = 1000 # 1 second
latency_score = max(0.0, 1.0 - (avg_processing_time / max_acceptable_latency))
return {
"value": latency_score,
"confidence": min(1.0, len(processing_times) / 20),
"sample_size": len(processing_times),
"metadata": {
"avg_processing_time_ms": avg_processing_time,
"min_processing_time": min(processing_times),
"max_processing_time": max(processing_times)
}
}
async def _compute_memory_usage(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute memory usage efficiency."""
# Memory usage analysis
memory_usage = context_data.get("memory_usage", {})
if not memory_usage:
return {"value": 0.5, "confidence": 0.0, "sample_size": 0}
current_usage = memory_usage.get("current_mb", 0)
max_usage = memory_usage.get("max_mb", 1000)
# Usage efficiency (lower usage = higher efficiency)
usage_ratio = current_usage / max(max_usage, 1)
efficiency_score = max(0.0, 1.0 - usage_ratio)
return {
"value": efficiency_score,
"confidence": 0.8, # Memory metrics are generally reliable
"sample_size": 1,
"metadata": {
"current_usage_mb": current_usage,
"max_usage_mb": max_usage,
"usage_ratio": usage_ratio
}
}
async def _compute_error_rate(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute system error rate."""
# Error analysis
total_operations = context_data.get("total_operations", 0)
error_count = context_data.get("error_count", 0)
if total_operations == 0:
return {"value": 1.0, "confidence": 0.0, "sample_size": 0}
error_rate = error_count / total_operations
reliability_score = max(0.0, 1.0 - error_rate)
return {
"value": reliability_score,
"confidence": min(1.0, total_operations / 50),
"sample_size": total_operations,
"metadata": {
"total_operations": total_operations,
"error_count": error_count,
"raw_error_rate": error_rate
}
}
async def _compute_system_throughput(
self,
context_data: Dict[str, Any],
agent_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Compute system throughput."""
# Throughput analysis
operations_per_minute = context_data.get("operations_per_minute", 0)
target_throughput = context_data.get("target_throughput", 100)
# Throughput efficiency
throughput_ratio = operations_per_minute / max(target_throughput, 1)
efficiency_score = min(1.0, throughput_ratio)
return {
"value": efficiency_score,
"confidence": 0.7,
"sample_size": 1,
"metadata": {
"operations_per_minute": operations_per_minute,
"target_throughput": target_throughput,
"throughput_ratio": throughput_ratio
}
}
# Helper methods
async def _store_metric_value(self, metric_value: MetricValue) -> None:
"""Store metric value in time series."""
with self._lock:
metric_type = metric_value.metric_type
if metric_type not in self.metric_series:
self.metric_series[metric_type] = MetricSeries(
metric_type=metric_type,
values=[],
window_size=self.max_series_length,
last_updated=datetime.utcnow(),
aggregation_method="mean"
)
series = self.metric_series[metric_type]
series.values.append(metric_value)
series.last_updated = datetime.utcnow()
# Maintain window size
if len(series.values) > self.max_series_length:
series.values = series.values[-self.max_series_length:]
class OptimizationEngine:
"""Advanced optimization engine for context engineering systems."""
def __init__(self):
self.optimization_strategies = {
OptimizationTarget.PERFORMANCE: self._optimize_performance,
OptimizationTarget.ACCURACY: self._optimize_accuracy,
OptimizationTarget.EFFICIENCY: self._optimize_efficiency,
OptimizationTarget.USER_EXPERIENCE: self._optimize_user_experience,
OptimizationTarget.RESOURCE_USAGE: self._optimize_resource_usage,
OptimizationTarget.ADAPTABILITY: self._optimize_adaptability
}
self.current_recommendations = []
self.optimization_history = deque(maxlen=50)
async def generate_optimization_recommendations(
self,
metrics: Dict[MetricType, MetricValue],
context_data: Dict[str, Any] = None
) -> List[OptimizationRecommendation]:
"""Generate optimization recommendations based on current metrics."""
if context_data is None:
context_data = {}
recommendations = []
# Analyze each metric for optimization opportunities
for metric_type, metric_value in metrics.items():
metric_recommendations = await self._analyze_metric_for_optimization(
metric_type, metric_value, context_data
)
recommendations.extend(metric_recommendations)
# Cross-metric analysis
cross_recommendations = await self._analyze_cross_metric_optimizations(metrics)
recommendations.extend(cross_recommendations)
# Rank and filter recommendations
ranked_recommendations = await self._rank_recommendations(recommendations)
self.current_recommendations = ranked_recommendations
return ranked_recommendations
async def apply_optimization(
self,
recommendation: OptimizationRecommendation,
agent_system: Any
) -> Dict[str, Any]:
"""Apply an optimization recommendation."""
optimization_target = recommendation.target
if optimization_target in self.optimization_strategies:
optimization_func = self.optimization_strategies[optimization_target]
result = await optimization_func(recommendation, agent_system)
# Record optimization attempt
self.optimization_history.append({
"recommendation_id": recommendation.recommendation_id,
"timestamp": datetime.utcnow(),
"result": result,
"target": optimization_target.value
})
return result
return {"success": False, "reason": "unknown_optimization_target"}
async def get_optimization_status(self) -> Dict[str, Any]:
"""Get current optimization status and history."""
recent_optimizations = list(self.optimization_history)[-10:] # Last 10
return {
"current_recommendations": len(self.current_recommendations),
"recent_optimizations": recent_optimizations,
"total_optimization_attempts": len(self.optimization_history),
"optimization_success_rate": self._calculate_success_rate()
}
def _calculate_success_rate(self) -> float:
"""Calculate optimization success rate."""
if not self.optimization_history:
return 0.0
successful_attempts = sum(
1 for opt in self.optimization_history
if opt.get("result", {}).get("success", False)
)
return successful_attempts / len(self.optimization_history)
# Individual optimization strategies
async def _optimize_performance(
self,
recommendation: OptimizationRecommendation,
agent_system: Any
) -> Dict[str, Any]:
"""Optimize for performance."""
# Performance optimization strategies
strategies = {
"cache_frequently_used_context": self._optimize_cache_strategy,
"parallel_processing": self._optimize_parallel_processing,
"reduce_computation_overhead": self._optimize_computation_overhead,
"memory_pool_optimization": self._optimize_memory_pools
}
strategy_func = strategies.get(recommendation.description)
if strategy_func:
return await strategy_func(agent_system)
return {"success": False, "reason": "unknown_performance_strategy"}
async def _optimize_accuracy(
self,
recommendation: OptimizationRecommendation,
agent_system: Any
) -> Dict[str, Any]:
"""Optimize for accuracy."""
# Accuracy optimization strategies
strategies = {
"enhance_context_validation": self._optimize_context_validation,
"improve_relevance_scoring": self._optimize_relevance_scoring,
"refine_reasoning_algorithms": self._optimize_reasoning_algorithms
}
strategy_func = strategies.get(recommendation.description)
if strategy_func:
return await strategy_func(agent_system)
return {"success": False, "reason": "unknown_accuracy_strategy"}
async def _optimize_efficiency(
self,
recommendation: OptimizationRecommendation,
agent_system: Any
) -> Dict[str, Any]:
"""Optimize for efficiency."""
# Efficiency optimization strategies
strategies = {
"context_compression": self._optimize_context_compression,
"resource_pooling": self._optimize_resource_pooling,
"batch_processing": self._optimize_batch_processing
}
strategy_func = strategies.get(recommendation.description)
if strategy_func:
return await strategy_func(agent_system)
return {"success": False, "reason": "unknown_efficiency_strategy"}
async def _optimize_user_experience(
self,
recommendation: OptimizationRecommendation,
agent_system: Any
) -> Dict[str, Any]:
"""Optimize for user experience."""
# User experience optimization strategies
strategies = {
"reduce_response_latency": self._optimize_response_latency,
"improve_interaction_flow": self._optimize_interaction_flow,
"enhance_feedback_mechanisms": self._optimize_feedback_mechanisms
}
strategy_func = strategies.get(recommendation.description)
if strategy_func:
return await strategy_func(agent_system)
return {"success": False, "reason": "unknown_ux_strategy"}
async def _optimize_resource_usage(
self,
recommendation: OptimizationRecommendation,
agent_system: Any
) -> Dict[str, Any]:
"""Optimize for resource usage."""
# Resource optimization strategies
strategies = {
"memory_optimization": self._optimize_memory_usage,
"cpu_optimization": self._optimize_cpu_usage,
"storage_optimization": self._optimize_storage_usage
}
strategy_func = strategies.get(recommendation.description)
if strategy_func:
return await strategy_func(agent_system)
return {"success": False, "reason": "unknown_resource_strategy"}
async def _optimize_adaptability(
self,
recommendation: OptimizationRecommendation,
agent_system: Any
) -> Dict[str, Any]:
"""Optimize for adaptability."""
# Adaptability optimization strategies
strategies = {
"improve_learning_rate": self._optimize_learning_rate,
"enhance_pattern_recognition": self._optimize_pattern_recognition,
"adaptive_parameter_tuning": self._optimize_parameter_tuning
}
strategy_func = strategies.get(recommendation.description)
if strategy_func:
return await strategy_func(agent_system)
return {"success": False, "reason": "unknown_adaptability_strategy"}
# Analysis methods
async def _analyze_metric_for_optimization(
self,
metric_type: MetricType,
metric_value: MetricValue,
context_data: Dict[str, Any]
) -> List[OptimizationRecommendation]:
"""Analyze individual metric for optimization opportunities."""
recommendations = []
# Thresholds for optimization
thresholds = {
MetricType.CONTEXT_RETENTION_ACCURACY: {"poor": 0.6, "good": 0.8},
MetricType.CONTEXT_RELEVANCE_PRECISION: {"poor": 0.7, "good": 0.9},
MetricType.CONTEXT_ADAPTATION_SPEED: {"poor": 0.5, "good": 0.8},
MetricType.CONTEXTUAL_REASONING_QUALITY: {"poor": 0.6, "good": 0.85},
MetricType.USER_SATISFACTION: {"poor": 0.7, "good": 0.9},
MetricType.PROCESSING_LATENCY: {"poor": 0.4, "good": 0.7},
MetricType.ERROR_RATE: {"poor": 0.8, "good": 0.95}
}
threshold_data = thresholds.get(metric_type)
if not threshold_data:
return recommendations
# Check if optimization is needed
if metric_value.value < threshold_data["poor"]:
# Generate optimization recommendation
recommendation = await self._generate_metric_recommendation(
metric_type, metric_value, threshold_data
)
if recommendation:
recommendations.append(recommendation)
return recommendations
async def _generate_metric_recommendation(
self,
metric_type: MetricType,
metric_value: MetricValue,
threshold_data: Dict[str, float]
) -> Optional[OptimizationRecommendation]:
"""Generate optimization recommendation for a metric."""
# Map metrics to optimization targets and strategies
metric_mappings = {
MetricType.CONTEXT_RETENTION_ACCURACY: {
"target": OptimizationTarget.ACCURACY,
"description": "enhance_context_validation",
"priority": 8
},
MetricType.CONTEXT_RELEVANCE_PRECISION: {
"target": OptimizationTarget.ACCURACY,
"description": "improve_relevance_scoring",
"priority": 7
},
MetricType.PROCESSING_LATENCY: {
"target": OptimizationTarget.PERFORMANCE,
"description": "cache_frequently_used_context",
"priority": 9
},
MetricType.USER_SATISFACTION: {
"target": OptimizationTarget.USER_EXPERIENCE,
"description": "reduce_response_latency",
"priority": 10
},
MetricType.ERROR_RATE: {
"target": OptimizationTarget.ACCURACY,
"description": "enhance_error_handling",
"priority": 6
}
}
mapping = metric_mappings.get(metric_type)
if not mapping:
return None
# Calculate expected impact
impact = max(0.1, threshold_data["good"] - metric_value.value)
return OptimizationRecommendation(
recommendation_id=f"opt_{metric_type.value}_{int(time.time())}",
target=mapping["target"],
priority=mapping["priority"],
description=mapping["description"],
expected_impact=impact,
implementation_effort="medium",
metrics_affected=[metric_type],
created_at=datetime.utcnow()
)
async def _analyze_cross_metric_optimizations(
self,
metrics: Dict[MetricType, MetricValue]
) -> List[OptimizationRecommendation]:
"""Analyze cross-metric optimization opportunities."""
recommendations = []
# Performance-Accuracy trade-offs
latency = metrics.get(MetricType.PROCESSING_LATUS)
accuracy = metrics.get(MetricType.CONTEXT_RETENTION_ACCURACY)
if latency and accuracy:
if latency.value < 0.5 and accuracy.value > 0.8:
# Good accuracy but poor performance - optimize for performance
recommendation = OptimizationRecommendation(
recommendation_id=f"perf_acc_opt_{int(time.time())}",
target=OptimizationTarget.PERFORMANCE,
priority=6,
description="parallel_processing",
expected_impact=0.3,
implementation_effort="high",
metrics_affected=[MetricType.PROCESSING_LATENCY],
created_at=datetime.utcnow()
)
recommendations.append(recommendation)
# Efficiency-User Experience balance
utilization = metrics.get(MetricType.CONTEXT_UTILIZATION_EFFICIENCY)
satisfaction = metrics.get(MetricType.USER_SATISFACTION)
if utilization and satisfaction:
if utilization.value > 0.9 and satisfaction.value < 0.7:
# High efficiency but low satisfaction - improve UX
recommendation = OptimizationRecommendation(
recommendation_id=f"eff_ux_opt_{int(time.time())}",
target=OptimizationTarget.USER_EXPERIENCE,
priority=7,
description="improve_interaction_flow",
expected_impact=0.4,
implementation_effort="medium",
metrics_affected=[MetricType.USER_SATISFACTION],
created_at=datetime.utcnow()
)
recommendations.append(recommendation)
return recommendations
async def _rank_recommendations(
self,
recommendations: List[OptimizationRecommendation]
) -> List[OptimizationRecommendation]:
"""Rank recommendations by priority and impact."""
# Score each recommendation
scored_recommendations = []
for rec in recommendations:
# Calculate composite score
priority_score = rec.priority / 10.0
impact_score = rec.expected_impact
effort_penalty = {"low": 0.0, "medium": -0.1, "high": -0.2}.get(
rec.implementation_effort, -0.1
)
composite_score = priority_score * 0.4 + impact_score * 0.4 + effort_penalty
scored_recommendations.append((composite_score, rec))
# Sort by score (highest first)
scored_recommendations.sort(key=lambda x: x[0], reverse=True)
return [rec for _, rec in scored_recommendations]
# Placeholder optimization implementations
async def _optimize_cache_strategy(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize caching strategy."""
return {"success": True, "improvement": "cache_hit_rate_increased"}
async def _optimize_parallel_processing(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize parallel processing."""
return {"success": True, "improvement": "processing_speed_increased"}
async def _optimize_computation_overhead(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize computation overhead."""
return {"success": True, "improvement": "computation_overhead_reduced"}
async def _optimize_memory_pools(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize memory pools."""
return {"success": True, "improvement": "memory_efficiency_improved"}
async def _optimize_context_validation(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize context validation."""
return {"success": True, "improvement": "validation_accuracy_increased"}
async def _optimize_relevance_scoring(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize relevance scoring."""
return {"success": True, "improvement": "relevance_precision_increased"}
async def _optimize_reasoning_algorithms(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize reasoning algorithms."""
return {"success": True, "improvement": "reasoning_quality_increased"}
async def _optimize_context_compression(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize context compression."""
return {"success": True, "improvement": "compression_efficiency_increased"}
async def _optimize_resource_pooling(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize resource pooling."""
return {"success": True, "improvement": "resource_utilization_improved"}
async def _optimize_batch_processing(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize batch processing."""
return {"success": True, "improvement": "batch_efficiency_increased"}
async def _optimize_response_latency(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize response latency."""
return {"success": True, "improvement": "response_time_reduced"}
async def _optimize_interaction_flow(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize interaction flow."""
return {"success": True, "improvement": "interaction_ux_improved"}
async def _optimize_feedback_mechanisms(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize feedback mechanisms."""
return {"success": True, "improvement": "feedback_quality_increased"}
async def _optimize_memory_usage(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize memory usage."""
return {"success": True, "improvement": "memory_usage_optimized"}
async def _optimize_cpu_usage(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize CPU usage."""
return {"success": True, "improvement": "cpu_efficiency_improved"}
async def _optimize_storage_usage(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize storage usage."""
return {"success": True, "improvement": "storage_efficiency_increased"}
async def _optimize_learning_rate(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize learning rate."""
return {"success": True, "improvement": "learning_speed_increased"}
async def _optimize_pattern_recognition(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize pattern recognition."""
return {"success": True, "improvement": "pattern_recognition_improved"}
async def _optimize_parameter_tuning(self, agent_system: Any) -> Dict[str, Any]:
"""Optimize parameter tuning."""
return {"success": True, "improvement": "parameter_optimization_increased"}
class MetricsDashboard:
"""Comprehensive metrics dashboard and monitoring system."""
def __init__(self):
self.metrics_collector = MetricsCollector()
self.optimization_engine = OptimizationEngine()
self.alerts = {} # Alert ID -> Alert
self.dashboard_config = {
"refresh_interval": 30, # seconds
"alert_thresholds": {
MetricType.ERROR_RATE: {"warning": 0.1, "critical": 0.2},
MetricType.PROCESSING_LATENCY: {"warning": 0.3, "critical": 0.2},
MetricType.USER_SATISFACTION: {"warning": 0.6, "critical": 0.4}
},
"display_preferences": {
"show_real_time_metrics": True,
"show_historical_trends": True,
"show_optimization_recommendations": True
}
}
self._lock = threading.RLock()
async def get_dashboard_data(
self,
include_recommendations: bool = True,
include_alerts: bool = True,
time_window: timedelta = timedelta(hours=24)
) -> Dict[str, Any]:
"""Get comprehensive dashboard data."""
# Get real-time metrics
real_time_metrics = await self.metrics_collector.get_real_time_metrics()
# Get metric statistics
metric_statistics = {}
for metric_type in MetricType:
stats = await self.metrics_collector.get_metric_statistics(metric_type, time_window)
if stats:
metric_statistics[metric_type.value] = stats
# Generate optimization recommendations
recommendations = []
if include_recommendations:
recommendations = await self.optimization_engine.generate_optimization_recommendations(
real_time_metrics
)
# Check for alerts
current_alerts = []
if include_alerts:
current_alerts = await self._check_metric_alerts(real_time_metrics)
return {
"timestamp": datetime.utcnow().isoformat(),
"real_time_metrics": {mt.value: mv.value for mt, mv in real_time_metrics.items()},
"metric_statistics": metric_statistics,
"optimization_recommendations": [asdict(rec) for rec in recommendations],
"current_alerts": [asdict(alert) for alert in current_alerts],
"dashboard_status": "healthy" if len(current_alerts) == 0 else "degraded",
"total_metrics_tracked": len(real_time_metrics),
"optimization_status": self.optimization_engine.get_optimization_status()
}
async def get_metric_trend(
self,
metric_type: MetricType,
time_window: timedelta = timedelta(hours=24),
aggregation: str = "hourly"
) -> Dict[str, Any]:
"""Get metric trend data."""
series = await self.metrics_collector.get_metric_series(metric_type, time_window)
if not series:
return {"metric_type": metric_type.value, "data": [], "trend": "insufficient_data"}
# Aggregate data based on requested aggregation
aggregated_data = await self._aggregate_metric_data(series, aggregation)
# Calculate trend
trend = await self._calculate_trend(series)
return {
"metric_type": metric_type.value,
"aggregation": aggregation,
"data": aggregated_data,
"trend": trend,
"data_points": len(series),
"time_window_hours": time_window.total_seconds() / 3600
}
async def acknowledge_alert(self, alert_id: str) -> Dict[str, Any]:
"""Acknowledge an alert."""
if alert_id in self.alerts:
self.alerts[alert_id].acknowledged = True
return {
"success": True,
"alert_id": alert_id,
"acknowledged_at": datetime.utcnow().isoformat()
}
else:
return {
"success": False,
"reason": "alert_not_found"
}
async def trigger_optimization(
self,
recommendation_id: str,
agent_system: Any = None
) -> Dict[str, Any]:
"""Trigger an optimization recommendation."""
# Find the recommendation
recommendation = None
for rec in self.optimization_engine.current_recommendations:
if rec.recommendation_id == recommendation_id:
recommendation = rec
break
if not recommendation:
return {
"success": False,
"reason": "recommendation_not_found"
}
# Apply optimization
result = await self.optimization_engine.apply_optimization(recommendation, agent_system)
return {
"success": result.get("success", False),
"recommendation_id": recommendation_id,
"optimization_result": result,
"applied_at": datetime.utcnow().isoformat()
}
async def _check_metric_alerts(
self,
real_time_metrics: Dict[MetricType, MetricValue]
) -> List[Alert]:
"""Check metrics against alert thresholds."""
alerts = []
thresholds = self.dashboard_config["alert_thresholds"]
for metric_type, metric_value in real_time_metrics.items():
metric_thresholds = thresholds.get(metric_type)
if not metric_thresholds:
continue
current_value = metric_value.value
# Check warning threshold
if current_value < metric_thresholds.get("warning", 0):
alert_level = AlertLevel.WARNING
title = f"Warning: {metric_type.value} below threshold"
description = f"Metric {metric_type.value} is {current_value:.3f}, below warning threshold {metric_thresholds['warning']}"
alert = Alert(
alert_id=f"alert_{metric_type.value}_{int(time.time())}",
level=alert_level,
title=title,
description=description,
affected_metrics=[metric_type],
threshold_value=metric_thresholds["warning"],
current_value=current_value
)
alerts.append(alert)
# Check critical threshold
if current_value < metric_thresholds.get("critical", 0):
alert_level = AlertLevel.CRITICAL
title = f"Critical: {metric_type.value} severely below threshold"
description = f"Metric {metric_type.value} is {current_value:.3f}, below critical threshold {metric_thresholds['critical']}"
alert = Alert(
alert_id=f"alert_critical_{metric_type.value}_{int(time.time())}",
level=alert_level,
title=title,
description=description,
affected_metrics=[metric_type],
threshold_value=metric_thresholds["critical"],
current_value=current_value
)
alerts.append(alert)
# Store alerts
for alert in alerts:
self.alerts[alert.alert_id] = alert
return [alert for alert in alerts if not alert.acknowledged]
async def _aggregate_metric_data(
self,
series: List[MetricValue],
aggregation: str
) -> List[Dict[str, Any]]:
"""Aggregate metric data based on time period."""
if aggregation == "hourly":
# Group by hour
hourly_groups = defaultdict(list)
for mv in series:
hour_key = mv.timestamp.replace(minute=0, second=0, microsecond=0)
hourly_groups[hour_key].append(mv.value)
aggregated = []
for hour, values in sorted(hour_groups.items()):
aggregated.append({
"timestamp": hour.isoformat(),
"value": np.mean(values),
"count": len(values)
})
return aggregated
elif aggregation == "daily":
# Group by day
daily_groups = defaultdict(list)
for mv in series:
day_key = mv.timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
daily_groups[day_key].append(mv.value)
aggregated = []
for day, values in sorted(daily_groups.items()):
aggregated.append({
"timestamp": day.isoformat(),
"value": np.mean(values),
"count": len(values)
})
return aggregated
else:
# Return raw data points
return [
{
"timestamp": mv.timestamp.isoformat(),
"value": mv.value,
"confidence": mv.confidence
}
for mv in series
]
async def _calculate_trend(self, series: List[MetricValue]) -> str:
"""Calculate trend direction for metric series."""
if len(series) < 3:
return "insufficient_data"
# Simple trend calculation using first and last thirds
first_third = series[:len(series)//3]
last_third = series[-len(series)//3:]
first_avg = np.mean([mv.value for mv in first_third])
last_avg = np.mean([mv.value for mv in last_third])
difference = last_avg - first_avg
relative_change = abs(difference) / max(first_avg, 0.1)
if relative_change < 0.05: # Less than 5% change
return "stable"
elif difference > 0:
return "improving"
else:
return "declining"
# Dashboard configuration methods
async def update_alert_threshold(
self,
metric_type: MetricType,
warning_threshold: float,
critical_threshold: float
) -> Dict[str, Any]:
"""Update alert thresholds for a metric."""
with self._lock:
self.dashboard_config["alert_thresholds"][metric_type] = {
"warning": warning_threshold,
"critical": critical_threshold
}
return {
"success": True,
"metric_type": metric_type.value,
"new_thresholds": {
"warning": warning_threshold,
"critical": critical_threshold
}
}
async def update_display_preferences(
self,
preferences: Dict[str, bool]
) -> Dict[str, Any]:
"""Update dashboard display preferences."""
with self._lock:
self.dashboard_config["display_preferences"].update(preferences)
return {
"success": True,
"new_preferences": self.dashboard_config["display_preferences"]
}
if __name__ == "__main__":
print("Metrics Dashboard & Optimization System Initialized")
print("=" * 60)
dashboard = MetricsDashboard()
print("Ready for comprehensive metrics monitoring and optimization!")