Spaces:
Build error
Build error
| """ | |
| Unified Architecture Example | |
| This example demonstrates the comprehensive Phase 3 Unified Architecture | |
| for Hybrid Agent System and Multi-Agent Collaboration Platform. | |
| """ | |
| import asyncio | |
| import logging | |
| import time | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Any | |
| from uuid import uuid4 | |
| from src.unified_architecture import ( | |
| # Core interfaces | |
| AgentCapability, AgentStatus, AgentMetadata, | |
| UnifiedTask, TaskResult, IUnifiedAgent, | |
| # Platform components | |
| MultiAgentPlatform, PlatformConfig, | |
| # Communication | |
| AgentMessage, MessageType, | |
| # Memory and collaboration | |
| MemoryEntry, MemoryType, | |
| # Marketplace | |
| AgentListing, ListingStatus, | |
| # Performance and monitoring | |
| PlatformStats | |
| ) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ExampleAgent(IUnifiedAgent): | |
| """Example agent implementation for demonstration""" | |
| def __init__(self, agent_id: str, name: str, capabilities: List[AgentCapability]): | |
| self.agent_id = agent_id | |
| self.name = name | |
| self.capabilities = capabilities | |
| self.status = AgentStatus.IDLE | |
| self.logger = logging.getLogger(f"Agent-{name}") | |
| async def execute_task(self, task: UnifiedTask) -> TaskResult: | |
| """Execute a task""" | |
| try: | |
| self.status = AgentStatus.BUSY | |
| self.logger.info(f"Executing task: {task.description}") | |
| # Simulate task execution | |
| await asyncio.sleep(2) | |
| # Generate result based on task type | |
| if "analysis" in task.description.lower(): | |
| result = TaskResult( | |
| task_id=task.id, | |
| success=True, | |
| data={"analysis": f"Analysis completed for {task.description}"}, | |
| metadata={"execution_time": 2.0, "agent": self.name} | |
| ) | |
| elif "processing" in task.description.lower(): | |
| result = TaskResult( | |
| task_id=task.id, | |
| success=True, | |
| data={"processed_data": f"Processed: {task.description}"}, | |
| metadata={"execution_time": 2.0, "agent": self.name} | |
| ) | |
| else: | |
| result = TaskResult( | |
| task_id=task.id, | |
| success=True, | |
| data={"result": f"Task completed: {task.description}"}, | |
| metadata={"execution_time": 2.0, "agent": self.name} | |
| ) | |
| self.status = AgentStatus.IDLE | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"Task execution failed: {e}") | |
| self.status = AgentStatus.IDLE | |
| return TaskResult( | |
| task_id=task.id, | |
| success=False, | |
| error=str(e), | |
| metadata={"agent": self.name} | |
| ) | |
| async def get_status(self) -> AgentStatus: | |
| """Get current agent status""" | |
| return self.status | |
| async def get_capabilities(self) -> List[AgentCapability]: | |
| """Get agent capabilities""" | |
| return self.capabilities | |
| class DataAnalysisAgent(ExampleAgent): | |
| """Specialized agent for data analysis tasks""" | |
| def __init__(self, agent_id: str): | |
| super().__init__( | |
| agent_id=agent_id, | |
| name="DataAnalysisAgent", | |
| capabilities=[AgentCapability.DATA_ANALYSIS, AgentCapability.MACHINE_LEARNING] | |
| ) | |
| async def execute_task(self, task: UnifiedTask) -> TaskResult: | |
| """Execute data analysis task""" | |
| try: | |
| self.status = AgentStatus.BUSY | |
| self.logger.info(f"Performing data analysis: {task.description}") | |
| # Simulate data analysis | |
| await asyncio.sleep(3) | |
| result = TaskResult( | |
| task_id=task.id, | |
| success=True, | |
| data={ | |
| "analysis_type": "statistical", | |
| "insights": ["Trend identified", "Anomaly detected", "Correlation found"], | |
| "recommendations": ["Increase monitoring", "Optimize parameters"] | |
| }, | |
| metadata={"execution_time": 3.0, "agent": self.name} | |
| ) | |
| self.status = AgentStatus.IDLE | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"Data analysis failed: {e}") | |
| self.status = AgentStatus.IDLE | |
| return TaskResult( | |
| task_id=task.id, | |
| success=False, | |
| error=str(e), | |
| metadata={"agent": self.name} | |
| ) | |
| class ProcessingAgent(ExampleAgent): | |
| """Specialized agent for data processing tasks""" | |
| def __init__(self, agent_id: str): | |
| super().__init__( | |
| agent_id=agent_id, | |
| name="ProcessingAgent", | |
| capabilities=[AgentCapability.DATA_PROCESSING, AgentCapability.FILE_OPERATIONS] | |
| ) | |
| async def execute_task(self, task: UnifiedTask) -> TaskResult: | |
| """Execute data processing task""" | |
| try: | |
| self.status = AgentStatus.BUSY | |
| self.logger.info(f"Processing data: {task.description}") | |
| # Simulate data processing | |
| await asyncio.sleep(2) | |
| result = TaskResult( | |
| task_id=task.id, | |
| success=True, | |
| data={ | |
| "processed_files": 5, | |
| "data_volume": "2.5GB", | |
| "format": "parquet", | |
| "quality_score": 0.95 | |
| }, | |
| metadata={"execution_time": 2.0, "agent": self.name} | |
| ) | |
| self.status = AgentStatus.IDLE | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"Data processing failed: {e}") | |
| self.status = AgentStatus.IDLE | |
| return TaskResult( | |
| task_id=task.id, | |
| success=False, | |
| error=str(e), | |
| metadata={"agent": self.name} | |
| ) | |
| class CollaborationAgent(ExampleAgent): | |
| """Specialized agent for collaboration and coordination""" | |
| def __init__(self, agent_id: str): | |
| super().__init__( | |
| agent_id=agent_id, | |
| name="CollaborationAgent", | |
| capabilities=[AgentCapability.COLLABORATION, AgentCapability.COORDINATION] | |
| ) | |
| async def execute_task(self, task: UnifiedTask) -> TaskResult: | |
| """Execute collaboration task""" | |
| try: | |
| self.status = AgentStatus.BUSY | |
| self.logger.info(f"Coordinating collaboration: {task.description}") | |
| # Simulate collaboration coordination | |
| await asyncio.sleep(1) | |
| result = TaskResult( | |
| task_id=task.id, | |
| success=True, | |
| data={ | |
| "collaboration_type": "multi_agent", | |
| "participants": 3, | |
| "coordination_method": "distributed", | |
| "efficiency_gain": 0.25 | |
| }, | |
| metadata={"execution_time": 1.0, "agent": self.name} | |
| ) | |
| self.status = AgentStatus.IDLE | |
| return result | |
| except Exception as e: | |
| self.logger.error(f"Collaboration failed: {e}") | |
| self.status = AgentStatus.IDLE | |
| return TaskResult( | |
| task_id=task.id, | |
| success=False, | |
| error=str(e), | |
| metadata={"agent": self.name} | |
| ) | |
| async def create_sample_agents() -> List[ExampleAgent]: | |
| """Create sample agents for demonstration""" | |
| agents = [ | |
| DataAnalysisAgent("analysis-agent-001"), | |
| ProcessingAgent("processing-agent-001"), | |
| CollaborationAgent("collaboration-agent-001"), | |
| ExampleAgent("general-agent-001", "GeneralAgent", [AgentCapability.GENERAL_PURPOSE]) | |
| ] | |
| return agents | |
| async def create_sample_tasks() -> List[UnifiedTask]: | |
| """Create sample tasks for demonstration""" | |
| from src.unified_architecture.core import TaskPriority, TaskType | |
| tasks = [ | |
| UnifiedTask( | |
| id=uuid4(), | |
| description="Analyze customer behavior patterns in sales data", | |
| task_type=TaskType.ANALYSIS, | |
| priority=TaskPriority.HIGH, | |
| requirements={ | |
| "capabilities": [AgentCapability.DATA_ANALYSIS], | |
| "resources": {"cpu": 2.0, "memory": 4.0} | |
| }, | |
| dependencies=[], | |
| metadata={"domain": "sales", "data_source": "customer_db"} | |
| ), | |
| UnifiedTask( | |
| id=uuid4(), | |
| description="Process and clean raw sensor data", | |
| task_type=TaskType.PROCESSING, | |
| priority=TaskPriority.MEDIUM, | |
| requirements={ | |
| "capabilities": [AgentCapability.DATA_PROCESSING], | |
| "resources": {"cpu": 1.0, "memory": 2.0} | |
| }, | |
| dependencies=[], | |
| metadata={"domain": "iot", "data_format": "json"} | |
| ), | |
| UnifiedTask( | |
| id=uuid4(), | |
| description="Coordinate multi-agent workflow for report generation", | |
| task_type=TaskType.COLLABORATION, | |
| priority=TaskPriority.HIGH, | |
| requirements={ | |
| "capabilities": [AgentCapability.COLLABORATION], | |
| "resources": {"cpu": 0.5, "memory": 1.0} | |
| }, | |
| dependencies=[], | |
| metadata={"workflow_type": "report_generation"} | |
| ) | |
| ] | |
| return tasks | |
| async def demonstrate_platform_features(platform: MultiAgentPlatform): | |
| """Demonstrate various platform features""" | |
| # 1. Register agents | |
| logger.info("=== Registering Agents ===") | |
| agents = await create_sample_agents() | |
| for agent in agents: | |
| metadata = AgentMetadata( | |
| agent_id=agent.agent_id, | |
| name=agent.name, | |
| capabilities=agent.capabilities, | |
| status=agent.status, | |
| version="1.0.0", | |
| description=f"Example {agent.name} for demonstration", | |
| tags=["example", "demo"], | |
| created_at=datetime.utcnow() | |
| ) | |
| success = await platform.register_agent(agent, metadata) | |
| logger.info(f"Registered {agent.name}: {success}") | |
| # 2. Submit tasks | |
| logger.info("\n=== Submitting Tasks ===") | |
| tasks = await create_sample_tasks() | |
| task_ids = [] | |
| for task in tasks: | |
| task_id = await platform.submit_task(task) | |
| task_ids.append(task_id) | |
| logger.info(f"Submitted task: {task.description} -> {task_id}") | |
| # 3. Monitor task execution | |
| logger.info("\n=== Monitoring Task Execution ===") | |
| for i in range(10): # Monitor for 10 seconds | |
| for task_id in task_ids: | |
| status = await platform.get_task_status(task_id) | |
| if status: | |
| logger.info(f"Task {task_id}: {status.get('status', 'unknown')}") | |
| await asyncio.sleep(1) | |
| # 4. Demonstrate communication | |
| logger.info("\n=== Agent Communication ===") | |
| message = AgentMessage( | |
| id=uuid4(), | |
| from_agent=agents[0].agent_id, | |
| to_agent=agents[1].agent_id, | |
| type=MessageType.COLLABORATION, | |
| content="Let's collaborate on the data analysis task", | |
| timestamp=datetime.utcnow(), | |
| metadata={"priority": "high"} | |
| ) | |
| success = await platform.send_message(message) | |
| logger.info(f"Message sent: {success}") | |
| # 5. Share memory | |
| logger.info("\n=== Memory Sharing ===") | |
| memory_entry = MemoryEntry( | |
| id=uuid4(), | |
| type=MemoryType.EXPERIENCE, | |
| content="Successfully analyzed customer behavior patterns", | |
| source_agent=agents[0].agent_id, | |
| tags=["analysis", "customer", "patterns"], | |
| created_at=datetime.utcnow(), | |
| metadata={"task_id": str(task_ids[0])} | |
| ) | |
| success = await platform.share_memory(memory_entry) | |
| logger.info(f"Memory shared: {success}") | |
| # 6. Search memory | |
| logger.info("\n=== Memory Search ===") | |
| search_results = await platform.search_memory("customer behavior") | |
| logger.info(f"Found {len(search_results)} memory entries") | |
| # 7. Get platform statistics | |
| logger.info("\n=== Platform Statistics ===") | |
| stats = await platform.get_platform_stats() | |
| logger.info(f"Total agents: {stats.total_agents}") | |
| logger.info(f"Active agents: {stats.active_agents}") | |
| logger.info(f"Total tasks: {stats.total_tasks}") | |
| logger.info(f"Completed tasks: {stats.completed_tasks}") | |
| logger.info(f"Platform uptime: {stats.platform_uptime:.2f} seconds") | |
| # 8. Get collaboration network | |
| logger.info("\n=== Collaboration Network ===") | |
| network = await platform.get_collaboration_network() | |
| logger.info(f"Collaboration network nodes: {len(network.get('nodes', []))}") | |
| logger.info(f"Collaboration network edges: {len(network.get('edges', []))}") | |
| # 9. Demonstrate marketplace (if enabled) | |
| if platform.config.enable_marketplace: | |
| logger.info("\n=== Marketplace Demo ===") | |
| # Create a sample listing | |
| listing = AgentListing( | |
| agent_id=agents[0].agent_id, | |
| name="Advanced Data Analysis Agent", | |
| description="Specialized agent for complex data analysis tasks", | |
| version="2.0.0", | |
| author="Demo User", | |
| capabilities=agents[0].capabilities, | |
| tags=["analysis", "advanced", "demo"], | |
| status=ListingStatus.ACTIVE, | |
| pricing_model="usage_based", | |
| pricing_details={"per_task": 0.10, "per_hour": 1.00} | |
| ) | |
| # Note: In a real implementation, you would add this to the marketplace | |
| logger.info(f"Created sample listing: {listing.name}") | |
| # 10. Health check | |
| logger.info("\n=== Platform Health Check ===") | |
| health = await platform.health_check() | |
| logger.info(f"Platform status: {health.get('platform_status')}") | |
| # Log component health | |
| for component, status in health.get('components', {}).items(): | |
| logger.info(f" {component}: {status.get('status', 'unknown')}") | |
| async def demonstrate_advanced_features(platform: MultiAgentPlatform): | |
| """Demonstrate advanced platform features""" | |
| logger.info("\n=== Advanced Features Demo ===") | |
| # 1. Resource allocation | |
| logger.info("--- Resource Allocation ---") | |
| requirements = { | |
| "cpu": 2.0, | |
| "memory": 4.0, | |
| "gpu": 1.0 | |
| } | |
| allocation = await platform.allocate_resources(requirements) | |
| if allocation: | |
| logger.info(f"Allocated resources: {allocation.resources}") | |
| logger.info(f"Allocation ID: {allocation.id}") | |
| # Release resources | |
| success = await platform.release_resources(allocation.id) | |
| logger.info(f"Released resources: {success}") | |
| # 2. Get available agents with filtering | |
| logger.info("\n--- Agent Discovery ---") | |
| analysis_agents = await platform.get_available_agents([AgentCapability.DATA_ANALYSIS]) | |
| logger.info(f"Found {len(analysis_agents)} data analysis agents") | |
| for agent in analysis_agents: | |
| logger.info(f" - {agent.name}: {[cap.value for cap in agent.capabilities]}") | |
| # 3. Performance tracking | |
| logger.info("\n--- Performance Tracking ---") | |
| if platform.config.enable_performance_tracking: | |
| # Get performance for first agent | |
| agents = await platform.get_available_agents() | |
| if agents: | |
| performance = await platform.get_agent_performance(agents[0].agent_id) | |
| if performance: | |
| logger.info(f"Agent {agents[0].name} performance:") | |
| logger.info(f" Success rate: {performance.get('success_rate', 0):.2%}") | |
| logger.info(f" Average execution time: {performance.get('avg_execution_time', 0):.2f}s") | |
| logger.info(f" Total tasks: {performance.get('total_tasks', 0)}") | |
| # 4. Broadcast messaging | |
| logger.info("\n--- Broadcast Messaging ---") | |
| broadcast_message = AgentMessage( | |
| id=uuid4(), | |
| from_agent="platform", | |
| to_agent="all", | |
| type=MessageType.SYSTEM, | |
| content="System maintenance scheduled for tomorrow at 2 AM UTC", | |
| timestamp=datetime.utcnow(), | |
| metadata={"priority": "medium", "maintenance": True} | |
| ) | |
| success = await platform.broadcast_message(broadcast_message) | |
| logger.info(f"Broadcast message sent: {success}") | |
| async def main(): | |
| """Main demonstration function""" | |
| logger.info("Starting Unified Architecture Demonstration") | |
| # Configure platform | |
| config = PlatformConfig( | |
| max_concurrent_tasks=50, | |
| task_timeout=300, | |
| heartbeat_interval=30, | |
| cleanup_interval=3600, | |
| enable_marketplace=True, | |
| enable_dashboard=True, | |
| enable_performance_tracking=True, | |
| enable_conflict_resolution=True, | |
| storage_backend="memory", | |
| log_level="INFO" | |
| ) | |
| # Create and start platform | |
| platform = MultiAgentPlatform(config) | |
| try: | |
| async with platform.platform_context(): | |
| logger.info("Platform started successfully") | |
| # Demonstrate basic features | |
| await demonstrate_platform_features(platform) | |
| # Demonstrate advanced features | |
| await demonstrate_advanced_features(platform) | |
| # Final statistics | |
| logger.info("\n=== Final Platform Statistics ===") | |
| final_stats = await platform.get_platform_stats() | |
| logger.info(f"Platform uptime: {final_stats.platform_uptime:.2f} seconds") | |
| logger.info(f"Total tasks processed: {final_stats.total_tasks}") | |
| logger.info(f"Success rate: {final_stats.completed_tasks / max(final_stats.total_tasks, 1):.2%}") | |
| except Exception as e: | |
| logger.error(f"Platform demonstration failed: {e}") | |
| raise | |
| logger.info("Unified Architecture Demonstration completed") | |
| if __name__ == "__main__": | |
| # Run the demonstration | |
| asyncio.run(main()) |