ml-debug-env / server /bug_generator.py
rak2315's picture
Block A B C: partial observability, LLM judge, adversarial scheduler
49aa3ca
Raw
History Blame Contribute Delete
44.4 kB
import ast
import os
import random
import re
import subprocess
import sys
import tempfile
from dataclasses import dataclass, field
from typing import Optional, List
@dataclass
class BugScenario:
task_id: str
task_description: str
buggy_code: str
error_output: str
correct_bug_type: str
solution_hint: str
num_bugs: int = 1
alert: str = ""
TASK_SHAPE_MISMATCH = "shape_mismatch"
TASK_TRAINING_COLLAPSE = "training_collapse"
TASK_DATA_LEAKAGE = "data_leakage"
TASK_WRONG_DEVICE = "wrong_device"
TASK_GRADIENT_NOT_ZEROED = "gradient_not_zeroed"
TASK_MISSING_EVAL_MODE = "missing_eval_mode"
TASK_COMPOUND_SHAPE_DEVICE = "compound_shape_device"
TASK_COMPOUND_LEAKAGE_EVAL = "compound_leakage_eval"
ALL_TASKS = [
TASK_SHAPE_MISMATCH,
TASK_TRAINING_COLLAPSE,
TASK_DATA_LEAKAGE,
TASK_WRONG_DEVICE,
TASK_GRADIENT_NOT_ZEROED,
TASK_MISSING_EVAL_MODE,
TASK_COMPOUND_SHAPE_DEVICE,
TASK_COMPOUND_LEAKAGE_EVAL,
]
SINGLE_TASKS = [
TASK_SHAPE_MISMATCH,
TASK_TRAINING_COLLAPSE,
TASK_DATA_LEAKAGE,
TASK_WRONG_DEVICE,
TASK_GRADIENT_NOT_ZEROED,
TASK_MISSING_EVAL_MODE,
]
COMPOUND_TASKS = [
TASK_COMPOUND_SHAPE_DEVICE,
TASK_COMPOUND_LEAKAGE_EVAL,
]
AVAILABLE_TOOLS = ["run_code", "get_traceback", "inspect_gradients", "print_shapes", "view_source"]
def get_scenario(task_id: str, seed: Optional[int] = None) -> BugScenario:
rng = random.Random(seed)
if task_id == TASK_SHAPE_MISMATCH:
return _shape_mismatch_scenario(rng)
elif task_id == TASK_TRAINING_COLLAPSE:
return _training_collapse_scenario(rng)
elif task_id == TASK_DATA_LEAKAGE:
return _data_leakage_scenario(rng)
elif task_id == TASK_WRONG_DEVICE:
return _wrong_device_scenario(rng)
elif task_id == TASK_GRADIENT_NOT_ZEROED:
return _gradient_not_zeroed_scenario(rng)
elif task_id == TASK_MISSING_EVAL_MODE:
return _missing_eval_mode_scenario(rng)
elif task_id == TASK_COMPOUND_SHAPE_DEVICE:
return _compound_shape_device_scenario(rng)
elif task_id == TASK_COMPOUND_LEAKAGE_EVAL:
return _compound_leakage_eval_scenario(rng)
else:
raise ValueError(f"Unknown task_id: {task_id}")
def get_random_task(seed: Optional[int] = None) -> str:
rng = random.Random(seed)
return rng.choice(ALL_TASKS)
# ──────────────────────────────────────────────────────────────
# TOOL EXECUTION ENGINE
# These functions implement the 5 diagnostic tools available
# to the agent in partial observability mode.
# ──────────────────────────────────────────────────────────────
def _get_python_exe() -> str:
python_exe = os.environ.get("PYTHON_EXEC")
if not python_exe:
server_dir = os.path.dirname(os.path.abspath(__file__))
project_dir = os.path.dirname(server_dir)
candidate = os.path.join(project_dir, ".venv", "Scripts", "python.exe")
if not os.path.exists(candidate):
candidate = os.path.join(project_dir, ".venv", "bin", "python")
python_exe = candidate if os.path.exists(candidate) else sys.executable
return python_exe
def _run_in_subprocess(code: str, timeout: int = 40) -> tuple[str, bool, str]:
"""Run code, return (output, success, stderr_only)."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as f:
f.write(code)
tmp_path = f.name
try:
result = subprocess.run(
[_get_python_exe(), tmp_path],
capture_output=True,
text=True,
timeout=timeout,
env={**os.environ, "PYTHONIOENCODING": "utf-8"},
)
combined = (result.stdout + result.stderr).strip()
return combined, result.returncode == 0, result.stderr.strip()
except subprocess.TimeoutExpired:
return f"Execution timed out after {timeout}s.", False, ""
except Exception as e:
return f"Execution error: {e}", False, ""
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
def execute_tool(tool_name: str, scenario: BugScenario) -> str:
"""
Execute a diagnostic tool against the buggy scenario.
Returns a string result to show the agent.
"""
if tool_name == "run_code":
return _tool_run_code(scenario.buggy_code)
elif tool_name == "get_traceback":
return _tool_get_traceback(scenario.buggy_code)
elif tool_name == "inspect_gradients":
return _tool_inspect_gradients(scenario.buggy_code, scenario.task_id)
elif tool_name == "print_shapes":
return _tool_print_shapes(scenario.buggy_code, scenario.task_id)
elif tool_name == "view_source":
return _tool_view_source(scenario.buggy_code)
else:
return f"Unknown tool: {tool_name}. Available tools: {AVAILABLE_TOOLS}"
def _tool_run_code(buggy_code: str) -> str:
output, success, _ = _run_in_subprocess(buggy_code, timeout=30)
status = "COMPLETED" if success else "FAILED"
lines = output.split("\n")
truncated = "\n".join(lines[:40])
if len(lines) > 40:
truncated += f"\n... ({len(lines) - 40} more lines truncated)"
return f"[run_code] Status: {status}\n\n{truncated}"
def _tool_get_traceback(buggy_code: str) -> str:
output, success, stderr = _run_in_subprocess(buggy_code, timeout=30)
if success:
return "[get_traceback] Code ran without error. No traceback available."
if not stderr and not output:
return "[get_traceback] No traceback captured."
tb_text = stderr if stderr else output
lines = tb_text.split("\n")
truncated = "\n".join(lines[:50])
return f"[get_traceback] Full traceback:\n\n{truncated}"
def _tool_inspect_gradients(buggy_code: str, task_id: str) -> str:
"""
Inject gradient norm logging before backward pass, run one batch, report per-layer norms.
For tasks where code crashes before backward (shape_mismatch, wrong_device), returns crash info.
For silent bugs (leakage, eval_mode), returns gradient info anyway β€” agent must interpret.
"""
instrumented = _inject_gradient_logging(buggy_code)
output, success, _ = _run_in_subprocess(instrumented, timeout=35)
if not success and "GRAD_NORMS" not in output:
lines = output.split("\n")[:20]
return (
"[inspect_gradients] Code crashed before gradients could be measured.\n"
"Partial output:\n" + "\n".join(lines)
)
lines = output.split("\n")
grad_lines = [l for l in lines if "GRAD_NORM" in l or "grad_norm" in l.lower() or "GRADIENT" in l]
if not grad_lines:
return (
"[inspect_gradients] Gradient injection ran but no gradient norms captured.\n"
"This may indicate gradients are not flowing (no backward call reached).\n"
f"Output excerpt:\n{chr(10).join(lines[:15])}"
)
return "[inspect_gradients] Per-layer gradient norms (first batch):\n\n" + "\n".join(grad_lines[:30])
def _inject_gradient_logging(code: str) -> str:
"""
Inject gradient norm reporting after the first backward() call.
Wraps the training loop to break after 1 batch and report norms.
"""
injection = '''
import torch as _torch_grad_tool
_GRAD_BATCH_COUNT = 0
_GRAD_ORIG_backward = _torch_grad_tool.Tensor.backward
def _patched_backward(self, *args, **kwargs):
global _GRAD_BATCH_COUNT
_GRAD_ORIG_backward(self, *args, **kwargs)
_GRAD_BATCH_COUNT += 1
if _GRAD_BATCH_COUNT == 1:
# find all parameters with gradients in scope
import gc
for obj in gc.get_objects():
if isinstance(obj, _torch_grad_tool.nn.Module):
print("GRAD_NORMS:")
for name, param in obj.named_parameters():
if param.grad is not None:
norm = param.grad.norm().item()
print(f" GRAD_NORM layer={name} norm={norm:.6f}")
break
_torch_grad_tool.Tensor.backward = _patched_backward
'''
return injection + "\n" + code
def _tool_print_shapes(buggy_code: str, task_id: str) -> str:
"""
Inject shape printing at each linear layer, run one forward pass, report shapes.
"""
instrumented = _inject_shape_logging(buggy_code)
output, success, _ = _run_in_subprocess(instrumented, timeout=35)
lines = output.split("\n")
shape_lines = [l for l in lines if "SHAPE" in l or "shape" in l.lower()]
if not shape_lines and not success:
error_lines = [l for l in lines if "Error" in l or "error" in l]
return (
"[print_shapes] Code crashed before shapes could be captured.\n"
"Error:\n" + "\n".join(error_lines[:10]) +
"\nFull output excerpt:\n" + "\n".join(lines[:20])
)
if not shape_lines:
return (
"[print_shapes] Shape hook ran but no shapes captured.\n"
f"Output:\n{chr(10).join(lines[:20])}"
)
return "[print_shapes] Tensor shapes at each layer (first batch):\n\n" + "\n".join(shape_lines[:40])
def _inject_shape_logging(code: str) -> str:
"""Register forward hooks on all Linear layers to print input/output shapes."""
injection = '''
import torch as _torch_shape_tool
_shape_hooks = []
def _register_shape_hooks(model):
def make_hook(name):
def hook(module, input, output):
in_shape = tuple(input[0].shape) if input else "unknown"
out_shape = tuple(output.shape)
print(f"SHAPE layer={name} in={in_shape} out={out_shape}")
return hook
for name, module in model.named_modules():
if isinstance(module, (_torch_shape_tool.nn.Linear, _torch_shape_tool.nn.Conv2d)):
h = module.register_forward_hook(make_hook(name))
_shape_hooks.append(h)
_orig_Module_init = _torch_shape_tool.nn.Module.__init__
_registered_models = set()
def _patched_init(self, *args, **kwargs):
_orig_Module_init(self, *args, **kwargs)
_torch_shape_tool.nn.Module.__init__ = _patched_init
import atexit as _atexit_shape
def _cleanup_hooks():
for h in _shape_hooks:
h.remove()
_atexit_shape.register(_cleanup_hooks)
# Patch Module.to() and Module.forward to auto-register hooks on first forward
_orig_forward = _torch_shape_tool.nn.Module.__call__
_hooked_ids = set()
def _auto_hook_call(self, *args, **kwargs):
mid = id(self)
if mid not in _hooked_ids and len(list(self.children())) > 0:
_register_shape_hooks(self)
_hooked_ids.add(mid)
return _orig_forward(self, *args, **kwargs)
_torch_shape_tool.nn.Module.__call__ = _auto_hook_call
'''
return injection + "\n" + code
def _tool_view_source(buggy_code: str) -> str:
lines = buggy_code.split("\n")
numbered = "\n".join(f"{i+1:3d} | {line}" for i, line in enumerate(lines))
return f"[view_source] Full buggy script ({len(lines)} lines):\n\n{numbered}"
# ──────────────────────────────────────────────────────────────
# ALERT MESSAGES β€” minimal failure notice shown on reset()
# No code, no traceback, no hints. Just what an on-call engineer sees.
# ──────────────────────────────────────────────────────────────
ALERTS = {
TASK_SHAPE_MISMATCH: "Training job crashed immediately. No epochs completed. Exit code 1.",
TASK_TRAINING_COLLAPSE: "Training job completed 5 epochs. Final loss: nan. Model did not converge.",
TASK_DATA_LEAKAGE: "Training job completed successfully. Test accuracy: 96.5%. Review requested.",
TASK_WRONG_DEVICE: "Training job crashed on first forward pass. Exit code 1.",
TASK_GRADIENT_NOT_ZEROED: "Training job failed. Loss exploded to nan by epoch 4. Exit code 0.",
TASK_MISSING_EVAL_MODE: "Training completed. Evaluation metrics unstable across repeated runs.",
TASK_COMPOUND_SHAPE_DEVICE: "Training job crashed immediately. Multiple errors detected. Exit code 1.",
TASK_COMPOUND_LEAKAGE_EVAL: "Training completed. Metrics look suspicious and vary between evaluation runs.",
}
# ──────────────────────────────────────────────────────────────
# TASK 1 β€” Shape Mismatch (Easy)
# ──────────────────────────────────────────────────────────────
def _shape_mismatch_scenario(rng: random.Random) -> BugScenario:
variant = rng.choice(["mlp", "deep", "autoencoder"])
hidden_size = rng.choice([128, 256, 512])
wrong_size = rng.choice([64, 32, 16])
num_classes = rng.choice([10, 5, 20])
if variant == "mlp":
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class SimpleClassifier(nn.Module):
def __init__(self):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(784, {hidden_size}),
nn.ReLU(),
nn.Linear({hidden_size}, {hidden_size}),
nn.ReLU(),
)
self.classifier = nn.Linear({wrong_size}, {num_classes})
def forward(self, x):
features = self.encoder(x)
return self.classifier(features)
X = torch.randn(200, 784)
y = torch.randint(0, {num_classes}, (200,))
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, shuffle=True)
model = SimpleClassifier()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(3):
for xb, yb in loader:
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
print(f"Epoch {{epoch+1}} complete")
print("Training finished")
'''
error_output = f"RuntimeError: mat1 and mat2 shapes cannot be multiplied ({hidden_size} cannot be broadcast to {wrong_size})"
solution_hint = f"classifier input must be {hidden_size} not {wrong_size}"
elif variant == "deep":
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class DeepNet(nn.Module):
def __init__(self):
super().__init__()
self.feature_extractor = nn.Sequential(
nn.Linear(512, {hidden_size}),
nn.BatchNorm1d({hidden_size}),
nn.ReLU(),
nn.Linear({hidden_size}, {hidden_size}),
nn.ReLU(),
)
self.head = nn.Linear({wrong_size}, {num_classes})
def forward(self, x):
z = self.feature_extractor(x)
return self.head(z)
X = torch.randn(300, 512)
y = torch.randint(0, {num_classes}, (300,))
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=64, shuffle=True)
model = DeepNet()
optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=0.9)
criterion = nn.CrossEntropyLoss()
for epoch in range(3):
for xb, yb in loader:
optimizer.zero_grad()
out = model(xb)
loss = criterion(out, yb)
loss.backward()
optimizer.step()
print(f"Epoch {{epoch+1}} complete")
print("Training finished")
'''
error_output = f"RuntimeError: mat1 and mat2 shapes cannot be multiplied ({hidden_size} cannot be broadcast to {wrong_size})"
solution_hint = f"head input must be {hidden_size} not {wrong_size}"
else:
bottleneck = rng.choice([16, 32])
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class Autoencoder(nn.Module):
def __init__(self):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(128, {hidden_size}),
nn.ReLU(),
nn.Linear({hidden_size}, {bottleneck}),
)
self.decoder = nn.Sequential(
nn.Linear({wrong_size}, {hidden_size}),
nn.ReLU(),
nn.Linear({hidden_size}, 128),
)
def forward(self, x):
z = self.encoder(x)
return self.decoder(z)
X = torch.randn(200, 128)
dataset = TensorDataset(X, X)
loader = DataLoader(dataset, batch_size=32, shuffle=True)
model = Autoencoder()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
for epoch in range(3):
for xb, _ in loader:
optimizer.zero_grad()
out = model(xb)
loss = criterion(out, xb)
loss.backward()
optimizer.step()
print(f"Epoch {{epoch+1}} complete")
print("Training finished")
'''
error_output = f"RuntimeError: mat1 and mat2 shapes cannot be multiplied ({bottleneck} cannot be broadcast to {wrong_size})"
solution_hint = f"decoder input must be {bottleneck} not {wrong_size}"
return BugScenario(
task_id=TASK_SHAPE_MISMATCH,
task_description=(
"This PyTorch model crashes immediately during the forward pass with a shape mismatch. "
"The training loop never completes a single step. "
"Find the architectural bug and fix the script so it trains for 3 epochs without error."
),
buggy_code=buggy_code,
error_output=error_output,
correct_bug_type="shape_mismatch",
solution_hint=solution_hint,
alert=ALERTS[TASK_SHAPE_MISMATCH],
)
# ──────────────────────────────────────────────────────────────
# TASK 2 β€” Training Collapse (Medium)
# ──────────────────────────────────────────────────────────────
def _training_collapse_scenario(rng: random.Random) -> BugScenario:
bad_lr = rng.choice([10.0, 50.0, 100.0])
variant = rng.choice(["lr", "loss_fn"])
if variant == "lr":
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(20, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1),
)
def forward(self, x):
return self.net(x).squeeze(-1)
X = torch.randn(300, 20)
y = (X[:, 0] + X[:, 1] * 0.5 + torch.randn(300) * 0.1)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32)
model = MLP()
optimizer = optim.SGD(model.parameters(), lr={bad_lr})
criterion = nn.MSELoss()
for epoch in range(5):
epoch_loss = 0.0
for xb, yb in loader:
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg = epoch_loss / len(loader)
print(f"Epoch {{epoch+1}}, loss: {{avg:.4f}}")
print("Training finished")
'''
error_output = (
f"Training runs without crashing but loss diverges to NaN by epoch 2.\n"
f"Epoch 1, loss: 847.3291\nEpoch 2, loss: nan\nEpoch 3, loss: nan"
)
solution_hint = f"learning rate {bad_lr} causes gradient explosion; reduce to ~1e-3"
else:
buggy_code = '''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class BinaryClassifier(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(15, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Sigmoid(),
)
def forward(self, x):
return self.net(x).squeeze(-1)
X = torch.randn(400, 15)
y = (X[:, 0] > 0).float()
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32)
model = BinaryClassifier()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
for epoch in range(5):
epoch_loss = 0.0
for xb, yb in loader:
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg = epoch_loss / len(loader)
print(f"Epoch {epoch+1}, loss: {avg:.4f}")
print("Training finished")
'''
error_output = (
"Training runs without error but model fails to converge.\n"
"Epoch 1, loss: 0.2489\nEpoch 2, loss: 0.2491\nEpoch 5, loss: 0.2491\n"
"Loss plateaus immediately. Wrong loss function for binary classification."
)
solution_hint = "MSELoss used for binary classification; should be BCELoss or BCEWithLogitsLoss"
return BugScenario(
task_id=TASK_TRAINING_COLLAPSE,
task_description=(
"This PyTorch training script runs without any Python errors or crashes, "
"but the model completely fails to learn. "
"Diagnose why training is broken and fix the script so loss decreases "
"consistently across 5 epochs."
),
buggy_code=buggy_code,
error_output=error_output,
correct_bug_type="training_collapse",
solution_hint=solution_hint,
alert=ALERTS[TASK_TRAINING_COLLAPSE],
)
# ──────────────────────────────────────────────────────────────
# TASK 3 β€” Data Leakage (Hard)
# ──────────────────────────────────────────────────────────────
def _data_leakage_scenario(rng: random.Random) -> BugScenario:
variant = rng.choice(["normalize_before_split", "scaler_fit_on_all"])
if variant == "normalize_before_split":
buggy_code = '''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class TabularClassifier(nn.Module):
def __init__(self, input_dim, num_classes):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, num_classes),
)
def forward(self, x):
return self.net(x)
torch.manual_seed(42)
N, D, C = 1000, 20, 3
X_raw = torch.randn(N, D)
true_weights = torch.randn(D, C)
y_all = (X_raw @ true_weights).argmax(dim=1)
mean = X_raw.mean(dim=0)
std = X_raw.std(dim=0) + 1e-8
X_normalized = (X_raw - mean) / std
split = int(0.8 * N)
X_train, X_test = X_normalized[:split], X_normalized[split:]
y_train, y_test = y_all[:split], y_all[split:]
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
model = TabularClassifier(D, C)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(10):
model.train()
for xb, yb in train_loader:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
model.eval()
with torch.no_grad():
test_preds = model(X_test).argmax(dim=1)
accuracy = (test_preds == y_test).float().mean().item()
print(f"Test accuracy: {accuracy:.4f}")
print("Training finished")
'''
error_output = (
"Script runs to completion. Reported test accuracy: 0.9650\n"
"However, the evaluation is invalid β€” there is a data pipeline bug."
)
solution_hint = "normalize using only train set mean/std; compute mean and std after the split, only on X_train"
else:
buggy_code = '''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class Regressor(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(10, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, 1),
)
def forward(self, x):
return self.net(x).squeeze(-1)
N = 800
X_raw = torch.randn(N, 10)
y_all = X_raw[:, 0] * 2.5 + X_raw[:, 2] * 1.3 + torch.randn(N) * 0.3
split = int(0.75 * N)
X_train_raw, X_test_raw = X_raw[:split], X_raw[split:]
y_train, y_test = y_all[:split], y_all[split:]
full_mean = X_raw.mean(dim=0)
full_std = X_raw.std(dim=0) + 1e-8
X_train = (X_train_raw - full_mean) / full_std
X_test = (X_test_raw - full_mean) / full_std
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
model = Regressor()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
for epoch in range(8):
model.train()
for xb, yb in train_loader:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
model.eval()
with torch.no_grad():
test_loss = criterion(model(X_test), y_test).item()
print(f"Test MSE: {test_loss:.4f}")
print("Training finished")
'''
error_output = (
"Script runs to completion. Reported test MSE: 0.1021\n"
"The MSE is artificially low β€” test statistics leaked into normalization."
)
solution_hint = "fit normalization stats only on X_train_raw; use train_mean and train_std to normalize both train and test"
return BugScenario(
task_id=TASK_DATA_LEAKAGE,
task_description=(
"This PyTorch training script runs cleanly with no errors and reports impressive metrics. "
"But the evaluation is fundamentally broken due to a data handling mistake. "
"Identify the data pipeline bug and fix it so the evaluation is valid."
),
buggy_code=buggy_code,
error_output=error_output,
correct_bug_type="data_leakage",
solution_hint=solution_hint,
alert=ALERTS[TASK_DATA_LEAKAGE],
)
# ──────────────────────────────────────────────────────────────
# TASK 4 β€” Wrong Device (Medium)
# ──────────────────────────────────────────────────────────────
def _wrong_device_scenario(rng: random.Random) -> BugScenario:
hidden = rng.choice([64, 128, 256])
num_classes = rng.choice([5, 10])
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class Classifier(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(784, {hidden}),
nn.ReLU(),
nn.Linear({hidden}, {num_classes}),
)
def forward(self, x):
return self.net(x)
X = torch.randn(200, 784)
y = torch.randint(0, {num_classes}, (200,))
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Classifier().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss().to(device)
for epoch in range(3):
for xb, yb in loader:
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
print(f"Epoch {{epoch+1}} complete")
print("Training finished")
'''
error_output = (
"RuntimeError: Expected all tensors to be on the same device, "
"but found at least two devices!\n\n"
"The model was moved to the target device but data batches remain on CPU. "
"Every forward pass crashes. Fix tensor placement so all tensors are on the same device."
)
return BugScenario(
task_id=TASK_WRONG_DEVICE,
task_description=(
"This PyTorch training script crashes on the first forward pass. "
"The model and data tensors are on different devices. "
"Fix the script so training runs for 3 epochs without error on whatever device is available."
),
buggy_code=buggy_code,
error_output=error_output,
correct_bug_type="wrong_device",
solution_hint="move xb and yb to device inside the training loop: xb, yb = xb.to(device), yb.to(device)",
alert=ALERTS[TASK_WRONG_DEVICE],
)
# ──────────────────────────────────────────────────────────────
# TASK 5 β€” Gradient Not Zeroed (Medium-Hard)
# ──────────────────────────────────────────────────────────────
def _gradient_not_zeroed_scenario(rng: random.Random) -> BugScenario:
hidden = rng.choice([32, 64, 128])
lr = rng.choice([1e-3, 5e-4])
variant = rng.choice(["regression", "classification"])
if variant == "regression":
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(10, {hidden}),
nn.ReLU(),
nn.Linear({hidden}, {hidden}),
nn.ReLU(),
nn.Linear({hidden}, 1),
)
def forward(self, x):
return self.net(x).squeeze(-1)
X = torch.randn(500, 10)
y = X[:, 0] * 1.5 - X[:, 2] * 0.8 + torch.randn(500) * 0.2
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, shuffle=True)
model = MLP()
optimizer = optim.Adam(model.parameters(), lr={lr})
criterion = nn.MSELoss()
for epoch in range(6):
epoch_loss = 0.0
for xb, yb in loader:
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg = epoch_loss / len(loader)
print(f"Epoch {{epoch+1}}, loss: {{avg:.4f}}")
print("Training finished")
'''
else:
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class Net(nn.Module):
def __init__(self):
super().__init__()
self.features = nn.Sequential(
nn.Linear(32, {hidden}),
nn.ReLU(),
nn.Linear({hidden}, {hidden}),
nn.ReLU(),
)
self.classifier = nn.Linear({hidden}, 4)
def forward(self, x):
return self.classifier(self.features(x))
X = torch.randn(400, 32)
y = torch.randint(0, 4, (400,))
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, shuffle=True)
model = Net()
optimizer = optim.SGD(model.parameters(), lr={lr}, momentum=0.9)
criterion = nn.CrossEntropyLoss()
for epoch in range(6):
epoch_loss = 0.0
for xb, yb in loader:
out = model(xb)
loss = criterion(out, yb)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg = epoch_loss / len(loader)
print(f"Epoch {{epoch+1}}, loss: {{avg:.4f}}")
print("Training finished")
'''
error_output = (
"Script runs without crashing but training is highly unstable.\n"
"Epoch 1, loss: 12.4821\nEpoch 2, loss: 847.2341\n"
"Epoch 3, loss: 23451.8821\nEpoch 4, loss: nan\n"
"Loss explodes after epoch 1 and collapses to NaN. "
"Fundamental error in training loop structure."
)
return BugScenario(
task_id=TASK_GRADIENT_NOT_ZEROED,
task_description=(
"This PyTorch training script runs without crashing but loss explodes "
"after the first epoch and collapses to NaN. The model never learns. "
"Find the training loop bug and fix it so loss decreases consistently across 6 epochs."
),
buggy_code=buggy_code,
error_output=error_output,
correct_bug_type="gradient_not_zeroed",
solution_hint="optimizer.zero_grad() is missing before loss.backward(); gradients accumulate causing explosion",
alert=ALERTS[TASK_GRADIENT_NOT_ZEROED],
)
# ──────────────────────────────────────────────────────────────
# TASK 6 β€” Missing Eval Mode (Hard)
# ──────────────────────────────────────────────────────────────
def _missing_eval_mode_scenario(rng: random.Random) -> BugScenario:
dropout_p = rng.choice([0.3, 0.4, 0.5])
hidden = rng.choice([64, 128])
num_classes = rng.choice([3, 5])
variant = rng.choice(["classifier", "regressor"])
if variant == "classifier":
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class DropoutClassifier(nn.Module):
def __init__(self, input_dim, num_classes):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, {hidden}),
nn.BatchNorm1d({hidden}),
nn.ReLU(),
nn.Dropout(p={dropout_p}),
nn.Linear({hidden}, {hidden}),
nn.BatchNorm1d({hidden}),
nn.ReLU(),
nn.Dropout(p={dropout_p}),
nn.Linear({hidden}, num_classes),
)
def forward(self, x):
return self.net(x)
torch.manual_seed(42)
N, D, C = 800, 20, {num_classes}
X = torch.randn(N, D)
true_w = torch.randn(D, C)
y = (X @ true_w).argmax(dim=1)
split = int(0.8 * N)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
model = DropoutClassifier(D, C)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(10):
model.train()
for xb, yb in train_loader:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
print(f"Epoch {{epoch+1}} complete")
preds = model(X_test).argmax(dim=1)
accuracy = (preds == y_test).float().mean().item()
print(f"Test accuracy: {{accuracy:.4f}}")
print("Evaluation complete")
print("Training finished")
'''
else:
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class RegNet(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(15, {hidden}),
nn.BatchNorm1d({hidden}),
nn.ReLU(),
nn.Dropout(p={dropout_p}),
nn.Linear({hidden}, {hidden}),
nn.ReLU(),
nn.Dropout(p={dropout_p}),
nn.Linear({hidden}, 1),
)
def forward(self, x):
return self.net(x).squeeze(-1)
torch.manual_seed(42)
N = 600
X = torch.randn(N, 15)
y = X[:, 0] * 2.0 + X[:, 3] * 0.5 + torch.randn(N) * 0.3
split = int(0.8 * N)
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
model = RegNet()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
for epoch in range(10):
model.train()
for xb, yb in train_loader:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
print(f"Epoch {{epoch+1}} complete")
test_loss = criterion(model(X_test), y_test).item()
print(f"Test MSE: {{test_loss:.4f}}")
print("Evaluation complete")
print("Training finished")
'''
error_output = (
"Script runs to completion with no errors.\n"
f"Reported metrics vary between runs due to active Dropout(p={dropout_p}).\n"
"Running evaluation twice gives different numbers. "
"Model appears to be in wrong mode during evaluation."
)
return BugScenario(
task_id=TASK_MISSING_EVAL_MODE,
task_description=(
"This PyTorch model trains successfully but produces unreliable evaluation metrics. "
"Running evaluation multiple times gives different results each time. "
f"The model has Dropout(p={dropout_p}) and BatchNorm layers. "
"Fix the evaluation so it produces stable, deterministic metrics."
),
buggy_code=buggy_code,
error_output=error_output,
correct_bug_type="missing_eval_mode",
solution_hint=f"model.eval() and torch.no_grad() must be called before evaluation; dropout p={dropout_p} stays active in train mode",
alert=ALERTS[TASK_MISSING_EVAL_MODE],
)
# ──────────────────────────────────────────────────────────────
# TASK 7 β€” Compound: Shape Mismatch + Wrong Device (Medium-Hard)
# ──────────────────────────────────────────────────────────────
def _compound_shape_device_scenario(rng: random.Random) -> BugScenario:
hidden_size = rng.choice([128, 256])
wrong_size = rng.choice([32, 16])
num_classes = rng.choice([5, 10])
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class MultiLayerNet(nn.Module):
def __init__(self):
super().__init__()
self.backbone = nn.Sequential(
nn.Linear(256, {hidden_size}),
nn.ReLU(),
nn.Linear({hidden_size}, {hidden_size}),
nn.ReLU(),
)
self.classifier = nn.Linear({wrong_size}, {num_classes})
def forward(self, x):
features = self.backbone(x)
return self.classifier(features)
X = torch.randn(300, 256)
y = torch.randint(0, {num_classes}, (300,))
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MultiLayerNet().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(3):
for xb, yb in loader:
optimizer.zero_grad()
pred = model(xb)
loss = criterion(pred, yb)
loss.backward()
optimizer.step()
print(f"Epoch {{epoch+1}} complete")
print("Training finished")
'''
error_output = (
"This script has TWO bugs that must both be fixed.\n\n"
f"Bug 1 β€” Shape mismatch:\n"
f" RuntimeError: mat1 and mat2 shapes cannot be multiplied "
f"({hidden_size} cannot be broadcast to {wrong_size})\n\n"
"Bug 2 β€” Device mismatch:\n"
" RuntimeError: Expected all tensors to be on the same device!\n\n"
"Fix BOTH bugs. Script should train 3 epochs without error."
)
return BugScenario(
task_id=TASK_COMPOUND_SHAPE_DEVICE,
task_description=(
"This PyTorch script has TWO bugs that must both be fixed. "
"There is a shape mismatch in the model architecture AND a device placement error. "
"Fix both bugs so the script trains for 3 epochs without any errors."
),
buggy_code=buggy_code,
error_output=error_output,
correct_bug_type="compound_shape_device",
solution_hint=f"fix 1: classifier input must be {hidden_size} not {wrong_size}; fix 2: move xb and yb to device in training loop",
num_bugs=2,
alert=ALERTS[TASK_COMPOUND_SHAPE_DEVICE],
)
# ──────────────────────────────────────────────────────────────
# TASK 8 β€” Compound: Data Leakage + Missing Eval Mode (Expert)
# ──────────────────────────────────────────────────────────────
def _compound_leakage_eval_scenario(rng: random.Random) -> BugScenario:
dropout_p = rng.choice([0.3, 0.4])
hidden = rng.choice([64, 128])
num_classes = rng.choice([3, 4])
buggy_code = f'''import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(42)
class TabularNet(nn.Module):
def __init__(self, input_dim, num_classes):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, {hidden}),
nn.BatchNorm1d({hidden}),
nn.ReLU(),
nn.Dropout(p={dropout_p}),
nn.Linear({hidden}, {hidden}),
nn.ReLU(),
nn.Dropout(p={dropout_p}),
nn.Linear({hidden}, num_classes),
)
def forward(self, x):
return self.net(x)
torch.manual_seed(42)
N, D, C = 1000, 20, {num_classes}
X_raw = torch.randn(N, D)
true_weights = torch.randn(D, C)
y_all = (X_raw @ true_weights).argmax(dim=1)
mean = X_raw.mean(dim=0)
std = X_raw.std(dim=0) + 1e-8
X_normalized = (X_raw - mean) / std
split = int(0.8 * N)
X_train, X_test = X_normalized[:split], X_normalized[split:]
y_train, y_test = y_all[:split], y_all[split:]
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
model = TabularNet(D, C)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(10):
model.train()
for xb, yb in train_loader:
optimizer.zero_grad()
loss = criterion(model(xb), yb)
loss.backward()
optimizer.step()
print(f"Epoch {{epoch+1}} complete")
test_preds = model(X_test).argmax(dim=1)
accuracy = (test_preds == y_test).float().mean().item()
print(f"Test accuracy: {{accuracy:.4f}}")
print("Evaluation complete")
print("Training finished")
'''
error_output = (
"Script runs to completion with no errors.\n"
"Reported test accuracy: 0.9700 (varies slightly between runs)\n\n"
"This script has TWO silent bugs:\n"
"Bug 1 β€” Data leakage: normalization statistics computed from entire dataset before split.\n"
f"Bug 2 β€” Missing eval mode: model.eval() not called; Dropout(p={dropout_p}) active during evaluation."
)
return BugScenario(
task_id=TASK_COMPOUND_LEAKAGE_EVAL,
task_description=(
"This PyTorch script runs cleanly and reports impressive metrics β€” but contains "
"TWO silent bugs that make the evaluation invalid. "
"There is a data leakage bug in preprocessing AND a missing eval mode bug. "
"Fix both so the evaluation is correct and deterministic."
),
buggy_code=buggy_code,
error_output=error_output,
correct_bug_type="compound_leakage_eval",
solution_hint=f"fix 1: compute mean/std only from X_train after split; fix 2: add model.eval() and torch.no_grad() before evaluation",
num_bugs=2,
alert=ALERTS[TASK_COMPOUND_LEAKAGE_EVAL],
)