ml4se-evals-visualization / adapters /code_generation.py
egor-bogomolov's picture
Add 28 benchmark datasets with rich visualization views
9a8a9c5
"""Code generation benchmark adapters."""
from __future__ import annotations
import json
from collections import defaultdict
from typing import Any
from adapters import DatasetAdapter
# Injected at runtime by _set_helpers()
_highlight_code = None
_code_offset = None
_extract_test_classes = None
# ---------------------------------------------------------------------------
# REval adapter (HuggingFace: JetBrains-Research/REval)
# ---------------------------------------------------------------------------
def _format_typed_value(val: dict) -> str:
"""Convert a {__type__, __value__} dict from REval states into a Python repr string."""
t = val.get("__type__")
v = val.get("__value__")
if t in ("int", "float", "str", "bool", "NoneType"):
return repr(v)
elif t == "list":
return "[" + ", ".join(_format_typed_value(item) for item in v) + "]"
elif t == "tuple":
items = ", ".join(_format_typed_value(item) for item in v)
return f"({items},)" if len(v) == 1 else f"({items})"
elif t == "set":
return "{" + ", ".join(_format_typed_value(item) for item in v) + "}"
else:
return repr(v)
class REvalAdapter(DatasetAdapter):
slug = "reval"
display_name = "REval"
has_ground_truth = True
has_tasks = True
def __init__(self, problems_ds, tasks_ds, executions_ds, states_ds):
self._problems = problems_ds
self._tasks: dict[str, list] = {}
for row in tasks_ds:
self._tasks[row["task_id"]] = json.loads(row["tasks"])
self._executions: dict[tuple[str, int], dict] = {}
for row in executions_ds:
self._executions[(row["task_id"], row["input_idx"])] = {
"status": row["status"],
"trace": row["trace"],
"coverage": row["coverage"],
}
self._states: dict[tuple[str, int], list] = {}
for row in states_ds:
self._states[(row["task_id"], row["input_idx"])] = json.loads(row["states"])
def problem_count(self) -> int:
return len(self._problems)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._problems[idx]
return {
"idx": idx,
"task_id": row["task_id"],
"entry_point": row["entry_point"],
"num_inputs": len(row["inputs"]),
"source": "ClassEval" if row["test"] else "HumanEval",
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
problem = self._problems[idx]
task_id = problem["task_id"]
task_list = self._tasks.get(task_id, [])
code = problem["code"]
offset = _code_offset(code)
code = code[offset:]
highlighted_code = _highlight_code(code)
tasks_info = []
for task_item in task_list:
adjusted_items = []
for item in task_item.get("task", []):
adj = dict(item)
if "lineno" in adj:
adj["lineno"] -= offset
adjusted_items.append(adj)
input_idx = task_item["input_idx"]
inputs = problem["inputs"]
outputs = problem["outputs"]
inp = inputs[input_idx] if input_idx < len(inputs) else ""
out = outputs[input_idx] if input_idx < len(outputs) else ""
task_info = {
"input_idx": input_idx,
"input": inp,
"output": out,
"task_items": adjusted_items,
}
if "output_pred" in task_item:
task_info["output_pred"] = task_item["output_pred"]
task_lines = set()
for item in adjusted_items:
if "lineno" in item:
task_lines.add(item["lineno"])
task_info["task_lines"] = sorted(task_lines)
tasks_info.append(task_info)
if problem["test"]:
tc_list = _extract_test_classes(problem["test"], problem["entry_point"])
for task_info in tasks_info:
idx_in_tc = task_info["input_idx"]
if idx_in_tc < len(tc_list):
task_info["test_class_name"] = tc_list[idx_in_tc]["name"]
task_info["test_class_code"] = tc_list[idx_in_tc]["code"]
return {
"idx": idx,
"task_id": problem["task_id"],
"entry_point": problem["entry_point"],
"code": code,
"highlighted_code": highlighted_code,
"inputs": list(problem["inputs"]),
"outputs": list(problem["outputs"]),
"test": problem["test"],
"tasks": tasks_info,
"source": "ClassEval" if problem["test"] else "HumanEval",
"has_ground_truth": True,
"has_tasks": True,
}
def get_ground_truth(self, idx: int, input_idx: int) -> dict[str, Any]:
problem = self._problems[idx]
task_id = problem["task_id"]
exec_rec = self._executions.get((task_id, input_idx))
if exec_rec is None:
return {"status": "unavailable", "message": "No execution data for this input"}
if exec_rec["status"] == "error":
return {"status": "error", "message": "Execution failed for this input"}
code = problem["code"]
offset = _code_offset(code)
coverage_1indexed = [ln + 1 - offset for ln in exec_rec["coverage"]]
total_lines = len(code[offset:].splitlines())
task_list = self._tasks.get(task_id, [])
task_items = []
for t in task_list:
if t["input_idx"] == input_idx:
task_items = t.get("task", [])
break
states_list = self._states.get((task_id, input_idx), [])
variable_answers = []
for item in task_items:
lineno = item["lineno"]
var = item["var"]
values = []
for s in states_list:
if s["lineno"] == lineno and var in s.get("locals", {}):
values.append(s["locals"][var])
if not values:
answer_str = "(not available)"
elif len(values) == 1:
answer_str = _format_typed_value(values[0])
else:
seen = []
for v in values:
fmt = _format_typed_value(v)
if fmt not in seen:
seen.append(fmt)
answer_str = "[" + ", ".join(seen) + "]" if len(seen) > 1 else seen[0]
variable_answers.append(
{
"lineno": lineno - offset,
"var": var,
"answer_str": answer_str,
}
)
trace = exec_rec["trace"]
next_lines_answers = []
processed_linenos: set[int] = set()
for item in task_items:
lineno = item["lineno"]
if lineno in processed_linenos:
continue
processed_linenos.add(lineno)
nexts: set[int] = set()
for i, ln in enumerate(trace):
if ln == lineno and i + 1 < len(trace):
nexts.add(trace[i + 1])
next_lines_answers.append(
{
"lineno": lineno,
"next_lines": sorted(nexts) if nexts else [-1],
}
)
return {
"status": "ok",
"coverage": coverage_1indexed,
"total_lines": total_lines,
"variable_answers": variable_answers,
"next_lines_answers": next_lines_answers,
}
# ---------------------------------------------------------------------------
# HumanEval+ adapter (HuggingFace: evalplus/humanevalplus)
# ---------------------------------------------------------------------------
class HumanEvalPlusAdapter(DatasetAdapter):
slug = "humanevalplus"
display_name = "HumanEval+"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": row["task_id"],
"entry_point": row["entry_point"],
"num_inputs": 0,
"source": "HumanEval+",
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
code = row["prompt"] + row["canonical_solution"]
return {
"idx": idx,
"task_id": row["task_id"],
"entry_point": row["entry_point"],
"code": code,
"highlighted_code": _highlight_code(code),
"inputs": [],
"outputs": [],
"test": row["test"],
"tasks": [],
"source": "HumanEval+",
"has_ground_truth": False,
"has_tasks": False,
}
# ---------------------------------------------------------------------------
# BigOBench adapter (HuggingFace: facebook/BigOBench)
# ---------------------------------------------------------------------------
class BigOBenchAdapter(DatasetAdapter):
slug = "bigobench"
display_name = "BigOBench"
has_ground_truth = False
has_tasks = False
def __init__(self, problems: list[dict[str, Any]]):
self._problems = problems
def problem_count(self) -> int:
return len(self._problems)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
prob = self._problems[idx]
return {
"idx": idx,
"task_id": prob["problem_id"],
"entry_point": prob["problem_name"],
"num_inputs": len(prob["solutions"]),
"source": "BigOBench",
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
prob = self._problems[idx]
solutions = []
for sol in prob["solutions"]:
solutions.append(
{
"solution_id": sol["solution_id"],
"code": sol["solution_code"],
"highlighted_code": _highlight_code(sol["solution_code"]),
"time_complexity": sol.get("time_complexity"),
"space_complexity": sol.get("space_complexity"),
}
)
return {
"idx": idx,
"task_id": prob["problem_id"],
"entry_point": prob["problem_name"],
"code": solutions[0]["code"] if solutions else "",
"highlighted_code": solutions[0]["highlighted_code"] if solutions else "",
"inputs": [],
"outputs": [],
"test": None,
"tasks": [],
"source": "BigOBench",
"has_ground_truth": False,
"has_tasks": False,
"description": prob["description"],
"solutions": solutions,
}
def merge_bigobench(ds_time, ds_space) -> list[dict[str, Any]]:
"""Merge time and space complexity test sets by problem_id."""
solutions: dict[tuple[str, str], dict[str, Any]] = {}
problem_meta: dict[str, dict[str, str]] = {}
for row in ds_time:
pid, sid = row["problem_id"], row["solution_id"]
problem_meta[pid] = {
"problem_name": row["problem_name"],
"description": row["description"],
}
solutions[(pid, sid)] = {
"solution_id": sid,
"solution_code": row["solution_code"],
"time_complexity": row["time_complexity_inferred"],
"space_complexity": None,
}
for row in ds_space:
pid, sid = row["problem_id"], row["solution_id"]
problem_meta.setdefault(
pid,
{
"problem_name": row["problem_name"],
"description": row["description"],
},
)
key = (pid, sid)
if key in solutions:
solutions[key]["space_complexity"] = row["space_complexity_inferred"]
else:
solutions[key] = {
"solution_id": sid,
"solution_code": row["solution_code"],
"time_complexity": None,
"space_complexity": row["space_complexity_inferred"],
}
by_problem: dict[str, list[dict[str, Any]]] = defaultdict(list)
for (pid, _sid), sol in solutions.items():
by_problem[pid].append(sol)
problems = []
for pid in sorted(by_problem.keys()):
meta = problem_meta[pid]
problems.append(
{
"problem_id": pid,
"problem_name": meta["problem_name"],
"description": meta["description"],
"solutions": by_problem[pid],
}
)
return problems
# ---------------------------------------------------------------------------
# MBPP+ adapter (HuggingFace: evalplus/mbppplus)
# ---------------------------------------------------------------------------
class MBPPPlusAdapter(DatasetAdapter):
slug = "mbppplus"
display_name = "MBPP+"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": str(row["task_id"]),
"entry_point": row["prompt"][:60].replace("\n", " ").strip(),
"num_inputs": len(row["test_list"]),
"source": "MBPP+",
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
code = row["code"]
return {
"idx": idx,
"task_id": str(row["task_id"]),
"entry_point": row["prompt"][:60].replace("\n", " ").strip(),
"code": code,
"highlighted_code": _highlight_code(code),
"inputs": [],
"outputs": [],
"test": "\n".join(row["test_list"]),
"tasks": [],
"source": "MBPP+",
"has_ground_truth": False,
"has_tasks": False,
"description": row["prompt"],
}
# ---------------------------------------------------------------------------
# ClassEval adapter (HuggingFace: FudanSELab/ClassEval)
# ---------------------------------------------------------------------------
class ClassEvalAdapter(DatasetAdapter):
slug = "classeval"
display_name = "ClassEval"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": row["task_id"],
"entry_point": row["class_name"],
"num_inputs": len(row["methods_info"]),
"source": "ClassEval",
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
code = row["solution_code"]
return {
"idx": idx,
"task_id": row["task_id"],
"entry_point": row["class_name"],
"code": code,
"highlighted_code": _highlight_code(code),
"inputs": [],
"outputs": [],
"test": row["test"],
"tasks": [],
"source": "ClassEval",
"has_ground_truth": False,
"has_tasks": False,
"description": row["class_description"],
"skeleton": row["skeleton"],
}
# ---------------------------------------------------------------------------
# LiveCodeBench adapter (HuggingFace: livecodebench/code_generation_lite)
# ---------------------------------------------------------------------------
class LiveCodeBenchAdapter(DatasetAdapter):
slug = "livecodebench"
display_name = "LiveCodeBench"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": row["question_id"],
"entry_point": row["question_title"],
"num_inputs": 0,
"source": row["platform"],
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
test_cases = []
try:
test_cases = json.loads(row["public_test_cases"]) if row["public_test_cases"] else []
except (json.JSONDecodeError, TypeError):
pass
inputs = [tc.get("input", "") for tc in test_cases]
outputs = [tc.get("output", "") for tc in test_cases]
starter = row.get("starter_code", "") or ""
code = starter if starter.strip() else ""
return {
"idx": idx,
"task_id": row["question_id"],
"entry_point": row["question_title"],
"code": code,
"highlighted_code": _highlight_code(code) if code else "",
"inputs": inputs,
"outputs": outputs,
"test": None,
"tasks": [],
"source": row["platform"],
"has_ground_truth": False,
"has_tasks": False,
"description": row["question_content"],
"difficulty": row.get("difficulty", ""),
"contest_date": row.get("contest_date", ""),
}
# ---------------------------------------------------------------------------
# CodeContests adapter (HuggingFace: deepmind/code_contests)
# ---------------------------------------------------------------------------
_CC_LANG_NAMES = {0: "Unknown", 1: "Python 2", 2: "C++", 3: "Python 3", 4: "Java"}
class CodeContestsAdapter(DatasetAdapter):
slug = "codecontests"
display_name = "CodeContests"
has_ground_truth = False
has_tasks = False
_DIFFICULTY_NAMES = {
0: "Unknown",
1: "Easy",
2: "Medium",
3: "Hard",
4: "Harder",
5: "Hardest",
6: "External",
}
_SOURCE_NAMES = {
0: "Unknown",
1: "CodeChef",
2: "Codeforces",
3: "HackerEarth",
4: "CodeJam",
5: "AtCoder",
6: "Aizu",
}
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
source_int = row.get("source", 0)
source_name = self._SOURCE_NAMES.get(source_int, "Unknown")
return {
"idx": idx,
"task_id": row["name"],
"entry_point": row["name"],
"num_inputs": len(row.get("public_tests", {}).get("input", [])),
"source": source_name,
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
source_int = row.get("source", 0)
source_name = self._SOURCE_NAMES.get(source_int, "Unknown")
diff_int = row.get("difficulty", 0)
diff_name = self._DIFFICULTY_NAMES.get(diff_int, "Unknown")
sols_data = row.get("solutions", {})
sol_langs = sols_data.get("language", [])
sol_codes = sols_data.get("solution", [])
solutions = []
for i, code in enumerate(sol_codes[:10]):
lang_int = sol_langs[i] if i < len(sol_langs) else 0
lang_name = _CC_LANG_NAMES.get(lang_int, "Unknown")
lang_key = {1: "python", 2: "cpp", 3: "python", 4: "java"}.get(lang_int, "python")
solutions.append(
{
"solution_id": f"sol_{i}",
"code": code,
"highlighted_code": _highlight_code(code, language=lang_key),
"language": lang_name,
}
)
pub_tests = row.get("public_tests", {})
inputs = pub_tests.get("input", [])
outputs = pub_tests.get("output", [])
tags = list(row.get("cf_tags", []))
return {
"idx": idx,
"task_id": row["name"],
"entry_point": row["name"],
"code": solutions[0]["code"] if solutions else "",
"highlighted_code": solutions[0]["highlighted_code"] if solutions else "",
"inputs": inputs,
"outputs": outputs,
"test": None,
"tasks": [],
"source": source_name,
"has_ground_truth": False,
"has_tasks": False,
"description": row["description"],
"difficulty": diff_name,
"solutions": solutions,
"cf_rating": row.get("cf_rating", 0),
"tags": tags,
}
# ---------------------------------------------------------------------------
# APPS adapter (HuggingFace: codeparrot/apps)
# ---------------------------------------------------------------------------
class APPSAdapter(DatasetAdapter):
slug = "apps"
display_name = "APPS"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": str(row["problem_id"]),
"entry_point": row["question"][:60].replace("\n", " ").strip(),
"num_inputs": 0,
"source": row.get("difficulty", "unknown"),
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
solutions = []
if row.get("solutions"):
try:
sol_list = json.loads(row["solutions"])
for i, code in enumerate(sol_list[:5]):
solutions.append(
{
"solution_id": f"sol_{i}",
"code": code,
"highlighted_code": _highlight_code(code),
}
)
except (json.JSONDecodeError, TypeError):
pass
inputs, outputs = [], []
if row.get("input_output"):
try:
io = json.loads(row["input_output"])
inputs = io.get("inputs", [])
outputs = io.get("outputs", [])
except (json.JSONDecodeError, TypeError):
pass
code = solutions[0]["code"] if solutions else (row.get("starter_code") or "")
return {
"idx": idx,
"task_id": str(row["problem_id"]),
"entry_point": row["question"][:60].replace("\n", " ").strip(),
"code": code,
"highlighted_code": _highlight_code(code) if code else "",
"inputs": inputs[:5],
"outputs": outputs[:5],
"test": None,
"tasks": [],
"source": row.get("difficulty", "unknown"),
"has_ground_truth": False,
"has_tasks": False,
"description": row["question"],
"difficulty": row.get("difficulty", ""),
"solutions": solutions if len(solutions) > 1 else [],
"url": row.get("url", ""),
"starter_code": row.get("starter_code", ""),
}
# ---------------------------------------------------------------------------
# MBPP adapter (HuggingFace: google-research-datasets/mbpp)
# ---------------------------------------------------------------------------
class MBPPAdapter(DatasetAdapter):
slug = "mbpp"
display_name = "MBPP"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": str(row["task_id"]),
"entry_point": row["text"][:60].replace("\n", " ").strip(),
"num_inputs": len(row.get("test_list", [])),
"source": "MBPP",
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
code = row["code"]
test_list = row.get("test_list", [])
challenge_tests = row.get("challenge_test_list", [])
all_tests = test_list + challenge_tests
return {
"idx": idx,
"task_id": str(row["task_id"]),
"entry_point": row["text"][:60].replace("\n", " ").strip(),
"code": code,
"highlighted_code": _highlight_code(code),
"inputs": [],
"outputs": [],
"test": "\n".join(all_tests),
"tasks": [],
"source": "MBPP",
"has_ground_truth": False,
"has_tasks": False,
"description": row["text"],
}
# ---------------------------------------------------------------------------
# CodeSearchNet adapter (HuggingFace: code-search-net/code_search_net)
# ---------------------------------------------------------------------------
class CodeSearchNetAdapter(DatasetAdapter):
slug = "codesearchnet"
display_name = "CodeSearchNet"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": row.get("func_name", str(idx)),
"entry_point": row.get("func_name", f"csn_{idx}"),
"num_inputs": 0,
"source": row.get("language", "unknown"),
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
code = row.get("func_code_string", "")
lang = row.get("language", "python")
return {
"idx": idx,
"task_id": row.get("func_name", str(idx)),
"entry_point": row.get("func_name", f"csn_{idx}"),
"code": code,
"highlighted_code": _highlight_code(code, language=lang),
"inputs": [],
"outputs": [],
"test": None,
"tasks": [],
"source": lang,
"has_ground_truth": False,
"has_tasks": False,
"description": row.get("func_documentation_string", ""),
}
# ---------------------------------------------------------------------------
# BigCodeBench adapter (HuggingFace: bigcode/bigcodebench)
# ---------------------------------------------------------------------------
class BigCodeBenchAdapter(DatasetAdapter):
slug = "bigcodebench"
display_name = "BigCodeBench"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": row["task_id"],
"entry_point": row.get("entry_point", "task_func"),
"num_inputs": 0,
"source": "BigCodeBench",
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
code = row.get("code_prompt", "") + row.get("canonical_solution", "")
libs = row.get("libs", "")
return {
"idx": idx,
"task_id": row["task_id"],
"entry_point": row.get("entry_point", "task_func"),
"code": code,
"highlighted_code": _highlight_code(code),
"inputs": [],
"outputs": [],
"test": row.get("test", ""),
"tasks": [],
"source": "BigCodeBench",
"has_ground_truth": False,
"has_tasks": False,
"description": row.get("complete_prompt", ""),
"libs": libs,
}
# ---------------------------------------------------------------------------
# EffiBench adapter (HuggingFace: DONG19/EffiBench)
# ---------------------------------------------------------------------------
class EffiBenchAdapter(DatasetAdapter):
slug = "effibench"
display_name = "EffiBench"
has_ground_truth = False
has_tasks = False
def __init__(self, hf_dataset):
self._ds = hf_dataset
def problem_count(self) -> int:
return len(self._ds)
def get_problem_summary(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
return {
"idx": idx,
"task_id": str(row.get("problem_idx", idx)),
"entry_point": row.get("task_name", f"effibench_{idx}"),
"num_inputs": 0,
"source": "EffiBench",
}
def get_problem_detail(self, idx: int) -> dict[str, Any]:
row = self._ds[idx]
code = row.get("canonical_solution", "")
return {
"idx": idx,
"task_id": str(row.get("problem_idx", idx)),
"entry_point": row.get("task_name", f"effibench_{idx}"),
"code": code,
"highlighted_code": _highlight_code(code),
"inputs": [],
"outputs": [],
"test": row.get("test_case", ""),
"tasks": [],
"source": "EffiBench",
"has_ground_truth": False,
"has_tasks": False,
"description": row.get("markdown_description", row.get("description", "")),
}