xvadur's picture
Add complete Aethero_App and aethero_protocol directories
46f737d
from typing import Dict, Any, Optional, Callable
import logging
from datetime import datetime
import asyncio
from dataclasses import dataclass
@dataclass
class ErrorContext:
error: Exception
agent_id: str
task_id: str
pipeline_id: str
timestamp: str
additional_data: Dict[str, Any]
class AetheroError(Exception):
"""Base exception class for Aethero system errors."""
def __init__(self, message: str, error_code: str, context: Dict[str, Any]):
super().__init__(message)
self.error_code = error_code
self.context = context
self.timestamp = datetime.now().isoformat()
class AgentError(AetheroError):
"""Exception raised for errors in agent operations."""
pass
class TaskError(AetheroError):
"""Exception raised for errors in task processing."""
pass
class ErrorHandler:
def __init__(self, logger: Optional[logging.Logger] = None):
self.logger = logger or logging.getLogger('aethero_error_handler')
self.error_handlers: Dict[str, Callable] = {}
self.retry_policies: Dict[str, Dict[str, Any]] = {}
self.notification_callbacks: Dict[str, Callable] = []
def register_error_handler(self, error_type: str, handler: Callable) -> None:
"""Register a handler for a specific error type."""
self.error_handlers[error_type] = handler
def set_retry_policy(self, agent_id: str, policy: Dict[str, Any]) -> None:
"""Set retry policy for an agent."""
self.retry_policies[agent_id] = policy
def register_notification_callback(self, callback: Callable) -> None:
"""Register a callback for error notifications."""
self.notification_callbacks.append(callback)
async def handle_error(self, error_context: ErrorContext) -> Dict[str, Any]:
"""Handle an error with the appropriate strategy."""
error_type = type(error_context.error).__name__
# Log the error
self.logger.error(
f"Error in agent {error_context.agent_id}: {str(error_context.error)}",
extra={
"error_context": error_context.__dict__,
"error_type": error_type
}
)
# Check for specific handler
if error_type in self.error_handlers:
try:
return await self.error_handlers[error_type](error_context)
except Exception as e:
self.logger.error(f"Error handler failed: {str(e)}")
# Check retry policy
if error_context.agent_id in self.retry_policies:
return await self._handle_retry(error_context)
# Send notifications
await self._send_notifications(error_context)
# Return error response
return {
"status": "error",
"error_type": error_type,
"message": str(error_context.error),
"timestamp": error_context.timestamp,
"task_id": error_context.task_id,
"agent_id": error_context.agent_id
}
async def _handle_retry(self, error_context: ErrorContext) -> Dict[str, Any]:
"""Handle error retry based on policy."""
policy = self.retry_policies[error_context.agent_id]
max_retries = policy.get("max_retries", 3)
delay = policy.get("delay", 1)
current_retry = error_context.additional_data.get("retry_count", 0)
if current_retry < max_retries:
self.logger.info(
f"Retrying task {error_context.task_id} for agent {error_context.agent_id}. "
f"Attempt {current_retry + 1}/{max_retries}"
)
# Exponential backoff
retry_delay = delay * (2 ** current_retry)
await asyncio.sleep(retry_delay)
return {
"status": "retry",
"retry_count": current_retry + 1,
"next_retry_delay": retry_delay * 2,
"task_id": error_context.task_id
}
return {
"status": "error",
"message": "Max retries exceeded",
"task_id": error_context.task_id,
"agent_id": error_context.agent_id
}
async def _send_notifications(self, error_context: ErrorContext) -> None:
"""Send error notifications to registered callbacks."""
notification = {
"type": "error",
"agent_id": error_context.agent_id,
"task_id": error_context.task_id,
"error": str(error_context.error),
"timestamp": error_context.timestamp
}
for callback in self.notification_callbacks:
try:
await callback(notification)
except Exception as e:
self.logger.error(f"Notification callback failed: {str(e)}")
# Example usage
async def example_error_handler(error_context: ErrorContext) -> Dict[str, Any]:
"""Example error handler for demonstration."""
return {
"status": "handled",
"message": f"Handled {type(error_context.error).__name__}",
"task_id": error_context.task_id
}
async def example_notification(notification: Dict[str, Any]) -> None:
"""Example notification callback."""
print(f"Error notification: {notification}")
async def main():
# Configure logging
logging.basicConfig(level=logging.INFO)
# Create error handler
handler = ErrorHandler()
# Register handlers
handler.register_error_handler("ValueError", example_error_handler)
handler.register_notification_callback(example_notification)
# Set retry policy
handler.set_retry_policy("example_agent", {
"max_retries": 3,
"delay": 1
})
# Create error context
error_context = ErrorContext(
error=ValueError("Example error"),
agent_id="example_agent",
task_id="test_task_1",
pipeline_id="test_pipeline",
timestamp=datetime.now().isoformat(),
additional_data={"retry_count": 0}
)
# Handle error
result = await handler.handle_error(error_context)
print(f"Error handling result: {result}")
if __name__ == "__main__":
asyncio.run(main())