Spaces:
Build error
Build error
| """ | |
| Multi-Agent Platform API Server | |
| FastAPI server providing REST APIs and WebSocket support for the unified architecture | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import time | |
| import uuid | |
| import os | |
| from datetime import datetime, timedelta | |
| from typing import Any, Dict, List, Optional, Union | |
| from contextlib import asynccontextmanager | |
| import aioredis | |
| from fastapi import ( | |
| APIRouter, BackgroundTasks, Depends, FastAPI, HTTPException, | |
| Query, WebSocket, WebSocketDisconnect, WebSocketException, status | |
| ) | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from pydantic import BaseModel, Field, validator | |
| import uvicorn | |
| from src.unified_architecture import ( | |
| MultiAgentPlatform, Agent, Task, Resource, Conflict, | |
| PerformanceMetrics, WorkflowDefinition, WorkflowExecution | |
| ) | |
| from src.unified_architecture.platform import PlatformConfig | |
| from src.unified_architecture.enhanced_platform import AgentMetadata | |
| from src.core.entities.agent import AgentType | |
| from src.application.agents.agent_factory import AgentFactory | |
| from src.infrastructure.database.supabase_repositories import ( | |
| SupabaseAgentRepository, SupabaseTaskRepository, | |
| SupabaseSessionRepository, SupabaseToolRepository | |
| ) | |
| from src.infrastructure.monitoring.metrics import MetricsCollector | |
| from src.infrastructure.circuit_breaker.circuit_breaker import CircuitBreakerRegistry | |
| from src.infrastructure.agents.concrete_agents import ( | |
| FSMReactAgentImpl, NextGenAgentImpl, CrewAgentImpl, SpecializedAgentImpl | |
| ) | |
| from src.infrastructure.di.container import get_container | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Security | |
| security = HTTPBearer() | |
| # Redis connection for WebSocket management | |
| redis_client: Optional[aioredis.Redis] = None | |
| # Global platform instance | |
| platform: Optional[MultiAgentPlatform] = None | |
| # Metrics collector | |
| metrics_collector = MetricsCollector() | |
| # Circuit breaker registry | |
| circuit_breaker_registry = CircuitBreakerRegistry() | |
| # Pydantic models for API requests/responses | |
| class AgentRegistrationRequest(BaseModel): | |
| name: str = Field(..., description="Agent name") | |
| capabilities: List[str] = Field(..., description="List of agent capabilities") | |
| resources: Dict[str, Any] = Field(default_factory=dict, description="Agent resources") | |
| metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") | |
| class AgentResponse(BaseModel): | |
| id: str | |
| name: str | |
| capabilities: List[str] | |
| resources: Dict[str, Any] | |
| metadata: Dict[str, Any] | |
| status: str | |
| created_at: datetime | |
| last_heartbeat: Optional[datetime] = None | |
| class TaskSubmissionRequest(BaseModel): | |
| title: str = Field(..., description="Task title") | |
| description: str = Field(..., description="Task description") | |
| priority: int = Field(default=1, ge=1, le=10, description="Task priority (1-10)") | |
| required_capabilities: List[str] = Field(default_factory=list, description="Required capabilities") | |
| resources: Dict[str, Any] = Field(default_factory=dict, description="Required resources") | |
| deadline: Optional[datetime] = Field(None, description="Task deadline") | |
| metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") | |
| class TaskResponse(BaseModel): | |
| id: str | |
| title: str | |
| description: str | |
| priority: int | |
| required_capabilities: List[str] | |
| resources: Dict[str, Any] | |
| metadata: Dict[str, Any] | |
| status: str | |
| assigned_agent: Optional[str] = None | |
| created_at: datetime | |
| deadline: Optional[datetime] = None | |
| completed_at: Optional[datetime] = None | |
| class ResourceRequest(BaseModel): | |
| name: str = Field(..., description="Resource name") | |
| type: str = Field(..., description="Resource type") | |
| capacity: Dict[str, Any] = Field(..., description="Resource capacity") | |
| metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") | |
| class ResourceResponse(BaseModel): | |
| id: str | |
| name: str | |
| type: str | |
| capacity: Dict[str, Any] | |
| metadata: Dict[str, Any] | |
| status: str | |
| created_at: datetime | |
| class ConflictReportRequest(BaseModel): | |
| agent_id: str = Field(..., description="Agent ID involved in conflict") | |
| task_id: str = Field(..., description="Task ID involved in conflict") | |
| conflict_type: str = Field(..., description="Type of conflict") | |
| description: str = Field(..., description="Conflict description") | |
| severity: str = Field(..., description="Conflict severity (low, medium, high, critical)") | |
| metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") | |
| class ConflictResponse(BaseModel): | |
| id: str | |
| agent_id: str | |
| task_id: str | |
| conflict_type: str | |
| description: str | |
| severity: str | |
| metadata: Dict[str, Any] | |
| status: str | |
| created_at: datetime | |
| resolved_at: Optional[datetime] = None | |
| class WorkflowDefinitionRequest(BaseModel): | |
| name: str = Field(..., description="Workflow name") | |
| description: str = Field(..., description="Workflow description") | |
| steps: List[Dict[str, Any]] = Field(..., description="Workflow steps") | |
| metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") | |
| class WorkflowExecutionRequest(BaseModel): | |
| workflow_id: str = Field(..., description="Workflow ID to execute") | |
| parameters: Dict[str, Any] = Field(default_factory=dict, description="Execution parameters") | |
| metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") | |
| class PerformanceMetricsResponse(BaseModel): | |
| agent_id: str | |
| metrics: Dict[str, Any] | |
| timestamp: datetime | |
| class SystemHealthResponse(BaseModel): | |
| status: str | |
| components: Dict[str, str] | |
| metrics: Dict[str, Any] | |
| timestamp: datetime | |
| class WebSocketMessage(BaseModel): | |
| type: str | |
| data: Dict[str, Any] | |
| timestamp: datetime = Field(default_factory=datetime.utcnow) | |
| # Dependency injection | |
| async def get_platform() -> MultiAgentPlatform: | |
| if platform is None: | |
| raise HTTPException(status_code=503, detail="Platform not initialized") | |
| return platform | |
| async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: | |
| """Verify authentication token""" | |
| # In production, implement proper JWT verification | |
| token = credentials.credentials | |
| # For now, accept any non-empty token | |
| if not token: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid authentication token" | |
| ) | |
| return token | |
| # WebSocket connection manager | |
| class ConnectionManager: | |
| def __init__(self): | |
| self.active_connections: List[WebSocket] = [] | |
| self.connection_metadata: Dict[WebSocket, Dict[str, Any]] = {} | |
| async def connect(self, websocket: WebSocket, client_id: str): | |
| await websocket.accept() | |
| self.active_connections.append(websocket) | |
| self.connection_metadata[websocket] = { | |
| "client_id": client_id, | |
| "connected_at": datetime.utcnow(), | |
| "subscriptions": set() | |
| } | |
| logger.info(f"WebSocket client {client_id} connected") | |
| def disconnect(self, websocket: WebSocket): | |
| if websocket in self.active_connections: | |
| self.active_connections.remove(websocket) | |
| client_id = self.connection_metadata.get(websocket, {}).get("client_id", "unknown") | |
| del self.connection_metadata[websocket] | |
| logger.info(f"WebSocket client {client_id} disconnected") | |
| async def send_personal_message(self, message: str, websocket: WebSocket): | |
| try: | |
| await websocket.send_text(message) | |
| except Exception as e: | |
| logger.error(f"Error sending message to WebSocket: {e}") | |
| self.disconnect(websocket) | |
| async def broadcast(self, message: str, exclude: Optional[WebSocket] = None): | |
| disconnected = [] | |
| for connection in self.active_connections: | |
| if connection != exclude: | |
| try: | |
| await connection.send_text(message) | |
| except Exception as e: | |
| logger.error(f"Error broadcasting message: {e}") | |
| disconnected.append(connection) | |
| # Clean up disconnected connections | |
| for connection in disconnected: | |
| self.disconnect(connection) | |
| def get_connection_info(self) -> Dict[str, Any]: | |
| return { | |
| "active_connections": len(self.active_connections), | |
| "connections": [ | |
| { | |
| "client_id": metadata.get("client_id"), | |
| "connected_at": metadata.get("connected_at"), | |
| "subscriptions": list(metadata.get("subscriptions", set())) | |
| } | |
| for metadata in self.connection_metadata.values() | |
| ] | |
| } | |
| manager = ConnectionManager() | |
| # API Routes | |
| router = APIRouter(prefix="/api/v1") | |
| async def health_check(): | |
| """System health check endpoint""" | |
| try: | |
| components = { | |
| "platform": "healthy" if platform else "unhealthy", | |
| "redis": "healthy" if redis_client else "unhealthy", | |
| "database": "healthy" # Add actual DB health check | |
| } | |
| metrics = { | |
| "active_agents": len(platform.agents) if platform else 0, | |
| "active_tasks": len(platform.tasks) if platform else 0, | |
| "active_connections": manager.get_connection_info()["active_connections"] | |
| } | |
| return SystemHealthResponse( | |
| status="healthy" if all(v == "healthy" for v in components.values()) else "degraded", | |
| components=components, | |
| metrics=metrics, | |
| timestamp=datetime.utcnow() | |
| ) | |
| except Exception as e: | |
| logger.error(f"Health check failed: {e}") | |
| raise HTTPException(status_code=503, detail="Health check failed") | |
| # Global variables | |
| agent_factory = None | |
| async def lifespan(app: FastAPI): | |
| """Application lifespan manager with proper platform initialization""" | |
| global platform, redis_client, agent_factory | |
| # Initialize DI container | |
| container = get_container() | |
| # Initialize Redis connection | |
| try: | |
| redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") | |
| global redis_client | |
| redis_client = await aioredis.from_url(redis_url, encoding="utf-8", decode_responses=True) | |
| await redis_client.ping() | |
| logger.info("Redis connection established") | |
| except Exception as e: | |
| logger.warning(f"Redis connection failed: {e}") | |
| redis_client = None | |
| # Initialize platform with configuration | |
| try: | |
| # Create platform configuration | |
| platform_config = PlatformConfig( | |
| max_concurrent_tasks=100, | |
| task_timeout=300, | |
| heartbeat_interval=30, | |
| cleanup_interval=3600, | |
| enable_marketplace=True, | |
| enable_dashboard=True, | |
| enable_performance_tracking=True, | |
| enable_conflict_resolution=True, | |
| storage_backend="redis" if redis_client else "memory", | |
| log_level="INFO" | |
| ) | |
| # Create and initialize platform | |
| global platform | |
| platform = MultiAgentPlatform(platform_config) | |
| # Start platform services | |
| await platform.start() | |
| # Initialize agent factory | |
| global agent_factory | |
| agent_factory = AgentFactory() | |
| # Pre-create some default agents | |
| await _initialize_default_agents(platform, agent_factory) | |
| logger.info("Multi-Agent Platform initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize platform: {e}") | |
| raise | |
| yield | |
| # Cleanup on shutdown | |
| try: | |
| # Shutdown platform | |
| if platform: | |
| await platform.stop() | |
| logger.info("Platform stopped") | |
| # Shutdown all agents | |
| if agent_factory: | |
| await agent_factory.shutdown_all() | |
| logger.info("All agents shut down") | |
| # Close Redis connection | |
| if redis_client: | |
| await redis_client.close() | |
| logger.info("Redis connection closed") | |
| except Exception as e: | |
| logger.error(f"Error during shutdown: {e}") | |
| # Agent Management | |
| async def register_agent( | |
| request: AgentRegistrationRequest, | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token) | |
| ): | |
| """Register a new agent""" | |
| try: | |
| agent = Agent( | |
| id=str(uuid.uuid4()), | |
| name=request.name, | |
| capabilities=request.capabilities, | |
| resources=request.resources, | |
| metadata=request.metadata | |
| ) | |
| platform.register_agent(agent) | |
| # Broadcast agent registration | |
| await manager.broadcast( | |
| WebSocketMessage( | |
| type="agent_registered", | |
| data={"agent_id": agent.id, "name": agent.name} | |
| ).json() | |
| ) | |
| return AgentResponse( | |
| id=agent.id, | |
| name=agent.name, | |
| capabilities=agent.capabilities, | |
| resources=agent.resources, | |
| metadata=agent.metadata, | |
| status=agent.status, | |
| created_at=datetime.utcnow(), | |
| last_heartbeat=None | |
| ) | |
| except Exception as e: | |
| logger.error(f"Agent registration failed: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def list_agents( | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token), | |
| status_filter: Optional[str] = Query(None, description="Filter by agent status"), | |
| capability_filter: Optional[str] = Query(None, description="Filter by capability") | |
| ): | |
| """List all agents with optional filtering""" | |
| try: | |
| agents = platform.agents.values() | |
| if status_filter: | |
| agents = [a for a in agents if a.status == status_filter] | |
| if capability_filter: | |
| agents = [a for a in agents if capability_filter in a.capabilities] | |
| return [ | |
| AgentResponse( | |
| id=agent.id, | |
| name=agent.name, | |
| capabilities=agent.capabilities, | |
| resources=agent.resources, | |
| metadata=agent.metadata, | |
| status=agent.status, | |
| created_at=datetime.utcnow(), # In production, store actual creation time | |
| last_heartbeat=None | |
| ) | |
| for agent in agents | |
| ] | |
| except Exception as e: | |
| logger.error(f"Failed to list agents: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_agent( | |
| agent_id: str, | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token) | |
| ): | |
| """Get agent details by ID""" | |
| try: | |
| agent = platform.agents.get(agent_id) | |
| if not agent: | |
| raise HTTPException(status_code=404, detail="Agent not found") | |
| return AgentResponse( | |
| id=agent.id, | |
| name=agent.name, | |
| capabilities=agent.capabilities, | |
| resources=agent.resources, | |
| metadata=agent.metadata, | |
| status=agent.status, | |
| created_at=datetime.utcnow(), | |
| last_heartbeat=None | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to get agent {agent_id}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def deregister_agent( | |
| agent_id: str, | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token) | |
| ): | |
| """Deregister an agent""" | |
| try: | |
| if agent_id not in platform.agents: | |
| raise HTTPException(status_code=404, detail="Agent not found") | |
| platform.deregister_agent(agent_id) | |
| # Broadcast agent deregistration | |
| await manager.broadcast( | |
| WebSocketMessage( | |
| type="agent_deregistered", | |
| data={"agent_id": agent_id} | |
| ).json() | |
| ) | |
| return {"message": "Agent deregistered successfully"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to deregister agent {agent_id}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Task Management | |
| async def submit_task( | |
| request: TaskSubmissionRequest, | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token) | |
| ): | |
| """Submit a new task""" | |
| try: | |
| task = Task( | |
| id=str(uuid.uuid4()), | |
| title=request.title, | |
| description=request.description, | |
| priority=request.priority, | |
| required_capabilities=request.required_capabilities, | |
| resources=request.resources, | |
| metadata=request.metadata, | |
| deadline=request.deadline | |
| ) | |
| platform.submit_task(task) | |
| # Broadcast task submission | |
| await manager.broadcast( | |
| WebSocketMessage( | |
| type="task_submitted", | |
| data={"task_id": task.id, "title": task.title} | |
| ).json() | |
| ) | |
| return TaskResponse( | |
| id=task.id, | |
| title=task.title, | |
| description=task.description, | |
| priority=task.priority, | |
| required_capabilities=task.required_capabilities, | |
| resources=task.resources, | |
| metadata=task.metadata, | |
| status=task.status, | |
| assigned_agent=task.assigned_agent, | |
| created_at=datetime.utcnow(), | |
| deadline=task.deadline, | |
| completed_at=task.completed_at | |
| ) | |
| except Exception as e: | |
| logger.error(f"Task submission failed: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def list_tasks( | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token), | |
| status_filter: Optional[str] = Query(None, description="Filter by task status"), | |
| priority_filter: Optional[int] = Query(None, description="Filter by priority") | |
| ): | |
| """List all tasks with optional filtering""" | |
| try: | |
| tasks = platform.tasks.values() | |
| if status_filter: | |
| tasks = [t for t in tasks if t.status == status_filter] | |
| if priority_filter: | |
| tasks = [t for t in tasks if t.priority == priority_filter] | |
| return [ | |
| TaskResponse( | |
| id=task.id, | |
| title=task.title, | |
| description=task.description, | |
| priority=task.priority, | |
| required_capabilities=task.required_capabilities, | |
| resources=task.resources, | |
| metadata=task.metadata, | |
| status=task.status, | |
| assigned_agent=task.assigned_agent, | |
| created_at=datetime.utcnow(), | |
| deadline=task.deadline, | |
| completed_at=task.completed_at | |
| ) | |
| for task in tasks | |
| ] | |
| except Exception as e: | |
| logger.error(f"Failed to list tasks: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_task( | |
| task_id: str, | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token) | |
| ): | |
| """Get task details by ID""" | |
| try: | |
| task = platform.tasks.get(task_id) | |
| if not task: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| return TaskResponse( | |
| id=task.id, | |
| title=task.title, | |
| description=task.description, | |
| priority=task.priority, | |
| required_capabilities=task.required_capabilities, | |
| resources=task.resources, | |
| metadata=task.metadata, | |
| status=task.status, | |
| assigned_agent=task.assigned_agent, | |
| created_at=datetime.utcnow(), | |
| deadline=task.deadline, | |
| completed_at=task.completed_at | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to get task {task_id}: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Resource Management | |
| async def register_resource( | |
| request: ResourceRequest, | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token) | |
| ): | |
| """Register a new resource""" | |
| try: | |
| resource = Resource( | |
| id=str(uuid.uuid4()), | |
| name=request.name, | |
| type=request.type, | |
| capacity=request.capacity, | |
| metadata=request.metadata | |
| ) | |
| platform.register_resource(resource) | |
| return ResourceResponse( | |
| id=resource.id, | |
| name=resource.name, | |
| type=resource.type, | |
| capacity=resource.capacity, | |
| metadata=resource.metadata, | |
| status=resource.status, | |
| created_at=datetime.utcnow() | |
| ) | |
| except Exception as e: | |
| logger.error(f"Resource registration failed: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def list_resources( | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token), | |
| type_filter: Optional[str] = Query(None, description="Filter by resource type") | |
| ): | |
| """List all resources with optional filtering""" | |
| try: | |
| resources = platform.resources.values() | |
| if type_filter: | |
| resources = [r for r in resources if r.type == type_filter] | |
| return [ | |
| ResourceResponse( | |
| id=resource.id, | |
| name=resource.name, | |
| type=resource.type, | |
| capacity=resource.capacity, | |
| metadata=resource.metadata, | |
| status=resource.status, | |
| created_at=datetime.utcnow() | |
| ) | |
| for resource in resources | |
| ] | |
| except Exception as e: | |
| logger.error(f"Failed to list resources: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Conflict Management | |
| async def report_conflict( | |
| request: ConflictReportRequest, | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token) | |
| ): | |
| """Report a conflict""" | |
| try: | |
| conflict = Conflict( | |
| id=str(uuid.uuid4()), | |
| agent_id=request.agent_id, | |
| task_id=request.task_id, | |
| conflict_type=request.conflict_type, | |
| description=request.description, | |
| severity=request.severity, | |
| metadata=request.metadata | |
| ) | |
| platform.report_conflict(conflict) | |
| # Broadcast conflict report | |
| await manager.broadcast( | |
| WebSocketMessage( | |
| type="conflict_reported", | |
| data={ | |
| "conflict_id": conflict.id, | |
| "agent_id": conflict.agent_id, | |
| "task_id": conflict.task_id, | |
| "severity": conflict.severity | |
| } | |
| ).json() | |
| ) | |
| return ConflictResponse( | |
| id=conflict.id, | |
| agent_id=conflict.agent_id, | |
| task_id=conflict.task_id, | |
| conflict_type=conflict.conflict_type, | |
| description=conflict.description, | |
| severity=conflict.severity, | |
| metadata=conflict.metadata, | |
| status=conflict.status, | |
| created_at=datetime.utcnow(), | |
| resolved_at=conflict.resolved_at | |
| ) | |
| except Exception as e: | |
| logger.error(f"Conflict reporting failed: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def list_conflicts( | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token), | |
| status_filter: Optional[str] = Query(None, description="Filter by conflict status"), | |
| severity_filter: Optional[str] = Query(None, description="Filter by severity") | |
| ): | |
| """List all conflicts with optional filtering""" | |
| try: | |
| conflicts = platform.conflicts.values() | |
| if status_filter: | |
| conflicts = [c for c in conflicts if c.status == status_filter] | |
| if severity_filter: | |
| conflicts = [c for c in conflicts if c.severity == severity_filter] | |
| return [ | |
| ConflictResponse( | |
| id=conflict.id, | |
| agent_id=conflict.agent_id, | |
| task_id=conflict.task_id, | |
| conflict_type=conflict.conflict_type, | |
| description=conflict.description, | |
| severity=conflict.severity, | |
| metadata=conflict.metadata, | |
| status=conflict.status, | |
| created_at=datetime.utcnow(), | |
| resolved_at=conflict.resolved_at | |
| ) | |
| for conflict in conflicts | |
| ] | |
| except Exception as e: | |
| logger.error(f"Failed to list conflicts: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Workflow Management | |
| async def create_workflow( | |
| request: WorkflowDefinitionRequest, | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token) | |
| ): | |
| """Create a new workflow definition""" | |
| try: | |
| workflow = WorkflowDefinition( | |
| id=str(uuid.uuid4()), | |
| name=request.name, | |
| description=request.description, | |
| steps=request.steps, | |
| metadata=request.metadata | |
| ) | |
| platform.create_workflow(workflow) | |
| return { | |
| "id": workflow.id, | |
| "name": workflow.name, | |
| "description": workflow.description, | |
| "status": "created" | |
| } | |
| except Exception as e: | |
| logger.error(f"Workflow creation failed: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def execute_workflow( | |
| workflow_id: str, | |
| request: WorkflowExecutionRequest, | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token) | |
| ): | |
| """Execute a workflow""" | |
| try: | |
| execution = WorkflowExecution( | |
| id=str(uuid.uuid4()), | |
| workflow_id=workflow_id, | |
| parameters=request.parameters, | |
| metadata=request.metadata | |
| ) | |
| platform.execute_workflow(execution) | |
| return { | |
| "execution_id": execution.id, | |
| "workflow_id": workflow_id, | |
| "status": execution.status | |
| } | |
| except Exception as e: | |
| logger.error(f"Workflow execution failed: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| # Performance Monitoring | |
| async def get_performance_metrics( | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token), | |
| agent_id: Optional[str] = Query(None, description="Filter by agent ID"), | |
| time_range: Optional[str] = Query("1h", description="Time range for metrics") | |
| ): | |
| """Get performance metrics""" | |
| try: | |
| metrics = [] | |
| for agent in platform.agents.values(): | |
| if agent_id and agent.id != agent_id: | |
| continue | |
| agent_metrics = platform.get_agent_metrics(agent.id) | |
| if agent_metrics: | |
| metrics.append(PerformanceMetricsResponse( | |
| agent_id=agent.id, | |
| metrics=agent_metrics, | |
| timestamp=datetime.utcnow() | |
| )) | |
| return metrics | |
| except Exception as e: | |
| logger.error(f"Failed to get performance metrics: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # WebSocket endpoint for real-time updates | |
| async def websocket_endpoint(websocket: WebSocket, client_id: str): | |
| """WebSocket endpoint for real-time monitoring""" | |
| await manager.connect(websocket, client_id) | |
| try: | |
| while True: | |
| # Keep connection alive and handle incoming messages | |
| data = await websocket.receive_text() | |
| message = json.loads(data) | |
| # Handle subscription requests | |
| if message.get("type") == "subscribe": | |
| subscriptions = message.get("subscriptions", []) | |
| manager.connection_metadata[websocket]["subscriptions"].update(subscriptions) | |
| await manager.send_personal_message( | |
| json.dumps({ | |
| "type": "subscription_confirmed", | |
| "subscriptions": list(subscriptions) | |
| }), | |
| websocket | |
| ) | |
| # Handle unsubscribe requests | |
| elif message.get("type") == "unsubscribe": | |
| subscriptions = message.get("subscriptions", []) | |
| manager.connection_metadata[websocket]["subscriptions"].difference_update(subscriptions) | |
| await manager.send_personal_message( | |
| json.dumps({ | |
| "type": "unsubscription_confirmed", | |
| "subscriptions": list(subscriptions) | |
| }), | |
| websocket | |
| ) | |
| except WebSocketDisconnect: | |
| manager.disconnect(websocket) | |
| except Exception as e: | |
| logger.error(f"WebSocket error: {e}") | |
| manager.disconnect(websocket) | |
| # Dashboard endpoints | |
| async def get_dashboard_summary( | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token) | |
| ): | |
| """Get dashboard summary data""" | |
| try: | |
| return { | |
| "agents": { | |
| "total": len(platform.agents), | |
| "active": len([a for a in platform.agents.values() if a.status == "active"]), | |
| "idle": len([a for a in platform.agents.values() if a.status == "idle"]), | |
| "busy": len([a for a in platform.agents.values() if a.status == "busy"]) | |
| }, | |
| "tasks": { | |
| "total": len(platform.tasks), | |
| "pending": len([t for t in platform.tasks.values() if t.status == "pending"]), | |
| "in_progress": len([t for t in platform.tasks.values() if t.status == "in_progress"]), | |
| "completed": len([t for t in platform.tasks.values() if t.status == "completed"]), | |
| "failed": len([t for t in platform.tasks.values() if t.status == "failed"]) | |
| }, | |
| "resources": { | |
| "total": len(platform.resources), | |
| "available": len([r for r in platform.resources.values() if r.status == "available"]), | |
| "in_use": len([r for r in platform.resources.values() if r.status == "in_use"]) | |
| }, | |
| "conflicts": { | |
| "total": len(platform.conflicts), | |
| "open": len([c for c in platform.conflicts.values() if c.status == "open"]), | |
| "resolved": len([c for c in platform.conflicts.values() if c.status == "resolved"]) | |
| }, | |
| "websocket_connections": manager.get_connection_info()["active_connections"] | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get dashboard summary: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_dashboard_activity( | |
| platform: MultiAgentPlatform = Depends(get_platform), | |
| token: str = Depends(verify_token), | |
| limit: int = Query(50, description="Number of recent activities to return") | |
| ): | |
| """Get recent activity for dashboard""" | |
| try: | |
| # In a real implementation, you would store and retrieve activity logs | |
| # For now, return a mock activity feed | |
| activities = [] | |
| # Add recent agent activities | |
| for agent in list(platform.agents.values())[:10]: | |
| activities.append({ | |
| "type": "agent_activity", | |
| "timestamp": datetime.utcnow(), | |
| "data": { | |
| "agent_id": agent.id, | |
| "agent_name": agent.name, | |
| "status": agent.status | |
| } | |
| }) | |
| # Add recent task activities | |
| for task in list(platform.tasks.values())[:10]: | |
| activities.append({ | |
| "type": "task_activity", | |
| "timestamp": datetime.utcnow(), | |
| "data": { | |
| "task_id": task.id, | |
| "task_title": task.title, | |
| "status": task.status | |
| } | |
| }) | |
| # Sort by timestamp and return limited results | |
| activities.sort(key=lambda x: x["timestamp"], reverse=True) | |
| return activities[:limit] | |
| except Exception as e: | |
| logger.error(f"Failed to get dashboard activity: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Application lifecycle | |
| async def lifespan(app: FastAPI): | |
| """Application lifespan manager""" | |
| global platform, redis_client | |
| # Initialize Redis connection | |
| try: | |
| redis_client = aioredis.from_url("redis://localhost:6379", encoding="utf-8", decode_responses=True) | |
| await redis_client.ping() | |
| logger.info("Redis connection established") | |
| except Exception as e: | |
| logger.warning(f"Redis connection failed: {e}") | |
| redis_client = None | |
| # Initialize platform with proper configuration | |
| try: | |
| # Configure platform | |
| platform_config = PlatformConfig( | |
| max_concurrent_tasks=100, | |
| enable_marketplace=True, | |
| enable_dashboard=True, | |
| enable_performance_tracking=True, | |
| enable_conflict_resolution=True, | |
| storage_backend="redis" if redis_client else "memory", | |
| log_level="INFO" | |
| ) | |
| # Create and initialize platform | |
| platform = MultiAgentPlatform(platform_config) | |
| await platform.start() | |
| # Register default agents | |
| await _register_default_agents(platform) | |
| logger.info("Multi-Agent Platform initialized and started successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize platform: {e}") | |
| raise | |
| yield | |
| # Cleanup | |
| if platform: | |
| await platform.stop() | |
| logger.info("Platform stopped") | |
| if redis_client: | |
| await redis_client.close() | |
| logger.info("Redis connection closed") | |
| async def _register_default_agents(platform: MultiAgentPlatform): | |
| """Register default agents with the platform""" | |
| try: | |
| # Create and register FSM React Agent | |
| fsm_agent = FSMReactAgentImpl() | |
| await fsm_agent.initialize() | |
| await platform.register_agent(fsm_agent, fsm_agent.metadata) | |
| logger.info("FSM React Agent registered") | |
| # Create and register Next Gen Agent | |
| next_gen_agent = NextGenAgentImpl() | |
| await next_gen_agent.initialize() | |
| await platform.register_agent(next_gen_agent, next_gen_agent.metadata) | |
| logger.info("Next Gen Agent registered") | |
| # Create and register Crew Agent | |
| crew_agent = CrewAgentImpl() | |
| await crew_agent.initialize() | |
| await platform.register_agent(crew_agent, crew_agent.metadata) | |
| logger.info("Crew Agent registered") | |
| # Create and register specialized agents for common domains | |
| domains = ["data_analysis", "code_generation", "research", "creative"] | |
| for domain in domains: | |
| specialized_agent = SpecializedAgentImpl(domain) | |
| await specialized_agent.initialize() | |
| await platform.register_agent(specialized_agent, specialized_agent.metadata) | |
| logger.info(f"Specialized {domain} Agent registered") | |
| logger.info(f"Total agents registered: {len(platform.agents)}") | |
| except Exception as e: | |
| logger.error(f"Failed to register default agents: {e}") | |
| raise | |
| # Create FastAPI application | |
| app = FastAPI( | |
| title="Multi-Agent Platform API", | |
| description="REST API and WebSocket server for the unified multi-agent architecture", | |
| version="1.0.0", | |
| lifespan=lifespan | |
| ) | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Configure appropriately for production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Include API routes | |
| app.include_router(router) | |
| # Root endpoint | |
| async def root(): | |
| """Root endpoint with API information""" | |
| return { | |
| "message": "Multi-Agent Platform API Server", | |
| "version": "1.0.0", | |
| "docs": "/docs", | |
| "health": "/api/v1/health" | |
| } | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| "src.api_server:app", | |
| host="0.0.0.0", | |
| port=8000, | |
| reload=True, | |
| log_level="info" | |
| ) |