""" Multi-Agent Platform API Server and Deployment Configuration Complete FastAPI implementation with WebSocket support for real-time monitoring """ from fastapi import FastAPI, HTTPException, Depends, WebSocket, WebSocketDisconnect from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from pydantic import BaseModel, Field from typing import Dict, List, Optional, Any, Set import asyncio import json import uuid import os from datetime import datetime import logging # Import from the unified architecture and Enhanced FSM from src.enhanced_fsm import HierarchicalFSM, AtomicState, ProbabilisticTransition from src.migrated_enhanced_fsm_agent import MigratedEnhancedFSMAgent logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ============================= # Platform Core Classes # ============================= class AgentCapability: """Agent capabilities enumeration""" REASONING = "REASONING" COLLABORATION = "COLLABORATION" EXECUTION = "EXECUTION" ANALYSIS = "ANALYSIS" SYNTHESIS = "SYNTHESIS" ERROR_HANDLING = "ERROR_HANDLING" class AgentStatus: """Agent status enumeration""" AVAILABLE = "AVAILABLE" BUSY = "BUSY" IDLE = "IDLE" OFFLINE = "OFFLINE" ERROR = "ERROR" class AgentMetadata: """Agent metadata""" def __init__(self, agent_id: str, name: str, version: str, capabilities: List[str], tags: List[str] = None): self.agent_id = agent_id self.name = name self.version = version self.capabilities = capabilities self.tags = tags or [] self.status = AgentStatus.AVAILABLE self.reliability_score = 1.0 self.last_seen = datetime.now() class UnifiedTask: """Unified task representation""" def __init__(self, task_id: str, task_type: str, priority: int, payload: Dict[str, Any], required_capabilities: List[str], deadline: Optional[datetime] = None, dependencies: List[str] = None): self.task_id = task_id self.task_type = task_type self.priority = priority self.payload = payload self.required_capabilities = required_capabilities self.deadline = deadline self.dependencies = dependencies or [] self.status = "PENDING" self.created_at = datetime.now() self.completed_at = None self.result = None class ConflictType: """Conflict types""" RESOURCE_CONFLICT = "RESOURCE_CONFLICT" TASK_CONFLICT = "TASK_CONFLICT" AGENT_CONFLICT = "AGENT_CONFLICT" DATA_CONFLICT = "DATA_CONFLICT" class Conflict: """Conflict representation""" def __init__(self, conflict_id: str, conflict_type: str, involved_agents: List[str], description: str, context: Dict[str, Any]): self.conflict_id = conflict_id self.conflict_type = conflict_type self.involved_agents = involved_agents self.description = description self.context = context self.reported_at = datetime.now() self.resolved = False self.resolution = None class MarketplaceListing: """Marketplace listing""" def __init__(self, agent_id: str, metadata: AgentMetadata, description: str, pricing: Dict[str, float], keywords: List[str]): self.agent_id = agent_id self.metadata = metadata self.description = description self.pricing = pricing self.keywords = keywords self.ratings = [] self.total_usage = 0 self.average_rating = 0.0 def add_rating(self, rating: float, review: str, user_id: str): """Add a rating to the listing""" self.ratings.append({ 'rating': rating, 'review': review, 'user_id': user_id, 'timestamp': datetime.now() }) self.average_rating = sum(r['rating'] for r in self.ratings) / len(self.ratings) class AgentRegistry: """Agent registry for managing agent registrations""" def __init__(self): self.agents: Dict[str, AgentMetadata] = {} self.agent_instances: Dict[str, MigratedEnhancedFSMAgent] = {} async def register(self, agent_id: str, metadata: AgentMetadata, agent_instance: MigratedEnhancedFSMAgent) -> bool: """Register an agent""" self.agents[agent_id] = metadata self.agent_instances[agent_id] = agent_instance return True async def unregister(self, agent_id: str) -> bool: """Unregister an agent""" if agent_id in self.agents: del self.agents[agent_id] if agent_id in self.agent_instances: del self.agent_instances[agent_id] return True return False async def discover(self, capabilities: List[str] = None, tags: List[str] = None, status: str = None) -> List[AgentMetadata]: """Discover agents based on criteria""" agents = list(self.agents.values()) if capabilities: agents = [a for a in agents if any(cap in a.capabilities for cap in capabilities)] if tags: agents = [a for a in agents if any(tag in a.tags for tag in tags)] if status: agents = [a for a in agents if a.status == status] return agents class TaskManager: """Task manager for handling task execution""" def __init__(self): self.tasks: Dict[str, UnifiedTask] = {} self.task_queue: List[str] = [] async def submit_task(self, task: UnifiedTask) -> str: """Submit a task for execution""" self.tasks[task.task_id] = task self.task_queue.append(task.task_id) return task.task_id async def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]: """Get task status""" if task_id not in self.tasks: return None task = self.tasks[task_id] return { "task_id": task.task_id, "status": task.status, "created_at": task.created_at.isoformat(), "completed_at": task.completed_at.isoformat() if task.completed_at else None, "result": task.result } class ResourceManager: """Resource manager for tracking resource utilization""" def __init__(self): self.allocated_resources: Dict[str, Dict[str, float]] = {} self.total_resources = { "cpu_cores": 100.0, "memory_mb": 102400.0, "gpu_memory_mb": 51200.0 } async def allocate_resources(self, agent_id: str, resources: Dict[str, float]) -> bool: """Allocate resources to an agent""" # Check if resources are available for resource, amount in resources.items(): if resource in self.total_resources: allocated = sum( agent_resources.get(resource, 0) for agent_resources in self.allocated_resources.values() ) if allocated + amount > self.total_resources[resource]: return False self.allocated_resources[agent_id] = resources return True async def release_resources(self, agent_id: str): """Release resources from an agent""" if agent_id in self.allocated_resources: del self.allocated_resources[agent_id] def get_resource_utilization(self) -> Dict[str, float]: """Get current resource utilization""" utilization = {} for resource, total in self.total_resources.items(): allocated = sum( agent_resources.get(resource, 0) for agent_resources in self.allocated_resources.values() ) utilization[resource] = (allocated / total) * 100 return utilization class Marketplace: """Marketplace for agent discovery and rating""" def __init__(self): self.listings: Dict[str, MarketplaceListing] = {} async def publish_agent(self, agent_id: str, description: str, pricing: Dict[str, float], keywords: List[str]) -> bool: """Publish an agent to marketplace""" # This would typically fetch agent metadata from registry metadata = AgentMetadata(agent_id, f"Agent_{agent_id}", "1.0.0", [AgentCapability.REASONING], ["marketplace"]) listing = MarketplaceListing(agent_id, metadata, description, pricing, keywords) self.listings[agent_id] = listing return True async def search_agents(self, query: str, min_rating: float = 0.0, max_price: Optional[float] = None) -> List[MarketplaceListing]: """Search for agents in marketplace""" results = [] for listing in self.listings.values(): # Simple search implementation if (query.lower() in listing.description.lower() or query.lower() in listing.metadata.name.lower() or any(query.lower() in keyword.lower() for keyword in listing.keywords)): if listing.average_rating >= min_rating: if max_price is None or listing.pricing.get("per_task", 0) <= max_price: results.append(listing) return results class ConflictResolver: """Conflict resolution system""" def __init__(self): self.conflicts: Dict[str, Conflict] = {} async def report_conflict(self, conflict: Conflict) -> str: """Report a conflict for resolution""" self.conflicts[conflict.conflict_id] = conflict # Simple auto-resolution logic if conflict.conflict_type == ConflictType.RESOURCE_CONFLICT: conflict.resolved = True conflict.resolution = "Resources reallocated automatically" elif conflict.conflict_type == ConflictType.TASK_CONFLICT: conflict.resolved = True conflict.resolution = "Task priority adjusted" return conflict.conflict_id class Dashboard: """Dashboard for system monitoring""" def __init__(self, agent_registry: AgentRegistry, task_manager: TaskManager): self.agent_registry = agent_registry self.task_manager = task_manager async def get_system_overview(self) -> Dict[str, Any]: """Get system-wide overview""" agents = list(self.agent_registry.agents.values()) # Calculate agent breakdown by status status_breakdown = {} for agent in agents: status = agent.status status_breakdown[status] = status_breakdown.get(status, 0) + 1 # Calculate performance metrics completed_tasks = sum(1 for task in self.task_manager.tasks.values() if task.status == "COMPLETED") total_tasks = len(self.task_manager.tasks) success_rate = completed_tasks / total_tasks if total_tasks > 0 else 0.0 return { "total_agents": len(agents), "active_agents": sum(1 for a in agents if a.status == AgentStatus.AVAILABLE), "agent_breakdown": { "by_status": status_breakdown }, "performance_summary": { "total_tasks_completed": completed_tasks, "total_tasks": total_tasks, "overall_success_rate": success_rate } } async def get_agent_details(self, agent_id: str) -> Optional[Dict[str, Any]]: """Get detailed agent information""" if agent_id not in self.agent_registry.agents: return None agent = self.agent_registry.agents[agent_id] agent_instance = self.agent_registry.agent_instances.get(agent_id) details = { "agent_id": agent.agent_id, "name": agent.name, "version": agent.version, "capabilities": agent.capabilities, "tags": agent.tags, "status": agent.status, "reliability_score": agent.reliability_score, "last_seen": agent.last_seen.isoformat() } if agent_instance: # Add Enhanced FSM metrics fsm_metrics = agent_instance.get_metrics() details["fsm_metrics"] = fsm_metrics return details class MultiAgentPlatform: """Main platform orchestrator""" def __init__(self, redis_url: str = None): self.redis_url = redis_url self.agent_registry = AgentRegistry() self.task_manager = TaskManager() self.resource_manager = ResourceManager() self.marketplace = Marketplace() self.conflict_resolver = ConflictResolver() self.dashboard = Dashboard(self.agent_registry, self.task_manager) async def initialize(self): """Initialize the platform""" logger.info("Initializing Multi-Agent Platform") # Initialize Redis connection if needed # Initialize other components async def shutdown(self): """Shutdown the platform""" logger.info("Shutting down Multi-Agent Platform") # Cleanup resources async def register_agent(self, agent_instance: MigratedEnhancedFSMAgent, metadata: AgentMetadata, resources: Dict[str, float]) -> bool: """Register an agent with the platform""" # Allocate resources if not await self.resource_manager.allocate_resources(metadata.agent_id, resources): return False # Register agent return await self.agent_registry.register(metadata.agent_id, metadata, agent_instance) async def submit_task(self, task: UnifiedTask) -> str: """Submit a task to the platform""" return await self.task_manager.submit_task(task) async def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]: """Get task status""" return await self.task_manager.get_task_status(task_id) # ============================= # API Models # ============================= class AgentRegistrationRequest(BaseModel): """Request model for agent registration""" name: str version: str capabilities: List[str] tags: List[str] = Field(default_factory=list) resources: Dict[str, float] = Field(default_factory=dict) description: Optional[str] = None class TaskSubmissionRequest(BaseModel): """Request model for task submission""" task_type: str priority: int = Field(default=5, ge=1, le=10) payload: Dict[str, Any] required_capabilities: List[str] deadline: Optional[datetime] = None dependencies: List[str] = Field(default_factory=list) class MarketplaceListingRequest(BaseModel): """Request model for marketplace listing""" agent_id: str description: str pricing: Dict[str, float] keywords: List[str] class ConflictReportRequest(BaseModel): """Request model for conflict reporting""" conflict_type: str involved_agents: List[str] description: str context: Dict[str, Any] class AgentSearchRequest(BaseModel): """Request model for agent search""" query: str min_rating: float = Field(default=0.0, ge=0.0, le=5.0) max_price: Optional[float] = None capabilities: Optional[List[str]] = None tags: Optional[List[str]] = None # ============================= # API Server # ============================= app = FastAPI( title="Multi-Agent Collaboration Platform API", description="API for managing and orchestrating AI agents with Enhanced FSM", version="2.0.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Security security = HTTPBearer() # Global platform instance platform: Optional[MultiAgentPlatform] = None # WebSocket connections for real-time updates websocket_connections: Set[WebSocket] = set() # ============================= # Lifecycle Events # ============================= @app.on_event("startup") async def startup_event(): """Initialize the platform on startup""" global platform redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") platform = MultiAgentPlatform(redis_url=redis_url) await platform.initialize() logger.info("Multi-Agent Platform API started") @app.on_event("shutdown") async def shutdown_event(): """Cleanup on shutdown""" global platform if platform: await platform.shutdown() # Close all WebSocket connections for websocket in websocket_connections.copy(): await websocket.close() logger.info("Multi-Agent Platform API shut down") # ============================= # Authentication # ============================= async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): """Verify API token""" # Simplified - implement proper authentication if credentials.credentials != "valid_token": raise HTTPException(status_code=401, detail="Invalid token") return credentials.credentials # ============================= # Agent Management Endpoints # ============================= @app.post("/api/v2/agents/register", response_model=Dict[str, str]) async def register_agent( request: AgentRegistrationRequest, token: str = Depends(verify_token) ): """Register a new agent with the platform""" try: # Create Enhanced FSM agent instance agent_id = f"agent_{uuid.uuid4().hex[:8]}" # Create mock tools for the agent class MockTool: def __init__(self, name): self.name = name self.tool_name = name mock_tools = [MockTool("search"), MockTool("calculator"), MockTool("database")] agent = MigratedEnhancedFSMAgent( tools=mock_tools, enable_hierarchical=True, enable_probabilistic=True, enable_discovery=True, enable_metrics=True, fsm_name=f"Agent_{agent_id}" ) # Create metadata metadata = AgentMetadata( agent_id=agent_id, name=request.name, version=request.version, capabilities=request.capabilities, tags=request.tags ) # Register with platform success = await platform.register_agent(agent, metadata, request.resources) if not success: raise HTTPException(status_code=400, detail="Failed to register agent") # Broadcast update await broadcast_update({ "event": "agent_registered", "agent_id": agent_id, "name": request.name }) return { "agent_id": agent_id, "status": "registered", "message": f"Agent {request.name} successfully registered" } except Exception as e: logger.error(f"Agent registration failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v2/agents", response_model=List[Dict[str, Any]]) async def list_agents( status: Optional[str] = None, capability: Optional[str] = None, tag: Optional[str] = None, token: str = Depends(verify_token) ): """List all registered agents with optional filters""" try: # Parse filters status_filter = status if status else None capability_filter = [capability] if capability else None tag_filter = [tag] if tag else None # Discover agents agents = await platform.agent_registry.discover( capabilities=capability_filter, tags=tag_filter, status=status_filter ) # Format response return [ { "agent_id": agent.agent_id, "name": agent.name, "version": agent.version, "capabilities": agent.capabilities, "tags": agent.tags, "status": agent.status, "reliability_score": agent.reliability_score, "last_seen": agent.last_seen.isoformat() } for agent in agents ] except Exception as e: logger.error(f"Failed to list agents: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v2/agents/{agent_id}", response_model=Dict[str, Any]) async def get_agent_details( agent_id: str, token: str = Depends(verify_token) ): """Get detailed information about a specific agent""" try: details = await platform.dashboard.get_agent_details(agent_id) if not details: raise HTTPException(status_code=404, detail="Agent not found") return details except HTTPException: raise except Exception as e: logger.error(f"Failed to get agent details: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/v2/agents/{agent_id}") async def unregister_agent( agent_id: str, token: str = Depends(verify_token) ): """Unregister an agent from the platform""" try: success = await platform.agent_registry.unregister(agent_id) if not success: raise HTTPException(status_code=404, detail="Agent not found") # Release resources await platform.resource_manager.release_resources(agent_id) # Broadcast update await broadcast_update({ "event": "agent_unregistered", "agent_id": agent_id }) return {"status": "unregistered", "agent_id": agent_id} except HTTPException: raise except Exception as e: logger.error(f"Failed to unregister agent: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # ============================= # Task Management Endpoints # ============================= @app.post("/api/v2/tasks/submit", response_model=Dict[str, str]) async def submit_task( request: TaskSubmissionRequest, token: str = Depends(verify_token) ): """Submit a task to the platform""" try: # Create task task = UnifiedTask( task_id=str(uuid.uuid4()), task_type=request.task_type, priority=request.priority, payload=request.payload, required_capabilities=request.required_capabilities, deadline=request.deadline, dependencies=request.dependencies ) # Submit to platform task_id = await platform.submit_task(task) # Broadcast update await broadcast_update({ "event": "task_submitted", "task_id": task_id, "task_type": request.task_type }) return { "task_id": task_id, "status": "submitted", "message": "Task successfully submitted for execution" } except Exception as e: logger.error(f"Task submission failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v2/tasks/{task_id}", response_model=Dict[str, Any]) async def get_task_status( task_id: str, token: str = Depends(verify_token) ): """Get the status of a submitted task""" try: status = await platform.get_task_status(task_id) if not status: raise HTTPException(status_code=404, detail="Task not found") return status except HTTPException: raise except Exception as e: logger.error(f"Failed to get task status: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v2/tasks/batch", response_model=List[Dict[str, str]]) async def submit_batch_tasks( tasks: List[TaskSubmissionRequest], token: str = Depends(verify_token) ): """Submit multiple tasks as a batch""" try: results = [] for task_request in tasks: task = UnifiedTask( task_id=str(uuid.uuid4()), task_type=task_request.task_type, priority=task_request.priority, payload=task_request.payload, required_capabilities=task_request.required_capabilities, deadline=task_request.deadline, dependencies=task_request.dependencies ) task_id = await platform.submit_task(task) results.append({ "task_id": task_id, "status": "submitted" }) return results except Exception as e: logger.error(f"Batch task submission failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # ============================= # Marketplace Endpoints # ============================= @app.post("/api/v2/marketplace/publish", response_model=Dict[str, str]) async def publish_to_marketplace( request: MarketplaceListingRequest, token: str = Depends(verify_token) ): """Publish an agent to the marketplace""" try: success = await platform.marketplace.publish_agent( request.agent_id, request.description, request.pricing, request.keywords ) if not success: raise HTTPException(status_code=400, detail="Failed to publish agent") return { "status": "published", "agent_id": request.agent_id, "message": "Agent successfully published to marketplace" } except HTTPException: raise except Exception as e: logger.error(f"Marketplace publishing failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v2/marketplace/search", response_model=List[Dict[str, Any]]) async def search_marketplace( request: AgentSearchRequest, token: str = Depends(verify_token) ): """Search for agents in the marketplace""" try: listings = await platform.marketplace.search_agents( request.query, request.min_rating, request.max_price ) return [ { "agent_id": listing.agent_id, "name": listing.metadata.name, "description": listing.description, "capabilities": listing.metadata.capabilities, "pricing": listing.pricing, "average_rating": listing.average_rating, "total_usage": listing.total_usage } for listing in listings ] except Exception as e: logger.error(f"Marketplace search failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v2/marketplace/rate/{agent_id}") async def rate_agent( agent_id: str, rating: float = Field(..., ge=1.0, le=5.0), review: Optional[str] = None, token: str = Depends(verify_token) ): """Rate an agent in the marketplace""" try: listing = platform.marketplace.listings.get(agent_id) if not listing: raise HTTPException(status_code=404, detail="Agent not found in marketplace") listing.add_rating(rating, review, "anonymous") # Would use actual user ID return { "status": "rated", "agent_id": agent_id, "new_average_rating": listing.average_rating } except HTTPException: raise except Exception as e: logger.error(f"Agent rating failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # ============================= # Monitoring & Analytics Endpoints # ============================= @app.get("/api/v2/dashboard/overview", response_model=Dict[str, Any]) async def get_system_overview(token: str = Depends(verify_token)): """Get system-wide metrics and overview""" try: overview = await platform.dashboard.get_system_overview() return overview except Exception as e: logger.error(f"Failed to get system overview: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/v2/resources/utilization", response_model=Dict[str, float]) async def get_resource_utilization(token: str = Depends(verify_token)): """Get current resource utilization""" try: utilization = platform.resource_manager.get_resource_utilization() return { resource: percentage for resource, percentage in utilization.items() } except Exception as e: logger.error(f"Failed to get resource utilization: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v2/conflicts/report", response_model=Dict[str, str]) async def report_conflict( request: ConflictReportRequest, token: str = Depends(verify_token) ): """Report a conflict for resolution""" try: conflict = Conflict( conflict_id=str(uuid.uuid4()), conflict_type=request.conflict_type, involved_agents=request.involved_agents, description=request.description, context=request.context ) conflict_id = await platform.conflict_resolver.report_conflict(conflict) return { "conflict_id": conflict_id, "status": "reported", "resolution": conflict.resolution if conflict.resolved else None } except Exception as e: logger.error(f"Conflict reporting failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # ============================= # WebSocket Endpoints # ============================= @app.websocket("/ws/dashboard") async def dashboard_websocket(websocket: WebSocket): """WebSocket endpoint for real-time dashboard updates""" await websocket.accept() websocket_connections.add(websocket) try: # Send initial system overview overview = await platform.dashboard.get_system_overview() await websocket.send_json({"type": "overview", "data": overview}) # Keep connection alive and send periodic updates while True: # Wait for messages or send periodic updates try: message = await asyncio.wait_for(websocket.receive_text(), timeout=30.0) # Handle client messages if needed except asyncio.TimeoutError: # Send periodic update overview = await platform.dashboard.get_system_overview() await websocket.send_json({"type": "overview", "data": overview}) except WebSocketDisconnect: websocket_connections.remove(websocket) except Exception as e: logger.error(f"WebSocket error: {str(e)}") websocket_connections.discard(websocket) async def broadcast_update(update: Dict[str, Any]): """Broadcast update to all connected WebSocket clients""" disconnected = set() for websocket in websocket_connections: try: await websocket.send_json({"type": "update", "data": update}) except: disconnected.add(websocket) # Remove disconnected clients websocket_connections.difference_update(disconnected) # ============================= # HTML Dashboard # ============================= @app.get("/dashboard", response_class=HTMLResponse) async def dashboard(): """Serve the web dashboard""" return """