{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "f6327efc", "metadata": {}, "outputs": [ { "ename": "ImportError", "evalue": "attempted relative import with no known parent package", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mImportError\u001b[0m Traceback (most recent call last)", "Cell \u001b[0;32mIn[1], line 7\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mshutil\u001b[39;00m\n\u001b[1;32m 5\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mtyping\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m Union, Any, Callable, List, Dict, Tuple\n\u001b[0;32m----> 7\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mscicode\u001b[39;00m\n\u001b[1;32m 9\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mnumpy\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01mnp\u001b[39;00m \u001b[38;5;66;03m# Many SciCode tests use numpy\u001b[39;00m\n\u001b[1;32m 10\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mbenchmark\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m CodingBenchmark\n", "File \u001b[0;32m/gpfs/radev/pi/ying_rex/tl688/selfevolve/EvoAgentX/evoagentx/benchmark/scicode.py:10\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mscicode\u001b[39;00m\n\u001b[1;32m 9\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mnumpy\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01mnp\u001b[39;00m \u001b[38;5;66;03m# Many SciCode tests use numpy\u001b[39;00m\n\u001b[0;32m---> 10\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mbenchmark\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m CodingBenchmark\n\u001b[1;32m 11\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mcore\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mlogging\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m logger\n\u001b[1;32m 12\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mutils\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mutils\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m download_file\n", "\u001b[0;31mImportError\u001b[0m: attempted relative import with no known parent package" ] } ], "source": [ "import os\n", "import re\n", "import gzip\n", "import shutil\n", "from typing import Union, Any, Callable, List, Dict, Tuple\n", "\n", "import scicode\n", "\n", "import numpy as np # Many SciCode tests use numpy\n", "from .benchmark import CodingBenchmark\n", "from ..core.logging import logger\n", "from ..utils.utils import download_file\n", "from ..core.module_utils import load_json\n", "from ..utils.aflow_utils.data_utils import AFLOW_DATASET_FILES_MAP, download_aflow_benchmark_data\n", "\n", "\n", "# ----------------------------\n", "# Raw SciCode (community) data\n", "# ----------------------------\n", "\n", "SCICODE_DEFAULT_URL = \"https://raw.githubusercontent.com/scicode-bench/scicode/main/data/scicode.jsonl.gz\" # If you mirror elsewhere, update here.\n", "\n", "\n", "def download_raw_scicode_data(save_folder: str, url: str = SCICODE_DEFAULT_URL) -> str:\n", " \"\"\"\n", " Download and unzip the raw SciCode jsonl(.gz) to `save_folder`.\n", "\n", " Returns:\n", " str: Path to the unzipped jsonl file.\n", " \"\"\"\n", " os.makedirs(save_folder, exist_ok=True)\n", " gz_path = os.path.join(save_folder, \"scicode.jsonl.gz\")\n", " jsonl_path = os.path.join(save_folder, \"scicode.jsonl\")\n", "\n", " logger.info(f\"Downloading SciCode data from {url} ...\")\n", " download_file(url=url, save_file=gz_path)\n", "\n", " logger.info(\"Unzipping SciCode data ...\")\n", " with gzip.open(gz_path, \"rb\") as f_in, open(jsonl_path, \"wb\") as f_out:\n", " shutil.copyfileobj(f_in, f_out)\n", " if os.path.exists(gz_path):\n", " os.remove(gz_path)\n", "\n", " return jsonl_path\n", "\n", "\n", "# ----------------------------\n", "# Schema helpers\n", "# ----------------------------\n", "\n", "def _extract_entry_point_from_header(header: str) -> str:\n", " \"\"\"\n", " Given a SciCode 'function_header' string like:\n", " \"def get_alpha(recvec, alpha_scaling=5):\\n '''...'''\"\n", " return \"get_alpha\".\n", " \"\"\"\n", " m = re.search(r\"def\\s+([A-Za-z_][A-Za-z0-9_]*)\\s*\\(\", header)\n", " if not m:\n", " raise ValueError(\"Could not parse entry point from function_header\")\n", " return m.group(1)\n", "\n", "\n", "def _coerce_scicode_row_to_examples(row: Dict[str, Any]) -> List[Dict[str, Any]]:\n", " \"\"\"\n", " SciCode rows may contain a single task or multiple step tasks.\n", " We normalize them to a list of examples with a unified structure:\n", " {\n", " \"task_id\": \"SciCode/#\",\n", " \"prompt\": ,\n", " \"entry_point\": ,\n", " \"canonical_solution\": ,\n", " \"tests\": List[str], # list of python test snippets\n", " \"imports\": str # optional import prelude (e.g., 'import numpy as np')\n", " }\n", " \"\"\"\n", " examples: List[Dict[str, Any]] = []\n", "\n", " name = str(row[0]) if 0 in row or isinstance(row, list) else str(row.get(\"name\", \"unknown\"))\n", " # Different dumps can be list-based or dict-based; support both:\n", " if isinstance(row, list):\n", " # Heuristic index layout (based on the example provided by the user):\n", " # [name, , description, , docstring, imports, steps(list[dict]) or code, tests(list[str]) or None]\n", " # We will try to find keys by semantic type\n", " description = None\n", " doc_or_header = None\n", " imports_block = None\n", " steps_or_code = None\n", " tests = None\n", "\n", " # Try assigning by scanning\n", " for item in row:\n", " if isinstance(item, str) and item.strip().startswith('\"\"\"'):\n", " # docstring/prompt block for the top-level task\n", " doc_or_header = item\n", " elif isinstance(item, str) and (item.startswith(\"import \") or \"from \" in item):\n", " imports_block = item\n", " elif isinstance(item, list):\n", " # Could be steps OR tests\n", " if item and isinstance(item[0], dict) and \"function_header\" in item[0]:\n", " steps_or_code = item\n", " elif item and isinstance(item[0], str) and item[0].strip().startswith((\"ref\", \"assert\", \"from \")):\n", " tests = item\n", " elif isinstance(item, dict):\n", " # Some SciCode variants may directly be dicts per step; treat as steps\n", " steps_or_code = [item]\n", "\n", " # If we have step dictionaries, produce one example per step\n", " if isinstance(steps_or_code, list) and steps_or_code and isinstance(steps_or_code[0], dict):\n", " for idx, step in enumerate(steps_or_code):\n", " header = step.get(\"function_header\") or step.get(\"header\") or \"\"\n", " code = step.get(\"ground_truth_code\") or step.get(\"solution\") or \"\"\n", " step_tests = step.get(\"test_cases\") or []\n", " entry_point = _extract_entry_point_from_header(header)\n", " prompt = header # keep header as the model prompt (header + docstring already embedded)\n", " examples.append(\n", " {\n", " \"task_id\": f\"SciCode/{name}#step{idx+1}\",\n", " \"prompt\": prompt,\n", " \"entry_point\": entry_point,\n", " \"canonical_solution\": code,\n", " \"tests\": step_tests,\n", " \"imports\": imports_block or \"\",\n", " }\n", " )\n", " else:\n", " # Single task variant: expect a combined \"function_header\" + \"ground_truth_code\" + \"test_cases\" in the row\n", " # Try to detect them from the large code string block if present.\n", " # Fall back to no-op if missing.\n", " # NOTE: The user’s example shows a consolidated block near the end; we’ll try to parse it.\n", " code_blob = None\n", " for item in row:\n", " if isinstance(item, str) and \"def \" in item and \"return\" in item:\n", " code_blob = item\n", " break\n", " # Try to split the big blob into multiple functions; evaluate the last one as the main if we cannot find header separately.\n", " if code_blob:\n", " # Heuristic: the last \"def ...\" in the blob is the target entry point\n", " headers = list(re.finditer(r\"(?ms)^(def\\s+[A-Za-z_][A-Za-z0-9_]*\\s*\\(.*?\\):\\s*\\n)\", code_blob))\n", " if headers:\n", " last_header = headers[-1].group(1)\n", " entry_point = _extract_entry_point_from_header(last_header)\n", " else:\n", " entry_point = \"solution\"\n", "\n", " # We will treat entire blob as canonical_solution and create a minimal prompt from the docstring if present\n", " prompt = doc_or_header or f\"def {entry_point}(*args, **kwargs):\\n '''Fill in the function body.'''\\n ...\"\n", " examples.append(\n", " {\n", " \"task_id\": f\"SciCode/{name}\",\n", " \"prompt\": prompt,\n", " \"entry_point\": entry_point,\n", " \"canonical_solution\": code_blob,\n", " \"tests\": tests or [],\n", " \"imports\": imports_block or \"\",\n", " }\n", " )\n", "\n", " else:\n", " # Dict-style row (fallback): expect keys by name\n", " steps = row.get(\"steps\", [])\n", " imports_block = row.get(\"imports\", \"\")\n", " task_name = row.get(\"name\", \"unknown\")\n", "\n", " if steps:\n", " for idx, step in enumerate(steps):\n", " header = step.get(\"function_header\", \"\")\n", " code = step.get(\"ground_truth_code\", \"\")\n", " step_tests = step.get(\"test_cases\", [])\n", " entry_point = _extract_entry_point_from_header(header)\n", " examples.append(\n", " {\n", " \"task_id\": f\"SciCode/{task_name}#step{idx+1}\",\n", " \"prompt\": header,\n", " \"entry_point\": entry_point,\n", " \"canonical_solution\": code,\n", " \"tests\": step_tests,\n", " \"imports\": imports_block or \"\",\n", " }\n", " )\n", " else:\n", " header = row.get(\"function_header\", \"\")\n", " code = row.get(\"ground_truth_code\", \"\")\n", " tests = row.get(\"test_cases\", [])\n", " entry_point = _extract_entry_point_from_header(header) if header else \"solution\"\n", " prompt = header or f\"def {entry_point}(*args, **kwargs):\\n pass\"\n", " examples.append(\n", " {\n", " \"task_id\": f\"SciCode/{task_name}\",\n", " \"prompt\": prompt,\n", " \"entry_point\": entry_point,\n", " \"canonical_solution\": code,\n", " \"tests\": tests,\n", " \"imports\": imports_block or \"\",\n", " }\n", " )\n", "\n", " return examples\n", "\n", "\n", "def load_scicode_data(jsonl_path: str) -> List[Dict[str, Any]]:\n", " \"\"\"\n", " Load SciCode jsonl and expand into normalized examples.\n", " \"\"\"\n", " raw = load_json(jsonl_path, type=\"jsonl\")\n", " all_examples: List[Dict[str, Any]] = []\n", " for row in raw:\n", " try:\n", " all_examples.extend(_coerce_scicode_row_to_examples(row))\n", " except Exception as e:\n", " logger.warning(f\"[SciCode] Skipping a malformed row due to: {e}\")\n", " return all_examples\n", "\n", "\n", "# ----------------------------\n", "# Benchmark classes\n", "# ----------------------------\n", "\n", "class SciCode(CodingBenchmark):\n", " \"\"\"\n", " Benchmark class for evaluating code generation on SciCode.\n", "\n", " SciCode problems provide:\n", " - function_header (prompt stub)\n", " - ground_truth_code (reference implementation)\n", " - test_cases (list[str] of python asserts)\n", "\n", " We normalize each item and evaluate by executing the candidate implementation\n", " against the provided test cases. Since many SciCode tests reference a variable\n", " named `target`, we heuristically pre-compute `target` from the reference\n", " implementation when necessary, or set it to True for boolean-allclose tests.\n", " \"\"\"\n", "\n", " def __init__(self, path: str = None, mode: str = \"all\", timeout: int = 60, k: Union[int, list] = 1, **kwargs):\n", " path = os.path.expanduser(path or \"~/.evoagentx/data/scicode\")\n", " self.k = k\n", " super().__init__(name=type(self).__name__, path=path, mode=mode, timeout=timeout, **kwargs)\n", "\n", " # ---------- Data loading ----------\n", "\n", " def _load_data(self):\n", " data_path = os.path.join(self.path, \"scicode.jsonl\")\n", " if not os.path.exists(data_path):\n", " data_path = download_raw_scicode_data(self.path)\n", "\n", " # For SciCode, we place everything into \"test\" split by default.\n", "\n", " if self.mode in (\"dev\", \"all\"):\n", " self._dev_data = load_scicode_data(\"/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_dev.jsonl\")\n", " if self.mode in (\"test\", \"all\"):\n", " self._test_data = load_scicode_data(\"/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_test.jsonl\")\n", "\n", " def _get_label(self, example: Any):\n", " \"\"\"\n", " For SciCode we treat the label as the full test suite plus metadata.\n", " \"\"\"\n", " return {\n", " \"task_id\": example[\"task_id\"],\n", " \"entry_point\": example[\"entry_point\"],\n", " \"tests\": example.get(\"tests\", []),\n", " \"canonical_solution\": example.get(\"canonical_solution\", \"\"),\n", " \"imports\": example.get(\"imports\", \"\"),\n", " }\n", "\n", " def _get_id(self, example: Any):\n", " return example[\"task_id\"]\n", "\n", " # ---------- Evaluation ----------\n", "\n", " @staticmethod\n", " def _build_reference_namespace(imports: str, canonical_solution: str) -> Dict[str, Any]:\n", " \"\"\"\n", " Build an execution namespace that defines the reference function.\n", " \"\"\"\n", " ns: Dict[str, Any] = {\"np\": np, \"scicode\":scicode}\n", " if imports:\n", " exec(imports, ns, ns) # e.g., \"import numpy as np\\nfrom scipy.special import erfc\"\n", " if canonical_solution:\n", " exec(canonical_solution, ns, ns)\n", " return ns\n", "\n", " @staticmethod\n", " def _extract_candidate_exprs_from_test(test_src: str) -> List[str]:\n", " \"\"\"\n", " Heuristically extract expressions that are compared against `target` inside np.allclose(..., target)\n", " or equality checks like \"== target\" / \", target)\" etc. Returns a list of python expressions (as strings)\n", " that we should evaluate with the *reference* implementation to generate `target`.\n", "\n", " This is a pragmatic parser covering the most common SciCode patterns.\n", " \"\"\"\n", " exprs: List[str] = []\n", "\n", " # Pattern A: np.allclose( , target )\n", " for m in re.finditer(r\"np\\.allclose\\s*\\(\\s*(?P.+?)\\s*,\\s*target\\s*\\)\", test_src, flags=re.DOTALL):\n", " exprs.append(m.group(\"expr\"))\n", "\n", " # Pattern B: assert == target\n", " for m in re.finditer(r\"assert\\s+(?P.+?)\\s*==\\s*target\", test_src):\n", " exprs.append(m.group(\"expr\"))\n", "\n", " # Pattern C: assert , target (when the first arg should be True)\n", " # In this case, target is expected to be True; no need to compute it.\n", " # We'll handle by leaving exprs empty and later default target=True.\n", "\n", " # Pattern D: Using slices like target[0], target[1] — we try to recover by\n", " # extracting both left-hand expressions in the same line in order:\n", " # np.allclose(func(...)[0], target[0]) and np.allclose(func(...)[1], target[1])\n", " # Already captured by Pattern A; expr may include \"[0]\" or \"[1]\".\n", " return exprs\n", "\n", " @staticmethod\n", " def _compute_target_list(exprs: List[str], ref_ns: Dict[str, Any]) -> Any:\n", " \"\"\"\n", " Given a list of expressions (strings), evaluate them in the reference namespace.\n", " If multiple expressions are found, we pack them into a tuple in the same order.\n", " If no expression found, return True (to support tests of the form `assert , target`).\n", " \"\"\"\n", " if not exprs:\n", " return True\n", " values = []\n", " for ex in exprs:\n", " # Safety: limit builtins\n", " local_ns: Dict[str, Any] = {}\n", " val = eval(ex, ref_ns, local_ns)\n", " values.append(val)\n", " if len(values) == 1:\n", " return values[0]\n", " return tuple(values)\n", "\n", " def _make_harness(self, task_id: str, entry_point: str, imports: str, canonical_solution: str, tests: List[str], candidate_src: str) -> str:\n", " \"\"\"\n", " Construct an executable harness that:\n", " 1) Defines imports\n", " 2) Defines candidate implementation (prompt + candidate completion)\n", " 3) Pre-computes `target` using the reference implementation for each test (heuristics)\n", " 4) Executes the original test snippet with `target` bound.\n", " We run each test independently within the same process, stopping on first failure.\n", " \"\"\"\n", " # We'll build a block that iterates tests in Python.\n", " # We cannot dynamically pass `target` into a raw `assert` snippet without executing it;\n", " # so for each test, we will:\n", " # a) compute target in a separate namespace using reference function,\n", " # b) then execute the original test with the candidate function and that target.\n", " # This is orchestrated by the benchmark runtime (not inside the user env).\n", "\n", " # NOTE: actual orchestration happens in `evaluate()` by repeated calls to `check_solution`;\n", " # here we only prepare the body (candidate code). The unit tests are executed by the\n", " # framework’s sand-boxed executor using `test` passed in.\n", "\n", " # We keep the candidate_src as-is. The imports are prepended at runtime via the test body.\n", " return candidate_src\n", "\n", " def handle_special_cases(self, task_id: str, solution: str, test: str) -> Tuple[str, str]:\n", " \"\"\"\n", " Hook: adjust solution/test for edge cases in SciCode, if needed.\n", " Currently, we leave as-is and fallback to the base handler.\n", " \"\"\"\n", " return super().handle_special_cases(task_id=task_id, solution=solution, test=test)\n", "\n", " def evaluate(self, prediction: Any, label: Any) -> dict:\n", " \"\"\"\n", " Evaluate candidate solution(s) against SciCode test cases.\n", "\n", " Strategy:\n", " - For each candidate solution:\n", " - For each test snippet:\n", " 1) Build reference namespace; compute `target` (heuristics).\n", " 2) Build candidate code by concatenating example['prompt'] + candidate solution.\n", " 3) Execute the test with `target` and candidate in the sandbox via `check_solution`.\n", "\n", " - Aggregate per-test pass/fail into a single boolean for the example.\n", " - Compute pass@k across candidates.\n", " \"\"\"\n", " prediction, label = self._check_evaluation_inputs(prediction, label)\n", "\n", " results = []\n", " for solution in prediction:\n", " # Each `label` item corresponds to the SAME example in our usage (benchmark runs per example),\n", " # but we preserve the structure consistent with the base class.\n", " solution_states = []\n", " for label_data in label:\n", " task_id = label_data[\"task_id\"]\n", " entry_point = label_data[\"entry_point\"]\n", " tests = label_data.get(\"tests\", [])\n", " imports = label_data.get(\"imports\", \"\")\n", " canonical_solution = label_data.get(\"canonical_solution\", \"\")\n", "\n", " # Build reference env for computing `target`\n", " ref_ns = self._build_reference_namespace(imports=imports, canonical_solution=canonical_solution)\n", "\n", " # Build candidate code (prompt + solution)\n", " prompt = self.get_example_by_id(task_id)[\"prompt\"]\n", " candidate_code = prompt + \"\\n\" + solution\n", "\n", " # Run each test individually; any failure => whole example fails\n", " all_ok = True\n", " for raw_test in tests if tests else [\"# no tests provided\\nassert True, True\"]:\n", " # Heuristically precompute `target`\n", " exprs = self._extract_candidate_exprs_from_test(raw_test)\n", " try:\n", " target_value = self._compute_target_list(exprs, ref_ns)\n", " except Exception as e:\n", " # If we cannot compute target from the reference, fall back to True\n", " logger.warning(f\"[SciCode] Fallback target=True for {task_id} due to: {e}\")\n", " target_value = True\n", "\n", " # Compose a runnable unit-test block:\n", " # We inject `imports`, bind `target`, then execute the original test code.\n", " unit_test = (\n", " (imports or \"\")\n", " + \"\\n\"\n", " + \"target = __TARGET_VALUE__\\n\"\n", " + raw_test\n", " )\n", "\n", " # Because `check_solution` runs code in separate exec, we stringify the target safely.\n", " # We'll register a placeholder and pass the real object via the executor's globals.\n", " # Our base framework doesn't support direct object injection; so we serialize small types.\n", " # For numpy arrays/tuples we rely on repr + eval. If that fails, we degrade to boolean.\n", " try:\n", " # Light-weight serializer for numpy arrays / tuples / lists / scalars\n", " def _pyrepr(obj):\n", " if isinstance(obj, np.ndarray):\n", " return f\"np.array({repr(obj.tolist())})\"\n", " return repr(obj)\n", "\n", " unit_test = unit_test.replace(\n", " \"__TARGET_VALUE__\", _pyrepr(target_value)\n", " )\n", " except Exception:\n", " unit_test = unit_test.replace(\"__TARGET_VALUE__\", \"True\")\n", "\n", " # Optional special-case patching hook\n", " candidate_code_patched, unit_test_patched = self.handle_special_cases(\n", " task_id=task_id, solution=candidate_code, test=unit_test\n", " )\n", "\n", " # Execute\n", " state, message = self.check_solution(\n", " task_id=task_id,\n", " solution=candidate_code_patched,\n", " test=unit_test_patched,\n", " entry_point=entry_point,\n", " )\n", " if state != self.SUCCESS:\n", " all_ok = False\n", " break\n", "\n", " solution_states.append(self.SUCCESS if all_ok else self.FAILURE)\n", " results.append(len(solution_states) == len(label) and all(s == self.SUCCESS for s in solution_states))\n", "\n", " k_list = [self.k] if isinstance(self.k, int) else self.k\n", " pass_at_k = self.compute_pass_at_k(results, k_list)\n", " return pass_at_k\n", "\n", "\n", "class AFlowSciCode(SciCode):\n", " \"\"\"\n", " AFlow-specific implementation of SciCode benchmark.\n", " Uses AFLOW_DATASET_FILES_MAP['scicode'] for split files (if provided by your distribution).\n", " \"\"\"\n", "\n", " def __init__(self, path: str = None, mode: str = \"all\", timeout: int = 60, k: Union[int, list] = 1, **kwargs):\n", " path = os.path.expanduser(path or \"~/.evoagentx/data/aflow/scicode\")\n", " super().__init__(path=path, mode=mode, timeout=timeout, k=k, **kwargs)\n", "\n", " def _load_data_from_file(self, file_name: str):\n", " if file_name is None:\n", " return None\n", " file_path = os.path.join(self.path, file_name)\n", " if not os.path.exists(file_path):\n", " logger.info(\"Downloading AFlow SciCode split files ...\")\n", " download_aflow_benchmark_data(dataset=\"scicode\", save_folder=self.path)\n", " return load_json(path=file_path, type=\"jsonl\")\n", "\n", " def _load_data(self):\n", " # Prefer AFLOW split files when available; otherwise fall back to raw download.\n", " if \"scicode\" not in AFLOW_DATASET_FILES_MAP:\n", " logger.warning(\"AFLOW_DATASET_FILES_MAP has no entry for 'scicode'; falling back to raw SciCode jsonl.\")\n", " return super()._load_data()\n", "\n", " splits = AFLOW_DATASET_FILES_MAP[\"scicode\"]\n", " data_all: Dict[str, List[Dict[str, Any]]] = {}\n", "\n", " for split in (\"train\", \"dev\", \"test\"):\n", " fname = splits.get(split)\n", " if fname:\n", " logger.info(f\"Loading {split} data from {fname}\")\n", " raw_split = self._load_data_from_file(file_name=fname)\n", " # Normalize rows to examples\n", " examples: List[Dict[str, Any]] = []\n", " for row in raw_split or []:\n", " try:\n", " examples.extend(_coerce_scicode_row_to_examples(row))\n", " except Exception as e:\n", " logger.warning(f\"[AFlowSciCode] Skipping a malformed row in {split} due to: {e}\")\n", " data_all[split] = examples\n", " else:\n", " data_all[split] = None\n", "\n", " if self.mode in (\"train\", \"all\"):\n", " self._train_data = data_all.get(\"train\")\n", " if self.mode in (\"dev\", \"all\"):\n", " self._dev_data = data_all.get(\"dev\")\n", " if self.mode in (\"test\", \"all\"):\n", " self._test_data = data_all.get(\"test\")\n", "\n", " async def async_evaluate(self, graph: Callable, example: Any) -> float:\n", " \"\"\"\n", " Generate a solution asynchronously and return pass@1 for the example.\n", " \"\"\"\n", " prompt, entry_point = example[\"prompt\"], example[\"entry_point\"]\n", " solution = await graph(prompt, entry_point)\n", " label = self._get_label(example)\n", " metrics = await super().async_evaluate(prediction=solution, label=label)\n", " return metrics.get(\"pass@1\", 0.0)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "0d5fd437", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.13" } }, "nbformat": 4, "nbformat_minor": 5 }