xvadur's picture
Add complete Aethero_App and aethero_protocol directories
46f737d
import asyncio
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
import json
from dataclasses import dataclass
import psutil
import os
@dataclass
class SystemMetrics:
cpu_percent: float
memory_percent: float
disk_usage: Dict[str, float]
timestamp: str = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now().isoformat()
def to_dict(self) -> Dict[str, Any]:
return {
"cpu_percent": self.cpu_percent,
"memory_percent": self.memory_percent,
"disk_usage": self.disk_usage,
"timestamp": self.timestamp
}
@dataclass
class AgentMetrics:
agent_id: str
status: str
tasks_processed: int
errors_count: int
avg_processing_time: float
last_active: str
memory_usage: float
cpu_usage: float
def to_dict(self) -> Dict[str, Any]:
return {
"agent_id": self.agent_id,
"status": self.status,
"tasks_processed": self.tasks_processed,
"errors_count": self.errors_count,
"avg_processing_time": self.avg_processing_time,
"last_active": self.last_active,
"memory_usage": self.memory_usage,
"cpu_usage": self.cpu_usage
}
class AetheroMonitor:
def __init__(self, logger: Optional[logging.Logger] = None):
self.logger = logger or logging.getLogger('aethero_monitor')
self.agent_metrics: Dict[str, AgentMetrics] = {}
self.system_metrics: List[SystemMetrics] = []
self.alert_thresholds = {
"cpu_percent": 80.0,
"memory_percent": 80.0,
"disk_percent": 90.0
}
self.alert_callbacks: List[callable] = []
self.running = True
async def start_monitoring(self, interval: int = 60):
"""Start the monitoring loop."""
self.logger.info("Starting Aethero monitoring system")
while self.running:
try:
await self.collect_metrics()
await asyncio.sleep(interval)
except Exception as e:
self.logger.error(f"Error in monitoring loop: {str(e)}")
await asyncio.sleep(5) # Brief pause before retry
async def collect_metrics(self):
"""Collect system and agent metrics."""
# System metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
metrics = SystemMetrics(
cpu_percent=cpu_percent,
memory_percent=memory.percent,
disk_usage={
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": disk.percent
}
)
self.system_metrics.append(metrics)
self.logger.info(f"Collected system metrics: {json.dumps(metrics.to_dict())}")
# Check thresholds and alert if necessary
await self._check_alerts(metrics)
def update_agent_metrics(self, agent_id: str, metrics: Dict[str, Any]):
"""Update metrics for a specific agent."""
self.agent_metrics[agent_id] = AgentMetrics(
agent_id=agent_id,
status=metrics.get("status", "unknown"),
tasks_processed=metrics.get("tasks_processed", 0),
errors_count=metrics.get("errors_count", 0),
avg_processing_time=metrics.get("avg_processing_time", 0.0),
last_active=metrics.get("last_active", datetime.now().isoformat()),
memory_usage=metrics.get("memory_usage", 0.0),
cpu_usage=metrics.get("cpu_usage", 0.0)
)
self.logger.info(f"Updated metrics for agent {agent_id}")
def get_system_metrics(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""Get system metrics history."""
metrics = self.system_metrics
if limit:
metrics = metrics[-limit:]
return [m.to_dict() for m in metrics]
def get_agent_metrics(self, agent_id: Optional[str] = None) -> Dict[str, Any]:
"""Get metrics for a specific agent or all agents."""
if agent_id:
return self.agent_metrics.get(agent_id, {}).to_dict()
return {aid: metrics.to_dict() for aid, metrics in self.agent_metrics.items()}
def add_alert_callback(self, callback: callable):
"""Add a callback for alerts."""
self.alert_callbacks.append(callback)
async def _check_alerts(self, metrics: SystemMetrics):
"""Check metrics against thresholds and trigger alerts if necessary."""
alerts = []
if metrics.cpu_percent > self.alert_thresholds["cpu_percent"]:
alerts.append(f"High CPU usage: {metrics.cpu_percent}%")
if metrics.memory_percent > self.alert_thresholds["memory_percent"]:
alerts.append(f"High memory usage: {metrics.memory_percent}%")
if metrics.disk_usage["percent"] > self.alert_thresholds["disk_percent"]:
alerts.append(f"High disk usage: {metrics.disk_usage['percent']}%")
if alerts:
alert_data = {
"timestamp": datetime.now().isoformat(),
"alerts": alerts,
"metrics": metrics.to_dict()
}
for callback in self.alert_callbacks:
try:
await callback(alert_data)
except Exception as e:
self.logger.error(f"Alert callback failed: {str(e)}")
# Example usage
async def example_alert_callback(alert_data: Dict[str, Any]):
"""Example alert callback."""
print(f"ALERT: {json.dumps(alert_data, indent=2)}")
async def main():
# Configure logging
logging.basicConfig(level=logging.INFO)
# Create monitor
monitor = AetheroMonitor()
# Add alert callback
monitor.add_alert_callback(example_alert_callback)
# Start monitoring in background
monitoring_task = asyncio.create_task(monitor.start_monitoring(interval=5))
# Simulate some agent metrics updates
monitor.update_agent_metrics("agent1", {
"status": "active",
"tasks_processed": 10,
"errors_count": 0,
"avg_processing_time": 0.5,
"memory_usage": 45.2,
"cpu_usage": 12.3
})
# Wait for some metrics collection
await asyncio.sleep(10)
# Get metrics
system_metrics = monitor.get_system_metrics(limit=5)
agent_metrics = monitor.get_agent_metrics()
print("\nSystem Metrics:")
print(json.dumps(system_metrics, indent=2))
print("\nAgent Metrics:")
print(json.dumps(agent_metrics, indent=2))
# Stop monitoring
monitor.running = False
await monitoring_task
if __name__ == "__main__":
asyncio.run(main())