Spaces:
Sleeping
Sleeping
| """ | |
| Metrics Collector module for the Monitoring & Analytics components. | |
| This module implements metrics collection, aggregation, and reporting | |
| to track the agent's performance and operation. | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from typing import Dict, List, Any, Optional, Union | |
| from datetime import datetime, timedelta | |
| import uuid | |
| from prometheus_client import Counter, Histogram, Gauge, Summary, push_to_gateway | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class MetricsCollector: | |
| """ | |
| Collects, aggregates, and reports metrics on agent performance. | |
| This class enables monitoring of task execution, action performance, | |
| resource usage, and other operational metrics. | |
| """ | |
| def __init__(self): | |
| """Initialize the MetricsCollector.""" | |
| # Configuration | |
| self.metrics_enabled = True | |
| self.prometheus_enabled = os.environ.get("PROMETHEUS_ENABLED", "true").lower() == "true" | |
| self.prometheus_push_gateway = os.environ.get("PROMETHEUS_PUSH_GATEWAY", "localhost:9091") | |
| self.prometheus_job_name = os.environ.get("PROMETHEUS_JOB_NAME", "agentic_browser") | |
| self.agent_id = str(uuid.uuid4())[:8] # Short ID for labeling | |
| # Prometheus metrics | |
| self.task_counter = None | |
| self.action_histogram = None | |
| self.error_counter = None | |
| self.memory_gauge = None | |
| self.performance_summary = None | |
| # Internal metrics storage | |
| self.metrics_data = { | |
| "tasks": { | |
| "total": 0, | |
| "successful": 0, | |
| "failed": 0 | |
| }, | |
| "actions": { | |
| "total": 0, | |
| "successful": 0, | |
| "failed": 0, | |
| "types": {} | |
| }, | |
| "performance": { | |
| "avg_task_time": 0, | |
| "avg_action_time": 0 | |
| }, | |
| "errors": { | |
| "total": 0, | |
| "types": {} | |
| }, | |
| "resources": { | |
| "memory_usage": 0, | |
| "cpu_usage": 0 | |
| } | |
| } | |
| # Time-series data | |
| self.time_series = { | |
| "tasks": [], | |
| "actions": [], | |
| "errors": [] | |
| } | |
| # Session tracking | |
| self.session_start_time = time.time() | |
| logger.info("MetricsCollector instance created") | |
| async def initialize(self): | |
| """Initialize resources.""" | |
| if self.prometheus_enabled: | |
| try: | |
| # Initialize Prometheus metrics | |
| self.task_counter = Counter( | |
| 'agentic_browser_tasks_total', | |
| 'Total number of tasks processed', | |
| ['status', 'agent_id'] | |
| ) | |
| self.action_histogram = Histogram( | |
| 'agentic_browser_action_duration_seconds', | |
| 'Action execution time in seconds', | |
| ['action_type', 'agent_id'], | |
| buckets=(0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0) | |
| ) | |
| self.error_counter = Counter( | |
| 'agentic_browser_errors_total', | |
| 'Total number of errors', | |
| ['error_type', 'agent_id'] | |
| ) | |
| self.memory_gauge = Gauge( | |
| 'agentic_browser_memory_usage_bytes', | |
| 'Memory usage in bytes', | |
| ['agent_id'] | |
| ) | |
| self.performance_summary = Summary( | |
| 'agentic_browser_task_performance_seconds', | |
| 'Task performance summary in seconds', | |
| ['agent_id'] | |
| ) | |
| logger.info("Prometheus metrics initialized") | |
| except Exception as e: | |
| logger.error(f"Error initializing Prometheus metrics: {str(e)}") | |
| self.prometheus_enabled = False | |
| logger.info("MetricsCollector initialized successfully") | |
| return True | |
| def record_task_created(self): | |
| """Record a task creation event.""" | |
| if not self.metrics_enabled: | |
| return | |
| self.metrics_data["tasks"]["total"] += 1 | |
| if self.prometheus_enabled: | |
| self.task_counter.labels(status="created", agent_id=self.agent_id).inc() | |
| def record_task_completed(self, duration: float): | |
| """ | |
| Record a task completion event. | |
| Args: | |
| duration: Task duration in seconds | |
| """ | |
| if not self.metrics_enabled: | |
| return | |
| self.metrics_data["tasks"]["successful"] += 1 | |
| # Update average task time | |
| total_successful = self.metrics_data["tasks"]["successful"] | |
| current_avg = self.metrics_data["performance"]["avg_task_time"] | |
| new_avg = ((current_avg * (total_successful - 1)) + duration) / total_successful | |
| self.metrics_data["performance"]["avg_task_time"] = new_avg | |
| # Add to time series | |
| self.time_series["tasks"].append({ | |
| "timestamp": time.time(), | |
| "status": "completed", | |
| "duration": duration | |
| }) | |
| if self.prometheus_enabled: | |
| self.task_counter.labels(status="completed", agent_id=self.agent_id).inc() | |
| self.performance_summary.labels(agent_id=self.agent_id).observe(duration) | |
| def record_task_failed(self): | |
| """Record a task failure event.""" | |
| if not self.metrics_enabled: | |
| return | |
| self.metrics_data["tasks"]["failed"] += 1 | |
| # Add to time series | |
| self.time_series["tasks"].append({ | |
| "timestamp": time.time(), | |
| "status": "failed" | |
| }) | |
| if self.prometheus_enabled: | |
| self.task_counter.labels(status="failed", agent_id=self.agent_id).inc() | |
| def record_action_executed(self, action_type: str, duration: float, success: bool): | |
| """ | |
| Record an action execution event. | |
| Args: | |
| action_type: Type of action executed | |
| duration: Action duration in seconds | |
| success: Whether the action was successful | |
| """ | |
| if not self.metrics_enabled: | |
| return | |
| self.metrics_data["actions"]["total"] += 1 | |
| if success: | |
| self.metrics_data["actions"]["successful"] += 1 | |
| else: | |
| self.metrics_data["actions"]["failed"] += 1 | |
| # Track by action type | |
| if action_type not in self.metrics_data["actions"]["types"]: | |
| self.metrics_data["actions"]["types"][action_type] = { | |
| "total": 0, | |
| "successful": 0, | |
| "failed": 0, | |
| "avg_duration": 0 | |
| } | |
| action_stats = self.metrics_data["actions"]["types"][action_type] | |
| action_stats["total"] += 1 | |
| if success: | |
| action_stats["successful"] += 1 | |
| # Update average duration | |
| current_avg = action_stats["avg_duration"] | |
| new_avg = ((current_avg * (action_stats["successful"] - 1)) + duration) / action_stats["successful"] | |
| action_stats["avg_duration"] = new_avg | |
| else: | |
| action_stats["failed"] += 1 | |
| # Add to time series | |
| self.time_series["actions"].append({ | |
| "timestamp": time.time(), | |
| "type": action_type, | |
| "duration": duration, | |
| "success": success | |
| }) | |
| # Update overall average action time | |
| total_successful_actions = self.metrics_data["actions"]["successful"] | |
| if total_successful_actions > 0: | |
| current_avg = self.metrics_data["performance"]["avg_action_time"] | |
| new_avg = ((current_avg * (total_successful_actions - 1)) + duration) / total_successful_actions | |
| self.metrics_data["performance"]["avg_action_time"] = new_avg | |
| if self.prometheus_enabled: | |
| self.action_histogram.labels(action_type=action_type, agent_id=self.agent_id).observe(duration) | |
| def record_error(self, error_type: str, error_details: str): | |
| """ | |
| Record an error event. | |
| Args: | |
| error_type: Type of error | |
| error_details: Error details | |
| """ | |
| if not self.metrics_enabled: | |
| return | |
| self.metrics_data["errors"]["total"] += 1 | |
| # Track by error type | |
| if error_type not in self.metrics_data["errors"]["types"]: | |
| self.metrics_data["errors"]["types"][error_type] = 0 | |
| self.metrics_data["errors"]["types"][error_type] += 1 | |
| # Add to time series | |
| self.time_series["errors"].append({ | |
| "timestamp": time.time(), | |
| "type": error_type, | |
| "details": error_details | |
| }) | |
| if self.prometheus_enabled: | |
| self.error_counter.labels(error_type=error_type, agent_id=self.agent_id).inc() | |
| def record_resource_usage(self, memory_bytes: int, cpu_percent: float): | |
| """ | |
| Record resource usage. | |
| Args: | |
| memory_bytes: Memory usage in bytes | |
| cpu_percent: CPU usage as percentage | |
| """ | |
| if not self.metrics_enabled: | |
| return | |
| self.metrics_data["resources"]["memory_usage"] = memory_bytes | |
| self.metrics_data["resources"]["cpu_usage"] = cpu_percent | |
| if self.prometheus_enabled: | |
| self.memory_gauge.labels(agent_id=self.agent_id).set(memory_bytes) | |
| async def push_metrics_to_prometheus(self): | |
| """Push current metrics to Prometheus Push Gateway.""" | |
| if not self.prometheus_enabled: | |
| return | |
| try: | |
| from prometheus_client import push_to_gateway | |
| # Add instance label | |
| registry = self.task_counter._registry | |
| grouping_keys = {'instance': f'agent_{self.agent_id}', 'job': self.prometheus_job_name} | |
| push_to_gateway(self.prometheus_push_gateway, job=self.prometheus_job_name, registry=registry, grouping_key=grouping_keys) | |
| logger.info(f"Metrics pushed to Prometheus Push Gateway: {self.prometheus_push_gateway}") | |
| except Exception as e: | |
| logger.error(f"Error pushing metrics to Prometheus: {str(e)}") | |
| def get_metrics_summary(self) -> Dict: | |
| """ | |
| Get a summary of current metrics. | |
| Returns: | |
| Dict: Metrics summary | |
| """ | |
| # Calculate success rates | |
| task_success_rate = 0 | |
| if self.metrics_data["tasks"]["total"] > 0: | |
| task_success_rate = self.metrics_data["tasks"]["successful"] / self.metrics_data["tasks"]["total"] | |
| action_success_rate = 0 | |
| if self.metrics_data["actions"]["total"] > 0: | |
| action_success_rate = self.metrics_data["actions"]["successful"] / self.metrics_data["actions"]["total"] | |
| # Calculate session duration | |
| session_duration = time.time() - self.session_start_time | |
| # Prepare summary | |
| return { | |
| "summary": { | |
| "task_success_rate": task_success_rate, | |
| "action_success_rate": action_success_rate, | |
| "avg_task_time": self.metrics_data["performance"]["avg_task_time"], | |
| "avg_action_time": self.metrics_data["performance"]["avg_action_time"], | |
| "error_rate": self.metrics_data["errors"]["total"] / max(1, self.metrics_data["tasks"]["total"]), | |
| "session_duration": session_duration | |
| }, | |
| "tasks": { | |
| "total": self.metrics_data["tasks"]["total"], | |
| "successful": self.metrics_data["tasks"]["successful"], | |
| "failed": self.metrics_data["tasks"]["failed"] | |
| }, | |
| "actions": { | |
| "total": self.metrics_data["actions"]["total"], | |
| "successful": self.metrics_data["actions"]["successful"], | |
| "failed": self.metrics_data["actions"]["failed"], | |
| "by_type": { | |
| action_type: { | |
| "success_rate": stats["successful"] / max(1, stats["total"]), | |
| "avg_duration": stats["avg_duration"], | |
| "count": stats["total"] | |
| } | |
| for action_type, stats in self.metrics_data["actions"]["types"].items() | |
| } | |
| }, | |
| "errors": { | |
| "total": self.metrics_data["errors"]["total"], | |
| "by_type": self.metrics_data["errors"]["types"] | |
| }, | |
| "resources": self.metrics_data["resources"] | |
| } | |
| def get_time_series(self, metric_type: str, time_range: int = 3600) -> List[Dict]: | |
| """ | |
| Get time series data for a specific metric. | |
| Args: | |
| metric_type: Type of metric to retrieve (tasks, actions, errors) | |
| time_range: Time range in seconds (default 1 hour) | |
| Returns: | |
| List[Dict]: Time series data points | |
| """ | |
| if metric_type not in self.time_series: | |
| return [] | |
| # Filter by time range | |
| start_time = time.time() - time_range | |
| return [point for point in self.time_series[metric_type] if point["timestamp"] >= start_time] | |
| def reset_metrics(self): | |
| """Reset all metrics.""" | |
| self.metrics_data = { | |
| "tasks": { | |
| "total": 0, | |
| "successful": 0, | |
| "failed": 0 | |
| }, | |
| "actions": { | |
| "total": 0, | |
| "successful": 0, | |
| "failed": 0, | |
| "types": {} | |
| }, | |
| "performance": { | |
| "avg_task_time": 0, | |
| "avg_action_time": 0 | |
| }, | |
| "errors": { | |
| "total": 0, | |
| "types": {} | |
| }, | |
| "resources": { | |
| "memory_usage": 0, | |
| "cpu_usage": 0 | |
| } | |
| } | |
| self.time_series = { | |
| "tasks": [], | |
| "actions": [], | |
| "errors": [] | |
| } | |
| self.session_start_time = time.time() | |
| logger.info("Metrics reset") | |
| async def start_periodic_reporting(self, interval: int = 300): | |
| """ | |
| Start periodic reporting of metrics. | |
| Args: | |
| interval: Reporting interval in seconds (default 5 minutes) | |
| """ | |
| while True: | |
| try: | |
| # Get metrics summary | |
| summary = self.get_metrics_summary() | |
| # Log summary | |
| logger.info(f"Metrics summary: Task success rate: {summary['summary']['task_success_rate']:.2f}, " + | |
| f"Action success rate: {summary['summary']['action_success_rate']:.2f}, " + | |
| f"Errors: {summary['errors']['total']}") | |
| # Push to Prometheus if enabled | |
| if self.prometheus_enabled: | |
| await self.push_metrics_to_prometheus() | |
| except Exception as e: | |
| logger.error(f"Error in periodic metrics reporting: {str(e)}") | |
| # Wait for next interval | |
| await asyncio.sleep(interval) | |
| def enable_metrics(self, enabled: bool = True): | |
| """ | |
| Enable or disable metrics collection. | |
| Args: | |
| enabled: Whether metrics should be enabled | |
| """ | |
| self.metrics_enabled = enabled | |
| logger.info(f"Metrics collection {'enabled' if enabled else 'disabled'}") | |
| async def shutdown(self): | |
| """Clean up resources.""" | |
| # Push final metrics if enabled | |
| if self.prometheus_enabled: | |
| try: | |
| await self.push_metrics_to_prometheus() | |
| except Exception as e: | |
| logger.error(f"Error pushing final metrics: {str(e)}") | |
| logger.info("MetricsCollector resources cleaned up") | |