Spaces:
Sleeping
Sleeping
File size: 9,041 Bytes
1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 1175c0b 19f7f7b 499adbd 19f7f7b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | """
Task registry and unified grader.
`TASK_REGISTRY` maps task_name β scenario class. Pools (A/B/C/D) live
in `pools.py` and reuse the same registry β there's no duplication.
The unified grader is oracle-INDEPENDENT: it consumes only step records
plus terminal artefacts (declared patch, declared no-change), so it can
score a saved trajectory file long after the episode ended.
"""
from __future__ import annotations
from typing import Dict, List, Optional, Type
from .models import StepRecord
from .scenarios.base import BaseScenario
from .scenarios.easy_memory_leak import MemoryLeakScenario
from .scenarios.medium_cascading_failure import CascadingFailureScenario
from .scenarios.hard_distributed_deadlock import DistributedDeadlockScenario
from .scenarios.grader_p2 import (
grade_patch_quality,
grade_no_change,
grade_p2_efficiency,
)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Registry
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
TASK_REGISTRY: Dict[str, Type[BaseScenario]] = {
"memory_leak": MemoryLeakScenario,
"cascading_failure": CascadingFailureScenario,
"distributed_deadlock": DistributedDeadlockScenario,
}
# Phase B scenarios are registered lazily to avoid import cycles
# during the initial Phase A bring-up. See `_lazy_register_phase_b()`.
_PHASE_B_REGISTERED = False
def _lazy_register_phase_b() -> None:
"""Register Phase B scenarios if importable."""
global _PHASE_B_REGISTERED
if _PHASE_B_REGISTERED:
return
_PHASE_B_REGISTERED = True
try:
from .scenarios.aliased_fault import AliasedFaultScenario
TASK_REGISTRY["aliased_fault"] = AliasedFaultScenario
except ImportError:
pass
try:
from .scenarios.severity_inversion import SeverityInversionScenario
TASK_REGISTRY["severity_inversion"] = SeverityInversionScenario
except ImportError:
pass
try:
from .scenarios.confidence_inversion import ConfidenceInversionScenario
TASK_REGISTRY["confidence_inversion"] = ConfidenceInversionScenario
except ImportError:
pass
try:
from .scenarios.info_ordering import InfoOrderingScenario
TASK_REGISTRY["info_ordering"] = InfoOrderingScenario
except ImportError:
pass
try:
from .scenarios.circuit_breaker_noop import CircuitBreakerNoopScenario
TASK_REGISTRY["circuit_breaker_noop"] = CircuitBreakerNoopScenario
except ImportError:
pass
# Pool D held-out compounds
try:
from .scenarios.heldout import (
HeldoutAliasedSeverityScenario,
HeldoutConfidenceOrderingScenario,
)
TASK_REGISTRY["heldout_aliased_severity"] = HeldoutAliasedSeverityScenario
TASK_REGISTRY["heldout_confidence_ordering"] = HeldoutConfidenceOrderingScenario
except ImportError:
pass
_lazy_register_phase_b()
TASK_NAMES = list(TASK_REGISTRY.keys())
def get_scenario(task_name: str) -> BaseScenario:
cls = TASK_REGISTRY.get(task_name)
if cls is None:
raise ValueError(
f"Unknown task: {task_name}. Available: {list(TASK_REGISTRY)}")
return cls()
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# P1-only grader (legacy)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def grade_trajectory(task_name: str, trajectory: List[StepRecord]) -> float:
"""Score a P1-only trajectory in [0.01, 0.99]."""
scenario = get_scenario(task_name)
return float(scenario.grade(trajectory))
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Unified grader
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Component weights (must sum to 1.0)
W_P1_RCA = 0.25
W_P1_EFFICIENCY = 0.15
W_PATCH_QUALITY = 0.35
W_NO_CHANGE = 0.25
# Note: weights sum to 1.0; for is_valid_issue scenarios the "no_change"
# slot is reallocated to P2 efficiency.
W_P2_EFFICIENCY = 0.25
def grade_trajectory_unified(
task_name: str,
p1_trajectory: List[StepRecord],
p2_trajectory: List[StepRecord],
declared_patch: Optional[str],
declared_no_change: bool,
p1_belief_history: Optional[List[dict]] = None,
) -> Dict[str, float]:
"""
Score a unified P1 + P2 trajectory.
Returns a breakdown dict with the four weighted component scores
and the final aggregate. Each component is in [0, 1] *before*
weighting; the final is also in [0, 1].
"""
scenario = get_scenario(task_name)
ctx = scenario.code_context
# ---- P1 components (always evaluated) ----
p1_rca_raw = scenario.grade_p1_rca(p1_trajectory)
p1_eff_raw = scenario.grade_p1_efficiency(p1_trajectory)
# ---- P2 components (only if scenario has a code_context) ----
if ctx is None:
# P1-only scenario β entire P2 budget goes to P1 RCA & efficiency
return {
"final": round(p1_rca_raw * 0.5 + p1_eff_raw * 0.5, 4),
"p1_rca": round(p1_rca_raw, 4),
"p1_efficiency": round(p1_eff_raw, 4),
"patch_quality": 0.0,
"no_change_detection": 0.0,
"p2_efficiency": 0.0,
}
if ctx.is_valid_issue:
patch_raw = grade_patch_quality(declared_patch or "", ctx)
no_change_raw = 0.0
p2_eff_raw = grade_p2_efficiency(
p2_steps = sum(1 for r in p2_trajectory if r.phase == 2),
expected_steps= ctx.expected_p2_steps,
)
else:
# No-change scenario: declared_no_change is the right answer,
# any patch is wrong. We grade `no_change` and keep efficiency.
patch_raw = 0.0
no_change_raw = grade_no_change(declared_no_change, ctx)
p2_eff_raw = grade_p2_efficiency(
p2_steps = sum(1 for r in p2_trajectory if r.phase == 2),
expected_steps= ctx.expected_p2_steps,
)
# Weighted sum
final = (
W_P1_RCA * p1_rca_raw +
W_P1_EFFICIENCY * p1_eff_raw +
W_PATCH_QUALITY * patch_raw +
W_NO_CHANGE * no_change_raw
)
# If no_change wasn't applicable (valid-issue scenario), reallocate
# its weight to P2 efficiency so weights still sum to 1.0
if ctx.is_valid_issue:
final += W_P2_EFFICIENCY * p2_eff_raw - W_NO_CHANGE * no_change_raw
return {
"final": round(final, 4),
"p1_rca": round(p1_rca_raw, 4),
"p1_efficiency": round(p1_eff_raw, 4),
"patch_quality": round(patch_raw, 4),
"no_change_detection": round(no_change_raw, 4),
"p2_efficiency": round(p2_eff_raw, 4),
}
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Counterfactual r_cross
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def compute_r_cross(
task_name: str,
declared_patch: Optional[str],
declared_no_change: bool,
p2_trajectory: List[StepRecord],
) -> float:
"""
Counterfactual cross-phase reward:
r_cross = max(0, r_code(Ο_2 | context(Ο_1)) - r_code(Ο_2 | context(β
)))
The null-context baseline lives on `CodeContext.null_context_p2_score`
(filled in by `training/run_pool_b_baseline.py`). We clamp to β₯0 so
Phase 1 is never punished for inherently hard bugs that no context
could have helped.
"""
scenario = get_scenario(task_name)
ctx = scenario.code_context
if ctx is None:
return 0.0
if ctx.is_valid_issue:
with_ctx = grade_patch_quality(declared_patch or "", ctx)
else:
with_ctx = grade_no_change(declared_no_change, ctx)
return max(0.0, with_ctx - ctx.null_context_p2_score)
|