File size: 6,044 Bytes
1a4aa87 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | """
Worker Schemas for Distributed Worker Coordination
Defines Pydantic schemas for worker registration, heartbeat, and status management.
"""
import uuid
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class WorkerStatus(str, Enum):
"""Worker status enumeration."""
REGISTERED = "registered"
ACTIVE = "active"
DEGRADED = "degraded"
OFFLINE = "offline"
class GPUInfo(BaseModel):
"""GPU information for a worker."""
gpu_index: int = Field(description="GPU index")
gpu_name: str = Field(description="GPU name")
memory_total_mb: int = Field(description="Total memory in MB")
memory_used_mb: int = Field(description="Used memory in MB")
utilization_percent: float = Field(default=0.0, description="GPU utilization %")
class WorkerRegistrationRequest(BaseModel):
"""Request model for worker registration."""
hostname: str = Field(description="Worker hostname")
gpu_count: int = Field(default=0, description="Number of GPUs")
gpu_info: Optional[List[GPUInfo]] = Field(default=None, description="GPU information")
gpu_memory_total: int = Field(default=0, description="Total GPU memory in MB")
max_concurrent_jobs: int = Field(default=1, description="Max concurrent jobs")
capabilities: Optional[Dict[str, Any]] = Field(
default=None,
description="Worker capabilities (supported models, etc.)"
)
worker_metadata: Optional[Dict[str, Any]] = Field(
default=None,
description="Additional worker metadata"
)
class WorkerRegistrationResponse(BaseModel):
"""Response model for worker registration."""
worker_id: str = Field(description="Assigned worker ID")
status: WorkerStatus = Field(description="Worker status")
registered_at: datetime = Field(description="Registration timestamp")
heartbeat_interval: int = Field(
default=30,
description="Heartbeat interval in seconds"
)
heartbeat_timeout: int = Field(
default=120,
description="Heartbeat timeout in seconds (worker marked offline after this)"
)
class HeartbeatRequest(BaseModel):
"""Request model for worker heartbeat."""
worker_id: str = Field(description="Worker ID")
gpu_usage: Optional[float] = Field(
default=None,
description="Current GPU usage percentage"
)
gpu_memory_used: Optional[int] = Field(
default=None,
description="Current GPU memory used in MB"
)
active_jobs: Optional[int] = Field(
default=None,
description="Number of active jobs"
)
gpu_info: Optional[List[GPUInfo]] = Field(
default=None,
description="Updated GPU information"
)
status: Optional[WorkerStatus] = Field(
default=None,
description="Worker status (optional override)"
)
class HeartbeatResponse(BaseModel):
"""Response model for worker heartbeat."""
worker_id: str = Field(description="Worker ID")
status: WorkerStatus = Field(description="Worker status")
timestamp: datetime = Field(description="Heartbeat timestamp")
assigned_jobs: int = Field(description="Number of assigned jobs")
class WorkerStatusResponse(BaseModel):
"""Response model for worker status."""
worker_id: str = Field(description="Worker ID")
hostname: str = Field(description="Worker hostname")
status: WorkerStatus = Field(description="Worker status")
gpu_count: int = Field(description="Number of GPUs")
gpu_memory_total: int = Field(description="Total GPU memory in MB")
gpu_memory_used: int = Field(description="Used GPU memory in MB")
gpu_usage_percent: float = Field(description="GPU usage percentage")
active_jobs: int = Field(description="Number of active jobs")
max_concurrent_jobs: int = Field(description="Max concurrent jobs")
load_factor: float = Field(
description="Load factor (active_jobs / max_concurrent_jobs)"
)
last_heartbeat: datetime = Field(description="Last heartbeat timestamp")
registered_at: datetime = Field(description="Registration timestamp")
capabilities: Optional[Dict[str, Any]] = Field(
default=None,
description="Worker capabilities"
)
uptime_seconds: int = Field(description="Worker uptime in seconds")
class WorkerListResponse(BaseModel):
"""Response model for listing workers."""
workers: List[WorkerStatusResponse] = Field(description="List of workers")
total: int = Field(description="Total number of workers")
active: int = Field(description="Number of active workers")
offline: int = Field(description="Number of offline workers")
class WorkerMetricsResponse(BaseModel):
"""Response model for worker cluster metrics."""
total_workers: int = Field(description="Total number of workers")
active_workers: int = Field(description="Number of active workers")
offline_workers: int = Field(description="Number of offline workers")
degraded_workers: int = Field(description="Number of degraded workers")
total_gpus: int = Field(description="Total number of GPUs")
total_gpu_memory_mb: int = Field(
description="Total GPU memory in MB"
)
used_gpu_memory_mb: int = Field(
description="Used GPU memory in MB"
)
total_active_jobs: int = Field(
description="Total number of active jobs across all workers"
)
average_load_factor: float = Field(
description="Average load factor across workers"
)
queue_length: int = Field(description="Number of pending jobs in queue")
__all__ = [
"WorkerStatus",
"GPUInfo",
"WorkerRegistrationRequest",
"WorkerRegistrationResponse",
"HeartbeatRequest",
"HeartbeatResponse",
"WorkerStatusResponse",
"WorkerListResponse",
"WorkerMetricsResponse",
]
|