Spaces:
Sleeping
Sleeping
| import ast | |
| import os | |
| import random | |
| import re | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from dataclasses import dataclass, field | |
| from typing import Optional, List | |
| class BugScenario: | |
| task_id: str | |
| task_description: str | |
| buggy_code: str | |
| error_output: str | |
| correct_bug_type: str | |
| solution_hint: str | |
| num_bugs: int = 1 | |
| alert: str = "" | |
| TASK_SHAPE_MISMATCH = "shape_mismatch" | |
| TASK_TRAINING_COLLAPSE = "training_collapse" | |
| TASK_DATA_LEAKAGE = "data_leakage" | |
| TASK_WRONG_DEVICE = "wrong_device" | |
| TASK_GRADIENT_NOT_ZEROED = "gradient_not_zeroed" | |
| TASK_MISSING_EVAL_MODE = "missing_eval_mode" | |
| TASK_COMPOUND_SHAPE_DEVICE = "compound_shape_device" | |
| TASK_COMPOUND_LEAKAGE_EVAL = "compound_leakage_eval" | |
| ALL_TASKS = [ | |
| TASK_SHAPE_MISMATCH, | |
| TASK_TRAINING_COLLAPSE, | |
| TASK_DATA_LEAKAGE, | |
| TASK_WRONG_DEVICE, | |
| TASK_GRADIENT_NOT_ZEROED, | |
| TASK_MISSING_EVAL_MODE, | |
| TASK_COMPOUND_SHAPE_DEVICE, | |
| TASK_COMPOUND_LEAKAGE_EVAL, | |
| ] | |
| SINGLE_TASKS = [ | |
| TASK_SHAPE_MISMATCH, | |
| TASK_TRAINING_COLLAPSE, | |
| TASK_DATA_LEAKAGE, | |
| TASK_WRONG_DEVICE, | |
| TASK_GRADIENT_NOT_ZEROED, | |
| TASK_MISSING_EVAL_MODE, | |
| ] | |
| COMPOUND_TASKS = [ | |
| TASK_COMPOUND_SHAPE_DEVICE, | |
| TASK_COMPOUND_LEAKAGE_EVAL, | |
| ] | |
| AVAILABLE_TOOLS = ["run_code", "get_traceback", "inspect_gradients", "print_shapes", "view_source"] | |
| def get_scenario(task_id: str, seed: Optional[int] = None) -> BugScenario: | |
| rng = random.Random(seed) | |
| if task_id == TASK_SHAPE_MISMATCH: | |
| return _shape_mismatch_scenario(rng) | |
| elif task_id == TASK_TRAINING_COLLAPSE: | |
| return _training_collapse_scenario(rng) | |
| elif task_id == TASK_DATA_LEAKAGE: | |
| return _data_leakage_scenario(rng) | |
| elif task_id == TASK_WRONG_DEVICE: | |
| return _wrong_device_scenario(rng) | |
| elif task_id == TASK_GRADIENT_NOT_ZEROED: | |
| return _gradient_not_zeroed_scenario(rng) | |
| elif task_id == TASK_MISSING_EVAL_MODE: | |
| return _missing_eval_mode_scenario(rng) | |
| elif task_id == TASK_COMPOUND_SHAPE_DEVICE: | |
| return _compound_shape_device_scenario(rng) | |
| elif task_id == TASK_COMPOUND_LEAKAGE_EVAL: | |
| return _compound_leakage_eval_scenario(rng) | |
| else: | |
| raise ValueError(f"Unknown task_id: {task_id}") | |
| def get_random_task(seed: Optional[int] = None) -> str: | |
| rng = random.Random(seed) | |
| return rng.choice(ALL_TASKS) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOOL EXECUTION ENGINE | |
| # These functions implement the 5 diagnostic tools available | |
| # to the agent in partial observability mode. | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_python_exe() -> str: | |
| python_exe = os.environ.get("PYTHON_EXEC") | |
| if not python_exe: | |
| server_dir = os.path.dirname(os.path.abspath(__file__)) | |
| project_dir = os.path.dirname(server_dir) | |
| candidate = os.path.join(project_dir, ".venv", "Scripts", "python.exe") | |
| if not os.path.exists(candidate): | |
| candidate = os.path.join(project_dir, ".venv", "bin", "python") | |
| python_exe = candidate if os.path.exists(candidate) else sys.executable | |
| return python_exe | |
| def _run_in_subprocess(code: str, timeout: int = 40) -> tuple[str, bool, str]: | |
| """Run code, return (output, success, stderr_only).""" | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as f: | |
| f.write(code) | |
| tmp_path = f.name | |
| try: | |
| result = subprocess.run( | |
| [_get_python_exe(), tmp_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| env={**os.environ, "PYTHONIOENCODING": "utf-8"}, | |
| ) | |
| combined = (result.stdout + result.stderr).strip() | |
| return combined, result.returncode == 0, result.stderr.strip() | |
| except subprocess.TimeoutExpired: | |
| return f"Execution timed out after {timeout}s.", False, "" | |
| except Exception as e: | |
| return f"Execution error: {e}", False, "" | |
| finally: | |
| try: | |
| os.unlink(tmp_path) | |
| except OSError: | |
| pass | |
| def execute_tool(tool_name: str, scenario: BugScenario) -> str: | |
| """ | |
| Execute a diagnostic tool against the buggy scenario. | |
| Returns a string result to show the agent. | |
| """ | |
| if tool_name == "run_code": | |
| return _tool_run_code(scenario.buggy_code) | |
| elif tool_name == "get_traceback": | |
| return _tool_get_traceback(scenario.buggy_code) | |
| elif tool_name == "inspect_gradients": | |
| return _tool_inspect_gradients(scenario.buggy_code, scenario.task_id) | |
| elif tool_name == "print_shapes": | |
| return _tool_print_shapes(scenario.buggy_code, scenario.task_id) | |
| elif tool_name == "view_source": | |
| return _tool_view_source(scenario.buggy_code) | |
| else: | |
| return f"Unknown tool: {tool_name}. Available tools: {AVAILABLE_TOOLS}" | |
| def _tool_run_code(buggy_code: str) -> str: | |
| output, success, _ = _run_in_subprocess(buggy_code, timeout=30) | |
| status = "COMPLETED" if success else "FAILED" | |
| lines = output.split("\n") | |
| truncated = "\n".join(lines[:40]) | |
| if len(lines) > 40: | |
| truncated += f"\n... ({len(lines) - 40} more lines truncated)" | |
| return f"[run_code] Status: {status}\n\n{truncated}" | |
| def _tool_get_traceback(buggy_code: str) -> str: | |
| output, success, stderr = _run_in_subprocess(buggy_code, timeout=30) | |
| if success: | |
| return "[get_traceback] Code ran without error. No traceback available." | |
| if not stderr and not output: | |
| return "[get_traceback] No traceback captured." | |
| tb_text = stderr if stderr else output | |
| lines = tb_text.split("\n") | |
| truncated = "\n".join(lines[:50]) | |
| return f"[get_traceback] Full traceback:\n\n{truncated}" | |
| def _tool_inspect_gradients(buggy_code: str, task_id: str) -> str: | |
| """ | |
| Inject gradient norm logging before backward pass, run one batch, report per-layer norms. | |
| For tasks where code crashes before backward (shape_mismatch, wrong_device), returns crash info. | |
| For silent bugs (leakage, eval_mode), returns gradient info anyway β agent must interpret. | |
| """ | |
| instrumented = _inject_gradient_logging(buggy_code) | |
| output, success, _ = _run_in_subprocess(instrumented, timeout=35) | |
| if not success and "GRAD_NORMS" not in output: | |
| lines = output.split("\n")[:20] | |
| return ( | |
| "[inspect_gradients] Code crashed before gradients could be measured.\n" | |
| "Partial output:\n" + "\n".join(lines) | |
| ) | |
| lines = output.split("\n") | |
| grad_lines = [l for l in lines if "GRAD_NORM" in l or "grad_norm" in l.lower() or "GRADIENT" in l] | |
| if not grad_lines: | |
| return ( | |
| "[inspect_gradients] Gradient injection ran but no gradient norms captured.\n" | |
| "This may indicate gradients are not flowing (no backward call reached).\n" | |
| f"Output excerpt:\n{chr(10).join(lines[:15])}" | |
| ) | |
| return "[inspect_gradients] Per-layer gradient norms (first batch):\n\n" + "\n".join(grad_lines[:30]) | |
| def _inject_gradient_logging(code: str) -> str: | |
| """ | |
| Inject gradient norm reporting after the first backward() call. | |
| Wraps the training loop to break after 1 batch and report norms. | |
| """ | |
| injection = ''' | |
| import torch as _torch_grad_tool | |
| _GRAD_BATCH_COUNT = 0 | |
| _GRAD_ORIG_backward = _torch_grad_tool.Tensor.backward | |
| def _patched_backward(self, *args, **kwargs): | |
| global _GRAD_BATCH_COUNT | |
| _GRAD_ORIG_backward(self, *args, **kwargs) | |
| _GRAD_BATCH_COUNT += 1 | |
| if _GRAD_BATCH_COUNT == 1: | |
| # find all parameters with gradients in scope | |
| import gc | |
| for obj in gc.get_objects(): | |
| if isinstance(obj, _torch_grad_tool.nn.Module): | |
| print("GRAD_NORMS:") | |
| for name, param in obj.named_parameters(): | |
| if param.grad is not None: | |
| norm = param.grad.norm().item() | |
| print(f" GRAD_NORM layer={name} norm={norm:.6f}") | |
| break | |
| _torch_grad_tool.Tensor.backward = _patched_backward | |
| ''' | |
| return injection + "\n" + code | |
| def _tool_print_shapes(buggy_code: str, task_id: str) -> str: | |
| """ | |
| Inject shape printing at each linear layer, run one forward pass, report shapes. | |
| """ | |
| instrumented = _inject_shape_logging(buggy_code) | |
| output, success, _ = _run_in_subprocess(instrumented, timeout=35) | |
| lines = output.split("\n") | |
| shape_lines = [l for l in lines if "SHAPE" in l or "shape" in l.lower()] | |
| if not shape_lines and not success: | |
| error_lines = [l for l in lines if "Error" in l or "error" in l] | |
| return ( | |
| "[print_shapes] Code crashed before shapes could be captured.\n" | |
| "Error:\n" + "\n".join(error_lines[:10]) + | |
| "\nFull output excerpt:\n" + "\n".join(lines[:20]) | |
| ) | |
| if not shape_lines: | |
| return ( | |
| "[print_shapes] Shape hook ran but no shapes captured.\n" | |
| f"Output:\n{chr(10).join(lines[:20])}" | |
| ) | |
| return "[print_shapes] Tensor shapes at each layer (first batch):\n\n" + "\n".join(shape_lines[:40]) | |
| def _inject_shape_logging(code: str) -> str: | |
| """Register forward hooks on all Linear layers to print input/output shapes.""" | |
| injection = ''' | |
| import torch as _torch_shape_tool | |
| _shape_hooks = [] | |
| def _register_shape_hooks(model): | |
| def make_hook(name): | |
| def hook(module, input, output): | |
| in_shape = tuple(input[0].shape) if input else "unknown" | |
| out_shape = tuple(output.shape) | |
| print(f"SHAPE layer={name} in={in_shape} out={out_shape}") | |
| return hook | |
| for name, module in model.named_modules(): | |
| if isinstance(module, (_torch_shape_tool.nn.Linear, _torch_shape_tool.nn.Conv2d)): | |
| h = module.register_forward_hook(make_hook(name)) | |
| _shape_hooks.append(h) | |
| _orig_Module_init = _torch_shape_tool.nn.Module.__init__ | |
| _registered_models = set() | |
| def _patched_init(self, *args, **kwargs): | |
| _orig_Module_init(self, *args, **kwargs) | |
| _torch_shape_tool.nn.Module.__init__ = _patched_init | |
| import atexit as _atexit_shape | |
| def _cleanup_hooks(): | |
| for h in _shape_hooks: | |
| h.remove() | |
| _atexit_shape.register(_cleanup_hooks) | |
| # Patch Module.to() and Module.forward to auto-register hooks on first forward | |
| _orig_forward = _torch_shape_tool.nn.Module.__call__ | |
| _hooked_ids = set() | |
| def _auto_hook_call(self, *args, **kwargs): | |
| mid = id(self) | |
| if mid not in _hooked_ids and len(list(self.children())) > 0: | |
| _register_shape_hooks(self) | |
| _hooked_ids.add(mid) | |
| return _orig_forward(self, *args, **kwargs) | |
| _torch_shape_tool.nn.Module.__call__ = _auto_hook_call | |
| ''' | |
| return injection + "\n" + code | |
| def _tool_view_source(buggy_code: str) -> str: | |
| lines = buggy_code.split("\n") | |
| numbered = "\n".join(f"{i+1:3d} | {line}" for i, line in enumerate(lines)) | |
| return f"[view_source] Full buggy script ({len(lines)} lines):\n\n{numbered}" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ALERT MESSAGES β minimal failure notice shown on reset() | |
| # No code, no traceback, no hints. Just what an on-call engineer sees. | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ALERTS = { | |
| TASK_SHAPE_MISMATCH: "Training job crashed immediately. No epochs completed. Exit code 1.", | |
| TASK_TRAINING_COLLAPSE: "Training job completed 5 epochs. Final loss: nan. Model did not converge.", | |
| TASK_DATA_LEAKAGE: "Training job completed successfully. Test accuracy: 96.5%. Review requested.", | |
| TASK_WRONG_DEVICE: "Training job crashed on first forward pass. Exit code 1.", | |
| TASK_GRADIENT_NOT_ZEROED: "Training job failed. Loss exploded to nan by epoch 4. Exit code 0.", | |
| TASK_MISSING_EVAL_MODE: "Training completed. Evaluation metrics unstable across repeated runs.", | |
| TASK_COMPOUND_SHAPE_DEVICE: "Training job crashed immediately. Multiple errors detected. Exit code 1.", | |
| TASK_COMPOUND_LEAKAGE_EVAL: "Training completed. Metrics look suspicious and vary between evaluation runs.", | |
| } | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TASK 1 β Shape Mismatch (Easy) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _shape_mismatch_scenario(rng: random.Random) -> BugScenario: | |
| variant = rng.choice(["mlp", "deep", "autoencoder"]) | |
| hidden_size = rng.choice([128, 256, 512]) | |
| wrong_size = rng.choice([64, 32, 16]) | |
| num_classes = rng.choice([10, 5, 20]) | |
| if variant == "mlp": | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class SimpleClassifier(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.encoder = nn.Sequential( | |
| nn.Linear(784, {hidden_size}), | |
| nn.ReLU(), | |
| nn.Linear({hidden_size}, {hidden_size}), | |
| nn.ReLU(), | |
| ) | |
| self.classifier = nn.Linear({wrong_size}, {num_classes}) | |
| def forward(self, x): | |
| features = self.encoder(x) | |
| return self.classifier(features) | |
| X = torch.randn(200, 784) | |
| y = torch.randint(0, {num_classes}, (200,)) | |
| dataset = TensorDataset(X, y) | |
| loader = DataLoader(dataset, batch_size=32, shuffle=True) | |
| model = SimpleClassifier() | |
| optimizer = optim.Adam(model.parameters(), lr=1e-3) | |
| criterion = nn.CrossEntropyLoss() | |
| for epoch in range(3): | |
| for xb, yb in loader: | |
| optimizer.zero_grad() | |
| pred = model(xb) | |
| loss = criterion(pred, yb) | |
| loss.backward() | |
| optimizer.step() | |
| print(f"Epoch {{epoch+1}} complete") | |
| print("Training finished") | |
| ''' | |
| error_output = f"RuntimeError: mat1 and mat2 shapes cannot be multiplied ({hidden_size} cannot be broadcast to {wrong_size})" | |
| solution_hint = f"classifier input must be {hidden_size} not {wrong_size}" | |
| elif variant == "deep": | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class DeepNet(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.feature_extractor = nn.Sequential( | |
| nn.Linear(512, {hidden_size}), | |
| nn.BatchNorm1d({hidden_size}), | |
| nn.ReLU(), | |
| nn.Linear({hidden_size}, {hidden_size}), | |
| nn.ReLU(), | |
| ) | |
| self.head = nn.Linear({wrong_size}, {num_classes}) | |
| def forward(self, x): | |
| z = self.feature_extractor(x) | |
| return self.head(z) | |
| X = torch.randn(300, 512) | |
| y = torch.randint(0, {num_classes}, (300,)) | |
| dataset = TensorDataset(X, y) | |
| loader = DataLoader(dataset, batch_size=64, shuffle=True) | |
| model = DeepNet() | |
| optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=0.9) | |
| criterion = nn.CrossEntropyLoss() | |
| for epoch in range(3): | |
| for xb, yb in loader: | |
| optimizer.zero_grad() | |
| out = model(xb) | |
| loss = criterion(out, yb) | |
| loss.backward() | |
| optimizer.step() | |
| print(f"Epoch {{epoch+1}} complete") | |
| print("Training finished") | |
| ''' | |
| error_output = f"RuntimeError: mat1 and mat2 shapes cannot be multiplied ({hidden_size} cannot be broadcast to {wrong_size})" | |
| solution_hint = f"head input must be {hidden_size} not {wrong_size}" | |
| else: | |
| bottleneck = rng.choice([16, 32]) | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class Autoencoder(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.encoder = nn.Sequential( | |
| nn.Linear(128, {hidden_size}), | |
| nn.ReLU(), | |
| nn.Linear({hidden_size}, {bottleneck}), | |
| ) | |
| self.decoder = nn.Sequential( | |
| nn.Linear({wrong_size}, {hidden_size}), | |
| nn.ReLU(), | |
| nn.Linear({hidden_size}, 128), | |
| ) | |
| def forward(self, x): | |
| z = self.encoder(x) | |
| return self.decoder(z) | |
| X = torch.randn(200, 128) | |
| dataset = TensorDataset(X, X) | |
| loader = DataLoader(dataset, batch_size=32, shuffle=True) | |
| model = Autoencoder() | |
| optimizer = optim.Adam(model.parameters(), lr=1e-3) | |
| criterion = nn.MSELoss() | |
| for epoch in range(3): | |
| for xb, _ in loader: | |
| optimizer.zero_grad() | |
| out = model(xb) | |
| loss = criterion(out, xb) | |
| loss.backward() | |
| optimizer.step() | |
| print(f"Epoch {{epoch+1}} complete") | |
| print("Training finished") | |
| ''' | |
| error_output = f"RuntimeError: mat1 and mat2 shapes cannot be multiplied ({bottleneck} cannot be broadcast to {wrong_size})" | |
| solution_hint = f"decoder input must be {bottleneck} not {wrong_size}" | |
| return BugScenario( | |
| task_id=TASK_SHAPE_MISMATCH, | |
| task_description=( | |
| "This PyTorch model crashes immediately during the forward pass with a shape mismatch. " | |
| "The training loop never completes a single step. " | |
| "Find the architectural bug and fix the script so it trains for 3 epochs without error." | |
| ), | |
| buggy_code=buggy_code, | |
| error_output=error_output, | |
| correct_bug_type="shape_mismatch", | |
| solution_hint=solution_hint, | |
| alert=ALERTS[TASK_SHAPE_MISMATCH], | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TASK 2 β Training Collapse (Medium) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _training_collapse_scenario(rng: random.Random) -> BugScenario: | |
| bad_lr = rng.choice([10.0, 50.0, 100.0]) | |
| variant = rng.choice(["lr", "loss_fn"]) | |
| if variant == "lr": | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class MLP(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(20, 64), | |
| nn.ReLU(), | |
| nn.Linear(64, 32), | |
| nn.ReLU(), | |
| nn.Linear(32, 1), | |
| ) | |
| def forward(self, x): | |
| return self.net(x).squeeze(-1) | |
| X = torch.randn(300, 20) | |
| y = (X[:, 0] + X[:, 1] * 0.5 + torch.randn(300) * 0.1) | |
| dataset = TensorDataset(X, y) | |
| loader = DataLoader(dataset, batch_size=32) | |
| model = MLP() | |
| optimizer = optim.SGD(model.parameters(), lr={bad_lr}) | |
| criterion = nn.MSELoss() | |
| for epoch in range(5): | |
| epoch_loss = 0.0 | |
| for xb, yb in loader: | |
| optimizer.zero_grad() | |
| pred = model(xb) | |
| loss = criterion(pred, yb) | |
| loss.backward() | |
| optimizer.step() | |
| epoch_loss += loss.item() | |
| avg = epoch_loss / len(loader) | |
| print(f"Epoch {{epoch+1}}, loss: {{avg:.4f}}") | |
| print("Training finished") | |
| ''' | |
| error_output = ( | |
| f"Training runs without crashing but loss diverges to NaN by epoch 2.\n" | |
| f"Epoch 1, loss: 847.3291\nEpoch 2, loss: nan\nEpoch 3, loss: nan" | |
| ) | |
| solution_hint = f"learning rate {bad_lr} causes gradient explosion; reduce to ~1e-3" | |
| else: | |
| buggy_code = '''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class BinaryClassifier(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(15, 32), | |
| nn.ReLU(), | |
| nn.Linear(32, 1), | |
| nn.Sigmoid(), | |
| ) | |
| def forward(self, x): | |
| return self.net(x).squeeze(-1) | |
| X = torch.randn(400, 15) | |
| y = (X[:, 0] > 0).float() | |
| dataset = TensorDataset(X, y) | |
| loader = DataLoader(dataset, batch_size=32) | |
| model = BinaryClassifier() | |
| optimizer = optim.Adam(model.parameters(), lr=1e-3) | |
| criterion = nn.MSELoss() | |
| for epoch in range(5): | |
| epoch_loss = 0.0 | |
| for xb, yb in loader: | |
| optimizer.zero_grad() | |
| pred = model(xb) | |
| loss = criterion(pred, yb) | |
| loss.backward() | |
| optimizer.step() | |
| epoch_loss += loss.item() | |
| avg = epoch_loss / len(loader) | |
| print(f"Epoch {epoch+1}, loss: {avg:.4f}") | |
| print("Training finished") | |
| ''' | |
| error_output = ( | |
| "Training runs without error but model fails to converge.\n" | |
| "Epoch 1, loss: 0.2489\nEpoch 2, loss: 0.2491\nEpoch 5, loss: 0.2491\n" | |
| "Loss plateaus immediately. Wrong loss function for binary classification." | |
| ) | |
| solution_hint = "MSELoss used for binary classification; should be BCELoss or BCEWithLogitsLoss" | |
| return BugScenario( | |
| task_id=TASK_TRAINING_COLLAPSE, | |
| task_description=( | |
| "This PyTorch training script runs without any Python errors or crashes, " | |
| "but the model completely fails to learn. " | |
| "Diagnose why training is broken and fix the script so loss decreases " | |
| "consistently across 5 epochs." | |
| ), | |
| buggy_code=buggy_code, | |
| error_output=error_output, | |
| correct_bug_type="training_collapse", | |
| solution_hint=solution_hint, | |
| alert=ALERTS[TASK_TRAINING_COLLAPSE], | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TASK 3 β Data Leakage (Hard) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _data_leakage_scenario(rng: random.Random) -> BugScenario: | |
| variant = rng.choice(["normalize_before_split", "scaler_fit_on_all"]) | |
| if variant == "normalize_before_split": | |
| buggy_code = '''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class TabularClassifier(nn.Module): | |
| def __init__(self, input_dim, num_classes): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(input_dim, 64), | |
| nn.ReLU(), | |
| nn.Linear(64, 32), | |
| nn.ReLU(), | |
| nn.Linear(32, num_classes), | |
| ) | |
| def forward(self, x): | |
| return self.net(x) | |
| torch.manual_seed(42) | |
| N, D, C = 1000, 20, 3 | |
| X_raw = torch.randn(N, D) | |
| true_weights = torch.randn(D, C) | |
| y_all = (X_raw @ true_weights).argmax(dim=1) | |
| mean = X_raw.mean(dim=0) | |
| std = X_raw.std(dim=0) + 1e-8 | |
| X_normalized = (X_raw - mean) / std | |
| split = int(0.8 * N) | |
| X_train, X_test = X_normalized[:split], X_normalized[split:] | |
| y_train, y_test = y_all[:split], y_all[split:] | |
| train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True) | |
| model = TabularClassifier(D, C) | |
| optimizer = optim.Adam(model.parameters(), lr=1e-3) | |
| criterion = nn.CrossEntropyLoss() | |
| for epoch in range(10): | |
| model.train() | |
| for xb, yb in train_loader: | |
| optimizer.zero_grad() | |
| loss = criterion(model(xb), yb) | |
| loss.backward() | |
| optimizer.step() | |
| model.eval() | |
| with torch.no_grad(): | |
| test_preds = model(X_test).argmax(dim=1) | |
| accuracy = (test_preds == y_test).float().mean().item() | |
| print(f"Test accuracy: {accuracy:.4f}") | |
| print("Training finished") | |
| ''' | |
| error_output = ( | |
| "Script runs to completion. Reported test accuracy: 0.9650\n" | |
| "However, the evaluation is invalid β there is a data pipeline bug." | |
| ) | |
| solution_hint = "normalize using only train set mean/std; compute mean and std after the split, only on X_train" | |
| else: | |
| buggy_code = '''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class Regressor(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(10, 32), | |
| nn.ReLU(), | |
| nn.Linear(32, 16), | |
| nn.ReLU(), | |
| nn.Linear(16, 1), | |
| ) | |
| def forward(self, x): | |
| return self.net(x).squeeze(-1) | |
| N = 800 | |
| X_raw = torch.randn(N, 10) | |
| y_all = X_raw[:, 0] * 2.5 + X_raw[:, 2] * 1.3 + torch.randn(N) * 0.3 | |
| split = int(0.75 * N) | |
| X_train_raw, X_test_raw = X_raw[:split], X_raw[split:] | |
| y_train, y_test = y_all[:split], y_all[split:] | |
| full_mean = X_raw.mean(dim=0) | |
| full_std = X_raw.std(dim=0) + 1e-8 | |
| X_train = (X_train_raw - full_mean) / full_std | |
| X_test = (X_test_raw - full_mean) / full_std | |
| train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True) | |
| model = Regressor() | |
| optimizer = optim.Adam(model.parameters(), lr=1e-3) | |
| criterion = nn.MSELoss() | |
| for epoch in range(8): | |
| model.train() | |
| for xb, yb in train_loader: | |
| optimizer.zero_grad() | |
| loss = criterion(model(xb), yb) | |
| loss.backward() | |
| optimizer.step() | |
| model.eval() | |
| with torch.no_grad(): | |
| test_loss = criterion(model(X_test), y_test).item() | |
| print(f"Test MSE: {test_loss:.4f}") | |
| print("Training finished") | |
| ''' | |
| error_output = ( | |
| "Script runs to completion. Reported test MSE: 0.1021\n" | |
| "The MSE is artificially low β test statistics leaked into normalization." | |
| ) | |
| solution_hint = "fit normalization stats only on X_train_raw; use train_mean and train_std to normalize both train and test" | |
| return BugScenario( | |
| task_id=TASK_DATA_LEAKAGE, | |
| task_description=( | |
| "This PyTorch training script runs cleanly with no errors and reports impressive metrics. " | |
| "But the evaluation is fundamentally broken due to a data handling mistake. " | |
| "Identify the data pipeline bug and fix it so the evaluation is valid." | |
| ), | |
| buggy_code=buggy_code, | |
| error_output=error_output, | |
| correct_bug_type="data_leakage", | |
| solution_hint=solution_hint, | |
| alert=ALERTS[TASK_DATA_LEAKAGE], | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TASK 4 β Wrong Device (Medium) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _wrong_device_scenario(rng: random.Random) -> BugScenario: | |
| hidden = rng.choice([64, 128, 256]) | |
| num_classes = rng.choice([5, 10]) | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class Classifier(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(784, {hidden}), | |
| nn.ReLU(), | |
| nn.Linear({hidden}, {num_classes}), | |
| ) | |
| def forward(self, x): | |
| return self.net(x) | |
| X = torch.randn(200, 784) | |
| y = torch.randint(0, {num_classes}, (200,)) | |
| dataset = TensorDataset(X, y) | |
| loader = DataLoader(dataset, batch_size=32, shuffle=True) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = Classifier().to(device) | |
| optimizer = optim.Adam(model.parameters(), lr=1e-3) | |
| criterion = nn.CrossEntropyLoss().to(device) | |
| for epoch in range(3): | |
| for xb, yb in loader: | |
| optimizer.zero_grad() | |
| pred = model(xb) | |
| loss = criterion(pred, yb) | |
| loss.backward() | |
| optimizer.step() | |
| print(f"Epoch {{epoch+1}} complete") | |
| print("Training finished") | |
| ''' | |
| error_output = ( | |
| "RuntimeError: Expected all tensors to be on the same device, " | |
| "but found at least two devices!\n\n" | |
| "The model was moved to the target device but data batches remain on CPU. " | |
| "Every forward pass crashes. Fix tensor placement so all tensors are on the same device." | |
| ) | |
| return BugScenario( | |
| task_id=TASK_WRONG_DEVICE, | |
| task_description=( | |
| "This PyTorch training script crashes on the first forward pass. " | |
| "The model and data tensors are on different devices. " | |
| "Fix the script so training runs for 3 epochs without error on whatever device is available." | |
| ), | |
| buggy_code=buggy_code, | |
| error_output=error_output, | |
| correct_bug_type="wrong_device", | |
| solution_hint="move xb and yb to device inside the training loop: xb, yb = xb.to(device), yb.to(device)", | |
| alert=ALERTS[TASK_WRONG_DEVICE], | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TASK 5 β Gradient Not Zeroed (Medium-Hard) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _gradient_not_zeroed_scenario(rng: random.Random) -> BugScenario: | |
| hidden = rng.choice([32, 64, 128]) | |
| lr = rng.choice([1e-3, 5e-4]) | |
| variant = rng.choice(["regression", "classification"]) | |
| if variant == "regression": | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class MLP(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(10, {hidden}), | |
| nn.ReLU(), | |
| nn.Linear({hidden}, {hidden}), | |
| nn.ReLU(), | |
| nn.Linear({hidden}, 1), | |
| ) | |
| def forward(self, x): | |
| return self.net(x).squeeze(-1) | |
| X = torch.randn(500, 10) | |
| y = X[:, 0] * 1.5 - X[:, 2] * 0.8 + torch.randn(500) * 0.2 | |
| dataset = TensorDataset(X, y) | |
| loader = DataLoader(dataset, batch_size=32, shuffle=True) | |
| model = MLP() | |
| optimizer = optim.Adam(model.parameters(), lr={lr}) | |
| criterion = nn.MSELoss() | |
| for epoch in range(6): | |
| epoch_loss = 0.0 | |
| for xb, yb in loader: | |
| pred = model(xb) | |
| loss = criterion(pred, yb) | |
| loss.backward() | |
| optimizer.step() | |
| epoch_loss += loss.item() | |
| avg = epoch_loss / len(loader) | |
| print(f"Epoch {{epoch+1}}, loss: {{avg:.4f}}") | |
| print("Training finished") | |
| ''' | |
| else: | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class Net(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.features = nn.Sequential( | |
| nn.Linear(32, {hidden}), | |
| nn.ReLU(), | |
| nn.Linear({hidden}, {hidden}), | |
| nn.ReLU(), | |
| ) | |
| self.classifier = nn.Linear({hidden}, 4) | |
| def forward(self, x): | |
| return self.classifier(self.features(x)) | |
| X = torch.randn(400, 32) | |
| y = torch.randint(0, 4, (400,)) | |
| dataset = TensorDataset(X, y) | |
| loader = DataLoader(dataset, batch_size=32, shuffle=True) | |
| model = Net() | |
| optimizer = optim.SGD(model.parameters(), lr={lr}, momentum=0.9) | |
| criterion = nn.CrossEntropyLoss() | |
| for epoch in range(6): | |
| epoch_loss = 0.0 | |
| for xb, yb in loader: | |
| out = model(xb) | |
| loss = criterion(out, yb) | |
| loss.backward() | |
| optimizer.step() | |
| epoch_loss += loss.item() | |
| avg = epoch_loss / len(loader) | |
| print(f"Epoch {{epoch+1}}, loss: {{avg:.4f}}") | |
| print("Training finished") | |
| ''' | |
| error_output = ( | |
| "Script runs without crashing but training is highly unstable.\n" | |
| "Epoch 1, loss: 12.4821\nEpoch 2, loss: 847.2341\n" | |
| "Epoch 3, loss: 23451.8821\nEpoch 4, loss: nan\n" | |
| "Loss explodes after epoch 1 and collapses to NaN. " | |
| "Fundamental error in training loop structure." | |
| ) | |
| return BugScenario( | |
| task_id=TASK_GRADIENT_NOT_ZEROED, | |
| task_description=( | |
| "This PyTorch training script runs without crashing but loss explodes " | |
| "after the first epoch and collapses to NaN. The model never learns. " | |
| "Find the training loop bug and fix it so loss decreases consistently across 6 epochs." | |
| ), | |
| buggy_code=buggy_code, | |
| error_output=error_output, | |
| correct_bug_type="gradient_not_zeroed", | |
| solution_hint="optimizer.zero_grad() is missing before loss.backward(); gradients accumulate causing explosion", | |
| alert=ALERTS[TASK_GRADIENT_NOT_ZEROED], | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TASK 6 β Missing Eval Mode (Hard) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _missing_eval_mode_scenario(rng: random.Random) -> BugScenario: | |
| dropout_p = rng.choice([0.3, 0.4, 0.5]) | |
| hidden = rng.choice([64, 128]) | |
| num_classes = rng.choice([3, 5]) | |
| variant = rng.choice(["classifier", "regressor"]) | |
| if variant == "classifier": | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class DropoutClassifier(nn.Module): | |
| def __init__(self, input_dim, num_classes): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(input_dim, {hidden}), | |
| nn.BatchNorm1d({hidden}), | |
| nn.ReLU(), | |
| nn.Dropout(p={dropout_p}), | |
| nn.Linear({hidden}, {hidden}), | |
| nn.BatchNorm1d({hidden}), | |
| nn.ReLU(), | |
| nn.Dropout(p={dropout_p}), | |
| nn.Linear({hidden}, num_classes), | |
| ) | |
| def forward(self, x): | |
| return self.net(x) | |
| torch.manual_seed(42) | |
| N, D, C = 800, 20, {num_classes} | |
| X = torch.randn(N, D) | |
| true_w = torch.randn(D, C) | |
| y = (X @ true_w).argmax(dim=1) | |
| split = int(0.8 * N) | |
| X_train, X_test = X[:split], X[split:] | |
| y_train, y_test = y[:split], y[split:] | |
| train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True) | |
| model = DropoutClassifier(D, C) | |
| optimizer = optim.Adam(model.parameters(), lr=1e-3) | |
| criterion = nn.CrossEntropyLoss() | |
| for epoch in range(10): | |
| model.train() | |
| for xb, yb in train_loader: | |
| optimizer.zero_grad() | |
| loss = criterion(model(xb), yb) | |
| loss.backward() | |
| optimizer.step() | |
| print(f"Epoch {{epoch+1}} complete") | |
| preds = model(X_test).argmax(dim=1) | |
| accuracy = (preds == y_test).float().mean().item() | |
| print(f"Test accuracy: {{accuracy:.4f}}") | |
| print("Evaluation complete") | |
| print("Training finished") | |
| ''' | |
| else: | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class RegNet(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(15, {hidden}), | |
| nn.BatchNorm1d({hidden}), | |
| nn.ReLU(), | |
| nn.Dropout(p={dropout_p}), | |
| nn.Linear({hidden}, {hidden}), | |
| nn.ReLU(), | |
| nn.Dropout(p={dropout_p}), | |
| nn.Linear({hidden}, 1), | |
| ) | |
| def forward(self, x): | |
| return self.net(x).squeeze(-1) | |
| torch.manual_seed(42) | |
| N = 600 | |
| X = torch.randn(N, 15) | |
| y = X[:, 0] * 2.0 + X[:, 3] * 0.5 + torch.randn(N) * 0.3 | |
| split = int(0.8 * N) | |
| X_train, X_test = X[:split], X[split:] | |
| y_train, y_test = y[:split], y[split:] | |
| train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True) | |
| model = RegNet() | |
| optimizer = optim.Adam(model.parameters(), lr=1e-3) | |
| criterion = nn.MSELoss() | |
| for epoch in range(10): | |
| model.train() | |
| for xb, yb in train_loader: | |
| optimizer.zero_grad() | |
| loss = criterion(model(xb), yb) | |
| loss.backward() | |
| optimizer.step() | |
| print(f"Epoch {{epoch+1}} complete") | |
| test_loss = criterion(model(X_test), y_test).item() | |
| print(f"Test MSE: {{test_loss:.4f}}") | |
| print("Evaluation complete") | |
| print("Training finished") | |
| ''' | |
| error_output = ( | |
| "Script runs to completion with no errors.\n" | |
| f"Reported metrics vary between runs due to active Dropout(p={dropout_p}).\n" | |
| "Running evaluation twice gives different numbers. " | |
| "Model appears to be in wrong mode during evaluation." | |
| ) | |
| return BugScenario( | |
| task_id=TASK_MISSING_EVAL_MODE, | |
| task_description=( | |
| "This PyTorch model trains successfully but produces unreliable evaluation metrics. " | |
| "Running evaluation multiple times gives different results each time. " | |
| f"The model has Dropout(p={dropout_p}) and BatchNorm layers. " | |
| "Fix the evaluation so it produces stable, deterministic metrics." | |
| ), | |
| buggy_code=buggy_code, | |
| error_output=error_output, | |
| correct_bug_type="missing_eval_mode", | |
| solution_hint=f"model.eval() and torch.no_grad() must be called before evaluation; dropout p={dropout_p} stays active in train mode", | |
| alert=ALERTS[TASK_MISSING_EVAL_MODE], | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TASK 7 β Compound: Shape Mismatch + Wrong Device (Medium-Hard) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _compound_shape_device_scenario(rng: random.Random) -> BugScenario: | |
| hidden_size = rng.choice([128, 256]) | |
| wrong_size = rng.choice([32, 16]) | |
| num_classes = rng.choice([5, 10]) | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class MultiLayerNet(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.backbone = nn.Sequential( | |
| nn.Linear(256, {hidden_size}), | |
| nn.ReLU(), | |
| nn.Linear({hidden_size}, {hidden_size}), | |
| nn.ReLU(), | |
| ) | |
| self.classifier = nn.Linear({wrong_size}, {num_classes}) | |
| def forward(self, x): | |
| features = self.backbone(x) | |
| return self.classifier(features) | |
| X = torch.randn(300, 256) | |
| y = torch.randint(0, {num_classes}, (300,)) | |
| dataset = TensorDataset(X, y) | |
| loader = DataLoader(dataset, batch_size=32, shuffle=True) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = MultiLayerNet().to(device) | |
| optimizer = optim.Adam(model.parameters(), lr=1e-3) | |
| criterion = nn.CrossEntropyLoss() | |
| for epoch in range(3): | |
| for xb, yb in loader: | |
| optimizer.zero_grad() | |
| pred = model(xb) | |
| loss = criterion(pred, yb) | |
| loss.backward() | |
| optimizer.step() | |
| print(f"Epoch {{epoch+1}} complete") | |
| print("Training finished") | |
| ''' | |
| error_output = ( | |
| "This script has TWO bugs that must both be fixed.\n\n" | |
| f"Bug 1 β Shape mismatch:\n" | |
| f" RuntimeError: mat1 and mat2 shapes cannot be multiplied " | |
| f"({hidden_size} cannot be broadcast to {wrong_size})\n\n" | |
| "Bug 2 β Device mismatch:\n" | |
| " RuntimeError: Expected all tensors to be on the same device!\n\n" | |
| "Fix BOTH bugs. Script should train 3 epochs without error." | |
| ) | |
| return BugScenario( | |
| task_id=TASK_COMPOUND_SHAPE_DEVICE, | |
| task_description=( | |
| "This PyTorch script has TWO bugs that must both be fixed. " | |
| "There is a shape mismatch in the model architecture AND a device placement error. " | |
| "Fix both bugs so the script trains for 3 epochs without any errors." | |
| ), | |
| buggy_code=buggy_code, | |
| error_output=error_output, | |
| correct_bug_type="compound_shape_device", | |
| solution_hint=f"fix 1: classifier input must be {hidden_size} not {wrong_size}; fix 2: move xb and yb to device in training loop", | |
| num_bugs=2, | |
| alert=ALERTS[TASK_COMPOUND_SHAPE_DEVICE], | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TASK 8 β Compound: Data Leakage + Missing Eval Mode (Expert) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _compound_leakage_eval_scenario(rng: random.Random) -> BugScenario: | |
| dropout_p = rng.choice([0.3, 0.4]) | |
| hidden = rng.choice([64, 128]) | |
| num_classes = rng.choice([3, 4]) | |
| buggy_code = f'''import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| torch.manual_seed(42) | |
| class TabularNet(nn.Module): | |
| def __init__(self, input_dim, num_classes): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.Linear(input_dim, {hidden}), | |
| nn.BatchNorm1d({hidden}), | |
| nn.ReLU(), | |
| nn.Dropout(p={dropout_p}), | |
| nn.Linear({hidden}, {hidden}), | |
| nn.ReLU(), | |
| nn.Dropout(p={dropout_p}), | |
| nn.Linear({hidden}, num_classes), | |
| ) | |
| def forward(self, x): | |
| return self.net(x) | |
| torch.manual_seed(42) | |
| N, D, C = 1000, 20, {num_classes} | |
| X_raw = torch.randn(N, D) | |
| true_weights = torch.randn(D, C) | |
| y_all = (X_raw @ true_weights).argmax(dim=1) | |
| mean = X_raw.mean(dim=0) | |
| std = X_raw.std(dim=0) + 1e-8 | |
| X_normalized = (X_raw - mean) / std | |
| split = int(0.8 * N) | |
| X_train, X_test = X_normalized[:split], X_normalized[split:] | |
| y_train, y_test = y_all[:split], y_all[split:] | |
| train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True) | |
| model = TabularNet(D, C) | |
| optimizer = optim.Adam(model.parameters(), lr=1e-3) | |
| criterion = nn.CrossEntropyLoss() | |
| for epoch in range(10): | |
| model.train() | |
| for xb, yb in train_loader: | |
| optimizer.zero_grad() | |
| loss = criterion(model(xb), yb) | |
| loss.backward() | |
| optimizer.step() | |
| print(f"Epoch {{epoch+1}} complete") | |
| test_preds = model(X_test).argmax(dim=1) | |
| accuracy = (test_preds == y_test).float().mean().item() | |
| print(f"Test accuracy: {{accuracy:.4f}}") | |
| print("Evaluation complete") | |
| print("Training finished") | |
| ''' | |
| error_output = ( | |
| "Script runs to completion with no errors.\n" | |
| "Reported test accuracy: 0.9700 (varies slightly between runs)\n\n" | |
| "This script has TWO silent bugs:\n" | |
| "Bug 1 β Data leakage: normalization statistics computed from entire dataset before split.\n" | |
| f"Bug 2 β Missing eval mode: model.eval() not called; Dropout(p={dropout_p}) active during evaluation." | |
| ) | |
| return BugScenario( | |
| task_id=TASK_COMPOUND_LEAKAGE_EVAL, | |
| task_description=( | |
| "This PyTorch script runs cleanly and reports impressive metrics β but contains " | |
| "TWO silent bugs that make the evaluation invalid. " | |
| "There is a data leakage bug in preprocessing AND a missing eval mode bug. " | |
| "Fix both so the evaluation is correct and deterministic." | |
| ), | |
| buggy_code=buggy_code, | |
| error_output=error_output, | |
| correct_bug_type="compound_leakage_eval", | |
| solution_hint=f"fix 1: compute mean/std only from X_train after split; fix 2: add model.eval() and torch.no_grad() before evaluation", | |
| num_bugs=2, | |
| alert=ALERTS[TASK_COMPOUND_LEAKAGE_EVAL], | |
| ) |