In [1]:
import os
import re
import gzip
import shutil
from typing import Union, Any, Callable, List, Dict, Tuple

import scicode

import numpy as np # Many SciCode tests use numpy
from .benchmark import CodingBenchmark
from ..core.logging import logger
from ..utils.utils import download_file
from ..core.module_utils import load_json
from ..utils.aflow_utils.data_utils import AFLOW_DATASET_FILES_MAP, download_aflow_benchmark_data


# ----------------------------
# Raw SciCode (community) data
# ----------------------------

SCICODE_DEFAULT_URL = "https://raw.githubusercontent.com/scicode-bench/scicode/main/data/scicode.jsonl.gz" # If you mirror elsewhere, update here.


def download_raw_scicode_data(save_folder: str, url: str = SCICODE_DEFAULT_URL) -> str:
 """
 Download and unzip the raw SciCode jsonl(.gz) to `save_folder`.

 Returns:
 str: Path to the unzipped jsonl file.
 """
 os.makedirs(save_folder, exist_ok=True)
 gz_path = os.path.join(save_folder, "scicode.jsonl.gz")
 jsonl_path = os.path.join(save_folder, "scicode.jsonl")

 logger.info(f"Downloading SciCode data from {url} ...")
 download_file(url=url, save_file=gz_path)

 logger.info("Unzipping SciCode data ...")
 with gzip.open(gz_path, "rb") as f_in, open(jsonl_path, "wb") as f_out:
 shutil.copyfileobj(f_in, f_out)
 if os.path.exists(gz_path):
 os.remove(gz_path)

 return jsonl_path


# ----------------------------
# Schema helpers
# ----------------------------

def _extract_entry_point_from_header(header: str) -> str:
 """
 Given a SciCode 'function_header' string like:
 "def get_alpha(recvec, alpha_scaling=5):\n '''...'''"
 return "get_alpha".
 """
 m = re.search(r"def\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(", header)
 if not m:
 raise ValueError("Could not parse entry point from function_header")
 return m.group(1)


