autoseo-engine / core /agent.py
Ahmed766's picture
Upload core/agent.py with huggingface_hub
2feedea verified
import asyncio
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from core.models import AgentConfig, Task, AgentMessage
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BaseAgent(ABC):
"""Abstract base class for all agents in the system"""
def __init__(self, config: AgentConfig):
self.config = config
self.name = config.name
self.enabled = config.enabled
self.max_iterations = config.max_iterations
self.timeout_seconds = config.timeout_seconds
self.model_name = config.model_name
self.message_queue = asyncio.Queue()
self.tasks = []
async def run(self):
"""Main execution loop for the agent"""
if not self.enabled:
logger.info(f"Agent {self.name} is disabled, skipping execution")
return
logger.info(f"Starting agent: {self.name}")
iteration = 0
while iteration < self.max_iterations:
try:
# Process any incoming messages
await self.process_messages()
# Execute agent-specific logic
await self.execute()
# Check for new tasks
await self.check_tasks()
iteration += 1
await asyncio.sleep(1) # Small delay to prevent busy waiting
except Exception as e:
logger.error(f"Error in agent {self.name}: {str(e)}")
break
logger.info(f"Agent {self.name} finished after {iteration} iterations")
async def process_messages(self):
"""Process messages from other agents"""
while not self.message_queue.empty():
message: AgentMessage = await self.message_queue.get()
await self.handle_message(message)
async def handle_message(self, message: AgentMessage):
"""Handle an incoming message"""
logger.info(f"Agent {self.name} received message from {message.sender}: {message.content}")
# Default implementation - override in subclasses as needed
async def send_message(self, recipient: str, content: str, message_type: str = "info"):
"""Send a message to another agent"""
message = AgentMessage(
sender=self.name,
recipient=recipient,
content=content,
message_type=message_type
)
# In a real implementation, this would send to a message broker
# For now, we'll just log it
logger.info(f"Agent {self.name} sending message to {recipient}: {content}")
async def add_task(self, task: Task):
"""Add a task to the agent's queue"""
self.tasks.append(task)
logger.info(f"Agent {self.name} added task: {task.description}")
async def check_tasks(self):
"""Check and process any assigned tasks"""
for task in self.tasks:
if task.status == "pending" and task.assigned_agent == self.name:
await self.execute_task(task)
async def execute_task(self, task: Task):
"""Execute a specific task"""
task.status = "running"
logger.info(f"Agent {self.name} starting task: {task.description}")
try:
result = await self._execute_task_logic(task)
task.status = "completed"
task.completed_at = datetime.now()
task.result = result
logger.info(f"Agent {self.name} completed task: {task.description}")
except Exception as e:
task.status = "failed"
logger.error(f"Agent {self.name} failed task {task.description}: {str(e)}")
@abstractmethod
async def execute(self):
"""Execute the agent's primary function - must be implemented by subclasses"""
pass
@abstractmethod
async def _execute_task_logic(self, task: Task) -> Dict[str, Any]:
"""Execute the specific logic for a task - must be implemented by subclasses"""
pass