"""Code generation benchmark adapters.""" from __future__ import annotations import json from collections import defaultdict from typing import Any from adapters import DatasetAdapter # Injected at runtime by _set_helpers() _highlight_code = None _code_offset = None _extract_test_classes = None # --------------------------------------------------------------------------- # REval adapter (HuggingFace: JetBrains-Research/REval) # --------------------------------------------------------------------------- def _format_typed_value(val: dict) -> str: """Convert a {__type__, __value__} dict from REval states into a Python repr string.""" t = val.get("__type__") v = val.get("__value__") if t in ("int", "float", "str", "bool", "NoneType"): return repr(v) elif t == "list": return "[" + ", ".join(_format_typed_value(item) for item in v) + "]" elif t == "tuple": items = ", ".join(_format_typed_value(item) for item in v) return f"({items},)" if len(v) == 1 else f"({items})" elif t == "set": return "{" + ", ".join(_format_typed_value(item) for item in v) + "}" else: return repr(v) class REvalAdapter(DatasetAdapter): slug = "reval" display_name = "REval" has_ground_truth = True has_tasks = True def __init__(self, problems_ds, tasks_ds, executions_ds, states_ds): self._problems = problems_ds self._tasks: dict[str, list] = {} for row in tasks_ds: self._tasks[row["task_id"]] = json.loads(row["tasks"]) self._executions: dict[tuple[str, int], dict] = {} for row in executions_ds: self._executions[(row["task_id"], row["input_idx"])] = { "status": row["status"], "trace": row["trace"], "coverage": row["coverage"], } self._states: dict[tuple[str, int], list] = {} for row in states_ds: self._states[(row["task_id"], row["input_idx"])] = json.loads(row["states"]) def problem_count(self) -> int: return len(self._problems) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._problems[idx] return { "idx": idx, "task_id": row["task_id"], "entry_point": row["entry_point"], "num_inputs": len(row["inputs"]), "source": "ClassEval" if row["test"] else "HumanEval", } def get_problem_detail(self, idx: int) -> dict[str, Any]: problem = self._problems[idx] task_id = problem["task_id"] task_list = self._tasks.get(task_id, []) code = problem["code"] offset = _code_offset(code) code = code[offset:] highlighted_code = _highlight_code(code) tasks_info = [] for task_item in task_list: adjusted_items = [] for item in task_item.get("task", []): adj = dict(item) if "lineno" in adj: adj["lineno"] -= offset adjusted_items.append(adj) input_idx = task_item["input_idx"] inputs = problem["inputs"] outputs = problem["outputs"] inp = inputs[input_idx] if input_idx < len(inputs) else "" out = outputs[input_idx] if input_idx < len(outputs) else "" task_info = { "input_idx": input_idx, "input": inp, "output": out, "task_items": adjusted_items, } if "output_pred" in task_item: task_info["output_pred"] = task_item["output_pred"] task_lines = set() for item in adjusted_items: if "lineno" in item: task_lines.add(item["lineno"]) task_info["task_lines"] = sorted(task_lines) tasks_info.append(task_info) if problem["test"]: tc_list = _extract_test_classes(problem["test"], problem["entry_point"]) for task_info in tasks_info: idx_in_tc = task_info["input_idx"] if idx_in_tc < len(tc_list): task_info["test_class_name"] = tc_list[idx_in_tc]["name"] task_info["test_class_code"] = tc_list[idx_in_tc]["code"] return { "idx": idx, "task_id": problem["task_id"], "entry_point": problem["entry_point"], "code": code, "highlighted_code": highlighted_code, "inputs": list(problem["inputs"]), "outputs": list(problem["outputs"]), "test": problem["test"], "tasks": tasks_info, "source": "ClassEval" if problem["test"] else "HumanEval", "has_ground_truth": True, "has_tasks": True, } def get_ground_truth(self, idx: int, input_idx: int) -> dict[str, Any]: problem = self._problems[idx] task_id = problem["task_id"] exec_rec = self._executions.get((task_id, input_idx)) if exec_rec is None: return {"status": "unavailable", "message": "No execution data for this input"} if exec_rec["status"] == "error": return {"status": "error", "message": "Execution failed for this input"} code = problem["code"] offset = _code_offset(code) coverage_1indexed = [ln + 1 - offset for ln in exec_rec["coverage"]] total_lines = len(code[offset:].splitlines()) task_list = self._tasks.get(task_id, []) task_items = [] for t in task_list: if t["input_idx"] == input_idx: task_items = t.get("task", []) break states_list = self._states.get((task_id, input_idx), []) variable_answers = [] for item in task_items: lineno = item["lineno"] var = item["var"] values = [] for s in states_list: if s["lineno"] == lineno and var in s.get("locals", {}): values.append(s["locals"][var]) if not values: answer_str = "(not available)" elif len(values) == 1: answer_str = _format_typed_value(values[0]) else: seen = [] for v in values: fmt = _format_typed_value(v) if fmt not in seen: seen.append(fmt) answer_str = "[" + ", ".join(seen) + "]" if len(seen) > 1 else seen[0] variable_answers.append( { "lineno": lineno - offset, "var": var, "answer_str": answer_str, } ) trace = exec_rec["trace"] next_lines_answers = [] processed_linenos: set[int] = set() for item in task_items: lineno = item["lineno"] if lineno in processed_linenos: continue processed_linenos.add(lineno) nexts: set[int] = set() for i, ln in enumerate(trace): if ln == lineno and i + 1 < len(trace): nexts.add(trace[i + 1]) next_lines_answers.append( { "lineno": lineno, "next_lines": sorted(nexts) if nexts else [-1], } ) return { "status": "ok", "coverage": coverage_1indexed, "total_lines": total_lines, "variable_answers": variable_answers, "next_lines_answers": next_lines_answers, } # --------------------------------------------------------------------------- # HumanEval+ adapter (HuggingFace: evalplus/humanevalplus) # --------------------------------------------------------------------------- class HumanEvalPlusAdapter(DatasetAdapter): slug = "humanevalplus" display_name = "HumanEval+" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": row["task_id"], "entry_point": row["entry_point"], "num_inputs": 0, "source": "HumanEval+", } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] code = row["prompt"] + row["canonical_solution"] return { "idx": idx, "task_id": row["task_id"], "entry_point": row["entry_point"], "code": code, "highlighted_code": _highlight_code(code), "inputs": [], "outputs": [], "test": row["test"], "tasks": [], "source": "HumanEval+", "has_ground_truth": False, "has_tasks": False, } # --------------------------------------------------------------------------- # BigOBench adapter (HuggingFace: facebook/BigOBench) # --------------------------------------------------------------------------- class BigOBenchAdapter(DatasetAdapter): slug = "bigobench" display_name = "BigOBench" has_ground_truth = False has_tasks = False def __init__(self, problems: list[dict[str, Any]]): self._problems = problems def problem_count(self) -> int: return len(self._problems) def get_problem_summary(self, idx: int) -> dict[str, Any]: prob = self._problems[idx] return { "idx": idx, "task_id": prob["problem_id"], "entry_point": prob["problem_name"], "num_inputs": len(prob["solutions"]), "source": "BigOBench", } def get_problem_detail(self, idx: int) -> dict[str, Any]: prob = self._problems[idx] solutions = [] for sol in prob["solutions"]: solutions.append( { "solution_id": sol["solution_id"], "code": sol["solution_code"], "highlighted_code": _highlight_code(sol["solution_code"]), "time_complexity": sol.get("time_complexity"), "space_complexity": sol.get("space_complexity"), } ) return { "idx": idx, "task_id": prob["problem_id"], "entry_point": prob["problem_name"], "code": solutions[0]["code"] if solutions else "", "highlighted_code": solutions[0]["highlighted_code"] if solutions else "", "inputs": [], "outputs": [], "test": None, "tasks": [], "source": "BigOBench", "has_ground_truth": False, "has_tasks": False, "description": prob["description"], "solutions": solutions, } def merge_bigobench(ds_time, ds_space) -> list[dict[str, Any]]: """Merge time and space complexity test sets by problem_id.""" solutions: dict[tuple[str, str], dict[str, Any]] = {} problem_meta: dict[str, dict[str, str]] = {} for row in ds_time: pid, sid = row["problem_id"], row["solution_id"] problem_meta[pid] = { "problem_name": row["problem_name"], "description": row["description"], } solutions[(pid, sid)] = { "solution_id": sid, "solution_code": row["solution_code"], "time_complexity": row["time_complexity_inferred"], "space_complexity": None, } for row in ds_space: pid, sid = row["problem_id"], row["solution_id"] problem_meta.setdefault( pid, { "problem_name": row["problem_name"], "description": row["description"], }, ) key = (pid, sid) if key in solutions: solutions[key]["space_complexity"] = row["space_complexity_inferred"] else: solutions[key] = { "solution_id": sid, "solution_code": row["solution_code"], "time_complexity": None, "space_complexity": row["space_complexity_inferred"], } by_problem: dict[str, list[dict[str, Any]]] = defaultdict(list) for (pid, _sid), sol in solutions.items(): by_problem[pid].append(sol) problems = [] for pid in sorted(by_problem.keys()): meta = problem_meta[pid] problems.append( { "problem_id": pid, "problem_name": meta["problem_name"], "description": meta["description"], "solutions": by_problem[pid], } ) return problems # --------------------------------------------------------------------------- # MBPP+ adapter (HuggingFace: evalplus/mbppplus) # --------------------------------------------------------------------------- class MBPPPlusAdapter(DatasetAdapter): slug = "mbppplus" display_name = "MBPP+" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": str(row["task_id"]), "entry_point": row["prompt"][:60].replace("\n", " ").strip(), "num_inputs": len(row["test_list"]), "source": "MBPP+", } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] code = row["code"] return { "idx": idx, "task_id": str(row["task_id"]), "entry_point": row["prompt"][:60].replace("\n", " ").strip(), "code": code, "highlighted_code": _highlight_code(code), "inputs": [], "outputs": [], "test": "\n".join(row["test_list"]), "tasks": [], "source": "MBPP+", "has_ground_truth": False, "has_tasks": False, "description": row["prompt"], } # --------------------------------------------------------------------------- # ClassEval adapter (HuggingFace: FudanSELab/ClassEval) # --------------------------------------------------------------------------- class ClassEvalAdapter(DatasetAdapter): slug = "classeval" display_name = "ClassEval" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": row["task_id"], "entry_point": row["class_name"], "num_inputs": len(row["methods_info"]), "source": "ClassEval", } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] code = row["solution_code"] return { "idx": idx, "task_id": row["task_id"], "entry_point": row["class_name"], "code": code, "highlighted_code": _highlight_code(code), "inputs": [], "outputs": [], "test": row["test"], "tasks": [], "source": "ClassEval", "has_ground_truth": False, "has_tasks": False, "description": row["class_description"], "skeleton": row["skeleton"], } # --------------------------------------------------------------------------- # LiveCodeBench adapter (HuggingFace: livecodebench/code_generation_lite) # --------------------------------------------------------------------------- class LiveCodeBenchAdapter(DatasetAdapter): slug = "livecodebench" display_name = "LiveCodeBench" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": row["question_id"], "entry_point": row["question_title"], "num_inputs": 0, "source": row["platform"], } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] test_cases = [] try: test_cases = json.loads(row["public_test_cases"]) if row["public_test_cases"] else [] except (json.JSONDecodeError, TypeError): pass inputs = [tc.get("input", "") for tc in test_cases] outputs = [tc.get("output", "") for tc in test_cases] starter = row.get("starter_code", "") or "" code = starter if starter.strip() else "" return { "idx": idx, "task_id": row["question_id"], "entry_point": row["question_title"], "code": code, "highlighted_code": _highlight_code(code) if code else "", "inputs": inputs, "outputs": outputs, "test": None, "tasks": [], "source": row["platform"], "has_ground_truth": False, "has_tasks": False, "description": row["question_content"], "difficulty": row.get("difficulty", ""), "contest_date": row.get("contest_date", ""), } # --------------------------------------------------------------------------- # CodeContests adapter (HuggingFace: deepmind/code_contests) # --------------------------------------------------------------------------- _CC_LANG_NAMES = {0: "Unknown", 1: "Python 2", 2: "C++", 3: "Python 3", 4: "Java"} class CodeContestsAdapter(DatasetAdapter): slug = "codecontests" display_name = "CodeContests" has_ground_truth = False has_tasks = False _DIFFICULTY_NAMES = { 0: "Unknown", 1: "Easy", 2: "Medium", 3: "Hard", 4: "Harder", 5: "Hardest", 6: "External", } _SOURCE_NAMES = { 0: "Unknown", 1: "CodeChef", 2: "Codeforces", 3: "HackerEarth", 4: "CodeJam", 5: "AtCoder", 6: "Aizu", } def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] source_int = row.get("source", 0) source_name = self._SOURCE_NAMES.get(source_int, "Unknown") return { "idx": idx, "task_id": row["name"], "entry_point": row["name"], "num_inputs": len(row.get("public_tests", {}).get("input", [])), "source": source_name, } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] source_int = row.get("source", 0) source_name = self._SOURCE_NAMES.get(source_int, "Unknown") diff_int = row.get("difficulty", 0) diff_name = self._DIFFICULTY_NAMES.get(diff_int, "Unknown") sols_data = row.get("solutions", {}) sol_langs = sols_data.get("language", []) sol_codes = sols_data.get("solution", []) solutions = [] for i, code in enumerate(sol_codes[:10]): lang_int = sol_langs[i] if i < len(sol_langs) else 0 lang_name = _CC_LANG_NAMES.get(lang_int, "Unknown") lang_key = {1: "python", 2: "cpp", 3: "python", 4: "java"}.get(lang_int, "python") solutions.append( { "solution_id": f"sol_{i}", "code": code, "highlighted_code": _highlight_code(code, language=lang_key), "language": lang_name, } ) pub_tests = row.get("public_tests", {}) inputs = pub_tests.get("input", []) outputs = pub_tests.get("output", []) tags = list(row.get("cf_tags", [])) return { "idx": idx, "task_id": row["name"], "entry_point": row["name"], "code": solutions[0]["code"] if solutions else "", "highlighted_code": solutions[0]["highlighted_code"] if solutions else "", "inputs": inputs, "outputs": outputs, "test": None, "tasks": [], "source": source_name, "has_ground_truth": False, "has_tasks": False, "description": row["description"], "difficulty": diff_name, "solutions": solutions, "cf_rating": row.get("cf_rating", 0), "tags": tags, } # --------------------------------------------------------------------------- # APPS adapter (HuggingFace: codeparrot/apps) # --------------------------------------------------------------------------- class APPSAdapter(DatasetAdapter): slug = "apps" display_name = "APPS" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": str(row["problem_id"]), "entry_point": row["question"][:60].replace("\n", " ").strip(), "num_inputs": 0, "source": row.get("difficulty", "unknown"), } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] solutions = [] if row.get("solutions"): try: sol_list = json.loads(row["solutions"]) for i, code in enumerate(sol_list[:5]): solutions.append( { "solution_id": f"sol_{i}", "code": code, "highlighted_code": _highlight_code(code), } ) except (json.JSONDecodeError, TypeError): pass inputs, outputs = [], [] if row.get("input_output"): try: io = json.loads(row["input_output"]) inputs = io.get("inputs", []) outputs = io.get("outputs", []) except (json.JSONDecodeError, TypeError): pass code = solutions[0]["code"] if solutions else (row.get("starter_code") or "") return { "idx": idx, "task_id": str(row["problem_id"]), "entry_point": row["question"][:60].replace("\n", " ").strip(), "code": code, "highlighted_code": _highlight_code(code) if code else "", "inputs": inputs[:5], "outputs": outputs[:5], "test": None, "tasks": [], "source": row.get("difficulty", "unknown"), "has_ground_truth": False, "has_tasks": False, "description": row["question"], "difficulty": row.get("difficulty", ""), "solutions": solutions if len(solutions) > 1 else [], "url": row.get("url", ""), "starter_code": row.get("starter_code", ""), } # --------------------------------------------------------------------------- # MBPP adapter (HuggingFace: google-research-datasets/mbpp) # --------------------------------------------------------------------------- class MBPPAdapter(DatasetAdapter): slug = "mbpp" display_name = "MBPP" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": str(row["task_id"]), "entry_point": row["text"][:60].replace("\n", " ").strip(), "num_inputs": len(row.get("test_list", [])), "source": "MBPP", } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] code = row["code"] test_list = row.get("test_list", []) challenge_tests = row.get("challenge_test_list", []) all_tests = test_list + challenge_tests return { "idx": idx, "task_id": str(row["task_id"]), "entry_point": row["text"][:60].replace("\n", " ").strip(), "code": code, "highlighted_code": _highlight_code(code), "inputs": [], "outputs": [], "test": "\n".join(all_tests), "tasks": [], "source": "MBPP", "has_ground_truth": False, "has_tasks": False, "description": row["text"], } # --------------------------------------------------------------------------- # CodeSearchNet adapter (HuggingFace: code-search-net/code_search_net) # --------------------------------------------------------------------------- class CodeSearchNetAdapter(DatasetAdapter): slug = "codesearchnet" display_name = "CodeSearchNet" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": row.get("func_name", str(idx)), "entry_point": row.get("func_name", f"csn_{idx}"), "num_inputs": 0, "source": row.get("language", "unknown"), } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] code = row.get("func_code_string", "") lang = row.get("language", "python") return { "idx": idx, "task_id": row.get("func_name", str(idx)), "entry_point": row.get("func_name", f"csn_{idx}"), "code": code, "highlighted_code": _highlight_code(code, language=lang), "inputs": [], "outputs": [], "test": None, "tasks": [], "source": lang, "has_ground_truth": False, "has_tasks": False, "description": row.get("func_documentation_string", ""), } # --------------------------------------------------------------------------- # BigCodeBench adapter (HuggingFace: bigcode/bigcodebench) # --------------------------------------------------------------------------- class BigCodeBenchAdapter(DatasetAdapter): slug = "bigcodebench" display_name = "BigCodeBench" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": row["task_id"], "entry_point": row.get("entry_point", "task_func"), "num_inputs": 0, "source": "BigCodeBench", } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] code = row.get("code_prompt", "") + row.get("canonical_solution", "") libs = row.get("libs", "") return { "idx": idx, "task_id": row["task_id"], "entry_point": row.get("entry_point", "task_func"), "code": code, "highlighted_code": _highlight_code(code), "inputs": [], "outputs": [], "test": row.get("test", ""), "tasks": [], "source": "BigCodeBench", "has_ground_truth": False, "has_tasks": False, "description": row.get("complete_prompt", ""), "libs": libs, } # --------------------------------------------------------------------------- # EffiBench adapter (HuggingFace: DONG19/EffiBench) # --------------------------------------------------------------------------- class EffiBenchAdapter(DatasetAdapter): slug = "effibench" display_name = "EffiBench" has_ground_truth = False has_tasks = False def __init__(self, hf_dataset): self._ds = hf_dataset def problem_count(self) -> int: return len(self._ds) def get_problem_summary(self, idx: int) -> dict[str, Any]: row = self._ds[idx] return { "idx": idx, "task_id": str(row.get("problem_idx", idx)), "entry_point": row.get("task_name", f"effibench_{idx}"), "num_inputs": 0, "source": "EffiBench", } def get_problem_detail(self, idx: int) -> dict[str, Any]: row = self._ds[idx] code = row.get("canonical_solution", "") return { "idx": idx, "task_id": str(row.get("problem_idx", idx)), "entry_point": row.get("task_name", f"effibench_{idx}"), "code": code, "highlighted_code": _highlight_code(code), "inputs": [], "outputs": [], "test": row.get("test_case", ""), "tasks": [], "source": "EffiBench", "has_ground_truth": False, "has_tasks": False, "description": row.get("markdown_description", row.get("description", "")), }