def _coerce_scicode_row_to_examples(row: Dict[str, Any]) -> List[Dict[str, Any]]:
 """
 SciCode rows may contain a single task or multiple step tasks.
 We normalize them to a list of examples with a unified structure:
 {
 "task_id": "SciCode/#",
 "prompt": ,
 "entry_point": ,
 "canonical_solution": ,
 "tests": List[str], # list of python test snippets
 "imports": str # optional import prelude (e.g., 'import numpy as np')
 }
 """
 examples: List[Dict[str, Any]] = []

 name = str(row[0]) if 0 in row or isinstance(row, list) else str(row.get("name", "unknown"))
 # Different dumps can be list-based or dict-based; support both:
 if isinstance(row, list):
 # Heuristic index layout (based on the example provided by the user):
 # [name, , description, , docstring, imports, steps(list[dict]) or code, tests(list[str]) or None]
 # We will try to find keys by semantic type
 description = None
 doc_or_header = None
 imports_block = None
 steps_or_code = None
 tests = None

 # Try assigning by scanning
 for item in row:
 if isinstance(item, str) and item.strip().startswith('"""'):
 # docstring/prompt block for the top-level task
 doc_or_header = item
 elif isinstance(item, str) and (item.startswith("import ") or "from " in item):
 imports_block = item
 elif isinstance(item, list):
 # Could be steps OR tests
 if item and isinstance(item[0], dict) and "function_header" in item[0]:
 steps_or_code = item
 elif item and isinstance(item[0], str) and item[0].strip().startswith(("ref", "assert", "from ")):
 tests = item
 elif isinstance(item, dict):
 # Some SciCode variants may directly be dicts per step; treat as steps
 steps_or_code = [item]

 # If we have step dictionaries, produce one example per step
 if isinstance(steps_or_code, list) and steps_or_code and isinstance(steps_or_code[0], dict):
 for idx, step in enumerate(steps_or_code):
 header = step.get("function_header") or step.get("header") or ""
 code = step.get("ground_truth_code") or step.get("solution") or ""
 step_tests = step.get("test_cases") or []
 entry_point = _extract_entry_point_from_header(header)
 prompt = header # keep header as the model prompt (header + docstring already embedded)
 examples.append(
 {
 "task_id": f"SciCode/{name}#step{idx+1}",
 "prompt": prompt,
 "entry_point": entry_point,
 "canonical_solution": code,
 "tests": step_tests,
 "imports": imports_block or "",
 }
 )
 else:
 # Single task variant: expect a combined "function_header" + "ground_truth_code" + "test_cases" in the row
 # Try to detect them from the large code string block if present.
 # Fall back to no-op if missing.
 # NOTE: The user’s example shows a consolidated block near the end; we’ll try to parse it.
 code_blob = None
 for item in row:
 if isinstance(item, str) and "def " in item and "return" in item:
 code_blob = item
 break
 # Try to split the big blob into multiple functions; evaluate the last one as the main if we cannot find header separately.
 if code_blob:
 # Heuristic: the last "def ..." in the blob is the target entry point
 headers = list(re.finditer(r"(?ms)^(def\s+[A-Za-z_][A-Za-z0-9_]*\s*\(.*?\):\s*\n)", code_blob))
 if headers:
 last_header = headers[-1].group(1)
 entry_point = _extract_entry_point_from_header(last_header)
 else:
 entry_point = "solution"

 # We will treat entire blob as canonical_solution and create a minimal prompt from the docstring if present
 prompt = doc_or_header or f"def {entry_point}(*args, **kwargs):\n '''Fill in the function body.'''\n ..."
 examples.append(
 {
 "task_id": f"SciCode/{name}",
 "prompt": prompt,
 "entry_point": entry_point,
 "canonical_solution": code_blob,
 "tests": tests or [],
 "imports": imports_block or "",
 }
 )

 else:
 # Dict-style row (fallback): expect keys by name
 steps = row.get("steps", [])
 imports_block = row.get("imports", "")
 task_name = row.get("name", "unknown")

 if steps:
 for idx, step in enumerate(steps):
 header = step.get("function_header", "")
 code = step.get("ground_truth_code", "")
 step_tests = step.get("test_cases", [])
 entry_point = _extract_entry_point_from_header(header)
 examples.append(
 {
 "task_id": f"SciCode/{task_name}#step{idx+1}",
 "prompt": header,
 "entry_point": entry_point,
 "canonical_solution": code,
 "tests": step_tests,
 "imports": imports_block or "",
 }
 )
 else:
 header = row.get("function_header", "")
 code = row.get("ground_truth_code", "")
 tests = row.get("test_cases", [])
 entry_point = _extract_entry_point_from_header(header) if header else "solution"
 prompt = header or f"def {entry_point}(*args, **kwargs):\n pass"
 examples.append(
 {
 "task_id": f"SciCode/{task_name}",
 "prompt": prompt,
 "entry_point": entry_point,
 "canonical_solution": code,
 "tests": tests,
 "imports": imports_block or "",
 }
 )

 return examples


def load_scicode_data(jsonl_path: str) -> List[Dict[str, Any]]:
 """
 Load SciCode jsonl and expand into normalized examples.
 """
 raw = load_json(jsonl_path, type="jsonl")
 all_examples: List[Dict[str, Any]] = []
 for row in raw:
 try:
 all_examples.extend(_coerce_scicode_row_to_examples(row))
 except Exception as e:
 logger.warning(f"[SciCode] Skipping a malformed row due to: {e}")
 return all_examples


# ----------------------------
# Benchmark classes
# ----------------------------

