File size: 4,290 Bytes
b641d3d
 
 
 
 
 
 
81547b4
 
b641d3d
 
 
 
 
81547b4
b641d3d
 
 
 
 
 
 
 
 
 
 
 
 
81547b4
b641d3d
81547b4
b641d3d
 
 
 
 
 
 
 
 
 
 
 
 
 
81547b4
b641d3d
81547b4
b641d3d
 
81547b4
b641d3d
 
 
 
 
 
 
 
 
81547b4
b641d3d
 
81547b4
b641d3d
 
 
 
 
 
 
81547b4
b641d3d
 
 
 
 
 
 
 
 
 
81547b4
b641d3d
81547b4
b641d3d
 
81547b4
b641d3d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81547b4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from typing import Any

from .constants import TaskName
from .models import SystemMetrics


def _clamp(score: float) -> float:
    """Clamp score to the open interval (0, 1), as required by the evaluation pipeline."""
    return max(0.01, min(0.99, score))


def grade_cascading_timeout(metrics: SystemMetrics, context: dict[str, Any]) -> float:
    timeout_resolved = bool(context.get("cascading_timeout_resolved", False))
    if timeout_resolved and metrics.gateway_success_rate >= 0.99:
        return _clamp(1.0)
    if not timeout_resolved:
        # Prevent instant pass while the injected timeout fault is still active.
        return _clamp(metrics.gateway_success_rate * 0.25)
    return _clamp(0.4 + metrics.gateway_success_rate * 0.4)


def grade_byzantine_queue_fault(
    metrics: SystemMetrics, context: dict[str, Any]
) -> float:
    baseline_restart = int(context.get("baseline_worker_restart_count", 0))
    restart_delta = max(0, metrics.worker_restart_count - baseline_restart)

    if metrics.queue_depth == 0 and restart_delta <= 1:
        return _clamp(1.0)
    if metrics.queue_depth == 0:
        return _clamp(0.6)

    queue_component = max(0.0, 1.0 - metrics.queue_depth / 50.0)
    stability_penalty = min(0.4, restart_delta * 0.05)
    return _clamp(queue_component - stability_penalty)


def grade_distributed_lock_starvation(
    metrics: SystemMetrics, context: dict[str, Any]
) -> float:
    lock_exists = bool(context.get("lock_exists", True))
    baseline_stall = int(context.get("baseline_consumer_stall_count", 0))
    stall_delta = max(0, metrics.consumer_stall_count - baseline_stall)

    if not lock_exists and metrics.queue_depth <= 3:
        return _clamp(1.0)
    if not lock_exists:
        return _clamp(0.6)

    # If lock still exists, reward slight progress only when stalls don't explode.
    return _clamp(0.2) if stall_delta <= 1 else _clamp(0.0)


def grade_backpressure_cascade(metrics: SystemMetrics, _: dict[str, Any]) -> float:
    return _clamp(1.0 - (metrics.queue_depth / 200.0))


def grade_route_partition(metrics: SystemMetrics, context: dict[str, Any]) -> float:
    route_blocked = bool(context.get("route_blocked", True))
    if not route_blocked and metrics.gateway_success_rate >= 0.95:
        return _clamp(1.0)
    if not route_blocked:
        return _clamp(metrics.gateway_success_rate)
    return _clamp(0.0)


def grade_registry_corruption(metrics: SystemMetrics, context: dict[str, Any]) -> float:
    registry_auth_matches_default = bool(
        context.get("registry_auth_matches_default", False)
    )
    if registry_auth_matches_default and metrics.gateway_success_rate >= 0.99:
        return _clamp(1.0)
    if registry_auth_matches_default:
        return _clamp(0.5 + metrics.gateway_success_rate * 0.5)
    return _clamp(metrics.gateway_success_rate * 0.3)


def grade_job_generator_runaway(
    metrics: SystemMetrics, context: dict[str, Any]
) -> float:
    rate_resolved = bool(context.get("job_generator_rate_resolved", False))
    if rate_resolved and metrics.queue_depth <= 5:
        return _clamp(1.0)
    if rate_resolved and metrics.queue_depth <= 30:
        return _clamp(0.7)
    if rate_resolved:
        return _clamp(0.7 - (metrics.queue_depth - 30) / 100.0)
    return _clamp(0.2) if metrics.queue_depth <= 30 else _clamp(0.0)


def grade_task(
    task_name: TaskName | str, metrics: SystemMetrics, context: dict[str, Any]
) -> float:
    task = TaskName.parse(task_name) if isinstance(task_name, str) else task_name

    if task is TaskName.CASCADING_TIMEOUT:
        return grade_cascading_timeout(metrics, context)
    if task is TaskName.BYZANTINE_QUEUE_FAULT:
        return grade_byzantine_queue_fault(metrics, context)
    if task is TaskName.DISTRIBUTED_LOCK_STARVATION:
        return grade_distributed_lock_starvation(metrics, context)
    if task is TaskName.BACKPRESSURE_CASCADE:
        return grade_backpressure_cascade(metrics, context)
    if task is TaskName.ROUTE_PARTITION:
        return grade_route_partition(metrics, context)
    if task is TaskName.REGISTRY_CORRUPTION:
        return grade_registry_corruption(metrics, context)
    if task is TaskName.JOB_GENERATOR_RUNAWAY:
        return grade_job_generator_runaway(metrics, context)
    return _clamp(0.0)