Spaces:
Configuration error
Configuration error
| from typing import Dict, Any, Optional, Callable | |
| import logging | |
| from datetime import datetime | |
| import asyncio | |
| from dataclasses import dataclass | |
| class ErrorContext: | |
| error: Exception | |
| agent_id: str | |
| task_id: str | |
| pipeline_id: str | |
| timestamp: str | |
| additional_data: Dict[str, Any] | |
| class AetheroError(Exception): | |
| """Base exception class for Aethero system errors.""" | |
| def __init__(self, message: str, error_code: str, context: Dict[str, Any]): | |
| super().__init__(message) | |
| self.error_code = error_code | |
| self.context = context | |
| self.timestamp = datetime.now().isoformat() | |
| class AgentError(AetheroError): | |
| """Exception raised for errors in agent operations.""" | |
| pass | |
| class TaskError(AetheroError): | |
| """Exception raised for errors in task processing.""" | |
| pass | |
| class ErrorHandler: | |
| def __init__(self, logger: Optional[logging.Logger] = None): | |
| self.logger = logger or logging.getLogger('aethero_error_handler') | |
| self.error_handlers: Dict[str, Callable] = {} | |
| self.retry_policies: Dict[str, Dict[str, Any]] = {} | |
| self.notification_callbacks: Dict[str, Callable] = [] | |
| def register_error_handler(self, error_type: str, handler: Callable) -> None: | |
| """Register a handler for a specific error type.""" | |
| self.error_handlers[error_type] = handler | |
| def set_retry_policy(self, agent_id: str, policy: Dict[str, Any]) -> None: | |
| """Set retry policy for an agent.""" | |
| self.retry_policies[agent_id] = policy | |
| def register_notification_callback(self, callback: Callable) -> None: | |
| """Register a callback for error notifications.""" | |
| self.notification_callbacks.append(callback) | |
| async def handle_error(self, error_context: ErrorContext) -> Dict[str, Any]: | |
| """Handle an error with the appropriate strategy.""" | |
| error_type = type(error_context.error).__name__ | |
| # Log the error | |
| self.logger.error( | |
| f"Error in agent {error_context.agent_id}: {str(error_context.error)}", | |
| extra={ | |
| "error_context": error_context.__dict__, | |
| "error_type": error_type | |
| } | |
| ) | |
| # Check for specific handler | |
| if error_type in self.error_handlers: | |
| try: | |
| return await self.error_handlers[error_type](error_context) | |
| except Exception as e: | |
| self.logger.error(f"Error handler failed: {str(e)}") | |
| # Check retry policy | |
| if error_context.agent_id in self.retry_policies: | |
| return await self._handle_retry(error_context) | |
| # Send notifications | |
| await self._send_notifications(error_context) | |
| # Return error response | |
| return { | |
| "status": "error", | |
| "error_type": error_type, | |
| "message": str(error_context.error), | |
| "timestamp": error_context.timestamp, | |
| "task_id": error_context.task_id, | |
| "agent_id": error_context.agent_id | |
| } | |
| async def _handle_retry(self, error_context: ErrorContext) -> Dict[str, Any]: | |
| """Handle error retry based on policy.""" | |
| policy = self.retry_policies[error_context.agent_id] | |
| max_retries = policy.get("max_retries", 3) | |
| delay = policy.get("delay", 1) | |
| current_retry = error_context.additional_data.get("retry_count", 0) | |
| if current_retry < max_retries: | |
| self.logger.info( | |
| f"Retrying task {error_context.task_id} for agent {error_context.agent_id}. " | |
| f"Attempt {current_retry + 1}/{max_retries}" | |
| ) | |
| # Exponential backoff | |
| retry_delay = delay * (2 ** current_retry) | |
| await asyncio.sleep(retry_delay) | |
| return { | |
| "status": "retry", | |
| "retry_count": current_retry + 1, | |
| "next_retry_delay": retry_delay * 2, | |
| "task_id": error_context.task_id | |
| } | |
| return { | |
| "status": "error", | |
| "message": "Max retries exceeded", | |
| "task_id": error_context.task_id, | |
| "agent_id": error_context.agent_id | |
| } | |
| async def _send_notifications(self, error_context: ErrorContext) -> None: | |
| """Send error notifications to registered callbacks.""" | |
| notification = { | |
| "type": "error", | |
| "agent_id": error_context.agent_id, | |
| "task_id": error_context.task_id, | |
| "error": str(error_context.error), | |
| "timestamp": error_context.timestamp | |
| } | |
| for callback in self.notification_callbacks: | |
| try: | |
| await callback(notification) | |
| except Exception as e: | |
| self.logger.error(f"Notification callback failed: {str(e)}") | |
| # Example usage | |
| async def example_error_handler(error_context: ErrorContext) -> Dict[str, Any]: | |
| """Example error handler for demonstration.""" | |
| return { | |
| "status": "handled", | |
| "message": f"Handled {type(error_context.error).__name__}", | |
| "task_id": error_context.task_id | |
| } | |
| async def example_notification(notification: Dict[str, Any]) -> None: | |
| """Example notification callback.""" | |
| print(f"Error notification: {notification}") | |
| async def main(): | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| # Create error handler | |
| handler = ErrorHandler() | |
| # Register handlers | |
| handler.register_error_handler("ValueError", example_error_handler) | |
| handler.register_notification_callback(example_notification) | |
| # Set retry policy | |
| handler.set_retry_policy("example_agent", { | |
| "max_retries": 3, | |
| "delay": 1 | |
| }) | |
| # Create error context | |
| error_context = ErrorContext( | |
| error=ValueError("Example error"), | |
| agent_id="example_agent", | |
| task_id="test_task_1", | |
| pipeline_id="test_pipeline", | |
| timestamp=datetime.now().isoformat(), | |
| additional_data={"retry_count": 0} | |
| ) | |
| # Handle error | |
| result = await handler.handle_error(error_context) | |
| print(f"Error handling result: {result}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |