| """
|
| GPU-Aware Job Scheduler with Least-Load Assignment and Tenant Fairness
|
|
|
| Provides intelligent job scheduling with:
|
| - GPU affinity management
|
| - Least-load worker selection
|
| - Tenant fairness (preventing starvation)
|
| - Atomic job claiming
|
| - Fault tolerance
|
| """
|
|
|
| import uuid
|
| from datetime import datetime
|
| from typing import Dict, List, Optional
|
|
|
| from sqlalchemy import select, update, func
|
|
|
| from backend.db.models import Worker, EvaluationRun
|
| from backend.db.session import get_db_context
|
| from backend.logging.logger import get_logger
|
|
|
| from .job_schema import (
|
| GPURequirement,
|
| JobStatus,
|
| EvaluationJob,
|
| )
|
| from .worker_registry import get_worker_registry, DEFAULT_HEARTBEAT_TIMEOUT
|
| from .worker_schema import WorkerStatus
|
|
|
| logger = get_logger("queue.scheduler", component="queue")
|
|
|
|
|
| class JobScheduler:
|
| """
|
| GPU-aware job scheduler with least-load assignment and tenant fairness.
|
|
|
| Responsibilities:
|
| - GPU affinity management
|
| - Least-load worker selection
|
| - Tenant fairness (weighted scheduling)
|
| - Atomic job claiming
|
| - Job assignment with capacity checking
|
| """
|
|
|
| def __init__(self):
|
| self._worker_registry = get_worker_registry()
|
|
|
| async def get_tenant_active_job_count(self, tenant_id: uuid.UUID) -> int:
|
| """
|
| Get the number of active jobs for a tenant.
|
|
|
| Args:
|
| tenant_id: The tenant ID
|
|
|
| Returns:
|
| Number of active jobs (pending or running)
|
| """
|
| try:
|
| async with get_db_context() as session:
|
| query = select(func.count(EvaluationRun.id)).where(
|
| EvaluationRun.tenant_id == tenant_id,
|
| EvaluationRun.status.in_(["pending", "running"])
|
| )
|
| result = await session.execute(query)
|
| return result.scalar() or 0
|
| except Exception as e:
|
| logger.error(
|
| "Failed to get tenant active job count",
|
| tenant_id=str(tenant_id),
|
| error=str(e),
|
| )
|
| return 0
|
|
|
| async def get_all_tenant_job_counts(self) -> Dict[uuid.UUID, int]:
|
| """
|
| Get active job counts for all tenants.
|
|
|
| Returns:
|
| Dictionary mapping tenant_id to active job count
|
| """
|
| try:
|
| async with get_db_context() as session:
|
| query = (
|
| select(EvaluationRun.tenant_id, func.count(EvaluationRun.id))
|
| .where(EvaluationRun.status.in_(["pending", "running"]))
|
| .group_by(EvaluationRun.tenant_id)
|
| )
|
| result = await session.execute(query)
|
| return {row[0]: row[1] for row in result.all()}
|
| except Exception as e:
|
| logger.error(
|
| "Failed to get all tenant job counts",
|
| error=str(e),
|
| )
|
| return {}
|
|
|
| def calculate_tenant_priority(self, tenant_id: uuid.UUID, tenant_job_counts: Dict[uuid.UUID, int]) -> float:
|
| """
|
| Calculate priority for a tenant based on job count.
|
|
|
| Priority = 1 / (active_jobs_per_tenant + 1)
|
|
|
| This gives higher priority to tenants with fewer active jobs,
|
| preventing starvation.
|
|
|
| Args:
|
| tenant_id: The tenant ID
|
| tenant_job_counts: Dictionary of tenant job counts
|
|
|
| Returns:
|
| Priority score (higher is better)
|
| """
|
| job_count = tenant_job_counts.get(tenant_id, 0)
|
|
|
| return 1.0 / (job_count + 1)
|
|
|
| async def get_pending_jobs_with_tenant_fairness(
|
| self,
|
| jobs: List[EvaluationJob],
|
| ) -> List[EvaluationJob]:
|
| """
|
| Sort pending jobs by tenant fairness priority.
|
|
|
| Jobs from tenants with fewer active jobs get higher priority.
|
|
|
| Args:
|
| jobs: List of pending jobs
|
|
|
| Returns:
|
| Sorted list of jobs
|
| """
|
| if not jobs:
|
| return jobs
|
|
|
|
|
| tenant_job_counts = await self.get_all_tenant_job_counts()
|
|
|
|
|
| def get_priority(job: EvaluationJob) -> float:
|
| if hasattr(job, 'tenant_id') and job.tenant_id:
|
| return self.calculate_tenant_priority(job.tenant_id, tenant_job_counts)
|
| return 0.0
|
|
|
| return sorted(jobs, key=get_priority, reverse=True)
|
|
|
| async def assign_job_to_worker(
|
| self,
|
| job: EvaluationJob,
|
| ) -> Optional[str]:
|
| """
|
| Assign a job to the best available worker using least-load strategy.
|
|
|
| The algorithm:
|
| 1. Filter workers by GPU requirement
|
| 2. Filter workers by status (ACTIVE or DEGRADED)
|
| 3. Filter workers with capacity (active_jobs < max_concurrent_jobs)
|
| 4. Sort by load factor (active_jobs / max_concurrent_jobs)
|
| 5. Select the worker with lowest load factor
|
|
|
| Args:
|
| job: The job to assign
|
|
|
| Returns:
|
| Worker ID if assigned, None if no suitable worker found
|
| """
|
| try:
|
|
|
| gpu_required = self._get_gpu_requirement(job)
|
|
|
|
|
| available_workers = await self._worker_registry.get_available_workers(
|
| gpu_required=gpu_required
|
| )
|
|
|
| if not available_workers:
|
| logger.warning(
|
| "No available workers for job",
|
| job_id=str(job.job_id),
|
| gpu_required=gpu_required,
|
| )
|
| return None
|
|
|
|
|
| selected_worker = None
|
| min_load = float('inf')
|
|
|
| for worker in available_workers:
|
|
|
| if worker.max_concurrent_jobs > 0:
|
| load_factor = worker.active_jobs / worker.max_concurrent_jobs
|
| else:
|
| load_factor = float('inf')
|
|
|
|
|
| if gpu_required > 0:
|
|
|
| free_gpu_memory = worker.gpu_memory_total - worker.gpu_memory_used
|
| if free_gpu_memory < 4000:
|
| continue
|
|
|
| if load_factor < min_load:
|
| min_load = load_factor
|
| selected_worker = worker
|
|
|
| if selected_worker is None:
|
| logger.warning(
|
| "No worker with sufficient capacity",
|
| job_id=str(job.job_id),
|
| )
|
| return None
|
|
|
|
|
| worker_id = await self._claim_job_for_worker(
|
| job_id=job.job_id,
|
| worker_id=selected_worker.worker_id,
|
| )
|
|
|
| if worker_id:
|
| logger.info(
|
| "Job assigned to worker",
|
| job_id=str(job.job_id),
|
| worker_id=worker_id,
|
| load_factor=min_load,
|
| )
|
|
|
| return worker_id
|
|
|
| except Exception as e:
|
| logger.error(
|
| "Failed to assign job to worker",
|
| job_id=str(job.job_id),
|
| error=str(e),
|
| )
|
| return None
|
|
|
| async def _claim_job_for_worker(
|
| self,
|
| job_id: uuid.UUID,
|
| worker_id: str,
|
| ) -> Optional[str]:
|
| """
|
| Atomically claim a job for a worker.
|
|
|
| Uses atomic UPDATE to prevent duplicate job execution.
|
|
|
| Args:
|
| job_id: Job ID
|
| worker_id: Worker ID
|
|
|
| Returns:
|
| Worker ID if claimed successfully, None if already claimed
|
| """
|
| try:
|
| from backend.queue.producer import _job_queue
|
|
|
|
|
| job = None
|
| for j in _job_queue:
|
| if j.job_id == job_id:
|
| job = j
|
| break
|
|
|
| if job is None:
|
| logger.warning(
|
| "Job not found in queue",
|
| job_id=str(job_id),
|
| )
|
| return None
|
|
|
|
|
| if job.status != JobStatus.QUEUED:
|
| logger.warning(
|
| "Job not in QUEUED status",
|
| job_id=str(job_id),
|
| status=job.status,
|
| )
|
| return None
|
|
|
|
|
| job.status = JobStatus.RUNNING
|
| job.worker_id = worker_id
|
| job.started_at = datetime.utcnow()
|
|
|
|
|
| async with get_db_context() as session:
|
| stmt = (
|
| update(Worker)
|
| .where(Worker.worker_id == worker_id)
|
| .values(active_jobs=Worker.active_jobs + 1)
|
| )
|
| await session.execute(stmt)
|
| await session.commit()
|
|
|
| logger.debug(
|
| "Job claimed atomically",
|
| job_id=str(job_id),
|
| worker_id=worker_id,
|
| )
|
|
|
| return worker_id
|
|
|
| except Exception as e:
|
| logger.error(
|
| "Failed to claim job",
|
| job_id=str(job_id),
|
| worker_id=worker_id,
|
| error=str(e),
|
| )
|
| return None
|
|
|
| def _get_gpu_requirement(self, job: EvaluationJob) -> int:
|
| """
|
| Determine GPU requirement from job.
|
|
|
| Args:
|
| job: The job
|
|
|
| Returns:
|
| Number of GPUs required (0 for CPU-only)
|
| """
|
|
|
| if job.metadata:
|
| gpu_req = job.metadata.get("gpu_requirement")
|
| if gpu_req is not None:
|
| return int(gpu_req)
|
|
|
|
|
| job_type = job.metadata.get("job_type")
|
| if job_type == "benchmark":
|
| return 1
|
| elif job_type == "single_eval":
|
| return 0
|
|
|
|
|
| if hasattr(job, 'job_type') and job.job_type == "benchmark":
|
| return 1
|
|
|
| return 0
|
|
|
| async def release_job_from_worker(
|
| self,
|
| job_id: uuid.UUID,
|
| worker_id: str,
|
| ) -> bool:
|
| """
|
| Release a job from a worker (job completed or failed).
|
|
|
| Args:
|
| job_id: Job ID
|
| worker_id: Worker ID
|
|
|
| Returns:
|
| True if released successfully
|
| """
|
| try:
|
|
|
| async with get_db_context() as session:
|
| stmt = (
|
| update(Worker)
|
| .where(Worker.worker_id == worker_id)
|
| .where(Worker.active_jobs > 0)
|
| .values(active_jobs=Worker.active_jobs - 1)
|
| )
|
| await session.execute(stmt)
|
| await session.commit()
|
|
|
| logger.debug(
|
| "Job released from worker",
|
| job_id=str(job_id),
|
| worker_id=worker_id,
|
| )
|
|
|
| return True
|
|
|
| except Exception as e:
|
| logger.error(
|
| "Failed to release job from worker",
|
| job_id=str(job_id),
|
| worker_id=worker_id,
|
| error=str(e),
|
| )
|
| return False
|
|
|
| async def get_worker_for_job(
|
| self,
|
| gpu_required: int = 0,
|
| ) -> Optional[Worker]:
|
| """
|
| Get the best worker for a job with given GPU requirements.
|
|
|
| Args:
|
| gpu_required: Number of GPUs required
|
|
|
| Returns:
|
| Best worker or None
|
| """
|
| available_workers = await self._worker_registry.get_available_workers(
|
| gpu_required=gpu_required
|
| )
|
|
|
| if not available_workers:
|
| return None
|
|
|
| return available_workers[0]
|
|
|
| async def check_gpu_capacity(
|
| self,
|
| worker_id: str,
|
| gpu_required: int,
|
| ) -> bool:
|
| """
|
| Check if a worker has sufficient GPU capacity for a job.
|
|
|
| Args:
|
| worker_id: Worker ID
|
| gpu_required: GPUs required
|
|
|
| Returns:
|
| True if worker has sufficient capacity
|
| """
|
| try:
|
| async with get_db_context() as session:
|
| stmt = select(Worker).where(Worker.worker_id == worker_id)
|
| result = await session.execute(stmt)
|
| worker = result.scalar_one_or_none()
|
|
|
| if worker is None:
|
| return False
|
|
|
|
|
| if worker.gpu_count < gpu_required:
|
| return False
|
|
|
|
|
| free_memory = worker.gpu_memory_total - worker.gpu_memory_used
|
| required_memory = gpu_required * 4000
|
|
|
| return free_memory >= required_memory
|
|
|
| except Exception as e:
|
| logger.error(
|
| "Failed to check GPU capacity",
|
| worker_id=worker_id,
|
| error=str(e),
|
| )
|
| return False
|
|
|
|
|
|
|
| _scheduler: Optional[JobScheduler] = None
|
|
|
|
|
| def get_job_scheduler() -> JobScheduler:
|
| """Get the global job scheduler instance."""
|
| global _scheduler
|
| if _scheduler is None:
|
| _scheduler = JobScheduler()
|
| return _scheduler
|
|
|
|
|
| __all__ = [
|
| "JobScheduler",
|
| "get_job_scheduler",
|
| ]
|
|
|