class SciCode(CodingBenchmark):
 """
 Benchmark class for evaluating code generation on SciCode.

 SciCode problems provide:
 - function_header (prompt stub)
 - ground_truth_code (reference implementation)
 - test_cases (list[str] of python asserts)

 We normalize each item and evaluate by executing the candidate implementation
 against the provided test cases. Since many SciCode tests reference a variable
 named `target`, we heuristically pre-compute `target` from the reference
 implementation when necessary, or set it to True for boolean-allclose tests.
 """

 def __init__(self, path: str = None, mode: str = "all", timeout: int = 60, k: Union[int, list] = 1, **kwargs):
 path = os.path.expanduser(path or "~/.evoagentx/data/scicode")
 self.k = k
 super().__init__(name=type(self).__name__, path=path, mode=mode, timeout=timeout, **kwargs)

 # ---------- Data loading ----------

 def _load_data(self):
 data_path = os.path.join(self.path, "scicode.jsonl")
 if not os.path.exists(data_path):
 data_path = download_raw_scicode_data(self.path)

 # For SciCode, we place everything into "test" split by default.

 if self.mode in ("dev", "all"):
 self._dev_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_dev.jsonl")
 if self.mode in ("test", "all"):
 self._test_data = load_scicode_data("/home/tl688/pitl688/selfevolve/SciCode/eval/data/subproblems_test.jsonl")

 def _get_label(self, example: Any):
 """
 For SciCode we treat the label as the full test suite plus metadata.
 """
 return {
 "task_id": example["task_id"],
 "entry_point": example["entry_point"],
 "tests": example.get("tests", []),
 "canonical_solution": example.get("canonical_solution", ""),
 "imports": example.get("imports", ""),
 }

 def _get_id(self, example: Any):
 return example["task_id"]

 # ---------- Evaluation ----------

 @staticmethod
 def _build_reference_namespace(imports: str, canonical_solution: str) -> Dict[str, Any]:
 """
 Build an execution namespace that defines the reference function.
 """
 ns: Dict[str, Any] = {"np": np, "scicode":scicode}
 if imports:
 exec(imports, ns, ns) # e.g., "import numpy as np\nfrom scipy.special import erfc"
 if canonical_solution:
 exec(canonical_solution, ns, ns)
 return ns

 @staticmethod
 def _extract_candidate_exprs_from_test(test_src: str) -> List[str]:
 """
 Heuristically extract expressions that are compared against `target` inside np.allclose(..., target)
 or equality checks like "== target" / ", target)" etc. Returns a list of python expressions (as strings)
 that we should evaluate with the *reference* implementation to generate `target`.

 This is a pragmatic parser covering the most common SciCode patterns.
 """
 exprs: List[str] = []

 # Pattern A: np.allclose( , target )
 for m in re.finditer(r"np\.allclose\s*\(\s*(?P.+?)\s*,\s*target\s*\)", test_src, flags=re.DOTALL):
 exprs.append(m.group("expr"))

 # Pattern B: assert == target
 for m in re.finditer(r"assert\s+(?P.+?)\s*==\s*target", test_src):
 exprs.append(m.group("expr"))

 # Pattern C: assert , target (when the first arg should be True)
 # In this case, target is expected to be True; no need to compute it.
 # We'll handle by leaving exprs empty and later default target=True.

 # Pattern D: Using slices like target[0], target[1] — we try to recover by
 # extracting both left-hand expressions in the same line in order:
 # np.allclose(func(...)[0], target[0]) and np.allclose(func(...)[1], target[1])
 # Already captured by Pattern A; expr may include "[0]" or "[1]".
 return exprs

 @staticmethod
 def _compute_target_list(exprs: List[str], ref_ns: Dict[str, Any]) -> Any:
 """
 Given a list of expressions (strings), evaluate them in the reference namespace.
 If multiple expressions are found, we pack them into a tuple in the same order.
 If no expression found, return True (to support tests of the form `assert , target`).
 """
 if not exprs:
 return True
 values = []
 for ex in exprs:
 # Safety: limit builtins
 local_ns: Dict[str, Any] = {}
 val = eval(ex, ref_ns, local_ns)
 values.append(val)
 if len(values) == 1:
 return values[0]
 return tuple(values)

 def _make_harness(self, task_id: str, entry_point: str, imports: str, canonical_solution: str, tests: List[str], candidate_src: str) -> str:
 """
 Construct an executable harness that:
 1) Defines imports
 2) Defines candidate implementation (prompt + candidate completion)
 3) Pre-computes `target` using the reference implementation for each test (heuristics)
 4) Executes the original test snippet with `target` bound.
 We run each test independently within the same process, stopping on first failure.
 """
 # We'll build a block that iterates tests in Python.
 # We cannot dynamically pass `target` into a raw `assert` snippet without executing it;
 # so for each test, we will:
 # a) compute target in a separate namespace using reference function,
 # b) then execute the original test with the candidate function and that target.
 # This is orchestrated by the benchmark runtime (not inside the user env).

 # NOTE: actual orchestration happens in `evaluate()` by repeated calls to `check_solution`;
 # here we only prepare the body (candidate code). The unit tests are executed by the
 # framework’s sand-boxed executor using `test` passed in.

 # We keep the candidate_src as-is. The imports are prepended at runtime via the test body.
 return candidate_src

 def handle_special_cases(self, task_id: str, solution: str, test: str) -> Tuple[str, str]:
 """
 Hook: adjust solution/test for edge cases in SciCode, if needed.
 Currently, we leave as-is and fallback to the base handler.
 """
 return super().handle_special_cases(task_id=task_id, solution=solution, test=test)

 def evaluate(self, prediction: Any, label: Any) -> dict:
 """
 Evaluate candidate solution(s) against SciCode test cases.

 Strategy:
 - For each candidate solution:
 - For each test snippet:
 1) Build reference namespace; compute `target` (heuristics).
 2) Build candidate code by concatenating example['prompt'] + candidate solution.
 3) Execute the test with `target` and candidate in the sandbox via `check_solution`.

 - Aggregate per-test pass/fail into a single boolean for the example.
 - Compute pass@k across candidates.
 """
 prediction, label = self._check_evaluation_inputs(prediction, label)

 results = []
 for solution in prediction:
 # Each `label` item corresponds to the SAME example in our usage (benchmark runs per example),
 # but we preserve the structure consistent with the base class.
 solution_states = []
 for label_data in label:
 task_id = label_data["task_id"]
 entry_point = label_data["entry_point"]
 tests = label_data.get("tests", [])
 imports = label_data.get("imports", "")
 canonical_solution = label_data.get("canonical_solution", "")

 # Build reference env for computing `target`
 ref_ns = self._build_reference_namespace(imports=imports, canonical_solution=canonical_solution)

 # Build candidate code (prompt + solution)
 prompt = self.get_example_by_id(task_id)["prompt"]
 candidate_code = prompt + "\n" + solution

 # Run each test individually; any failure => whole example fails
 all_ok = True
 for raw_test in tests if tests else ["# no tests provided\nassert True, True"]:
 # Heuristically precompute `target`
 exprs = self._extract_candidate_exprs_from_test(raw_test)
 try:
 target_value = self._compute_target_list(exprs, ref_ns)
 except Exception as e:
 # If we cannot compute target from the reference, fall back to True
 logger.warning(f"[SciCode] Fallback target=True for {task_id} due to: {e}")
 target_value = True

 # Compose a runnable unit-test block:
 # We inject `imports`, bind `target`, then execute the original test code.
 unit_test = (
 (imports or "")
 + "\n"
 + "target = __TARGET_VALUE__\n"
 + raw_test
 )

 # Because `check_solution` runs code in separate exec, we stringify the target safely.
 # We'll register a placeholder and pass the real object via the executor's globals.
 # Our base framework doesn't support direct object injection; so we serialize small types.
 # For numpy arrays/tuples we rely on repr + eval. If that fails, we degrade to boolean.
 try:
 # Light-weight serializer for numpy arrays / tuples / lists / scalars
 def _pyrepr(obj):
 if isinstance(obj, np.ndarray):
 return f"np.array({repr(obj.tolist())})"
 return repr(obj)

 unit_test = unit_test.replace(
 "__TARGET_VALUE__", _pyrepr(target_value)
 )
 except Exception:
 unit_test = unit_test.replace("__TARGET_VALUE__", "True")

 # Optional special-case patching hook
 candidate_code_patched, unit_test_patched = self.handle_special_cases(
 task_id=task_id, solution=candidate_code, test=unit_test
 )

 # Execute
 state, message = self.check_solution(
 task_id=task_id,
 solution=candidate_code_patched,
 test=unit_test_patched,
 entry_point=entry_point,
 )
 if state != self.SUCCESS:
 all_ok = False
 break

 solution_states.append(self.SUCCESS if all_ok else self.FAILURE)
 results.append(len(solution_states) == len(label) and all(s == self.SUCCESS for s in solution_states))

 k_list = [self.k] if isinstance(self.k, int) else self.k
 pass_at_k = self.compute_pass_at_k(results, k_list)
 return pass_at_k


