""" GPU-Aware Job Scheduler with Least-Load Assignment and Tenant Fairness Provides intelligent job scheduling with: - GPU affinity management - Least-load worker selection - Tenant fairness (preventing starvation) - Atomic job claiming - Fault tolerance """ import uuid from datetime import datetime from typing import Dict, List, Optional from sqlalchemy import select, update, func from backend.db.models import Worker, EvaluationRun from backend.db.session import get_db_context from backend.logging.logger import get_logger from .job_schema import ( GPURequirement, JobStatus, EvaluationJob, ) from .worker_registry import get_worker_registry, DEFAULT_HEARTBEAT_TIMEOUT from .worker_schema import WorkerStatus logger = get_logger("queue.scheduler", component="queue") class JobScheduler: """ GPU-aware job scheduler with least-load assignment and tenant fairness. Responsibilities: - GPU affinity management - Least-load worker selection - Tenant fairness (weighted scheduling) - Atomic job claiming - Job assignment with capacity checking """ def __init__(self): self._worker_registry = get_worker_registry() async def get_tenant_active_job_count(self, tenant_id: uuid.UUID) -> int: """ Get the number of active jobs for a tenant. Args: tenant_id: The tenant ID Returns: Number of active jobs (pending or running) """ try: async with get_db_context() as session: query = select(func.count(EvaluationRun.id)).where( EvaluationRun.tenant_id == tenant_id, EvaluationRun.status.in_(["pending", "running"]) ) result = await session.execute(query) return result.scalar() or 0 except Exception as e: logger.error( "Failed to get tenant active job count", tenant_id=str(tenant_id), error=str(e), ) return 0 async def get_all_tenant_job_counts(self) -> Dict[uuid.UUID, int]: """ Get active job counts for all tenants. Returns: Dictionary mapping tenant_id to active job count """ try: async with get_db_context() as session: query = ( select(EvaluationRun.tenant_id, func.count(EvaluationRun.id)) .where(EvaluationRun.status.in_(["pending", "running"])) .group_by(EvaluationRun.tenant_id) ) result = await session.execute(query) return {row[0]: row[1] for row in result.all()} except Exception as e: logger.error( "Failed to get all tenant job counts", error=str(e), ) return {} def calculate_tenant_priority(self, tenant_id: uuid.UUID, tenant_job_counts: Dict[uuid.UUID, int]) -> float: """ Calculate priority for a tenant based on job count. Priority = 1 / (active_jobs_per_tenant + 1) This gives higher priority to tenants with fewer active jobs, preventing starvation. Args: tenant_id: The tenant ID tenant_job_counts: Dictionary of tenant job counts Returns: Priority score (higher is better) """ job_count = tenant_job_counts.get(tenant_id, 0) # Add 1 to avoid division by zero and give new tenants highest priority return 1.0 / (job_count + 1) async def get_pending_jobs_with_tenant_fairness( self, jobs: List[EvaluationJob], ) -> List[EvaluationJob]: """ Sort pending jobs by tenant fairness priority. Jobs from tenants with fewer active jobs get higher priority. Args: jobs: List of pending jobs Returns: Sorted list of jobs """ if not jobs: return jobs # Get active job counts for all tenants tenant_job_counts = await self.get_all_tenant_job_counts() # Sort by tenant priority (highest priority first) def get_priority(job: EvaluationJob) -> float: if hasattr(job, 'tenant_id') and job.tenant_id: return self.calculate_tenant_priority(job.tenant_id, tenant_job_counts) return 0.0 # Jobs without tenant get lowest priority return sorted(jobs, key=get_priority, reverse=True) async def assign_job_to_worker( self, job: EvaluationJob, ) -> Optional[str]: """ Assign a job to the best available worker using least-load strategy. The algorithm: 1. Filter workers by GPU requirement 2. Filter workers by status (ACTIVE or DEGRADED) 3. Filter workers with capacity (active_jobs < max_concurrent_jobs) 4. Sort by load factor (active_jobs / max_concurrent_jobs) 5. Select the worker with lowest load factor Args: job: The job to assign Returns: Worker ID if assigned, None if no suitable worker found """ try: # Determine GPU requirement from job gpu_required = self._get_gpu_requirement(job) # Get available workers available_workers = await self._worker_registry.get_available_workers( gpu_required=gpu_required ) if not available_workers: logger.warning( "No available workers for job", job_id=str(job.job_id), gpu_required=gpu_required, ) return None # Select worker with least load selected_worker = None min_load = float('inf') for worker in available_workers: # Calculate load factor if worker.max_concurrent_jobs > 0: load_factor = worker.active_jobs / worker.max_concurrent_jobs else: load_factor = float('inf') # Check GPU capacity if GPU required if gpu_required > 0: # Check if worker has enough free GPU memory free_gpu_memory = worker.gpu_memory_total - worker.gpu_memory_used if free_gpu_memory < 4000: # Require at least 4GB free per job continue if load_factor < min_load: min_load = load_factor selected_worker = worker if selected_worker is None: logger.warning( "No worker with sufficient capacity", job_id=str(job.job_id), ) return None # Atomically claim the job worker_id = await self._claim_job_for_worker( job_id=job.job_id, worker_id=selected_worker.worker_id, ) if worker_id: logger.info( "Job assigned to worker", job_id=str(job.job_id), worker_id=worker_id, load_factor=min_load, ) return worker_id except Exception as e: logger.error( "Failed to assign job to worker", job_id=str(job.job_id), error=str(e), ) return None async def _claim_job_for_worker( self, job_id: uuid.UUID, worker_id: str, ) -> Optional[str]: """ Atomically claim a job for a worker. Uses atomic UPDATE to prevent duplicate job execution. Args: job_id: Job ID worker_id: Worker ID Returns: Worker ID if claimed successfully, None if already claimed """ try: from backend.queue.producer import _job_queue # Find the job in the queue job = None for j in _job_queue: if j.job_id == job_id: job = j break if job is None: logger.warning( "Job not found in queue", job_id=str(job_id), ) return None # Check if job is still queued (not already claimed) if job.status != JobStatus.QUEUED: logger.warning( "Job not in QUEUED status", job_id=str(job_id), status=job.status, ) return None # Atomically update job status and worker job.status = JobStatus.RUNNING job.worker_id = worker_id job.started_at = datetime.utcnow() # Update worker active jobs count async with get_db_context() as session: stmt = ( update(Worker) .where(Worker.worker_id == worker_id) .values(active_jobs=Worker.active_jobs + 1) ) await session.execute(stmt) await session.commit() logger.debug( "Job claimed atomically", job_id=str(job_id), worker_id=worker_id, ) return worker_id except Exception as e: logger.error( "Failed to claim job", job_id=str(job_id), worker_id=worker_id, error=str(e), ) return None def _get_gpu_requirement(self, job: EvaluationJob) -> int: """ Determine GPU requirement from job. Args: job: The job Returns: Number of GPUs required (0 for CPU-only) """ # Check job metadata for GPU requirement if job.metadata: gpu_req = job.metadata.get("gpu_requirement") if gpu_req is not None: return int(gpu_req) # Infer from job type job_type = job.metadata.get("job_type") if job_type == "benchmark": return 1 # Benchmark jobs typically need GPU elif job_type == "single_eval": return 0 # Single eval can run on CPU # Default to 1 GPU for benchmark jobs if hasattr(job, 'job_type') and job.job_type == "benchmark": return 1 return 0 async def release_job_from_worker( self, job_id: uuid.UUID, worker_id: str, ) -> bool: """ Release a job from a worker (job completed or failed). Args: job_id: Job ID worker_id: Worker ID Returns: True if released successfully """ try: # Update worker active jobs count async with get_db_context() as session: stmt = ( update(Worker) .where(Worker.worker_id == worker_id) .where(Worker.active_jobs > 0) .values(active_jobs=Worker.active_jobs - 1) ) await session.execute(stmt) await session.commit() logger.debug( "Job released from worker", job_id=str(job_id), worker_id=worker_id, ) return True except Exception as e: logger.error( "Failed to release job from worker", job_id=str(job_id), worker_id=worker_id, error=str(e), ) return False async def get_worker_for_job( self, gpu_required: int = 0, ) -> Optional[Worker]: """ Get the best worker for a job with given GPU requirements. Args: gpu_required: Number of GPUs required Returns: Best worker or None """ available_workers = await self._worker_registry.get_available_workers( gpu_required=gpu_required ) if not available_workers: return None return available_workers[0] # Already sorted by load factor async def check_gpu_capacity( self, worker_id: str, gpu_required: int, ) -> bool: """ Check if a worker has sufficient GPU capacity for a job. Args: worker_id: Worker ID gpu_required: GPUs required Returns: True if worker has sufficient capacity """ try: async with get_db_context() as session: stmt = select(Worker).where(Worker.worker_id == worker_id) result = await session.execute(stmt) worker = result.scalar_one_or_none() if worker is None: return False # Check GPU count if worker.gpu_count < gpu_required: return False # Check GPU memory free_memory = worker.gpu_memory_total - worker.gpu_memory_used required_memory = gpu_required * 4000 # 4GB per GPU minimum return free_memory >= required_memory except Exception as e: logger.error( "Failed to check GPU capacity", worker_id=worker_id, error=str(e), ) return False # Global instance _scheduler: Optional[JobScheduler] = None def get_job_scheduler() -> JobScheduler: """Get the global job scheduler instance.""" global _scheduler if _scheduler is None: _scheduler = JobScheduler() return _scheduler __all__ = [ "JobScheduler", "get_job_scheduler", ]