JustinTX's picture
Add files using upload-large-folder tool
b0e88cf verified
import numpy as np
from typing import List, Tuple, Dict, Any
import json
import os
try:
from skydiscover.evaluation.evaluation_result import EvaluationResult
except ImportError:
from dataclasses import dataclass, field
from typing import Union
@dataclass
class EvaluationResult:
metrics: Dict[str, float]
artifacts: Dict[str, Union[str, bytes]] = field(default_factory=dict)
import importlib.util
TASK_FILE = os.getenv("ARC_TASK_FILE", "training")
TASK_NUM = os.getenv("TASK_NUM", 0)
DATA_ROOT = os.getenv("DATA_ROOT", os.path.join(os.path.dirname(os.path.abspath(__file__)), "data"))
INCLUDE_TEST = os.getenv("ARC_EVAL_INCLUDE_TEST", "0").lower() in ("1", "true", "yes")
USE_TEST_IN_SCORE = os.getenv("ARC_EVAL_USE_TEST_FOR_SCORE", "0").lower() in ("1", "true", "yes")
def cell_accuracy_single(pred: np.ndarray, gt: np.ndarray) -> float:
"""
Compute continuous cell-level accuracy between prediction and ground truth.
Returns a float in [0, 1]. Handles shape mismatches gracefully.
"""
if pred.shape != gt.shape:
# Partial credit for getting shape partially right
shape_score = 0.0
if len(pred.shape) == len(gt.shape) == 2:
row_match = 1.0 if pred.shape[0] == gt.shape[0] else 0.0
col_match = 1.0 if pred.shape[1] == gt.shape[1] else 0.0
shape_score = (row_match + col_match) * 0.1 # up to 0.2 for correct dimensions
return shape_score
# Cell-level accuracy
total_cells = gt.size
if total_cells == 0:
return 1.0
correct_cells = int(np.sum(pred == gt))
return correct_cells / total_cells
def best_attempt_cell_accuracy(attempts: List[np.ndarray], gt: np.ndarray) -> float:
"""Return the best cell accuracy across all attempts for one example."""
return max(cell_accuracy_single(a, gt) for a in attempts)
def pass_at_2_accuracy_single(
attempts: List[np.ndarray],
gt: np.ndarray
) -> Tuple[int, Dict[int, Any]]:
"""
Compute pass@2 accuracy for a single ARC test case.
Args:
attempts: List of 2 numpy arrays representing model attempts.
gt: Ground-truth output as a 2D numpy array.
Returns:
pass_at_2: int (1 if any attempt is perfectly correct, else 0)
diagnostics: dict mapping attempt index -> diagnostic info.
If sizes match, includes indices of incorrect cells.
"""
assert len(attempts) == 2, "Expected exactly 2 attempts for pass@2 evaluation."
diagnostics = {}
passed = False
for i, pred in enumerate(attempts):
attempt_info = {}
# Size check
if pred.shape != gt.shape:
attempt_info["size_match"] = False
attempt_info["pred_shape"] = list(pred.shape)
attempt_info["gt_shape"] = list(gt.shape)
attempt_info["incorrect_indices"] = None
attempt_info["cell_accuracy"] = 0.0
attempt_passed = False
else:
attempt_info["size_match"] = True
# Find incorrect cells
incorrect_mask = pred != gt
incorrect_indices = np.argwhere(incorrect_mask)
attempt_info["incorrect_indices"] = incorrect_indices.tolist()
attempt_info["num_incorrect"] = int(incorrect_mask.sum())
attempt_info["num_total"] = int(gt.size)
attempt_info["cell_accuracy"] = float(np.sum(~incorrect_mask)) / gt.size
# Perfect match
if incorrect_mask.sum() == 0:
attempt_passed = True
else:
attempt_passed = False
attempt_info["perfect_match"] = attempt_passed
passed = attempt_passed or passed
diagnostics[i] = attempt_info
pass_at_2 = 1 if passed else 0
return pass_at_2, diagnostics
def pass_at_2_accuracy_multi_test(
all_attempts: List[List[np.ndarray]],
all_gt: List[np.ndarray]
) -> Tuple[List[int], List[Dict[int, Any]]]:
"""
Compute pass@2 accuracy across multiple ARC test cases.
Args:
all_attempts: List of lists of 2 numpy arrays for each test case.
all_gt: List of ground-truth outputs as 2D numpy arrays.
"""
assert len(all_attempts) == len(all_gt), "Mismatched number of test cases."
all_diagnostics = []
all_pass = []
for attempts, gt in zip(all_attempts, all_gt):
pass_at_2, diagnostics = pass_at_2_accuracy_single(attempts, gt)
all_pass.append(pass_at_2)
all_diagnostics.append(diagnostics)
return all_pass, all_diagnostics
def extract_failure_artifacts(diagnostics, pred=None, gt=None):
"""
Extract failure artifacts from diagnostics for a given example.
Includes actual vs expected output snippets for better LLM feedback.
"""
artifacts = {}
if not diagnostics["size_match"]:
artifacts["error_type"] = "SizeMismatch"
artifacts["error_message"] = (
f"Output shape {diagnostics['pred_shape']} does not match "
f"expected shape {diagnostics['gt_shape']}."
)
artifacts["suggestion"] = (
f"Your output has shape {diagnostics['pred_shape']} but the correct output "
f"has shape {diagnostics['gt_shape']}. Review how you determine output dimensions."
)
else:
num_incorrect = diagnostics['num_incorrect']
num_total = diagnostics['num_total']
accuracy = diagnostics['cell_accuracy']
artifacts["error_type"] = "IncorrectCells"
artifacts["error_message"] = (
f"{num_incorrect}/{num_total} cells incorrect "
f"(cell accuracy: {accuracy:.1%})."
)
# Show a compact diff of expected vs actual for first few wrong cells
if diagnostics['incorrect_indices'] and pred is not None and gt is not None:
wrong = diagnostics['incorrect_indices'][:8] # first 8 wrong cells
diff_lines = []
for r, c in wrong:
diff_lines.append(f" [{r},{c}]: got {int(pred[r,c])}, expected {int(gt[r,c])}")
artifacts["cell_diffs"] = "\n".join(diff_lines)
if len(diagnostics['incorrect_indices']) > 8:
artifacts["cell_diffs"] += f"\n ... and {len(diagnostics['incorrect_indices'])-8} more"
artifacts["suggestion"] = (
f"Your solution gets {accuracy:.1%} of cells correct. "
f"Review the transformation logic for the failing cells."
)
return artifacts
def evaluate(program_path):
"""
Evaluate the program on ARC task training (and optionally test) examples.
Returns a combined_score that blends:
- pass@2 (binary perfect-match, weighted 0.6)
- cell accuracy (continuous partial credit, weighted 0.4)
This gives evolution gradient signal even when no example is solved perfectly.
"""
spec = importlib.util.spec_from_file_location("program_module", program_path)
program_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(program_module)
if not hasattr(program_module, 'transform_grid_attempt_1') or not hasattr(program_module, 'transform_grid_attempt_2'):
print(f"Stage 1 validation failed: Program must define 'transform_grid_attempt_1' and 'transform_grid_attempt_2' functions.")
error_artifacts = {
"error_type": "MissingFunction",
"error_message": "Stage 1: Program is missing required 'transform_grid_attempt_1' and 'transform_grid_attempt_2' functions.",
"suggestion": "Make sure your program includes a functions named 'transform_grid_attempt_1' and 'transform_grid_attempt_2' that take as an argument a 2D numpy array and return a 2D numpy array."
}
return EvaluationResult(
metrics={
"runs_successfully": 0.0,
"combined_score": 0.0,
"error": "Missing transform_grid_attempt_1 and transform_grid_attempt_2 functions"
},
artifacts=error_artifacts
)
# Load ARC tasks
challenge_path = os.path.join(DATA_ROOT, f"arc-agi_{TASK_FILE}_challenges.json")
with open(challenge_path, 'r') as f:
tasks = json.load(f)
task_id = list(tasks.keys())[int(TASK_NUM)]
task = tasks[task_id]
train_inputs = [np.array(inp["input"]) for inp in task['train']]
train_gts = [np.array(gt["output"]) for gt in task['train']]
train_attempts = []
# Generate attempts for training data
for inp in train_inputs:
attempt_1 = program_module.transform_grid_attempt_1(inp)
if not isinstance(attempt_1, np.ndarray):
print(f"transform_grid_attempt_1 did not return a numpy array")
error_artifacts = {
"error_type": "InvalidReturnType",
"error_message": "Stage 1: transform_grid_attempt_1 did not return a numpy array.",
"suggestion": "Make sure your transform_grid_attempt_1 function returns a 2D numpy array."
}
return EvaluationResult(
metrics={
"runs_successfully": 0.0,
"combined_score": 0.0,
"error": "transform_grid_attempt_1 did not return a numpy array"
},
artifacts=error_artifacts
)
attempt_2 = program_module.transform_grid_attempt_2(inp)
if not isinstance(attempt_2, np.ndarray):
print(f"transform_grid_attempt_2 did not return a numpy array")
error_artifacts = {
"error_type": "InvalidReturnType",
"error_message": "Stage 1: transform_grid_attempt_2 did not return a numpy array.",
"suggestion": "Make sure your transform_grid_attempt_2 function returns a 2D numpy array."
}
return EvaluationResult(
metrics={
"runs_successfully": 0.0,
"combined_score": 0.0,
"error": "transform_grid_attempt_2 did not return a numpy array"
},
artifacts=error_artifacts
)
train_attempts.append([attempt_1, attempt_2])
pass_at_2_train, train_diagnostics_list = pass_at_2_accuracy_multi_test(train_attempts, train_gts)
# Compute both binary pass@2 and continuous cell accuracy
train_pass_score = sum(pass_at_2_train) / len(pass_at_2_train)
train_cell_acc = sum(
best_attempt_cell_accuracy(attempts, gt)
for attempts, gt in zip(train_attempts, train_gts)
) / len(train_gts)
# Blended score: pass@2 (60%) + cell accuracy (40%) gives gradient signal
train_score = 0.6 * train_pass_score + 0.4 * train_cell_acc
metrics = {
"runs_successfully": 1.0,
"combined_score": train_score,
"train_combined_score": train_score,
"train_pass_at_2_score": train_pass_score,
"train_cell_accuracy": round(train_cell_acc, 4),
}
error_artifacts = {}
for i, (train_pass, train_diagnostics) in enumerate(zip(pass_at_2_train, train_diagnostics_list)):
example_name = f"train_example_{i}"
metrics[f"{example_name}_pass_at_2"] = train_pass
best_acc = best_attempt_cell_accuracy(train_attempts[i], train_gts[i])
metrics[f"{example_name}_cell_accuracy"] = round(best_acc, 4)
for attempt in train_diagnostics:
attempt_pass = train_diagnostics[attempt]["perfect_match"]
metrics[f"{example_name}_attempt_{attempt}"] = attempt_pass
if not attempt_pass:
pred = train_attempts[i][attempt]
gt = train_gts[i]
error_artifacts[f"{example_name}_attempt_{attempt}_diagnostics"] = extract_failure_artifacts(
train_diagnostics[attempt], pred=pred, gt=gt
)
# Optional: include test feedback (uses solutions if available)
if INCLUDE_TEST:
solution_path = os.path.join(DATA_ROOT, f"arc-agi_{TASK_FILE}_solutions.json")
if os.path.isfile(solution_path):
with open(solution_path, 'r') as f:
solutions = json.load(f)
task_id = list(tasks.keys())[int(TASK_NUM)]
solution = solutions.get(task_id)
if solution is not None and "test" in task:
if len(task["test"]) != len(solution):
raise ValueError(
f"Train/test data mismatch: task {task_id} has {len(task['test'])} test inputs "
f"but {len(solution)} solution outputs. Check that arc-agi_{TASK_FILE}_challenges.json "
f"and arc-agi_{TASK_FILE}_solutions.json were generated together."
)
test_inputs = [np.array(inp["input"]) for inp in task['test']]
test_gts = [np.array(gt) for gt in solution]
test_attempts = []
for inp in test_inputs:
attempt_1 = program_module.transform_grid_attempt_1(inp)
if not isinstance(attempt_1, np.ndarray):
print(f"transform_grid_attempt_1 did not return a numpy array (test)")
return EvaluationResult(
metrics={
"runs_successfully": 0.0,
"combined_score": 0.0,
"error": "transform_grid_attempt_1 did not return a numpy array (test)"
},
artifacts={
"error_type": "InvalidReturnType",
"error_message": "Stage 1: transform_grid_attempt_1 did not return a numpy array (test).",
"suggestion": "Make sure transform_grid_attempt_1 returns a 2D numpy array."
}
)
attempt_2 = program_module.transform_grid_attempt_2(inp)
if not isinstance(attempt_2, np.ndarray):
print(f"transform_grid_attempt_2 did not return a numpy array (test)")
return EvaluationResult(
metrics={
"runs_successfully": 0.0,
"combined_score": 0.0,
"error": "transform_grid_attempt_2 did not return a numpy array (test)"
},
artifacts={
"error_type": "InvalidReturnType",
"error_message": "Stage 1: transform_grid_attempt_2 did not return a numpy array (test).",
"suggestion": "Make sure transform_grid_attempt_2 returns a 2D numpy array."
}
)
test_attempts.append([attempt_1, attempt_2])
pass_at_2_test, test_diagnostics_list = pass_at_2_accuracy_multi_test(test_attempts, test_gts)
test_pass_score = sum(pass_at_2_test) / len(pass_at_2_test)
test_cell_acc = sum(
best_attempt_cell_accuracy(attempts, gt)
for attempts, gt in zip(test_attempts, test_gts)
) / len(test_gts)
test_score = 0.6 * test_pass_score + 0.4 * test_cell_acc
metrics["test_combined_score"] = test_score
metrics["test_pass_at_2_score"] = test_pass_score
metrics["test_cell_accuracy"] = round(test_cell_acc, 4)
metrics["test_included"] = 1
for i, (test_pass, test_diagnostics) in enumerate(zip(pass_at_2_test, test_diagnostics_list)):
example_name = f"test_example_{i}"
metrics[f"{example_name}_pass_at_2"] = test_pass
best_acc = best_attempt_cell_accuracy(test_attempts[i], test_gts[i])
metrics[f"{example_name}_cell_accuracy"] = round(best_acc, 4)
for attempt in test_diagnostics:
metrics[f"{example_name}_attempt_{attempt}"] = test_diagnostics[attempt]["perfect_match"]
if test_pass == 0:
first_failing_idx = next(
(a for a in test_diagnostics if not test_diagnostics[a]["perfect_match"]),
0,
)
pred = test_attempts[i][first_failing_idx]
gt = test_gts[i]
error_artifacts[f"{example_name}"] = extract_failure_artifacts(
test_diagnostics[first_failing_idx], pred=pred, gt=gt
)
if USE_TEST_IN_SCORE:
metrics["combined_score"] = (train_score + test_score) / 2.0
else:
metrics["test_included"] = 0
else:
metrics["test_included"] = 0
return EvaluationResult(
metrics=metrics,
artifacts=error_artifacts
)
def _evaluate_as_dict(program_path):
"""Adapter: calls evaluate() and converts EvaluationResult to a plain dict."""
result = evaluate(program_path)
d = dict(result.metrics)
for k, v in result.artifacts.items():
d[k] = v
return d
if __name__ == "__main__":
# Backwards-compat: bridges old evaluate() -> EvaluationResult to the
# container JSON protocol. wrapper.py is copied from
# skydiscover/evaluation/wrapper.py.
from wrapper import run
run(_evaluate_as_dict)