Veer15's picture
Upload folder using huggingface_hub
81547b4 verified
from typing import Any
from .constants import TaskName
from .models import SystemMetrics
def _clamp(score: float) -> float:
"""Clamp score to the open interval (0, 1), as required by the evaluation pipeline."""
return max(0.01, min(0.99, score))
def grade_cascading_timeout(metrics: SystemMetrics, context: dict[str, Any]) -> float:
timeout_resolved = bool(context.get("cascading_timeout_resolved", False))
if timeout_resolved and metrics.gateway_success_rate >= 0.99:
return _clamp(1.0)
if not timeout_resolved:
# Prevent instant pass while the injected timeout fault is still active.
return _clamp(metrics.gateway_success_rate * 0.25)
return _clamp(0.4 + metrics.gateway_success_rate * 0.4)
def grade_byzantine_queue_fault(
metrics: SystemMetrics, context: dict[str, Any]
) -> float:
baseline_restart = int(context.get("baseline_worker_restart_count", 0))
restart_delta = max(0, metrics.worker_restart_count - baseline_restart)
if metrics.queue_depth == 0 and restart_delta <= 1:
return _clamp(1.0)
if metrics.queue_depth == 0:
return _clamp(0.6)
queue_component = max(0.0, 1.0 - metrics.queue_depth / 50.0)
stability_penalty = min(0.4, restart_delta * 0.05)
return _clamp(queue_component - stability_penalty)
def grade_distributed_lock_starvation(
metrics: SystemMetrics, context: dict[str, Any]
) -> float:
lock_exists = bool(context.get("lock_exists", True))
baseline_stall = int(context.get("baseline_consumer_stall_count", 0))
stall_delta = max(0, metrics.consumer_stall_count - baseline_stall)
if not lock_exists and metrics.queue_depth <= 3:
return _clamp(1.0)
if not lock_exists:
return _clamp(0.6)
# If lock still exists, reward slight progress only when stalls don't explode.
return _clamp(0.2) if stall_delta <= 1 else _clamp(0.0)
def grade_backpressure_cascade(metrics: SystemMetrics, _: dict[str, Any]) -> float:
return _clamp(1.0 - (metrics.queue_depth / 200.0))
def grade_route_partition(metrics: SystemMetrics, context: dict[str, Any]) -> float:
route_blocked = bool(context.get("route_blocked", True))
if not route_blocked and metrics.gateway_success_rate >= 0.95:
return _clamp(1.0)
if not route_blocked:
return _clamp(metrics.gateway_success_rate)
return _clamp(0.0)
def grade_registry_corruption(metrics: SystemMetrics, context: dict[str, Any]) -> float:
registry_auth_matches_default = bool(
context.get("registry_auth_matches_default", False)
)
if registry_auth_matches_default and metrics.gateway_success_rate >= 0.99:
return _clamp(1.0)
if registry_auth_matches_default:
return _clamp(0.5 + metrics.gateway_success_rate * 0.5)
return _clamp(metrics.gateway_success_rate * 0.3)
def grade_job_generator_runaway(
metrics: SystemMetrics, context: dict[str, Any]
) -> float:
rate_resolved = bool(context.get("job_generator_rate_resolved", False))
if rate_resolved and metrics.queue_depth <= 5:
return _clamp(1.0)
if rate_resolved and metrics.queue_depth <= 30:
return _clamp(0.7)
if rate_resolved:
return _clamp(0.7 - (metrics.queue_depth - 30) / 100.0)
return _clamp(0.2) if metrics.queue_depth <= 30 else _clamp(0.0)
def grade_task(
task_name: TaskName | str, metrics: SystemMetrics, context: dict[str, Any]
) -> float:
task = TaskName.parse(task_name) if isinstance(task_name, str) else task_name
if task is TaskName.CASCADING_TIMEOUT:
return grade_cascading_timeout(metrics, context)
if task is TaskName.BYZANTINE_QUEUE_FAULT:
return grade_byzantine_queue_fault(metrics, context)
if task is TaskName.DISTRIBUTED_LOCK_STARVATION:
return grade_distributed_lock_starvation(metrics, context)
if task is TaskName.BACKPRESSURE_CASCADE:
return grade_backpressure_cascade(metrics, context)
if task is TaskName.ROUTE_PARTITION:
return grade_route_partition(metrics, context)
if task is TaskName.REGISTRY_CORRUPTION:
return grade_registry_corruption(metrics, context)
if task is TaskName.JOB_GENERATOR_RUNAWAY:
return grade_job_generator_runaway(metrics, context)
return _clamp(0.0)