import asyncio from typing import Dict, List, Optional from core.agent import BaseAgent from core.models import AgentConfig, Task, AgentMessage, SEOData from core.monetization import MonetizationEngine import logging import json import os from datetime import datetime logger = logging.getLogger(__name__) class MessageBroker: """Central message broker for agent communication""" def __init__(self): self.agents = {} self.message_queues = {} def register_agent(self, agent_name: str, agent_instance): """Register an agent with the message broker""" self.agents[agent_name] = agent_instance self.message_queues[agent_name] = asyncio.Queue() async def send_message(self, sender: str, recipient: str, content: str, message_type: str = "info"): """Send a message from one agent to another""" if recipient in self.message_queues: message = AgentMessage( sender=sender, recipient=recipient, content=content, message_type=message_type ) await self.message_queues[recipient].put(message) logger.info(f"Message sent from {sender} to {recipient}: {content[:50]}...") else: logger.warning(f"Recipient {recipient} not found in message queues") async def get_message(self, agent_name: str) -> Optional[AgentMessage]: """Get a message for an agent""" if agent_name in self.message_queues: if not self.message_queues[agent_name].empty(): return await self.message_queues[agent_name].get() return None class AutoSEOOrchestrator: """Main orchestrator for the AutoSEO Engine system""" def __init__(self): self.agents: Dict[str, BaseAgent] = {} self.message_broker = MessageBroker() self.monetization_engine = MonetizationEngine() self.running = False self.stats = { "start_time": None, "tasks_completed": 0, "revenue_generated": 0, "customers_served": 0 } def initialize_agents(self): """Initialize all agents in the system""" logger.info("Initializing AutoSEO Engine agents...") # Import agents here to avoid circular dependencies from agents.ceo_agent import CEOStrategyAgent from agents.seo_director_agent import SEODirectorAgent from agents.tech_seo_agent import TechnicalSEOAgent from agents.content_seo_agent import ContentSEOSemanticAgent from agents.programmatic_seo_agent import ProgrammaticSEOAgent from agents.link_authority_agent import LinkAuthorityOutreachAgent from agents.conversion_cro_agent import ConversionCROAgent from agents.client_management_agent import ClientManagementAgent from agents.automation_ops_agent import AutomationOpsAgent from agents.self_improvement_agent import SelfImprovementAgent # Define configurations for each agent configs = { "ceo_strategy": AgentConfig(name="ceo_strategy", enabled=True, max_iterations=5), "seo_director": AgentConfig(name="seo_director", enabled=True, max_iterations=5), "tech_seo": AgentConfig(name="tech_seo", enabled=True, max_iterations=5), "content_seo": AgentConfig(name="content_seo", enabled=True, max_iterations=5), "programmatic_seo": AgentConfig(name="programmatic_seo", enabled=True, max_iterations=5), "link_authority": AgentConfig(name="link_authority", enabled=True, max_iterations=5), "conversion_cro": AgentConfig(name="conversion_cro", enabled=True, max_iterations=5), "client_management": AgentConfig(name="client_management", enabled=True, max_iterations=5), "automation_ops": AgentConfig(name="automation_ops", enabled=True, max_iterations=5), "self_improvement": AgentConfig(name="self_improvement", enabled=True, max_iterations=5), } # Create agent instances agent_classes = { "ceo_strategy": CEOStrategyAgent, "seo_director": SEODirectorAgent, "tech_seo": TechnicalSEOAgent, "content_seo": ContentSEOSemanticAgent, "programmatic_seo": ProgrammaticSEOAgent, "link_authority": LinkAuthorityOutreachAgent, "conversion_cro": ConversionCROAgent, "client_management": ClientManagementAgent, "automation_ops": AutomationOpsAgent, "self_improvement": SelfImprovementAgent, } for name, agent_class in agent_classes.items(): agent = agent_class(configs[name]) self.agents[name] = agent self.message_broker.register_agent(name, agent) logger.info(f"Initialized {len(self.agents)} agents") async def start(self): """Start the AutoSEO Engine system""" logger.info("Starting AutoSEO Engine...") self.running = True self.stats["start_time"] = datetime.now() # Initialize agents self.initialize_agents() # Add sample customers to monetization engine await self.monetization_engine.add_customer({"name": "Acme Corp", "tier": "professional"}) await self.monetization_engine.add_customer({"name": "Globex Inc", "tier": "enterprise"}) await self.monetization_engine.add_customer({"name": "Wayne Enterprises", "tier": "starter"}) # Create tasks for all agents agent_tasks = [] for agent_name, agent in self.agents.items(): task = asyncio.create_task(self.run_agent(agent_name, agent)) agent_tasks.append(task) # Create task for message processing message_task = asyncio.create_task(self.process_messages()) # Create task for monetization processing monetization_task = asyncio.create_task(self.process_monetization()) logger.info("All agents started. AutoSEO Engine is running.") try: # Wait for all tasks to complete all_tasks = agent_tasks + [message_task, monetization_task] await asyncio.gather(*all_tasks) except KeyboardInterrupt: logger.info("Received interrupt signal. Shutting down...") finally: await self.shutdown() async def run_agent(self, agent_name: str, agent: BaseAgent): """Run a single agent""" try: # Replace the agent's send_message method with the message broker's original_send = agent.send_message async def new_send(recipient: str, content: str, message_type: str = "info"): await self.message_broker.send_message(agent_name, recipient, content, message_type) agent.send_message = new_send # Replace the agent's message queue with the broker's agent.message_queue = self.message_broker.message_queues[agent_name] await agent.run() except Exception as e: logger.error(f"Error running agent {agent_name}: {str(e)}") async def process_messages(self): """Process messages between agents""" while self.running: for agent_name in self.agents.keys(): message = await self.message_broker.get_message(agent_name) if message: # Handle special message types if message.message_type == "task_completed": self.stats["tasks_completed"] += 1 elif message.content.startswith("Revenue attribution:"): # Extract revenue from message try: revenue_str = message.content.split("$")[1].split(",")[0] revenue = float(revenue_str.replace(",", "")) self.stats["revenue_generated"] += revenue except: pass await asyncio.sleep(0.1) # Small delay to prevent busy waiting async def process_monetization(self): """Process monetization tasks""" while self.running: # Process billing cycle every hour (simulated) await self.monetization_engine.process_billing_cycle() logger.info(f"Monetization stats: {self.monetization_engine.revenue_data}") await asyncio.sleep(3600) # 1 hour async def get_system_stats(self) -> Dict: """Get system statistics""" mrr = await self.monetization_engine.calculate_mrr() return { **self.stats, "mrr": mrr, "uptime_minutes": (datetime.now() - self.stats["start_time"]).total_seconds() / 60 if self.stats["start_time"] else 0, "active_agents": len([a for a in self.agents.values() if a.enabled]), "total_customers": len(self.monetization_engine.customers) } async def shutdown(self): """Shut down the AutoSEO Engine system""" logger.info("Shutting down AutoSEO Engine...") self.running = False # Wait a moment for graceful shutdown await asyncio.sleep(1) # Print final stats final_stats = await self.get_system_stats() logger.info(f"Final system stats: {final_stats}") logger.info("AutoSEO Engine has shut down.") async def main(): orchestrator = AutoSEOOrchestrator() await orchestrator.start() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Application interrupted by user") import sys sys.exit(0)