autoseo-engine / main.py
Ahmed766's picture
Upload main.py with huggingface_hub
3d51c46 verified
import asyncio
from typing import Dict, List, Optional
from core.agent import BaseAgent
from core.models import AgentConfig, Task, AgentMessage, SEOData
from core.monetization import MonetizationEngine
import logging
import json
import os
from datetime import datetime
logger = logging.getLogger(__name__)
class MessageBroker:
"""Central message broker for agent communication"""
def __init__(self):
self.agents = {}
self.message_queues = {}
def register_agent(self, agent_name: str, agent_instance):
"""Register an agent with the message broker"""
self.agents[agent_name] = agent_instance
self.message_queues[agent_name] = asyncio.Queue()
async def send_message(self, sender: str, recipient: str, content: str, message_type: str = "info"):
"""Send a message from one agent to another"""
if recipient in self.message_queues:
message = AgentMessage(
sender=sender,
recipient=recipient,
content=content,
message_type=message_type
)
await self.message_queues[recipient].put(message)
logger.info(f"Message sent from {sender} to {recipient}: {content[:50]}...")
else:
logger.warning(f"Recipient {recipient} not found in message queues")
async def get_message(self, agent_name: str) -> Optional[AgentMessage]:
"""Get a message for an agent"""
if agent_name in self.message_queues:
if not self.message_queues[agent_name].empty():
return await self.message_queues[agent_name].get()
return None
class AutoSEOOrchestrator:
"""Main orchestrator for the AutoSEO Engine system"""
def __init__(self):
self.agents: Dict[str, BaseAgent] = {}
self.message_broker = MessageBroker()
self.monetization_engine = MonetizationEngine()
self.running = False
self.stats = {
"start_time": None,
"tasks_completed": 0,
"revenue_generated": 0,
"customers_served": 0
}
def initialize_agents(self):
"""Initialize all agents in the system"""
logger.info("Initializing AutoSEO Engine agents...")
# Import agents here to avoid circular dependencies
from agents.ceo_agent import CEOStrategyAgent
from agents.seo_director_agent import SEODirectorAgent
from agents.tech_seo_agent import TechnicalSEOAgent
from agents.content_seo_agent import ContentSEOSemanticAgent
from agents.programmatic_seo_agent import ProgrammaticSEOAgent
from agents.link_authority_agent import LinkAuthorityOutreachAgent
from agents.conversion_cro_agent import ConversionCROAgent
from agents.client_management_agent import ClientManagementAgent
from agents.automation_ops_agent import AutomationOpsAgent
from agents.self_improvement_agent import SelfImprovementAgent
# Define configurations for each agent
configs = {
"ceo_strategy": AgentConfig(name="ceo_strategy", enabled=True, max_iterations=5),
"seo_director": AgentConfig(name="seo_director", enabled=True, max_iterations=5),
"tech_seo": AgentConfig(name="tech_seo", enabled=True, max_iterations=5),
"content_seo": AgentConfig(name="content_seo", enabled=True, max_iterations=5),
"programmatic_seo": AgentConfig(name="programmatic_seo", enabled=True, max_iterations=5),
"link_authority": AgentConfig(name="link_authority", enabled=True, max_iterations=5),
"conversion_cro": AgentConfig(name="conversion_cro", enabled=True, max_iterations=5),
"client_management": AgentConfig(name="client_management", enabled=True, max_iterations=5),
"automation_ops": AgentConfig(name="automation_ops", enabled=True, max_iterations=5),
"self_improvement": AgentConfig(name="self_improvement", enabled=True, max_iterations=5),
}
# Create agent instances
agent_classes = {
"ceo_strategy": CEOStrategyAgent,
"seo_director": SEODirectorAgent,
"tech_seo": TechnicalSEOAgent,
"content_seo": ContentSEOSemanticAgent,
"programmatic_seo": ProgrammaticSEOAgent,
"link_authority": LinkAuthorityOutreachAgent,
"conversion_cro": ConversionCROAgent,
"client_management": ClientManagementAgent,
"automation_ops": AutomationOpsAgent,
"self_improvement": SelfImprovementAgent,
}
for name, agent_class in agent_classes.items():
agent = agent_class(configs[name])
self.agents[name] = agent
self.message_broker.register_agent(name, agent)
logger.info(f"Initialized {len(self.agents)} agents")
async def start(self):
"""Start the AutoSEO Engine system"""
logger.info("Starting AutoSEO Engine...")
self.running = True
self.stats["start_time"] = datetime.now()
# Initialize agents
self.initialize_agents()
# Add sample customers to monetization engine
await self.monetization_engine.add_customer({"name": "Acme Corp", "tier": "professional"})
await self.monetization_engine.add_customer({"name": "Globex Inc", "tier": "enterprise"})
await self.monetization_engine.add_customer({"name": "Wayne Enterprises", "tier": "starter"})
# Create tasks for all agents
agent_tasks = []
for agent_name, agent in self.agents.items():
task = asyncio.create_task(self.run_agent(agent_name, agent))
agent_tasks.append(task)
# Create task for message processing
message_task = asyncio.create_task(self.process_messages())
# Create task for monetization processing
monetization_task = asyncio.create_task(self.process_monetization())
logger.info("All agents started. AutoSEO Engine is running.")
try:
# Wait for all tasks to complete
all_tasks = agent_tasks + [message_task, monetization_task]
await asyncio.gather(*all_tasks)
except KeyboardInterrupt:
logger.info("Received interrupt signal. Shutting down...")
finally:
await self.shutdown()
async def run_agent(self, agent_name: str, agent: BaseAgent):
"""Run a single agent"""
try:
# Replace the agent's send_message method with the message broker's
original_send = agent.send_message
async def new_send(recipient: str, content: str, message_type: str = "info"):
await self.message_broker.send_message(agent_name, recipient, content, message_type)
agent.send_message = new_send
# Replace the agent's message queue with the broker's
agent.message_queue = self.message_broker.message_queues[agent_name]
await agent.run()
except Exception as e:
logger.error(f"Error running agent {agent_name}: {str(e)}")
async def process_messages(self):
"""Process messages between agents"""
while self.running:
for agent_name in self.agents.keys():
message = await self.message_broker.get_message(agent_name)
if message:
# Handle special message types
if message.message_type == "task_completed":
self.stats["tasks_completed"] += 1
elif message.content.startswith("Revenue attribution:"):
# Extract revenue from message
try:
revenue_str = message.content.split("$")[1].split(",")[0]
revenue = float(revenue_str.replace(",", ""))
self.stats["revenue_generated"] += revenue
except:
pass
await asyncio.sleep(0.1) # Small delay to prevent busy waiting
async def process_monetization(self):
"""Process monetization tasks"""
while self.running:
# Process billing cycle every hour (simulated)
await self.monetization_engine.process_billing_cycle()
logger.info(f"Monetization stats: {self.monetization_engine.revenue_data}")
await asyncio.sleep(3600) # 1 hour
async def get_system_stats(self) -> Dict:
"""Get system statistics"""
mrr = await self.monetization_engine.calculate_mrr()
return {
**self.stats,
"mrr": mrr,
"uptime_minutes": (datetime.now() - self.stats["start_time"]).total_seconds() / 60 if self.stats["start_time"] else 0,
"active_agents": len([a for a in self.agents.values() if a.enabled]),
"total_customers": len(self.monetization_engine.customers)
}
async def shutdown(self):
"""Shut down the AutoSEO Engine system"""
logger.info("Shutting down AutoSEO Engine...")
self.running = False
# Wait a moment for graceful shutdown
await asyncio.sleep(1)
# Print final stats
final_stats = await self.get_system_stats()
logger.info(f"Final system stats: {final_stats}")
logger.info("AutoSEO Engine has shut down.")
async def main():
orchestrator = AutoSEOOrchestrator()
await orchestrator.start()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Application interrupted by user")
import sys
sys.exit(0)