import os import re import gzip import shutil import pandas as pd import contextlib import signal import scicode import signal from contextlib import contextmanager import numpy as np # Many SciCode tests use numpy from typing import Union, Any, Callable, List, Dict, Tuple from .benchmark import CodingBenchmark from ..core.logging import logger from ..utils.utils import download_file from ..core.module_utils import load_json from ..utils.aflow_utils.data_utils import AFLOW_DATASET_FILES_MAP, download_aflow_benchmark_data class TimeoutException(Exception): pass @contextmanager def time_limit(seconds): def signal_handler(signum, frame): raise TimeoutException("Timed out!") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0) SCICODE_DEFAULT_URL = "https://raw.githubusercontent.com/scicode-bench/scicode/main/data/scicode.jsonl.gz" # If you mirror elsewhere, update here. def download_raw_scicode_data(save_folder: str, url: str = SCICODE_DEFAULT_URL) -> str: """ Download and unzip the raw SciCode jsonl(.gz) to `save_folder`. Returns: str: Path to the unzipped jsonl file. """ os.makedirs(save_folder, exist_ok=True) gz_path = os.path.join(save_folder, "scicode.jsonl.gz") jsonl_path = os.path.join(save_folder, "scicode.jsonl") logger.info(f"Downloading SciCode data from {url} ...") download_file(url=url, save_file=gz_path) logger.info("Unzipping SciCode data ...") with gzip.open(gz_path, "rb") as f_in, open(jsonl_path, "wb") as f_out: shutil.copyfileobj(f_in, f_out) if os.path.exists(gz_path): os.remove(gz_path) return jsonl_path # ---------------------------- # Schema helpers # ---------------------------- def _extract_entry_point_from_header(header: str) -> str: """ Given a SciCode 'function_header' string like: "def get_alpha(recvec, alpha_scaling=5):\n '''...'''" return "get_alpha". """ m = re.search(r"def\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(", header) if not m: raise ValueError("Could not parse entry point from function_header") return m.group(1) def _coerce_scicode_row_to_examples(row: Dict[str, Any]) -> List[Dict[str, Any]]: """ SciCode rows may contain a single task or multiple step tasks. We normalize them to a list of examples with a unified structure: { "task_id": "SciCode/#", "prompt": , "entry_point": , "canonical_solution": , "tests": List[str], # list of python test snippets "imports": str # optional import prelude (e.g., 'import numpy as np') } """ examples: List[Dict[str, Any]] = [] name = str(row[0]) if 0 in row or isinstance(row, list) else str(row.get("name", "unknown")) # Different dumps can be list-based or dict-based; support both: if isinstance(row, list): # Heuristic index layout (based on the example provided by the user): # [name, , description, , docstring, imports, steps(list[dict]) or code, tests(list[str]) or None] # We will try to find keys by semantic type description = None doc_or_header = None imports_block = None steps_or_code = None tests = None # Try assigning by scanning for item in row: # print(item) if isinstance(item, str) and item.strip().startswith('"""'): # docstring/prompt block for the top-level task doc_or_header = item elif isinstance(item, str) and (item.startswith("import ") or "from " in item): imports_block = item elif isinstance(item, list): # Could be steps OR tests if item and isinstance(item[0], dict) and "function_header" in item[0]: steps_or_code = item elif item and isinstance(item[0], str) and item[0].strip().startswith(("ref", "assert", "from ")): tests = item elif isinstance(item, dict): # Some SciCode variants may directly be dicts per step; treat as steps steps_or_code = [item] # If we have step dictionaries, produce one example per step if isinstance(steps_or_code, list) and steps_or_code and isinstance(steps_or_code[0], dict): for idx, step in enumerate(steps_or_code): header = step.get("function_header") or step.get("header") or "" code = step.get("ground_truth_code") or step.get("solution") or "" step_tests = step.get("test_cases") or [] entry_point = _extract_entry_point_from_header(header) prompt = header # keep header as the model prompt (header + docstring already embedded) examples.append( { "task_id": f"SciCode/{name}#step{idx+1}", "prompt": prompt, "entry_point": entry_point, "canonical_solution": code, "tests": step_tests, "imports": imports_block or "", } ) else: # Single task variant: expect a combined "function_header" + "ground_truth_code" + "test_cases" in the row # Try to detect them from the large code string block if present. # Fall back to no-op if missing. # NOTE: The user’s example shows a consolidated block near the end; we’ll try to parse it. code_blob = None for item in row: if isinstance(item, str) and "def " in item and "return" in item: code_blob = item break # Try to split the big blob into multiple functions; evaluate the last one as the main if we cannot find header separately. if code_blob: # Heuristic: the last "def ..." in the blob is the target entry point headers = list(re.finditer(r"(?ms)^(def\s+[A-Za-z_][A-Za-z0-9_]*\s*\(.*?\):\s*\n)", code_blob)) if headers: last_header = headers[-1].group(1) entry_point = _extract_entry_point_from_header(last_header) else: entry_point = "solution" # We will treat entire blob as canonical_solution and create a minimal prompt from the docstring if present prompt = doc_or_header or f"def {entry_point}(*args, **kwargs):\n '''Fill in the function body.'''\n ..." examples.append( { "task_id": f"SciCode/{name}", "prompt": prompt, "entry_point": entry_point, "canonical_solution": code_blob, "tests": tests or [], "imports": imports_block or "", } ) else: # Dict-style row (fallback): expect keys by name # print(row) steps = row.get("steps", []) imports_block = row.get("required_dependencies", "") task_name = row.get("step_number", "unknown") if steps: for idx, step in enumerate(steps): header = step.get("function_header", "") code = step.get("ground_truth_code", "") step_tests = step.get("test_cases", []) entry_point = _extract_entry_point_from_header(header) examples.append( { "task_id": f"SciCode/{task_name}#step{idx+1}", "prompt": header, "entry_point": entry_point, "canonical_solution": code, "tests": step_tests, "imports": imports_block or "", } ) else: # header = row.get("function_header", "") # prompt_update = row.get("step_description_prompt", "") # code = row.get("ground_truth_code", "") # tests = row.get("test_cases", []) # returnline = row.get("return_line", "") # entry_point = _extract_entry_point_from_header(header) if header else "solution" # prompt = header or f"def {entry_point}(*args, **kwargs):\n pass" # examples.append( # { # "task_id": f"SciCode/{task_name}", # "prompt": prompt_update+prompt+'''Fill in the function body.\n''' + returnline, # "entry_point": entry_point, # "canonical_solution": code, # "tests": tests, # "imports": imports_block or "", # } # ) # print(examples) header = row.get("function_header", "") prompt_update = row.get("step_description_prompt", "") code = row.get("ground_truth_code", "") tests = row.get("test_cases", []) returnline = row.get("return_line", "") entry_point = _extract_entry_point_from_header(header) if header else "solution" bkgd = row.get("step_background","") prompt = header or f"def {entry_point}(*args, **kwargs):\n " examples.append( { "task_id": f"SciCode/{task_name}", "prompt": bkgd+prompt_update+prompt+'''Fill in the function body.\n''', "entry_point": entry_point, "canonical_solution": code, "tests": tests, "imports": imports_block or "", } ) return examples def load_scicode_data(jsonl_path: str) -> List[Dict[str, Any]]: """ Load SciCode jsonl and expand into normalized examples. """ raw = load_json(jsonl_path, type="jsonl") # print(raw) all_examples: List[Dict[str, Any]] = [] for row in raw: try: all_examples.extend(_coerce_scicode_row_to_examples(row)) except Exception as e: logger.warning(f"[SciCode] Skipping a malformed row due to: {e}") return all_examples # ---------------------------- # Benchmark classes # ---------------------------- class SciCode(CodingBenchmark): """ Benchmark class for evaluating code generation on SciCode. SciCode problems provide: - function_header (prompt stub) - ground_truth_code (reference implementation) - test_cases (list[str] of python asserts) We normalize each item and evaluate by executing the candidate implementation against the provided test cases. Since many SciCode tests reference a variable named `target`, we heuristically pre-compute `target` from the reference implementation when necessary, or set it to True for boolean-allclose tests. """ def __init__(self, path: str = None, mode: str = "all", timeout: int = 60, k: Union[int, list] = 1, **kwargs): path = os.path.expanduser(path or "~/.evoagentx/data/scicode") self.k = k self.name = "scicode" super().__init__(name=type(self).__name__, path=path, mode=mode, timeout=timeout, **kwargs) # ---------- Data loading ---------- def _load_data(self): # data_path = os.path.join(self.path, "scicode.jsonl") # if not os.path.exists(data_path): # data_path = download_raw_scicode_data(self.path) # For SciCode, we place everything into "test" split by default. if self.mode in ("dev", "all"): self._dev_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_dev.jsonl") self._data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_dev.pkl") if self.mode in ("test", "all"): self._test_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_test.jsonl") self._test_data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_test.pkl") try: self._data_ground = pd.concat((self._data_ground, self._test_data_ground)) except: self._data_ground = self._test_data_ground # if self.mode in ("dev", "all"): # self._dev_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_1sample_dev.jsonl") # self._data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_dev.pkl") # if self.mode in ("test", "all"): # self._test_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_1sample_test.jsonl") # self._test_data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_test.pkl") # try: # self._data_ground = pd.concat((self._data_ground, self._test_data_ground)) # except: # self._data_ground = self._test_data_ground def _get_label(self, example: Any): """ For SciCode we treat the label as the full test suite plus metadata. """ return { "task_id": example["task_id"], "entry_point": example["entry_point"], "tests": example.get("tests", []), "canonical_solution": example.get("canonical_solution", ""), "imports": example.get("imports", ""), } def _get_id(self, example: Any): return example["task_id"] # ---------- Evaluation ---------- @staticmethod def _build_reference_namespace(imports: str, canonical_solution: str) -> Dict[str, Any]: """ Build an execution namespace that defines the reference function. """ ns: Dict[str, Any] = {"np": np, "scicode":scicode} if imports: exec(imports, ns, ns) # e.g., "import numpy as np\nfrom scipy.special import erfc" if canonical_solution: exec(canonical_solution, ns, ns) return ns @staticmethod def _extract_candidate_exprs_from_test(test_src: str) -> List[str]: """ Heuristically extract expressions that are compared against `target` inside np.allclose(..., target) or equality checks like "== target" / ", target)" etc. Returns a list of python expressions (as strings) that we should evaluate with the *reference* implementation to generate `target`. This is a pragmatic parser covering the most common SciCode patterns. """ exprs: List[str] = [] # Pattern A: np.allclose( , target ) for m in re.finditer(r"np\.allclose\s*\(\s*(?P.+?)\s*,\s*target\s*\)", test_src, flags=re.DOTALL): exprs.append(m.group("expr")) # Pattern B: assert == target for m in re.finditer(r"assert\s+(?P.+?)\s*==\s*target", test_src): exprs.append(m.group("expr")) # Pattern C: assert , target (when the first arg should be True) # In this case, target is expected to be True; no need to compute it. # We'll handle by leaving exprs empty and later default target=True. # Pattern D: Using slices like target[0], target[1] — we try to recover by # extracting both left-hand expressions in the same line in order: # np.allclose(func(...)[0], target[0]) and np.allclose(func(...)[1], target[1]) # Already captured by Pattern A; expr may include "[0]" or "[1]". return exprs @staticmethod def _compute_target_list(exprs: List[str], ref_ns: Dict[str, Any]) -> Any: """ Given a list of expressions (strings), evaluate them in the reference namespace. If multiple expressions are found, we pack them into a tuple in the same order. If no expression found, return True (to support tests of the form `assert , target`). """ if not exprs: return True values = [] for ex in exprs: # Safety: limit builtins local_ns: Dict[str, Any] = {} val = eval(ex, ref_ns, local_ns) values.append(val) if len(values) == 1: return values[0] return tuple(values) def _make_harness(self, task_id: str, entry_point: str, imports: str, canonical_solution: str, tests: List[str], candidate_src: str) -> str: """ Construct an executable harness that: 1) Defines imports 2) Defines candidate implementation (prompt + candidate completion) 3) Pre-computes `target` using the reference implementation for each test (heuristics) 4) Executes the original test snippet with `target` bound. We run each test independently within the same process, stopping on first failure. """ # We'll build a block that iterates tests in Python. # We cannot dynamically pass `target` into a raw `assert` snippet without executing it; # so for each test, we will: # a) compute target in a separate namespace using reference function, # b) then execute the original test with the candidate function and that target. # This is orchestrated by the benchmark runtime (not inside the user env). # NOTE: actual orchestration happens in `evaluate()` by repeated calls to `check_solution`; # here we only prepare the body (candidate code). The unit tests are executed by the # framework’s sand-boxed executor using `test` passed in. # We keep the candidate_src as-is. The imports are prepended at runtime via the test body. return candidate_src def handle_special_cases(self, task_id: str, solution: str, test: str) -> Tuple[str, str]: """ Hook: adjust solution/test for edge cases in SciCode, if needed. Currently, we leave as-is and fallback to the base handler. """ import re start = "```python" end = "```" s = solution if start in s and end in s: solution = s[s.find(start)+len(start):s.rfind(end)] print("solution start") print(solution) print("solution end") return super().handle_special_cases(task_id=task_id, solution=solution, test=test) def evaluate(self, prediction: Any, label: Any) -> dict: """ Evaluate the solution code. Args: prediction (str | List[str]): The solution code(s). label (dict | List[dict]): The unit test code(s). Returns: dict: The evaluation metrics (pass@k). """ prediction, label = self._check_evaluation_inputs(prediction, label) import pickle # 1. Define the object to be saved data = {"prediction":prediction, "label":label} # 2. Save the object to a pickle file # file_path = "saved_data_scicode_sew_info.pkl" # with open(file_path, "wb") as f: # pickle.dump(data, f) results = [] for solution in prediction: print(solution) solution_states = [] for label_data in label: task_id = label_data["task_id"] prompt = self.get_example_by_id(task_id)["prompt"] unit_test = label_data["tests"] extract_target = self._data_ground[self._data_ground['test_cases']==unit_test]['target'].values[0] unit_test = label_data['imports'] + "\n" +label_data["tests"] ###### parser if "numpy.ndarray" in str(type(extract_target)) and 'numpy.bool_' != str(type(extract_target)): unit_test = unit_test.replace('target', str(extract_target.tolist())) elif 'tuple' in str(type(extract_target)): try: update_target = tuple([i.tolist() for i in extract_target]) unit_test = unit_test.replace('target', str(update_target)) except: unit_test = unit_test.replace('target', str(extract_target)) elif 'dict' in str(type(extract_target)): update_target = dict() for i in extract_target.keys(): update_target[i] = extract_target[i].tolist() unit_test = unit_test.replace('target', str(update_target)) else: unit_test = unit_test.replace('target', str(extract_target)) # print(unit_test) ###### parser end entry_point = label_data["entry_point"] state, message = self.check_solution_scicode( task_id=task_id, solution=prompt + solution, test=unit_test, entry_point=entry_point ) if state != self.SUCCESS: break solution_states.append(state) self.error_list[task_id] = message.split('\n')[0] results.append(len(solution_states)==len(label) and all(state==self.SUCCESS for state in solution_states)) k_list = [self.k] if isinstance(self.k, int) else self.k pass_at_k = self.compute_pass_at_k(results, k_list) return pass_at_k class AFlowSciCode(SciCode): """ AFlow-specific implementation of SciCode benchmark. Uses AFLOW_DATASET_FILES_MAP['scicode'] for split files (if provided by your distribution). """ def __init__(self, path: str = None, mode: str = "all", timeout: int = 60, k: Union[int, list] = 1, **kwargs): self._dev_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_dev.jsonl") self._data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_dev.pkl") self._test_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_test.jsonl") self._test_data_ground = pd.read_pickle("/home/tl688/pitl688/selfevolve/SciCode/eval/data/problems_test.pkl") try: self._data_ground = pd.concat((self._data_ground, self._test_data_ground)) except: self._data_ground = self._test_data_ground self.k = k super().__init__(path=path, mode=mode, timeout=timeout, k=k, **kwargs) def extract_test_cases_with_entry_point(self, entry_point: str): """ Extract test cases with the given entry point. """ hardcoded_cases = { "find_zero": "", "decode_cyclic": "", "decode_shift": "", "by_length": "", "add": "", "triangle_area": "", "correct_bracketing": "", "solve": "", "sum_squares": "", "starts_one_ends": "", } if entry_point in hardcoded_cases: return hardcoded_cases[entry_point] for case in self._test_cases: if case["entry_point"] == entry_point: return case["test"] return None async def async_evaluate(self, graph: Callable, example: Any) -> float: # generate solution prompt, entry_point = example["prompt"], example["entry_point"] solution = await graph(prompt, entry_point) label = self._get_label(example) metrics = await super().async_evaluate(prediction=solution, label=label) return metrics["pass@1"] def evaluate(self, prediction: Any, label: Any) -> dict: """ Evaluate the solution code. Args: prediction (str | List[str]): The solution code(s). label (dict | List[dict]): The unit test code(s). Returns: dict: The evaluation metrics (pass@k). """ prediction, label = self._check_evaluation_inputs(prediction, label) results = [] for solution in prediction: # print(solution) solution_states = [] for label_data in label: task_id = label_data["task_id"] prompt = self.get_example_by_id(task_id)["prompt"] unit_test = label_data["tests"] extract_target = self._data_ground[self._data_ground['test_cases']==unit_test]['target'].values[0] unit_test = label_data['imports'] + "\n" +label_data["tests"] ###### parser if "numpy.ndarray" in str(type(extract_target)) and 'numpy.bool_' != str(type(extract_target)): unit_test = unit_test.replace('target', str(extract_target.tolist())) elif 'tuple' in str(type(extract_target)): try: update_target = tuple([i.tolist() for i in extract_target]) unit_test = unit_test.replace('target', str(update_target)) except: unit_test = unit_test.replace('target', str(extract_target)) elif 'dict' in str(type(extract_target)): update_target = dict() for i in extract_target.keys(): update_target[i] = extract_target[i].tolist() unit_test = unit_test.replace('target', str(update_target)) else: unit_test = unit_test.replace('target', str(extract_target)) # print(unit_test) ###### parser end entry_point = label_data["entry_point"] state, message = self.check_solution_scicode( task_id=task_id, solution=prompt + solution, test=unit_test, entry_point=entry_point ) # print(state) # print(message) if state != self.SUCCESS: break solution_states.append(state) self.error_list[task_id] = message.split('\n')[0] results.append(len(solution_states)==len(label) and all(state==self.SUCCESS for state in solution_states)) k_list = [self.k] if isinstance(self.k, int) else self.k pass_at_k = self.compute_pass_at_k(results, k_list) return pass_at_k