class AFlowSciCode(SciCode):
 """
 AFlow-specific implementation of SciCode benchmark.
 Uses AFLOW_DATASET_FILES_MAP['scicode'] for split files (if provided by your distribution).
 """

 def __init__(self, path: str = None, mode: str = "all", timeout: int = 60, k: Union[int, list] = 1, **kwargs):
 path = os.path.expanduser(path or "~/.evoagentx/data/aflow/scicode")
 super().__init__(path=path, mode=mode, timeout=timeout, k=k, **kwargs)

 def _load_data_from_file(self, file_name: str):
 if file_name is None:
 return None
 file_path = os.path.join(self.path, file_name)
 if not os.path.exists(file_path):
 logger.info("Downloading AFlow SciCode split files ...")
 download_aflow_benchmark_data(dataset="scicode", save_folder=self.path)
 return load_json(path=file_path, type="jsonl")

 def _load_data(self):
 # Prefer AFLOW split files when available; otherwise fall back to raw download.
 if "scicode" not in AFLOW_DATASET_FILES_MAP:
 logger.warning("AFLOW_DATASET_FILES_MAP has no entry for 'scicode'; falling back to raw SciCode jsonl.")
 return super()._load_data()

 splits = AFLOW_DATASET_FILES_MAP["scicode"]
 data_all: Dict[str, List[Dict[str, Any]]] = {}

 for split in ("train", "dev", "test"):
 fname = splits.get(split)
 if fname:
 logger.info(f"Loading {split} data from {fname}")
 raw_split = self._load_data_from_file(file_name=fname)
 # Normalize rows to examples
 examples: List[Dict[str, Any]] = []
 for row in raw_split or []:
 try:
 examples.extend(_coerce_scicode_row_to_examples(row))
 except Exception as e:
 logger.warning(f"[AFlowSciCode] Skipping a malformed row in {split} due to: {e}")
 data_all[split] = examples
 else:
 data_all[split] = None

 if self.mode in ("train", "all"):
 self._train_data = data_all.get("train")
 if self.mode in ("dev", "all"):
 self._dev_data = data_all.get("dev")
 if self.mode in ("test", "all"):
 self._test_data = data_all.get("test")

 async def async_evaluate(self, graph: Callable, example: Any) -> float:
 """
 Generate a solution asynchronously and return pass@1 for the example.
 """
 prompt, entry_point = example["prompt"], example["entry_point"]
 solution = await graph(prompt, entry_point)
 label = self._get_label(example)
 metrics = await super().async_evaluate(prediction=solution, label=label)
 return metrics.get("pass@1", 0.0)


ImportError: attempted relative import with no known parent package