| """ |
| Input preparation utilities for discovery runs. |
| |
| Handles materializing user-provided programs and evaluators (file paths, |
| inline strings, callables) into concrete file paths on disk, plus cleanup |
| of any temporary files created in the process. |
| """ |
|
|
| import logging |
| import os |
| import shutil |
| import tempfile |
| import uuid |
| from pathlib import Path |
| from typing import Any, Callable, Dict, List, Optional, Union |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def prepare_program( |
| initial_program: Union[str, Path, List[str]], |
| temp_dir: Optional[str], |
| temp_files: List[str], |
| ) -> str: |
| """Resolve initial_program to a file path, writing a temp file if needed.""" |
| if isinstance(initial_program, (str, Path)) and os.path.exists(str(initial_program)): |
| return str(initial_program) |
|
|
| solution = ( |
| "\n".join(initial_program) if isinstance(initial_program, list) else str(initial_program) |
| ) |
|
|
| if "EVOLVE-BLOCK-START" not in solution: |
| solution = f"# EVOLVE-BLOCK-START\n{solution}\n# EVOLVE-BLOCK-END" |
|
|
| if temp_dir is None: |
| temp_dir = tempfile.gettempdir() |
|
|
| program_file = os.path.join(temp_dir, f"program_{uuid.uuid4().hex[:8]}.py") |
| with open(program_file, "w") as fh: |
| fh.write(solution) |
| temp_files.append(program_file) |
| return program_file |
|
|
|
|
| def prepare_evaluator( |
| evaluator: Union[str, Path, Callable], |
| temp_dir: Optional[str], |
| temp_files: List[str], |
| caller_module_name: str = "skydiscover.api", |
| ) -> str: |
| """Resolve evaluator to a file path, writing a temp file if needed. |
| |
| When *evaluator* is a callable, it is registered in the caller module's |
| globals so the generated wrapper script can import it at runtime. |
| ``caller_module_name`` must match the module whose globals hold the callable. |
| """ |
| if isinstance(evaluator, (str, Path)) and os.path.exists(str(evaluator)): |
| return str(evaluator) |
|
|
| if callable(evaluator): |
| import sys |
|
|
| caller_module = sys.modules.get(caller_module_name) |
| evaluator_id = f"_skydiscover_evaluator_{uuid.uuid4().hex[:8]}" |
| if caller_module is not None: |
| setattr(caller_module, evaluator_id, evaluator) |
| evaluator_code = ( |
| f"import {caller_module_name} as _api\n\n" |
| f"def evaluate(program_path):\n" |
| f" return getattr(_api, '{evaluator_id}')(program_path)\n" |
| ) |
| else: |
| evaluator_code = str(evaluator) |
| if "def evaluate" not in evaluator_code: |
| raise ValueError("Evaluator code must contain a 'def evaluate(program_path)' function") |
|
|
| if temp_dir is None: |
| temp_dir = tempfile.gettempdir() |
|
|
| eval_file = os.path.join(temp_dir, f"evaluator_{uuid.uuid4().hex[:8]}.py") |
| with open(eval_file, "w") as fh: |
| fh.write(evaluator_code) |
| temp_files.append(eval_file) |
| return eval_file |
|
|
|
|
| def cleanup_temp(temp_files: List[str], temp_dir: Optional[str]) -> None: |
| """Best-effort removal of temporary files and directories.""" |
| for path in temp_files: |
| try: |
| os.unlink(path) |
| except OSError as exc: |
| logger.warning("Failed to delete temp file %s: %s", path, exc) |
| if temp_dir and os.path.exists(temp_dir): |
| try: |
| shutil.rmtree(temp_dir) |
| except OSError as exc: |
| logger.warning("Failed to delete temp directory %s: %s", temp_dir, exc) |
|
|