| import asyncio |
| from abc import ABC, abstractmethod |
| from typing import Dict, List, Any, Optional |
| from core.models import AgentConfig, Task, AgentMessage |
| import logging |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class BaseAgent(ABC): |
| """Abstract base class for all agents in the system""" |
| |
| def __init__(self, config: AgentConfig): |
| self.config = config |
| self.name = config.name |
| self.enabled = config.enabled |
| self.max_iterations = config.max_iterations |
| self.timeout_seconds = config.timeout_seconds |
| self.model_name = config.model_name |
| self.message_queue = asyncio.Queue() |
| self.tasks = [] |
| |
| async def run(self): |
| """Main execution loop for the agent""" |
| if not self.enabled: |
| logger.info(f"Agent {self.name} is disabled, skipping execution") |
| return |
| |
| logger.info(f"Starting agent: {self.name}") |
| iteration = 0 |
| |
| while iteration < self.max_iterations: |
| try: |
| |
| await self.process_messages() |
| |
| |
| await self.execute() |
| |
| |
| await self.check_tasks() |
| |
| iteration += 1 |
| await asyncio.sleep(1) |
| |
| except Exception as e: |
| logger.error(f"Error in agent {self.name}: {str(e)}") |
| break |
| |
| logger.info(f"Agent {self.name} finished after {iteration} iterations") |
| |
| async def process_messages(self): |
| """Process messages from other agents""" |
| while not self.message_queue.empty(): |
| message: AgentMessage = await self.message_queue.get() |
| await self.handle_message(message) |
| |
| async def handle_message(self, message: AgentMessage): |
| """Handle an incoming message""" |
| logger.info(f"Agent {self.name} received message from {message.sender}: {message.content}") |
| |
| |
| async def send_message(self, recipient: str, content: str, message_type: str = "info"): |
| """Send a message to another agent""" |
| message = AgentMessage( |
| sender=self.name, |
| recipient=recipient, |
| content=content, |
| message_type=message_type |
| ) |
| |
| |
| logger.info(f"Agent {self.name} sending message to {recipient}: {content}") |
| |
| async def add_task(self, task: Task): |
| """Add a task to the agent's queue""" |
| self.tasks.append(task) |
| logger.info(f"Agent {self.name} added task: {task.description}") |
| |
| async def check_tasks(self): |
| """Check and process any assigned tasks""" |
| for task in self.tasks: |
| if task.status == "pending" and task.assigned_agent == self.name: |
| await self.execute_task(task) |
| |
| async def execute_task(self, task: Task): |
| """Execute a specific task""" |
| task.status = "running" |
| logger.info(f"Agent {self.name} starting task: {task.description}") |
| |
| try: |
| result = await self._execute_task_logic(task) |
| task.status = "completed" |
| task.completed_at = datetime.now() |
| task.result = result |
| logger.info(f"Agent {self.name} completed task: {task.description}") |
| except Exception as e: |
| task.status = "failed" |
| logger.error(f"Agent {self.name} failed task {task.description}: {str(e)}") |
| |
| @abstractmethod |
| async def execute(self): |
| """Execute the agent's primary function - must be implemented by subclasses""" |
| pass |
| |
| @abstractmethod |
| async def _execute_task_logic(self, task: Task) -> Dict[str, Any]: |
| """Execute the specific logic for a task - must be implemented by subclasses""" |
| pass |