aegislm / backend /queue /worker_schema.py
ACA050's picture
Upload 50 files
1a4aa87 verified
"""
Worker Schemas for Distributed Worker Coordination
Defines Pydantic schemas for worker registration, heartbeat, and status management.
"""
import uuid
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class WorkerStatus(str, Enum):
"""Worker status enumeration."""
REGISTERED = "registered"
ACTIVE = "active"
DEGRADED = "degraded"
OFFLINE = "offline"
class GPUInfo(BaseModel):
"""GPU information for a worker."""
gpu_index: int = Field(description="GPU index")
gpu_name: str = Field(description="GPU name")
memory_total_mb: int = Field(description="Total memory in MB")
memory_used_mb: int = Field(description="Used memory in MB")
utilization_percent: float = Field(default=0.0, description="GPU utilization %")
class WorkerRegistrationRequest(BaseModel):
"""Request model for worker registration."""
hostname: str = Field(description="Worker hostname")
gpu_count: int = Field(default=0, description="Number of GPUs")
gpu_info: Optional[List[GPUInfo]] = Field(default=None, description="GPU information")
gpu_memory_total: int = Field(default=0, description="Total GPU memory in MB")
max_concurrent_jobs: int = Field(default=1, description="Max concurrent jobs")
capabilities: Optional[Dict[str, Any]] = Field(
default=None,
description="Worker capabilities (supported models, etc.)"
)
worker_metadata: Optional[Dict[str, Any]] = Field(
default=None,
description="Additional worker metadata"
)
class WorkerRegistrationResponse(BaseModel):
"""Response model for worker registration."""
worker_id: str = Field(description="Assigned worker ID")
status: WorkerStatus = Field(description="Worker status")
registered_at: datetime = Field(description="Registration timestamp")
heartbeat_interval: int = Field(
default=30,
description="Heartbeat interval in seconds"
)
heartbeat_timeout: int = Field(
default=120,
description="Heartbeat timeout in seconds (worker marked offline after this)"
)
class HeartbeatRequest(BaseModel):
"""Request model for worker heartbeat."""
worker_id: str = Field(description="Worker ID")
gpu_usage: Optional[float] = Field(
default=None,
description="Current GPU usage percentage"
)
gpu_memory_used: Optional[int] = Field(
default=None,
description="Current GPU memory used in MB"
)
active_jobs: Optional[int] = Field(
default=None,
description="Number of active jobs"
)
gpu_info: Optional[List[GPUInfo]] = Field(
default=None,
description="Updated GPU information"
)
status: Optional[WorkerStatus] = Field(
default=None,
description="Worker status (optional override)"
)
class HeartbeatResponse(BaseModel):
"""Response model for worker heartbeat."""
worker_id: str = Field(description="Worker ID")
status: WorkerStatus = Field(description="Worker status")
timestamp: datetime = Field(description="Heartbeat timestamp")
assigned_jobs: int = Field(description="Number of assigned jobs")
class WorkerStatusResponse(BaseModel):
"""Response model for worker status."""
worker_id: str = Field(description="Worker ID")
hostname: str = Field(description="Worker hostname")
status: WorkerStatus = Field(description="Worker status")
gpu_count: int = Field(description="Number of GPUs")
gpu_memory_total: int = Field(description="Total GPU memory in MB")
gpu_memory_used: int = Field(description="Used GPU memory in MB")
gpu_usage_percent: float = Field(description="GPU usage percentage")
active_jobs: int = Field(description="Number of active jobs")
max_concurrent_jobs: int = Field(description="Max concurrent jobs")
load_factor: float = Field(
description="Load factor (active_jobs / max_concurrent_jobs)"
)
last_heartbeat: datetime = Field(description="Last heartbeat timestamp")
registered_at: datetime = Field(description="Registration timestamp")
capabilities: Optional[Dict[str, Any]] = Field(
default=None,
description="Worker capabilities"
)
uptime_seconds: int = Field(description="Worker uptime in seconds")
class WorkerListResponse(BaseModel):
"""Response model for listing workers."""
workers: List[WorkerStatusResponse] = Field(description="List of workers")
total: int = Field(description="Total number of workers")
active: int = Field(description="Number of active workers")
offline: int = Field(description="Number of offline workers")
class WorkerMetricsResponse(BaseModel):
"""Response model for worker cluster metrics."""
total_workers: int = Field(description="Total number of workers")
active_workers: int = Field(description="Number of active workers")
offline_workers: int = Field(description="Number of offline workers")
degraded_workers: int = Field(description="Number of degraded workers")
total_gpus: int = Field(description="Total number of GPUs")
total_gpu_memory_mb: int = Field(
description="Total GPU memory in MB"
)
used_gpu_memory_mb: int = Field(
description="Used GPU memory in MB"
)
total_active_jobs: int = Field(
description="Total number of active jobs across all workers"
)
average_load_factor: float = Field(
description="Average load factor across workers"
)
queue_length: int = Field(description="Number of pending jobs in queue")
__all__ = [
"WorkerStatus",
"GPUInfo",
"WorkerRegistrationRequest",
"WorkerRegistrationResponse",
"HeartbeatRequest",
"HeartbeatResponse",
"WorkerStatusResponse",
"WorkerListResponse",
"WorkerMetricsResponse",
]