import asyncio from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional from core.models import AgentConfig, Task, AgentMessage import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BaseAgent(ABC): """Abstract base class for all agents in the system""" def __init__(self, config: AgentConfig): self.config = config self.name = config.name self.enabled = config.enabled self.max_iterations = config.max_iterations self.timeout_seconds = config.timeout_seconds self.model_name = config.model_name self.message_queue = asyncio.Queue() self.tasks = [] async def run(self): """Main execution loop for the agent""" if not self.enabled: logger.info(f"Agent {self.name} is disabled, skipping execution") return logger.info(f"Starting agent: {self.name}") iteration = 0 while iteration < self.max_iterations: try: # Process any incoming messages await self.process_messages() # Execute agent-specific logic await self.execute() # Check for new tasks await self.check_tasks() iteration += 1 await asyncio.sleep(1) # Small delay to prevent busy waiting except Exception as e: logger.error(f"Error in agent {self.name}: {str(e)}") break logger.info(f"Agent {self.name} finished after {iteration} iterations") async def process_messages(self): """Process messages from other agents""" while not self.message_queue.empty(): message: AgentMessage = await self.message_queue.get() await self.handle_message(message) async def handle_message(self, message: AgentMessage): """Handle an incoming message""" logger.info(f"Agent {self.name} received message from {message.sender}: {message.content}") # Default implementation - override in subclasses as needed async def send_message(self, recipient: str, content: str, message_type: str = "info"): """Send a message to another agent""" message = AgentMessage( sender=self.name, recipient=recipient, content=content, message_type=message_type ) # In a real implementation, this would send to a message broker # For now, we'll just log it logger.info(f"Agent {self.name} sending message to {recipient}: {content}") async def add_task(self, task: Task): """Add a task to the agent's queue""" self.tasks.append(task) logger.info(f"Agent {self.name} added task: {task.description}") async def check_tasks(self): """Check and process any assigned tasks""" for task in self.tasks: if task.status == "pending" and task.assigned_agent == self.name: await self.execute_task(task) async def execute_task(self, task: Task): """Execute a specific task""" task.status = "running" logger.info(f"Agent {self.name} starting task: {task.description}") try: result = await self._execute_task_logic(task) task.status = "completed" task.completed_at = datetime.now() task.result = result logger.info(f"Agent {self.name} completed task: {task.description}") except Exception as e: task.status = "failed" logger.error(f"Agent {self.name} failed task {task.description}: {str(e)}") @abstractmethod async def execute(self): """Execute the agent's primary function - must be implemented by subclasses""" pass @abstractmethod async def _execute_task_logic(self, task: Task) -> Dict[str, Any]: """Execute the specific logic for a task - must be implemented by subclasses""" pass