RobotPai / examples /integration /unified_architecture_example.py
atr0p05's picture
Upload 291 files
8a682b5 verified
"""
Unified Architecture Example
This example demonstrates the comprehensive Phase 3 Unified Architecture
for Hybrid Agent System and Multi-Agent Collaboration Platform.
"""
import asyncio
import logging
import time
from datetime import datetime
from typing import Dict, List, Optional, Any
from uuid import uuid4
from src.unified_architecture import (
# Core interfaces
AgentCapability, AgentStatus, AgentMetadata,
UnifiedTask, TaskResult, IUnifiedAgent,
# Platform components
MultiAgentPlatform, PlatformConfig,
# Communication
AgentMessage, MessageType,
# Memory and collaboration
MemoryEntry, MemoryType,
# Marketplace
AgentListing, ListingStatus,
# Performance and monitoring
PlatformStats
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ExampleAgent(IUnifiedAgent):
"""Example agent implementation for demonstration"""
def __init__(self, agent_id: str, name: str, capabilities: List[AgentCapability]):
self.agent_id = agent_id
self.name = name
self.capabilities = capabilities
self.status = AgentStatus.IDLE
self.logger = logging.getLogger(f"Agent-{name}")
async def execute_task(self, task: UnifiedTask) -> TaskResult:
"""Execute a task"""
try:
self.status = AgentStatus.BUSY
self.logger.info(f"Executing task: {task.description}")
# Simulate task execution
await asyncio.sleep(2)
# Generate result based on task type
if "analysis" in task.description.lower():
result = TaskResult(
task_id=task.id,
success=True,
data={"analysis": f"Analysis completed for {task.description}"},
metadata={"execution_time": 2.0, "agent": self.name}
)
elif "processing" in task.description.lower():
result = TaskResult(
task_id=task.id,
success=True,
data={"processed_data": f"Processed: {task.description}"},
metadata={"execution_time": 2.0, "agent": self.name}
)
else:
result = TaskResult(
task_id=task.id,
success=True,
data={"result": f"Task completed: {task.description}"},
metadata={"execution_time": 2.0, "agent": self.name}
)
self.status = AgentStatus.IDLE
return result
except Exception as e:
self.logger.error(f"Task execution failed: {e}")
self.status = AgentStatus.IDLE
return TaskResult(
task_id=task.id,
success=False,
error=str(e),
metadata={"agent": self.name}
)
async def get_status(self) -> AgentStatus:
"""Get current agent status"""
return self.status
async def get_capabilities(self) -> List[AgentCapability]:
"""Get agent capabilities"""
return self.capabilities
class DataAnalysisAgent(ExampleAgent):
"""Specialized agent for data analysis tasks"""
def __init__(self, agent_id: str):
super().__init__(
agent_id=agent_id,
name="DataAnalysisAgent",
capabilities=[AgentCapability.DATA_ANALYSIS, AgentCapability.MACHINE_LEARNING]
)
async def execute_task(self, task: UnifiedTask) -> TaskResult:
"""Execute data analysis task"""
try:
self.status = AgentStatus.BUSY
self.logger.info(f"Performing data analysis: {task.description}")
# Simulate data analysis
await asyncio.sleep(3)
result = TaskResult(
task_id=task.id,
success=True,
data={
"analysis_type": "statistical",
"insights": ["Trend identified", "Anomaly detected", "Correlation found"],
"recommendations": ["Increase monitoring", "Optimize parameters"]
},
metadata={"execution_time": 3.0, "agent": self.name}
)
self.status = AgentStatus.IDLE
return result
except Exception as e:
self.logger.error(f"Data analysis failed: {e}")
self.status = AgentStatus.IDLE
return TaskResult(
task_id=task.id,
success=False,
error=str(e),
metadata={"agent": self.name}
)
class ProcessingAgent(ExampleAgent):
"""Specialized agent for data processing tasks"""
def __init__(self, agent_id: str):
super().__init__(
agent_id=agent_id,
name="ProcessingAgent",
capabilities=[AgentCapability.DATA_PROCESSING, AgentCapability.FILE_OPERATIONS]
)
async def execute_task(self, task: UnifiedTask) -> TaskResult:
"""Execute data processing task"""
try:
self.status = AgentStatus.BUSY
self.logger.info(f"Processing data: {task.description}")
# Simulate data processing
await asyncio.sleep(2)
result = TaskResult(
task_id=task.id,
success=True,
data={
"processed_files": 5,
"data_volume": "2.5GB",
"format": "parquet",
"quality_score": 0.95
},
metadata={"execution_time": 2.0, "agent": self.name}
)
self.status = AgentStatus.IDLE
return result
except Exception as e:
self.logger.error(f"Data processing failed: {e}")
self.status = AgentStatus.IDLE
return TaskResult(
task_id=task.id,
success=False,
error=str(e),
metadata={"agent": self.name}
)
class CollaborationAgent(ExampleAgent):
"""Specialized agent for collaboration and coordination"""
def __init__(self, agent_id: str):
super().__init__(
agent_id=agent_id,
name="CollaborationAgent",
capabilities=[AgentCapability.COLLABORATION, AgentCapability.COORDINATION]
)
async def execute_task(self, task: UnifiedTask) -> TaskResult:
"""Execute collaboration task"""
try:
self.status = AgentStatus.BUSY
self.logger.info(f"Coordinating collaboration: {task.description}")
# Simulate collaboration coordination
await asyncio.sleep(1)
result = TaskResult(
task_id=task.id,
success=True,
data={
"collaboration_type": "multi_agent",
"participants": 3,
"coordination_method": "distributed",
"efficiency_gain": 0.25
},
metadata={"execution_time": 1.0, "agent": self.name}
)
self.status = AgentStatus.IDLE
return result
except Exception as e:
self.logger.error(f"Collaboration failed: {e}")
self.status = AgentStatus.IDLE
return TaskResult(
task_id=task.id,
success=False,
error=str(e),
metadata={"agent": self.name}
)
async def create_sample_agents() -> List[ExampleAgent]:
"""Create sample agents for demonstration"""
agents = [
DataAnalysisAgent("analysis-agent-001"),
ProcessingAgent("processing-agent-001"),
CollaborationAgent("collaboration-agent-001"),
ExampleAgent("general-agent-001", "GeneralAgent", [AgentCapability.GENERAL_PURPOSE])
]
return agents
async def create_sample_tasks() -> List[UnifiedTask]:
"""Create sample tasks for demonstration"""
from src.unified_architecture.core import TaskPriority, TaskType
tasks = [
UnifiedTask(
id=uuid4(),
description="Analyze customer behavior patterns in sales data",
task_type=TaskType.ANALYSIS,
priority=TaskPriority.HIGH,
requirements={
"capabilities": [AgentCapability.DATA_ANALYSIS],
"resources": {"cpu": 2.0, "memory": 4.0}
},
dependencies=[],
metadata={"domain": "sales", "data_source": "customer_db"}
),
UnifiedTask(
id=uuid4(),
description="Process and clean raw sensor data",
task_type=TaskType.PROCESSING,
priority=TaskPriority.MEDIUM,
requirements={
"capabilities": [AgentCapability.DATA_PROCESSING],
"resources": {"cpu": 1.0, "memory": 2.0}
},
dependencies=[],
metadata={"domain": "iot", "data_format": "json"}
),
UnifiedTask(
id=uuid4(),
description="Coordinate multi-agent workflow for report generation",
task_type=TaskType.COLLABORATION,
priority=TaskPriority.HIGH,
requirements={
"capabilities": [AgentCapability.COLLABORATION],
"resources": {"cpu": 0.5, "memory": 1.0}
},
dependencies=[],
metadata={"workflow_type": "report_generation"}
)
]
return tasks
async def demonstrate_platform_features(platform: MultiAgentPlatform):
"""Demonstrate various platform features"""
# 1. Register agents
logger.info("=== Registering Agents ===")
agents = await create_sample_agents()
for agent in agents:
metadata = AgentMetadata(
agent_id=agent.agent_id,
name=agent.name,
capabilities=agent.capabilities,
status=agent.status,
version="1.0.0",
description=f"Example {agent.name} for demonstration",
tags=["example", "demo"],
created_at=datetime.utcnow()
)
success = await platform.register_agent(agent, metadata)
logger.info(f"Registered {agent.name}: {success}")
# 2. Submit tasks
logger.info("\n=== Submitting Tasks ===")
tasks = await create_sample_tasks()
task_ids = []
for task in tasks:
task_id = await platform.submit_task(task)
task_ids.append(task_id)
logger.info(f"Submitted task: {task.description} -> {task_id}")
# 3. Monitor task execution
logger.info("\n=== Monitoring Task Execution ===")
for i in range(10): # Monitor for 10 seconds
for task_id in task_ids:
status = await platform.get_task_status(task_id)
if status:
logger.info(f"Task {task_id}: {status.get('status', 'unknown')}")
await asyncio.sleep(1)
# 4. Demonstrate communication
logger.info("\n=== Agent Communication ===")
message = AgentMessage(
id=uuid4(),
from_agent=agents[0].agent_id,
to_agent=agents[1].agent_id,
type=MessageType.COLLABORATION,
content="Let's collaborate on the data analysis task",
timestamp=datetime.utcnow(),
metadata={"priority": "high"}
)
success = await platform.send_message(message)
logger.info(f"Message sent: {success}")
# 5. Share memory
logger.info("\n=== Memory Sharing ===")
memory_entry = MemoryEntry(
id=uuid4(),
type=MemoryType.EXPERIENCE,
content="Successfully analyzed customer behavior patterns",
source_agent=agents[0].agent_id,
tags=["analysis", "customer", "patterns"],
created_at=datetime.utcnow(),
metadata={"task_id": str(task_ids[0])}
)
success = await platform.share_memory(memory_entry)
logger.info(f"Memory shared: {success}")
# 6. Search memory
logger.info("\n=== Memory Search ===")
search_results = await platform.search_memory("customer behavior")
logger.info(f"Found {len(search_results)} memory entries")
# 7. Get platform statistics
logger.info("\n=== Platform Statistics ===")
stats = await platform.get_platform_stats()
logger.info(f"Total agents: {stats.total_agents}")
logger.info(f"Active agents: {stats.active_agents}")
logger.info(f"Total tasks: {stats.total_tasks}")
logger.info(f"Completed tasks: {stats.completed_tasks}")
logger.info(f"Platform uptime: {stats.platform_uptime:.2f} seconds")
# 8. Get collaboration network
logger.info("\n=== Collaboration Network ===")
network = await platform.get_collaboration_network()
logger.info(f"Collaboration network nodes: {len(network.get('nodes', []))}")
logger.info(f"Collaboration network edges: {len(network.get('edges', []))}")
# 9. Demonstrate marketplace (if enabled)
if platform.config.enable_marketplace:
logger.info("\n=== Marketplace Demo ===")
# Create a sample listing
listing = AgentListing(
agent_id=agents[0].agent_id,
name="Advanced Data Analysis Agent",
description="Specialized agent for complex data analysis tasks",
version="2.0.0",
author="Demo User",
capabilities=agents[0].capabilities,
tags=["analysis", "advanced", "demo"],
status=ListingStatus.ACTIVE,
pricing_model="usage_based",
pricing_details={"per_task": 0.10, "per_hour": 1.00}
)
# Note: In a real implementation, you would add this to the marketplace
logger.info(f"Created sample listing: {listing.name}")
# 10. Health check
logger.info("\n=== Platform Health Check ===")
health = await platform.health_check()
logger.info(f"Platform status: {health.get('platform_status')}")
# Log component health
for component, status in health.get('components', {}).items():
logger.info(f" {component}: {status.get('status', 'unknown')}")
async def demonstrate_advanced_features(platform: MultiAgentPlatform):
"""Demonstrate advanced platform features"""
logger.info("\n=== Advanced Features Demo ===")
# 1. Resource allocation
logger.info("--- Resource Allocation ---")
requirements = {
"cpu": 2.0,
"memory": 4.0,
"gpu": 1.0
}
allocation = await platform.allocate_resources(requirements)
if allocation:
logger.info(f"Allocated resources: {allocation.resources}")
logger.info(f"Allocation ID: {allocation.id}")
# Release resources
success = await platform.release_resources(allocation.id)
logger.info(f"Released resources: {success}")
# 2. Get available agents with filtering
logger.info("\n--- Agent Discovery ---")
analysis_agents = await platform.get_available_agents([AgentCapability.DATA_ANALYSIS])
logger.info(f"Found {len(analysis_agents)} data analysis agents")
for agent in analysis_agents:
logger.info(f" - {agent.name}: {[cap.value for cap in agent.capabilities]}")
# 3. Performance tracking
logger.info("\n--- Performance Tracking ---")
if platform.config.enable_performance_tracking:
# Get performance for first agent
agents = await platform.get_available_agents()
if agents:
performance = await platform.get_agent_performance(agents[0].agent_id)
if performance:
logger.info(f"Agent {agents[0].name} performance:")
logger.info(f" Success rate: {performance.get('success_rate', 0):.2%}")
logger.info(f" Average execution time: {performance.get('avg_execution_time', 0):.2f}s")
logger.info(f" Total tasks: {performance.get('total_tasks', 0)}")
# 4. Broadcast messaging
logger.info("\n--- Broadcast Messaging ---")
broadcast_message = AgentMessage(
id=uuid4(),
from_agent="platform",
to_agent="all",
type=MessageType.SYSTEM,
content="System maintenance scheduled for tomorrow at 2 AM UTC",
timestamp=datetime.utcnow(),
metadata={"priority": "medium", "maintenance": True}
)
success = await platform.broadcast_message(broadcast_message)
logger.info(f"Broadcast message sent: {success}")
async def main():
"""Main demonstration function"""
logger.info("Starting Unified Architecture Demonstration")
# Configure platform
config = PlatformConfig(
max_concurrent_tasks=50,
task_timeout=300,
heartbeat_interval=30,
cleanup_interval=3600,
enable_marketplace=True,
enable_dashboard=True,
enable_performance_tracking=True,
enable_conflict_resolution=True,
storage_backend="memory",
log_level="INFO"
)
# Create and start platform
platform = MultiAgentPlatform(config)
try:
async with platform.platform_context():
logger.info("Platform started successfully")
# Demonstrate basic features
await demonstrate_platform_features(platform)
# Demonstrate advanced features
await demonstrate_advanced_features(platform)
# Final statistics
logger.info("\n=== Final Platform Statistics ===")
final_stats = await platform.get_platform_stats()
logger.info(f"Platform uptime: {final_stats.platform_uptime:.2f} seconds")
logger.info(f"Total tasks processed: {final_stats.total_tasks}")
logger.info(f"Success rate: {final_stats.completed_tasks / max(final_stats.total_tasks, 1):.2%}")
except Exception as e:
logger.error(f"Platform demonstration failed: {e}")
raise
logger.info("Unified Architecture Demonstration completed")
if __name__ == "__main__":
# Run the demonstration
asyncio.run(main())