RobotPai / examples /enhanced_unified_example.py
atr0p05's picture
Upload 291 files
8a682b5 verified
"""
Enhanced Unified Architecture Example
This example demonstrates how to integrate the existing FSM agents
with the enhanced unified architecture for advanced multi-agent collaboration.
"""
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import asyncio
import logging
from datetime import datetime
from typing import Dict, Any
from src.adapters.fsm_unified_adapter import UnifiedArchitectureBridge
from src.agents.advanced_agent_fsm import FSMReActAgent
from src.tools import (
file_reader, advanced_file_reader, web_researcher,
semantic_search_tool, python_interpreter, tavily_search_backoff,
get_weather, PythonREPLTool
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def create_fsm_agents() -> Dict[str, FSMReActAgent]:
"""Create multiple FSM agents with different tool sets"""
# Agent 1: General purpose with all tools
general_tools = [
file_reader, advanced_file_reader, web_researcher,
semantic_search_tool, python_interpreter, tavily_search_backoff,
get_weather, PythonREPLTool
]
general_tools = [tool for tool in general_tools if tool is not None]
general_agent = FSMReActAgent(tools=general_tools)
# Agent 2: Specialized for web research
research_tools = [
web_researcher, tavily_search_backoff, semantic_search_tool
]
research_tools = [tool for tool in research_tools if tool is not None]
research_agent = FSMReActAgent(tools=research_tools)
# Agent 3: Specialized for computation
computation_tools = [
python_interpreter, PythonREPLTool
]
computation_tools = [tool for tool in computation_tools if tool is not None]
computation_agent = FSMReActAgent(tools=computation_tools)
return {
"general": general_agent,
"research": research_agent,
"computation": computation_agent
}
async def demonstrate_enhanced_architecture():
"""Demonstrate the enhanced unified architecture capabilities"""
logger.info("=== Enhanced Unified Architecture Demo ===")
# Create bridge
bridge = UnifiedArchitectureBridge()
await bridge.initialize_platform()
try:
# Create FSM agents
logger.info("Creating FSM agents...")
fsm_agents = await create_fsm_agents()
# Register agents with unified architecture
agent_configs = [
{
"key": "general",
"agent": fsm_agents["general"],
"id": "fsm-general-001",
"name": "General Purpose FSM Agent",
"tags": ["general", "reasoning", "tools"]
},
{
"key": "research",
"agent": fsm_agents["research"],
"id": "fsm-research-001",
"name": "Research FSM Agent",
"tags": ["research", "web", "search"]
},
{
"key": "computation",
"agent": fsm_agents["computation"],
"id": "fsm-computation-001",
"name": "Computation FSM Agent",
"tags": ["computation", "python", "math"]
}
]
registered_agents = []
for config in agent_configs:
success = await bridge.register_fsm_agent(
config["agent"],
config["id"],
config["name"],
config["tags"]
)
if success:
registered_agents.append(config["id"])
logger.info(f"Registered {config['name']}")
else:
logger.error(f"Failed to register {config['name']}")
if not registered_agents:
logger.error("No agents registered successfully")
return
# Demonstrate different types of tasks
tasks = [
{
"query": "What is the current weather in San Francisco?",
"type": "weather_query",
"priority": 3,
"expected_agent": "fsm-general-001"
},
{
"query": "Search for recent developments in artificial intelligence",
"type": "research_query",
"priority": 4,
"expected_agent": "fsm-research-001"
},
{
"query": "Calculate the factorial of 10",
"type": "computation_query",
"priority": 2,
"expected_agent": "fsm-computation-001"
},
{
"query": "What is the population of Tokyo and how does it compare to New York?",
"type": "complex_query",
"priority": 5,
"expected_agent": "fsm-general-001"
}
]
logger.info("\n=== Executing Tasks ===")
for i, task_config in enumerate(tasks, 1):
logger.info(f"\nTask {i}: {task_config['query']}")
# Create unified task
task = await bridge.create_task_from_query(
task_config["query"],
task_config["type"],
task_config["priority"]
)
# Submit task
start_time = datetime.now()
result = await bridge.submit_task(task)
execution_time = (datetime.now() - start_time).total_seconds()
# Display results
logger.info(f"Execution time: {execution_time:.2f} seconds")
logger.info(f"Success: {result.success}")
logger.info(f"Agent used: {result.agent_id}")
if result.success:
if isinstance(result.result, dict):
output = result.result.get("output", str(result.result))
final_answer = result.result.get("final_answer", "")
logger.info(f"Output: {output[:200]}...")
if final_answer:
logger.info(f"Final answer: {final_answer}")
else:
logger.info(f"Result: {str(result.result)[:200]}...")
else:
logger.error(f"Error: {result.error}")
# Get agent metrics
metrics = await bridge.get_agent_metrics(result.agent_id)
if metrics:
logger.info(f"Agent metrics - Success rate: {metrics.get('success_rate', 0):.2%}")
# Demonstrate performance tracking
logger.info("\n=== Performance Summary ===")
for agent_id in registered_agents:
metrics = await bridge.get_agent_metrics(agent_id)
if metrics:
logger.info(f"\n{agent_id}:")
logger.info(f" Total tasks: {metrics.get('total_tasks', 0)}")
logger.info(f" Success rate: {metrics.get('success_rate', 0):.2%}")
logger.info(f" Avg execution time: {metrics.get('avg_execution_time', 0):.2f}s")
task_breakdown = metrics.get('task_breakdown', {})
if task_breakdown:
logger.info(" Task breakdown:")
for task_type, stats in task_breakdown.items():
logger.info(f" {task_type}: {stats['count']} tasks, "
f"{stats['success_rate']:.2%} success rate")
logger.info("\n=== Demo Completed Successfully ===")
except Exception as e:
logger.error(f"Demo failed: {e}")
raise
finally:
await bridge.shutdown()
async def demonstrate_agent_health_monitoring():
"""Demonstrate agent health monitoring capabilities"""
logger.info("\n=== Agent Health Monitoring Demo ===")
bridge = UnifiedArchitectureBridge()
await bridge.initialize_platform()
try:
# Create and register a single agent for health monitoring
fsm_agents = await create_fsm_agents()
general_agent = fsm_agents["general"]
success = await bridge.register_fsm_agent(
general_agent,
"health-monitor-agent",
"Health Monitor Agent",
["monitoring", "health"]
)
if success:
# Get health check
adapter = bridge.adapters["health-monitor-agent"]
health = await adapter.health_check()
logger.info("Agent Health Check:")
logger.info(f" Healthy: {health['healthy']}")
logger.info(f" Status: {health['status']}")
logger.info(f" Agent Type: {health['agent_type']}")
logger.info(f" Capabilities: {health['capabilities']}")
logger.info(f" Tools Available: {health['tools_available']}")
# Simulate some work
task = await bridge.create_task_from_query(
"What is 2 + 2?",
"simple_math",
1
)
result = await bridge.submit_task(task)
logger.info(f"Simple task result: {result.success}")
# Check health again
health_after = await adapter.health_check()
logger.info(f"Health after task: {health_after['healthy']}")
finally:
await bridge.shutdown()
if __name__ == "__main__":
# Run the main demonstration
asyncio.run(demonstrate_enhanced_architecture())
# Run health monitoring demonstration
asyncio.run(demonstrate_agent_health_monitoring())