aegislm / backend /queue /scheduler.py
ACA050's picture
Upload 50 files
1a4aa87 verified
"""
GPU-Aware Job Scheduler with Least-Load Assignment and Tenant Fairness
Provides intelligent job scheduling with:
- GPU affinity management
- Least-load worker selection
- Tenant fairness (preventing starvation)
- Atomic job claiming
- Fault tolerance
"""
import uuid
from datetime import datetime
from typing import Dict, List, Optional
from sqlalchemy import select, update, func
from backend.db.models import Worker, EvaluationRun
from backend.db.session import get_db_context
from backend.logging.logger import get_logger
from .job_schema import (
GPURequirement,
JobStatus,
EvaluationJob,
)
from .worker_registry import get_worker_registry, DEFAULT_HEARTBEAT_TIMEOUT
from .worker_schema import WorkerStatus
logger = get_logger("queue.scheduler", component="queue")
class JobScheduler:
"""
GPU-aware job scheduler with least-load assignment and tenant fairness.
Responsibilities:
- GPU affinity management
- Least-load worker selection
- Tenant fairness (weighted scheduling)
- Atomic job claiming
- Job assignment with capacity checking
"""
def __init__(self):
self._worker_registry = get_worker_registry()
async def get_tenant_active_job_count(self, tenant_id: uuid.UUID) -> int:
"""
Get the number of active jobs for a tenant.
Args:
tenant_id: The tenant ID
Returns:
Number of active jobs (pending or running)
"""
try:
async with get_db_context() as session:
query = select(func.count(EvaluationRun.id)).where(
EvaluationRun.tenant_id == tenant_id,
EvaluationRun.status.in_(["pending", "running"])
)
result = await session.execute(query)
return result.scalar() or 0
except Exception as e:
logger.error(
"Failed to get tenant active job count",
tenant_id=str(tenant_id),
error=str(e),
)
return 0
async def get_all_tenant_job_counts(self) -> Dict[uuid.UUID, int]:
"""
Get active job counts for all tenants.
Returns:
Dictionary mapping tenant_id to active job count
"""
try:
async with get_db_context() as session:
query = (
select(EvaluationRun.tenant_id, func.count(EvaluationRun.id))
.where(EvaluationRun.status.in_(["pending", "running"]))
.group_by(EvaluationRun.tenant_id)
)
result = await session.execute(query)
return {row[0]: row[1] for row in result.all()}
except Exception as e:
logger.error(
"Failed to get all tenant job counts",
error=str(e),
)
return {}
def calculate_tenant_priority(self, tenant_id: uuid.UUID, tenant_job_counts: Dict[uuid.UUID, int]) -> float:
"""
Calculate priority for a tenant based on job count.
Priority = 1 / (active_jobs_per_tenant + 1)
This gives higher priority to tenants with fewer active jobs,
preventing starvation.
Args:
tenant_id: The tenant ID
tenant_job_counts: Dictionary of tenant job counts
Returns:
Priority score (higher is better)
"""
job_count = tenant_job_counts.get(tenant_id, 0)
# Add 1 to avoid division by zero and give new tenants highest priority
return 1.0 / (job_count + 1)
async def get_pending_jobs_with_tenant_fairness(
self,
jobs: List[EvaluationJob],
) -> List[EvaluationJob]:
"""
Sort pending jobs by tenant fairness priority.
Jobs from tenants with fewer active jobs get higher priority.
Args:
jobs: List of pending jobs
Returns:
Sorted list of jobs
"""
if not jobs:
return jobs
# Get active job counts for all tenants
tenant_job_counts = await self.get_all_tenant_job_counts()
# Sort by tenant priority (highest priority first)
def get_priority(job: EvaluationJob) -> float:
if hasattr(job, 'tenant_id') and job.tenant_id:
return self.calculate_tenant_priority(job.tenant_id, tenant_job_counts)
return 0.0 # Jobs without tenant get lowest priority
return sorted(jobs, key=get_priority, reverse=True)
async def assign_job_to_worker(
self,
job: EvaluationJob,
) -> Optional[str]:
"""
Assign a job to the best available worker using least-load strategy.
The algorithm:
1. Filter workers by GPU requirement
2. Filter workers by status (ACTIVE or DEGRADED)
3. Filter workers with capacity (active_jobs < max_concurrent_jobs)
4. Sort by load factor (active_jobs / max_concurrent_jobs)
5. Select the worker with lowest load factor
Args:
job: The job to assign
Returns:
Worker ID if assigned, None if no suitable worker found
"""
try:
# Determine GPU requirement from job
gpu_required = self._get_gpu_requirement(job)
# Get available workers
available_workers = await self._worker_registry.get_available_workers(
gpu_required=gpu_required
)
if not available_workers:
logger.warning(
"No available workers for job",
job_id=str(job.job_id),
gpu_required=gpu_required,
)
return None
# Select worker with least load
selected_worker = None
min_load = float('inf')
for worker in available_workers:
# Calculate load factor
if worker.max_concurrent_jobs > 0:
load_factor = worker.active_jobs / worker.max_concurrent_jobs
else:
load_factor = float('inf')
# Check GPU capacity if GPU required
if gpu_required > 0:
# Check if worker has enough free GPU memory
free_gpu_memory = worker.gpu_memory_total - worker.gpu_memory_used
if free_gpu_memory < 4000: # Require at least 4GB free per job
continue
if load_factor < min_load:
min_load = load_factor
selected_worker = worker
if selected_worker is None:
logger.warning(
"No worker with sufficient capacity",
job_id=str(job.job_id),
)
return None
# Atomically claim the job
worker_id = await self._claim_job_for_worker(
job_id=job.job_id,
worker_id=selected_worker.worker_id,
)
if worker_id:
logger.info(
"Job assigned to worker",
job_id=str(job.job_id),
worker_id=worker_id,
load_factor=min_load,
)
return worker_id
except Exception as e:
logger.error(
"Failed to assign job to worker",
job_id=str(job.job_id),
error=str(e),
)
return None
async def _claim_job_for_worker(
self,
job_id: uuid.UUID,
worker_id: str,
) -> Optional[str]:
"""
Atomically claim a job for a worker.
Uses atomic UPDATE to prevent duplicate job execution.
Args:
job_id: Job ID
worker_id: Worker ID
Returns:
Worker ID if claimed successfully, None if already claimed
"""
try:
from backend.queue.producer import _job_queue
# Find the job in the queue
job = None
for j in _job_queue:
if j.job_id == job_id:
job = j
break
if job is None:
logger.warning(
"Job not found in queue",
job_id=str(job_id),
)
return None
# Check if job is still queued (not already claimed)
if job.status != JobStatus.QUEUED:
logger.warning(
"Job not in QUEUED status",
job_id=str(job_id),
status=job.status,
)
return None
# Atomically update job status and worker
job.status = JobStatus.RUNNING
job.worker_id = worker_id
job.started_at = datetime.utcnow()
# Update worker active jobs count
async with get_db_context() as session:
stmt = (
update(Worker)
.where(Worker.worker_id == worker_id)
.values(active_jobs=Worker.active_jobs + 1)
)
await session.execute(stmt)
await session.commit()
logger.debug(
"Job claimed atomically",
job_id=str(job_id),
worker_id=worker_id,
)
return worker_id
except Exception as e:
logger.error(
"Failed to claim job",
job_id=str(job_id),
worker_id=worker_id,
error=str(e),
)
return None
def _get_gpu_requirement(self, job: EvaluationJob) -> int:
"""
Determine GPU requirement from job.
Args:
job: The job
Returns:
Number of GPUs required (0 for CPU-only)
"""
# Check job metadata for GPU requirement
if job.metadata:
gpu_req = job.metadata.get("gpu_requirement")
if gpu_req is not None:
return int(gpu_req)
# Infer from job type
job_type = job.metadata.get("job_type")
if job_type == "benchmark":
return 1 # Benchmark jobs typically need GPU
elif job_type == "single_eval":
return 0 # Single eval can run on CPU
# Default to 1 GPU for benchmark jobs
if hasattr(job, 'job_type') and job.job_type == "benchmark":
return 1
return 0
async def release_job_from_worker(
self,
job_id: uuid.UUID,
worker_id: str,
) -> bool:
"""
Release a job from a worker (job completed or failed).
Args:
job_id: Job ID
worker_id: Worker ID
Returns:
True if released successfully
"""
try:
# Update worker active jobs count
async with get_db_context() as session:
stmt = (
update(Worker)
.where(Worker.worker_id == worker_id)
.where(Worker.active_jobs > 0)
.values(active_jobs=Worker.active_jobs - 1)
)
await session.execute(stmt)
await session.commit()
logger.debug(
"Job released from worker",
job_id=str(job_id),
worker_id=worker_id,
)
return True
except Exception as e:
logger.error(
"Failed to release job from worker",
job_id=str(job_id),
worker_id=worker_id,
error=str(e),
)
return False
async def get_worker_for_job(
self,
gpu_required: int = 0,
) -> Optional[Worker]:
"""
Get the best worker for a job with given GPU requirements.
Args:
gpu_required: Number of GPUs required
Returns:
Best worker or None
"""
available_workers = await self._worker_registry.get_available_workers(
gpu_required=gpu_required
)
if not available_workers:
return None
return available_workers[0] # Already sorted by load factor
async def check_gpu_capacity(
self,
worker_id: str,
gpu_required: int,
) -> bool:
"""
Check if a worker has sufficient GPU capacity for a job.
Args:
worker_id: Worker ID
gpu_required: GPUs required
Returns:
True if worker has sufficient capacity
"""
try:
async with get_db_context() as session:
stmt = select(Worker).where(Worker.worker_id == worker_id)
result = await session.execute(stmt)
worker = result.scalar_one_or_none()
if worker is None:
return False
# Check GPU count
if worker.gpu_count < gpu_required:
return False
# Check GPU memory
free_memory = worker.gpu_memory_total - worker.gpu_memory_used
required_memory = gpu_required * 4000 # 4GB per GPU minimum
return free_memory >= required_memory
except Exception as e:
logger.error(
"Failed to check GPU capacity",
worker_id=worker_id,
error=str(e),
)
return False
# Global instance
_scheduler: Optional[JobScheduler] = None
def get_job_scheduler() -> JobScheduler:
"""Get the global job scheduler instance."""
global _scheduler
if _scheduler is None:
_scheduler = JobScheduler()
return _scheduler
__all__ = [
"JobScheduler",
"get_job_scheduler",
]