|
|
import os |
|
|
import re |
|
|
import gzip |
|
|
import shutil |
|
|
import pandas as pd |
|
|
import contextlib |
|
|
import signal |
|
|
import scicode |
|
|
import signal |
|
|
from contextlib import contextmanager |
|
|
import numpy as np |
|
|
from typing import Union, Any, Callable, List, Dict, Tuple |
|
|
from .benchmark import CodingBenchmark |
|
|
from ..core.logging import logger |
|
|
from ..utils.utils import download_file |
|
|
from ..core.module_utils import load_json |
|
|
from ..utils.aflow_utils.data_utils import AFLOW_DATASET_FILES_MAP, download_aflow_benchmark_data |
|
|
|
|
|
|
|
|
class TimeoutException(Exception): pass |
|
|
|
|
|
@contextmanager |
|
|
def time_limit(seconds): |
|
|
def signal_handler(signum, frame): |
|
|
raise TimeoutException("Timed out!") |
|
|
signal.signal(signal.SIGALRM, signal_handler) |
|
|
signal.alarm(seconds) |
|
|
try: |
|
|
yield |
|
|
finally: |
|
|
signal.alarm(0) |
|
|
|
|
|
SCICODE_DEFAULT_URL = "https://raw.githubusercontent.com/scicode-bench/scicode/main/data/scicode.jsonl.gz" |
|
|
|
|
|
|
|
|
def download_raw_scicode_data(save_folder: str, url: str = SCICODE_DEFAULT_URL) -> str: |
|
|
""" |
|
|
Download and unzip the raw SciCode jsonl(.gz) to `save_folder`. |
|
|
|
|
|
Returns: |
|
|
str: Path to the unzipped jsonl file. |
|
|
""" |
|
|
os.makedirs(save_folder, exist_ok=True) |
|
|
gz_path = os.path.join(save_folder, "scicode.jsonl.gz") |
|
|
jsonl_path = os.path.join(save_folder, "scicode.jsonl") |
|
|
|
|
|
logger.info(f"Downloading SciCode data from {url} ...") |
|
|
download_file(url=url, save_file=gz_path) |
|
|
|
|
|
logger.info("Unzipping SciCode data ...") |
|
|
with gzip.open(gz_path, "rb") as f_in, open(jsonl_path, "wb") as f_out: |
|
|
shutil.copyfileobj(f_in, f_out) |
|
|
if os.path.exists(gz_path): |
|
|
os.remove(gz_path) |
|
|
|
|
|
return jsonl_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_entry_point_from_header(header: str) -> str: |
|
|
""" |
|
|
Given a SciCode 'function_header' string like: |
|
|
"def get_alpha(recvec, alpha_scaling=5):\n '''...'''" |
|
|
return "get_alpha". |
|
|
""" |
|
|
m = re.search(r"def\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(", header) |
|
|
if not m: |
|
|
raise ValueError("Could not parse entry point from function_header") |
|
|
return m.group(1) |
|
|
|
|
|
|
|
|
def _coerce_scicode_row_to_examples(row: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
SciCode rows may contain a single task or multiple step tasks. |
|
|
We normalize them to a list of examples with a unified structure: |
|
|
{ |
|
|
"task_id": "SciCode/<name>#<sub_id>", |
|
|
"prompt": <function_header + optional docstring block>, |
|
|
"entry_point": <func_name>, |
|
|
"canonical_solution": <ground_truth_code>, |
|
|
"tests": List[str], # list of python test snippets |
|
|
"imports": str # optional import prelude (e.g., 'import numpy as np') |
|
|
} |
|
|
""" |
|
|
examples: List[Dict[str, Any]] = [] |
|
|
|
|
|
name = str(row[0]) if 0 in row or isinstance(row, list) else str(row.get("name", "unknown")) |
|
|
|
|
|
if isinstance(row, list): |
|
|
|
|
|
|
|
|
|
|
|
description = None |
|
|
doc_or_header = None |
|
|
imports_block = None |
|
|
steps_or_code = None |
|
|
tests = None |
|
|
|
|
|
|
|
|
for item in row: |
|
|
|
|
|
if isinstance(item, str) and item.strip().startswith('"""'): |
|
|
|
|
|
doc_or_header = item |
|
|
elif isinstance(item, str) and (item.startswith("import ") or "from " in item): |
|
|
imports_block = item |
|
|
elif isinstance(item, list): |
|
|
|
|
|
if item and isinstance(item[0], dict) and "function_header" in item[0]: |
|
|
steps_or_code = item |
|
|
elif item and isinstance(item[0], str) and item[0].strip().startswith(("ref", "assert", "from ")): |
|
|
tests = item |
|
|
elif isinstance(item, dict): |
|
|
|
|
|
steps_or_code = [item] |
|
|
|
|
|
|
|
|
if isinstance(steps_or_code, list) and steps_or_code and isinstance(steps_or_code[0], dict): |
|
|
for idx, step in enumerate(steps_or_code): |
|
|
header = step.get("function_header") or step.get("header") or "" |
|
|
code = step.get("ground_truth_code") or step.get("solution") or "" |
|
|
step_tests = step.get("test_cases") or [] |
|
|
entry_point = _extract_entry_point_from_header(header) |
|
|
prompt = header |
|
|
examples.append( |
|
|
{ |
|
|
"task_id": f"SciCode/{name}#step{idx+1}", |
|
|
"prompt": prompt, |
|
|
"entry_point": entry_point, |
|
|
"canonical_solution": code, |
|
|
"tests": step_tests, |
|
|
"imports": imports_block or "", |
|
|
} |
|
|
) |
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
code_blob = None |
|
|
for item in row: |
|
|
if isinstance(item, str) and "def " in item and "return" in item: |
|
|
code_blob = item |
|
|
break |
|
|
|
|
|
if code_blob: |
|
|
|
|
|
headers = list(re.finditer(r"(?ms)^(def\s+[A-Za-z_][A-Za-z0-9_]*\s*\(.*?\):\s*\n)", code_blob)) |
|
|
if headers: |
|
|
last_header = headers[-1].group(1) |
|
|
entry_point = _extract_entry_point_from_header(last_header) |
|
|
else: |
|
|
entry_point = "solution" |
|
|
|
|
|
|
|
|
prompt = doc_or_header or f"def {entry_point}(*args, **kwargs):\n '''Fill in the function body.'''\n ..." |
|
|
examples.append( |
|
|
{ |
|
|
"task_id": f"SciCode/{name}", |
|
|
"prompt": prompt, |
|
|
"entry_point": entry_point, |
|
|
"canonical_solution": code_blob, |
|
|
"tests": tests or [], |
|
|
"imports": imports_block or "", |
|
|
} |
|
|
) |
|
|
|
|
|
else: |
|
|
|
|
|
|
|
|
steps = row.get("steps", []) |
|
|
imports_block = row.get("required_dependencies", "") |
|
|
task_name = row.get("step_number", "unknown") |
|
|
|
|
|
|
|
|
if steps: |
|
|
for idx, step in enumerate(steps): |
|
|
header = step.get("function_header", "") |
|
|
code = step.get("ground_truth_code", "") |
|
|
step_tests = step.get("test_cases", []) |
|
|
entry_point = _extract_entry_point_from_header(header) |
|
|
examples.append( |
|
|
{ |
|
|
"task_id": f"SciCode/{task_name}#step{idx+1}", |
|
|
"prompt": header, |
|
|
"entry_point": entry_point, |
|
|
"canonical_solution": code, |
|
|
"tests": step_tests, |
|
|
"imports": imports_block or "", |
|
|
} |
|
|
) |
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
header = row.get("function_header", "") |
|
|
prompt_update = row.get("step_description_prompt", "") |
|
|
code = row.get("ground_truth_code", "") |
|
|
tests = row.get("test_cases", []) |
|
|
returnline = row.get("return_line", "") |
|
|
entry_point = _extract_entry_point_from_header(header) if header else "solution" |
|
|
bkgd = row.get("step_background","") |
|
|
prompt = header or f"def {entry_point}(*args, **kwargs):\n " |
|
|
examples.append( |
|
|
{ |
|
|
"task_id": f"SciCode/{task_name}", |
|
|
"prompt": bkgd+prompt_update+prompt+'''Fill in the function body.\n''', |
|
|
"entry_point": entry_point, |
|
|
"canonical_solution": code, |
|
|
"tests": tests, |
|
|
"imports": imports_block or "", |
|
|
} |
|
|
) |
|
|
|
|
|
return examples |
|
|
|
|
|
|
|
|
def load_scicode_data(jsonl_path: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Load SciCode jsonl and expand into normalized examples. |
|
|
""" |
|
|
raw = load_json(jsonl_path, type="jsonl") |
|
|
|
|
|
all_examples: List[Dict[str, Any]] = [] |
|
|
for row in raw: |
|
|
try: |
|
|
all_examples.extend(_coerce_scicode_row_to_examples(row)) |
|
|
except Exception as e: |
|
|
logger.warning(f"[SciCode] Skipping a malformed row due to: {e}") |
|
|
return all_examples |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SciCode(CodingBenchmark): |
|
|
""" |
|
|
Benchmark class for evaluating code generation on SciCode. |
|
|
|
|
|
SciCode problems provide: |
|
|
- function_header (prompt stub) |
|
|
- ground_truth_code (reference implementation) |
|
|
- test_cases (list[str] of python asserts) |
|
|
|
|
|
We normalize each item and evaluate by executing the candidate implementation |
|
|
against the provided test cases. Since many SciCode tests reference a variable |
|
|
named `target`, we heuristically pre-compute `target` from the reference |
|
|
implementation when necessary, or set it to True for boolean-allclose tests. |
|
|
""" |
|
|
|
|
|
def __init__(self, path: str = None, mode: str = "all", timeout: int = 60, k: Union[int, list] = 1, **kwargs): |
|
|
path = os.path.expanduser(path or "~/.evoagentx/data/scicode") |
|
|
self.k = k |
|
|
self.name = "scicode" |
|
|
super().__init__(name=type(self).__name__, path=path, mode=mode, timeout=timeout, **kwargs) |
|
|
|
|
|
|
|
|
|
|
|
def _load_data(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.mode in ("dev", "all"): |
|
|
self._dev_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_dev.jsonl") |
|
|
self._data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_dev.pkl") |
|
|
if self.mode in ("test", "all"): |
|
|
self._test_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_test.jsonl") |
|
|
self._test_data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_test.pkl") |
|
|
try: |
|
|
self._data_ground = pd.concat((self._data_ground, self._test_data_ground)) |
|
|
except: |
|
|
self._data_ground = self._test_data_ground |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_label(self, example: Any): |
|
|
""" |
|
|
For SciCode we treat the label as the full test suite plus metadata. |
|
|
""" |
|
|
return { |
|
|
"task_id": example["task_id"], |
|
|
"entry_point": example["entry_point"], |
|
|
"tests": example.get("tests", []), |
|
|
"canonical_solution": example.get("canonical_solution", ""), |
|
|
"imports": example.get("imports", ""), |
|
|
} |
|
|
|
|
|
def _get_id(self, example: Any): |
|
|
return example["task_id"] |
|
|
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
def _build_reference_namespace(imports: str, canonical_solution: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Build an execution namespace that defines the reference function. |
|
|
""" |
|
|
ns: Dict[str, Any] = {"np": np, "scicode":scicode} |
|
|
if imports: |
|
|
exec(imports, ns, ns) |
|
|
if canonical_solution: |
|
|
exec(canonical_solution, ns, ns) |
|
|
return ns |
|
|
|
|
|
@staticmethod |
|
|
def _extract_candidate_exprs_from_test(test_src: str) -> List[str]: |
|
|
""" |
|
|
Heuristically extract expressions that are compared against `target` inside np.allclose(..., target) |
|
|
or equality checks like "== target" / ", target)" etc. Returns a list of python expressions (as strings) |
|
|
that we should evaluate with the *reference* implementation to generate `target`. |
|
|
|
|
|
This is a pragmatic parser covering the most common SciCode patterns. |
|
|
""" |
|
|
exprs: List[str] = [] |
|
|
|
|
|
for m in re.finditer(r"np\.allclose\s*\(\s*(?P<expr>.+?)\s*,\s*target\s*\)", test_src, flags=re.DOTALL): |
|
|
exprs.append(m.group("expr")) |
|
|
|
|
|
|
|
|
for m in re.finditer(r"assert\s+(?P<expr>.+?)\s*==\s*target", test_src): |
|
|
exprs.append(m.group("expr")) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return exprs |
|
|
|
|
|
@staticmethod |
|
|
def _compute_target_list(exprs: List[str], ref_ns: Dict[str, Any]) -> Any: |
|
|
""" |
|
|
Given a list of expressions (strings), evaluate them in the reference namespace. |
|
|
If multiple expressions are found, we pack them into a tuple in the same order. |
|
|
If no expression found, return True (to support tests of the form `assert <bool>, target`). |
|
|
""" |
|
|
if not exprs: |
|
|
return True |
|
|
values = [] |
|
|
for ex in exprs: |
|
|
|
|
|
local_ns: Dict[str, Any] = {} |
|
|
val = eval(ex, ref_ns, local_ns) |
|
|
values.append(val) |
|
|
if len(values) == 1: |
|
|
return values[0] |
|
|
return tuple(values) |
|
|
|
|
|
def _make_harness(self, task_id: str, entry_point: str, imports: str, canonical_solution: str, tests: List[str], candidate_src: str) -> str: |
|
|
""" |
|
|
Construct an executable harness that: |
|
|
1) Defines imports |
|
|
2) Defines candidate implementation (prompt + candidate completion) |
|
|
3) Pre-computes `target` using the reference implementation for each test (heuristics) |
|
|
4) Executes the original test snippet with `target` bound. |
|
|
We run each test independently within the same process, stopping on first failure. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return candidate_src |
|
|
|
|
|
def handle_special_cases(self, task_id: str, solution: str, test: str) -> Tuple[str, str]: |
|
|
""" |
|
|
Hook: adjust solution/test for edge cases in SciCode, if needed. |
|
|
Currently, we leave as-is and fallback to the base handler. |
|
|
""" |
|
|
import re |
|
|
start = "```python" |
|
|
end = "```" |
|
|
s = solution |
|
|
if start in s and end in s: |
|
|
solution = s[s.find(start)+len(start):s.rfind(end)] |
|
|
print("solution start") |
|
|
print(solution) |
|
|
print("solution end") |
|
|
return super().handle_special_cases(task_id=task_id, solution=solution, test=test) |
|
|
|
|
|
def evaluate(self, prediction: Any, label: Any) -> dict: |
|
|
""" |
|
|
Evaluate the solution code. |
|
|
|
|
|
Args: |
|
|
prediction (str | List[str]): The solution code(s). |
|
|
label (dict | List[dict]): The unit test code(s). |
|
|
|
|
|
Returns: |
|
|
dict: The evaluation metrics (pass@k). |
|
|
""" |
|
|
prediction, label = self._check_evaluation_inputs(prediction, label) |
|
|
import pickle |
|
|
|
|
|
data = {"prediction":prediction, "label":label} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
results = [] |
|
|
for solution in prediction: |
|
|
print(solution) |
|
|
solution_states = [] |
|
|
for label_data in label: |
|
|
task_id = label_data["task_id"] |
|
|
prompt = self.get_example_by_id(task_id)["prompt"] |
|
|
unit_test = label_data["tests"] |
|
|
extract_target = self._data_ground[self._data_ground['test_cases']==unit_test]['target'].values[0] |
|
|
unit_test = label_data['imports'] + "\n" +label_data["tests"] |
|
|
|
|
|
if "numpy.ndarray" in str(type(extract_target)) and 'numpy.bool_' != str(type(extract_target)): |
|
|
unit_test = unit_test.replace('target', str(extract_target.tolist())) |
|
|
elif 'tuple' in str(type(extract_target)): |
|
|
try: |
|
|
update_target = tuple([i.tolist() for i in extract_target]) |
|
|
unit_test = unit_test.replace('target', str(update_target)) |
|
|
except: |
|
|
unit_test = unit_test.replace('target', str(extract_target)) |
|
|
elif 'dict' in str(type(extract_target)): |
|
|
update_target = dict() |
|
|
for i in extract_target.keys(): |
|
|
update_target[i] = extract_target[i].tolist() |
|
|
unit_test = unit_test.replace('target', str(update_target)) |
|
|
else: |
|
|
unit_test = unit_test.replace('target', str(extract_target)) |
|
|
|
|
|
|
|
|
entry_point = label_data["entry_point"] |
|
|
state, message = self.check_solution_scicode( |
|
|
task_id=task_id, |
|
|
solution=prompt + solution, |
|
|
test=unit_test, |
|
|
entry_point=entry_point |
|
|
) |
|
|
if state != self.SUCCESS: |
|
|
break |
|
|
solution_states.append(state) |
|
|
self.error_list[task_id] = message.split('\n')[0] |
|
|
results.append(len(solution_states)==len(label) and all(state==self.SUCCESS for state in solution_states)) |
|
|
|
|
|
k_list = [self.k] if isinstance(self.k, int) else self.k |
|
|
pass_at_k = self.compute_pass_at_k(results, k_list) |
|
|
|
|
|
return pass_at_k |
|
|
|
|
|
|
|
|
class AFlowSciCode(SciCode): |
|
|
""" |
|
|
AFlow-specific implementation of SciCode benchmark. |
|
|
Uses AFLOW_DATASET_FILES_MAP['scicode'] for split files (if provided by your distribution). |
|
|
""" |
|
|
|
|
|
def __init__(self, path: str = None, mode: str = "all", timeout: int = 60, k: Union[int, list] = 1, **kwargs): |
|
|
self._dev_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_dev.jsonl") |
|
|
self._data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_dev.pkl") |
|
|
self._test_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_test.jsonl") |
|
|
self._test_data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_test.pkl") |
|
|
try: |
|
|
self._data_ground = pd.concat((self._data_ground, self._test_data_ground)) |
|
|
except: |
|
|
self._data_ground = self._test_data_ground |
|
|
self.k = k |
|
|
super().__init__(path=path, mode=mode, timeout=timeout, k=k, **kwargs) |
|
|
|
|
|
def extract_test_cases_with_entry_point(self, entry_point: str): |
|
|
""" |
|
|
Extract test cases with the given entry point. |
|
|
""" |
|
|
|
|
|
hardcoded_cases = { |
|
|
"find_zero": "", |
|
|
"decode_cyclic": "", |
|
|
"decode_shift": "", |
|
|
"by_length": "", |
|
|
"add": "", |
|
|
"triangle_area": "", |
|
|
"correct_bracketing": "", |
|
|
"solve": "", |
|
|
"sum_squares": "", |
|
|
"starts_one_ends": "", |
|
|
} |
|
|
if entry_point in hardcoded_cases: |
|
|
return hardcoded_cases[entry_point] |
|
|
|
|
|
for case in self._test_cases: |
|
|
if case["entry_point"] == entry_point: |
|
|
return case["test"] |
|
|
|
|
|
return None |
|
|
|
|
|
async def async_evaluate(self, graph: Callable, example: Any) -> float: |
|
|
|
|
|
|
|
|
prompt, entry_point = example["prompt"], example["entry_point"] |
|
|
solution = await graph(prompt, entry_point) |
|
|
label = self._get_label(example) |
|
|
metrics = await super().async_evaluate(prediction=solution, label=label) |
|
|
return metrics["pass@1"] |
|
|
|
|
|
def evaluate(self, prediction: Any, label: Any) -> dict: |
|
|
""" |
|
|
Evaluate the solution code. |
|
|
|
|
|
Args: |
|
|
prediction (str | List[str]): The solution code(s). |
|
|
label (dict | List[dict]): The unit test code(s). |
|
|
|
|
|
Returns: |
|
|
dict: The evaluation metrics (pass@k). |
|
|
""" |
|
|
prediction, label = self._check_evaluation_inputs(prediction, label) |
|
|
results = [] |
|
|
for solution in prediction: |
|
|
|
|
|
solution_states = [] |
|
|
for label_data in label: |
|
|
task_id = label_data["task_id"] |
|
|
prompt = self.get_example_by_id(task_id)["prompt"] |
|
|
unit_test = label_data["tests"] |
|
|
extract_target = self._data_ground[self._data_ground['test_cases']==unit_test]['target'].values[0] |
|
|
unit_test = label_data['imports'] + "\n" +label_data["tests"] |
|
|
|
|
|
if "numpy.ndarray" in str(type(extract_target)) and 'numpy.bool_' != str(type(extract_target)): |
|
|
unit_test = unit_test.replace('target', str(extract_target.tolist())) |
|
|
elif 'tuple' in str(type(extract_target)): |
|
|
try: |
|
|
update_target = tuple([i.tolist() for i in extract_target]) |
|
|
unit_test = unit_test.replace('target', str(update_target)) |
|
|
except: |
|
|
unit_test = unit_test.replace('target', str(extract_target)) |
|
|
elif 'dict' in str(type(extract_target)): |
|
|
update_target = dict() |
|
|
for i in extract_target.keys(): |
|
|
update_target[i] = extract_target[i].tolist() |
|
|
unit_test = unit_test.replace('target', str(update_target)) |
|
|
else: |
|
|
unit_test = unit_test.replace('target', str(extract_target)) |
|
|
|
|
|
|
|
|
entry_point = label_data["entry_point"] |
|
|
state, message = self.check_solution_scicode( |
|
|
task_id=task_id, |
|
|
solution=prompt + solution, |
|
|
test=unit_test, |
|
|
entry_point=entry_point |
|
|
) |
|
|
|
|
|
|
|
|
if state != self.SUCCESS: |
|
|
break |
|
|
solution_states.append(state) |
|
|
self.error_list[task_id] = message.split('\n')[0] |
|
|
results.append(len(solution_states)==len(label) and all(state==self.SUCCESS for state in solution_states)) |
|
|
|
|
|
k_list = [self.k] if isinstance(self.k, int) else self.k |
|
|
pass_at_k = self.compute_pass_at_k(results, k_list) |
|
|
|
|
|
return pass_at_k |