| """ |
| OpenEvolve Native Database — island-based MAP-Elites search for SkyDiscover. |
| |
| Faithful port of the OpenEvolve search algorithm (MAP-Elites with island-based |
| population model). All core search logic — sampling, MAP-Elites grid, |
| archive management, migration — mirrors the reference implementation at |
| ``openevolve/database.py``. |
| |
| SkyDiscover adaptations (minimum necessary): |
| - ``sample()`` returns ``(Program, List[Program])`` — the framework's |
| ``DiscoveryController`` normalises both plain and dict-wrapped returns. |
| - Uses ``Program.solution`` (SkyDiscover's field name). |
| - Island rotation lives in ``sample()``, generation increment + migration |
| check live in ``add()``, because SkyDiscover's ``DiscoveryController`` does |
| not call these as separate steps. |
| |
| Named ``openevolve_native`` (not ``openevolve``) to avoid any confusion with |
| the external ``openevolve`` package. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import random |
| import time |
| import uuid |
| from typing import Any, Dict, List, Optional, Set, Tuple |
|
|
| from skydiscover.config import DatabaseConfig |
| from skydiscover.search.base_database import Program, ProgramDatabase |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _safe_numeric_average(metrics: Dict[str, Any]) -> float: |
| """Average of numeric metric values, ignoring non-numeric entries.""" |
| if not metrics: |
| return 0.0 |
| numeric_values = [] |
| for value in metrics.values(): |
| if isinstance(value, (int, float)): |
| try: |
| fv = float(value) |
| if fv == fv: |
| numeric_values.append(fv) |
| except (ValueError, TypeError, OverflowError): |
| continue |
| return sum(numeric_values) / len(numeric_values) if numeric_values else 0.0 |
|
|
|
|
| def _get_fitness( |
| metrics: Dict[str, Any], |
| feature_dimensions: List[str] = (), |
| ) -> float: |
| """Fitness score, preferring ``combined_score`` and excluding feature dims.""" |
| if not metrics: |
| return 0.0 |
| if "combined_score" in metrics: |
| try: |
| return float(metrics["combined_score"]) |
| except (ValueError, TypeError): |
| pass |
|
|
| feature_dims = set(feature_dimensions) if feature_dimensions else set() |
| fitness_metrics: Dict[str, float] = {} |
| for key, value in metrics.items(): |
| if key not in feature_dims and isinstance(value, (int, float)): |
| try: |
| fv = float(value) |
| if fv == fv: |
| fitness_metrics[key] = fv |
| except (ValueError, TypeError, OverflowError): |
| continue |
|
|
| if not fitness_metrics: |
| return _safe_numeric_average(metrics) |
| return _safe_numeric_average(fitness_metrics) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class OpenEvolveNativeDatabase(ProgramDatabase): |
| """Island-based MAP-Elites database — native port of OpenEvolve.""" |
|
|
| |
| |
| |
|
|
| def __init__(self, name: str, config: DatabaseConfig, **kwargs: Any): |
| |
| self.num_islands: int = getattr(config, "num_islands", 5) |
| self.population_size: int = getattr(config, "population_size", 40) |
| self.archive_size: int = getattr(config, "archive_size", 100) |
| self.exploration_ratio: float = getattr(config, "exploration_ratio", 0.2) |
| self.exploitation_ratio: float = getattr(config, "exploitation_ratio", 0.7) |
| self.elite_selection_ratio: float = getattr(config, "elite_selection_ratio", 0.1) |
| self.feature_dimensions: List[str] = list( |
| getattr(config, "feature_dimensions", ["complexity", "diversity"]) |
| ) |
|
|
| raw_bins = getattr(config, "feature_bins", 10) |
| if isinstance(raw_bins, int): |
| self.feature_bins: int = max( |
| raw_bins, |
| int(pow(self.archive_size, 1 / max(len(self.feature_dimensions), 1)) + 0.99), |
| ) |
| else: |
| self.feature_bins = 10 |
|
|
| if isinstance(raw_bins, dict): |
| self.feature_bins_per_dim: Dict[str, int] = raw_bins |
| else: |
| self.feature_bins_per_dim = {d: self.feature_bins for d in self.feature_dimensions} |
|
|
| self.diversity_reference_size: int = getattr(config, "diversity_reference_size", 20) |
| self.migration_interval: int = getattr(config, "migration_interval", 10) |
| self.migration_rate: float = getattr(config, "migration_rate", 0.1) |
|
|
| |
| |
| self.islands: List[Set[str]] = [set() for _ in range(self.num_islands)] |
| self.island_feature_maps: List[Dict[str, str]] = [{} for _ in range(self.num_islands)] |
| self.island_best_programs: List[Optional[str]] = [None] * self.num_islands |
| self.island_generations: List[int] = [0] * self.num_islands |
| self.current_island: int = 0 |
| self.last_migration_generation: int = 0 |
|
|
| |
| self.archive: Set[str] = set() |
|
|
| |
| self.feature_stats: Dict[str, Dict[str, Any]] = {} |
|
|
| |
| self.diversity_cache: Dict[int, Dict[str, float]] = {} |
| self.diversity_cache_size: int = 1000 |
| self.diversity_reference_set: List[str] = [] |
|
|
| |
| super().__init__(name, config, **kwargs) |
|
|
| |
| random_seed = getattr(config, "random_seed", None) |
| if random_seed is not None: |
| random.seed(random_seed) |
|
|
| logger.info( |
| "OpenEvolveNativeDatabase: %d islands, pop=%d, archive=%d, " |
| "features=%s, bins=%d, migration_interval=%d, migration_rate=%.2f", |
| self.num_islands, |
| self.population_size, |
| self.archive_size, |
| self.feature_dimensions, |
| self.feature_bins, |
| self.migration_interval, |
| self.migration_rate, |
| ) |
|
|
| |
| |
| |
|
|
| def sample( |
| self, |
| num_context_programs: Optional[int] = 4, |
| **kwargs: Any, |
| ) -> Tuple[Program, List[Program]]: |
| if not self.programs: |
| raise ValueError("Cannot sample: no programs in database") |
|
|
| parent = self._sample_parent() |
| if num_context_programs is None: |
| num_context_programs = 4 |
| other_context_programs = self._sample_other_context_programs(parent, n=num_context_programs) |
|
|
| logger.debug( |
| "Sampled parent %s from island %d, %d other context programs", |
| parent.id, |
| self.current_island, |
| len(other_context_programs), |
| ) |
|
|
| |
| |
| |
| self.current_island = (self.current_island + 1) % self.num_islands |
|
|
| return parent, other_context_programs |
|
|
| |
| |
| |
|
|
| def add( |
| self, |
| program: Program, |
| iteration: Optional[int] = None, |
| **kwargs: Any, |
| ) -> str: |
| target_island: Optional[int] = kwargs.get("target_island") |
| is_migration: bool = kwargs.get("_is_migration", False) |
|
|
| if iteration is not None: |
| program.iteration_found = iteration |
| self.last_iteration = max(self.last_iteration, iteration) |
|
|
| self.programs[program.id] = program |
|
|
| |
| feature_coords = self._calculate_feature_coords(program) |
|
|
| |
| if target_island is not None: |
| island_idx = target_island |
| elif program.parent_id: |
| parent = self.programs.get(program.parent_id) |
| if parent and "island" in parent.metadata: |
| island_idx = parent.metadata["island"] |
| else: |
| island_idx = self.current_island |
| else: |
| island_idx = self.current_island |
| island_idx = island_idx % self.num_islands |
|
|
| |
| feature_key = self._feature_coords_to_key(feature_coords) |
| island_feature_map = self.island_feature_maps[island_idx] |
|
|
| should_replace = feature_key not in island_feature_map |
| if not should_replace: |
| existing_id = island_feature_map[feature_key] |
| if existing_id not in self.programs: |
| should_replace = True |
| else: |
| should_replace = self._is_better(program, self.programs[existing_id]) |
|
|
| if should_replace: |
| if feature_key in island_feature_map: |
| existing_id = island_feature_map[feature_key] |
| if existing_id in self.programs: |
| |
| if existing_id in self.archive: |
| self.archive.discard(existing_id) |
| self.archive.add(program.id) |
| |
| self.islands[island_idx].discard(existing_id) |
| island_feature_map[feature_key] = program.id |
|
|
| |
| self.islands[island_idx].add(program.id) |
| program.metadata["island"] = island_idx |
|
|
| |
| self._update_archive(program) |
| self._enforce_population_limit(exclude_program_id=program.id) |
| self._update_best_program(program) |
| self._update_island_best_program(program, island_idx) |
|
|
| |
| if self.config.db_path: |
| self._save_program(program) |
|
|
| |
| |
| |
| if not is_migration: |
| self.island_generations[island_idx] += 1 |
| if self._should_migrate(): |
| self._migrate_programs() |
|
|
| logger.debug("Added program %s to island %d", program.id, island_idx) |
| return program.id |
|
|
| |
| |
| |
|
|
| def _sample_parent(self) -> Program: |
| rand_val = random.random() |
| if rand_val < self.exploration_ratio: |
| return self._sample_exploration_parent() |
| elif rand_val < self.exploration_ratio + self.exploitation_ratio: |
| return self._sample_exploitation_parent() |
| else: |
| return self._sample_random_parent() |
|
|
| def _sample_exploration_parent(self) -> Program: |
| """Random program from current island (diverse sampling).""" |
| island_programs = self.islands[self.current_island] |
|
|
| if not island_programs: |
| return self._seed_empty_island(self.current_island) |
|
|
| valid = [pid for pid in island_programs if pid in self.programs] |
| |
| if len(valid) < len(island_programs): |
| for stale in island_programs - set(valid): |
| self.islands[self.current_island].discard(stale) |
|
|
| if not valid: |
| return self._seed_empty_island(self.current_island) |
|
|
| return self.programs[random.choice(valid)] |
|
|
| def _sample_exploitation_parent(self) -> Program: |
| """Elite program from archive, preferring current island.""" |
| if not self.archive: |
| return self._sample_exploration_parent() |
|
|
| valid_archive = [pid for pid in self.archive if pid in self.programs] |
| |
| if len(valid_archive) < len(self.archive): |
| for stale in self.archive - set(valid_archive): |
| self.archive.discard(stale) |
|
|
| if not valid_archive: |
| return self._sample_exploration_parent() |
|
|
| |
| in_island = [ |
| pid |
| for pid in valid_archive |
| if self.programs[pid].metadata.get("island") == self.current_island |
| ] |
|
|
| if in_island: |
| return self.programs[random.choice(in_island)] |
| return self.programs[random.choice(valid_archive)] |
|
|
| def _sample_random_parent(self) -> Program: |
| """Uniformly random program from entire population.""" |
| return self.programs[random.choice(list(self.programs.keys()))] |
|
|
| def _seed_empty_island(self, island_idx: int) -> Program: |
| """Seed an empty island with a copy of the best program.""" |
| if self.best_program_id and self.best_program_id in self.programs: |
| best = self.programs[self.best_program_id] |
| copy = Program( |
| id=str(uuid.uuid4()), |
| solution=best.solution, |
| language=best.language, |
| parent_id=best.id, |
| generation=best.generation, |
| metrics=best.metrics.copy(), |
| metadata={"island": island_idx}, |
| iteration_found=self.last_iteration, |
| ) |
| self.programs[copy.id] = copy |
| self.islands[island_idx].add(copy.id) |
| return copy |
| return next(iter(self.programs.values())) |
|
|
| |
| |
| |
|
|
| def _sample_other_context_programs(self, parent: Program, n: int = 4) -> List[Program]: |
| """Sample other context programs from parent's island. |
| |
| Strategy (matching OpenEvolve): |
| 1. Island best (if different from parent) |
| 2. Top elite programs from island |
| 3. Programs from nearby MAP-Elites cells (±2 perturbation) |
| 4. Random fill from island |
| """ |
| parent_island = parent.metadata.get("island", self.current_island) |
| island_program_ids = list(self.islands[parent_island]) |
| island_programs = [self.programs[pid] for pid in island_program_ids if pid in self.programs] |
|
|
| if not island_programs: |
| return [] |
|
|
| other_context_programs: List[Program] = [] |
| used_ids: set = {parent.id} |
|
|
| |
| island_best_id = self.island_best_programs[parent_island] |
| if ( |
| island_best_id is not None |
| and island_best_id != parent.id |
| and island_best_id in self.programs |
| ): |
| other_context_programs.append(self.programs[island_best_id]) |
| used_ids.add(island_best_id) |
| elif island_best_id is not None and island_best_id not in self.programs: |
| self.island_best_programs[parent_island] = None |
|
|
| |
| top_n = max(1, int(n * self.elite_selection_ratio)) |
| top_island = sorted( |
| island_programs, |
| key=lambda p: _get_fitness(p.metrics, self.feature_dimensions), |
| reverse=True, |
| )[:top_n] |
| for prog in top_island: |
| if prog.id not in used_ids: |
| other_context_programs.append(prog) |
| used_ids.add(prog.id) |
|
|
| |
| if len(island_programs) > n and len(other_context_programs) < n: |
| remaining_slots = n - len(other_context_programs) |
| feature_coords = self._calculate_feature_coords(parent) |
|
|
| |
| cell_map: Dict[str, str] = {} |
| for pid in island_program_ids: |
| if pid in self.programs: |
| coords = self._calculate_feature_coords(self.programs[pid]) |
| cell_map[self._feature_coords_to_key(coords)] = pid |
|
|
| nearby: List[Program] = [] |
| for _ in range(remaining_slots * 3): |
| perturbed = [ |
| max(0, min(self.feature_bins - 1, c + random.randint(-2, 2))) |
| for c in feature_coords |
| ] |
| key = self._feature_coords_to_key(perturbed) |
| if key in cell_map: |
| pid = cell_map[key] |
| if ( |
| pid not in used_ids |
| and pid not in {p.id for p in nearby} |
| and pid in self.programs |
| ): |
| nearby.append(self.programs[pid]) |
| if len(nearby) >= remaining_slots: |
| break |
|
|
| |
| if len(other_context_programs) + len(nearby) < n: |
| remaining = n - len(other_context_programs) - len(nearby) |
| all_used = used_ids | {p.id for p in nearby} |
| available = [ |
| pid |
| for pid in island_program_ids |
| if pid not in all_used and pid in self.programs |
| ] |
| if available: |
| sampled = random.sample(available, min(remaining, len(available))) |
| nearby.extend(self.programs[pid] for pid in sampled) |
|
|
| other_context_programs.extend(nearby) |
|
|
| return other_context_programs[:n] |
|
|
| |
| |
| |
|
|
| def _calculate_feature_coords(self, program: Program) -> List[int]: |
| coords: List[int] = [] |
| for dim in self.feature_dimensions: |
| |
| if dim in program.metrics: |
| coords.append(self._to_bin(dim, program.metrics[dim])) |
| |
| elif dim == "complexity": |
| coords.append(self._to_bin("complexity", float(len(program.solution)))) |
| elif dim == "diversity": |
| if len(self.programs) < 2: |
| coords.append(0) |
| else: |
| coords.append(self._to_bin("diversity", self._get_cached_diversity(program))) |
| elif dim == "score": |
| if not program.metrics: |
| coords.append(0) |
| else: |
| coords.append( |
| self._to_bin( |
| "score", |
| _get_fitness(program.metrics, self.feature_dimensions), |
| ) |
| ) |
| else: |
| raise ValueError( |
| f"Feature dimension '{dim}' not found in program metrics. " |
| f"Available metrics: {list(program.metrics.keys())}. " |
| f"Built-in features: 'complexity', 'diversity', 'score'." |
| ) |
| return coords |
|
|
| def _to_bin(self, dim: str, value: float) -> int: |
| """Update running stats, min-max scale, and return bin index.""" |
| self._update_feature_stats(dim, value) |
| scaled = self._scale_feature_value(dim, value) |
| num_bins = self.feature_bins_per_dim.get(dim, self.feature_bins) |
| return max(0, min(num_bins - 1, int(scaled * num_bins))) |
|
|
| @staticmethod |
| def _feature_coords_to_key(coords: List[int]) -> str: |
| return "-".join(str(c) for c in coords) |
|
|
| |
| |
| |
|
|
| def _update_feature_stats(self, feature_name: str, value: float) -> None: |
| if feature_name not in self.feature_stats: |
| self.feature_stats[feature_name] = { |
| "min": value, |
| "max": value, |
| "values": [], |
| } |
| stats = self.feature_stats[feature_name] |
| stats["min"] = min(stats["min"], value) |
| stats["max"] = max(stats["max"], value) |
| stats["values"].append(value) |
| if len(stats["values"]) > 1000: |
| stats["values"] = stats["values"][-1000:] |
|
|
| def _scale_feature_value(self, feature_name: str, value: float) -> float: |
| if feature_name not in self.feature_stats: |
| return min(1.0, max(0.0, value)) |
| stats = self.feature_stats[feature_name] |
| min_val, max_val = stats["min"], stats["max"] |
| if max_val == min_val: |
| return 0.5 |
| return min(1.0, max(0.0, (value - min_val) / (max_val - min_val))) |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _fast_code_diversity(code1: str, code2: str) -> float: |
| if code1 == code2: |
| return 0.0 |
| length_diff = abs(len(code1) - len(code2)) |
| line_diff = abs(code1.count("\n") - code2.count("\n")) |
| char_diff = len(set(code1).symmetric_difference(set(code2))) |
| return length_diff * 0.1 + line_diff * 10 + char_diff * 0.5 |
|
|
| def _get_cached_diversity(self, program: Program) -> float: |
| code_hash = hash(program.solution) |
|
|
| if code_hash in self.diversity_cache: |
| return self.diversity_cache[code_hash]["value"] |
|
|
| if ( |
| not self.diversity_reference_set |
| or len(self.diversity_reference_set) < self.diversity_reference_size |
| ): |
| self._update_diversity_reference_set() |
|
|
| scores = [ |
| self._fast_code_diversity(program.solution, ref) |
| for ref in self.diversity_reference_set |
| if ref != program.solution |
| ] |
| diversity = sum(scores) / max(1, len(scores)) if scores else 0.0 |
|
|
| |
| if len(self.diversity_cache) >= self.diversity_cache_size: |
| oldest = min(self.diversity_cache, key=lambda h: self.diversity_cache[h]["timestamp"]) |
| del self.diversity_cache[oldest] |
|
|
| self.diversity_cache[code_hash] = { |
| "value": diversity, |
| "timestamp": time.time(), |
| } |
| return diversity |
|
|
| def _update_diversity_reference_set(self) -> None: |
| if not self.programs: |
| return |
| all_progs = list(self.programs.values()) |
|
|
| if len(all_progs) <= self.diversity_reference_size: |
| self.diversity_reference_set = [p.solution for p in all_progs] |
| return |
|
|
| |
| selected: List[Program] = [] |
| remaining = all_progs.copy() |
|
|
| selected.append(remaining.pop(random.randint(0, len(remaining) - 1))) |
|
|
| while len(selected) < self.diversity_reference_size and remaining: |
| best_idx, best_div = -1, -1.0 |
| for i, cand in enumerate(remaining): |
| min_d = min(self._fast_code_diversity(cand.solution, s.solution) for s in selected) |
| if min_d > best_div: |
| best_div = min_d |
| best_idx = i |
| if best_idx >= 0: |
| selected.append(remaining.pop(best_idx)) |
|
|
| self.diversity_reference_set = [p.solution for p in selected] |
|
|
| |
| |
| |
|
|
| def _is_better(self, program1: Program, program2: Program) -> bool: |
| if not program1.metrics and not program2.metrics: |
| return program1.timestamp > program2.timestamp |
| if program1.metrics and not program2.metrics: |
| return True |
| if not program1.metrics and program2.metrics: |
| return False |
| return _get_fitness(program1.metrics, self.feature_dimensions) > _get_fitness( |
| program2.metrics, self.feature_dimensions |
| ) |
|
|
| |
| |
| |
|
|
| def _update_archive(self, program: Program) -> None: |
| if len(self.archive) < self.archive_size: |
| self.archive.add(program.id) |
| return |
|
|
| |
| valid = [] |
| for pid in list(self.archive): |
| if pid in self.programs: |
| valid.append(self.programs[pid]) |
| else: |
| self.archive.discard(pid) |
|
|
| if len(self.archive) < self.archive_size: |
| self.archive.add(program.id) |
| return |
|
|
| |
| if valid: |
| worst = min( |
| valid, |
| key=lambda p: _get_fitness(p.metrics, self.feature_dimensions), |
| ) |
| if self._is_better(program, worst): |
| self.archive.discard(worst.id) |
| self.archive.add(program.id) |
| else: |
| self.archive.add(program.id) |
|
|
| |
| |
| |
|
|
| def _update_best_program(self, program: Program) -> None: |
| if self.best_program_id is None: |
| self.best_program_id = program.id |
| return |
| if self.best_program_id not in self.programs: |
| self.best_program_id = program.id |
| return |
| current_best = self.programs[self.best_program_id] |
| if self._is_better(program, current_best): |
| old_id = self.best_program_id |
| self.best_program_id = program.id |
| if "combined_score" in program.metrics and "combined_score" in current_best.metrics: |
| logger.info( |
| "New best program %s replaces %s (%.4f -> %.4f)", |
| program.id, |
| old_id, |
| current_best.metrics["combined_score"], |
| program.metrics["combined_score"], |
| ) |
|
|
| def _update_island_best_program(self, program: Program, island_idx: int) -> None: |
| if island_idx >= len(self.island_best_programs): |
| return |
| cur_id = self.island_best_programs[island_idx] |
| if cur_id is None or cur_id not in self.programs: |
| self.island_best_programs[island_idx] = program.id |
| return |
| if self._is_better(program, self.programs[cur_id]): |
| self.island_best_programs[island_idx] = program.id |
|
|
| |
| |
| |
|
|
| def _enforce_population_limit(self, exclude_program_id: Optional[str] = None) -> None: |
| if len(self.programs) <= self.population_size: |
| return |
|
|
| num_to_remove = len(self.programs) - self.population_size |
| sorted_progs = sorted( |
| self.programs.values(), |
| key=lambda p: _get_fitness(p.metrics, self.feature_dimensions), |
| ) |
|
|
| protected = {self.best_program_id, exclude_program_id} - {None} |
| to_remove: List[Program] = [] |
| for prog in sorted_progs: |
| if len(to_remove) >= num_to_remove: |
| break |
| if prog.id not in protected: |
| to_remove.append(prog) |
|
|
| for prog in to_remove: |
| pid = prog.id |
| self.programs.pop(pid, None) |
|
|
| for imap in self.island_feature_maps: |
| for k in [k for k, v in imap.items() if v == pid]: |
| del imap[k] |
|
|
| for island in self.islands: |
| island.discard(pid) |
|
|
| self.archive.discard(pid) |
|
|
| self._cleanup_stale_island_bests() |
| logger.info( |
| "Population limit: removed %d, now %d", |
| len(to_remove), |
| len(self.programs), |
| ) |
|
|
| def _cleanup_stale_island_bests(self) -> None: |
| for i, best_id in enumerate(self.island_best_programs): |
| if best_id is None: |
| continue |
| if best_id not in self.programs or best_id not in self.islands[i]: |
| self.island_best_programs[i] = None |
| |
| progs = [self.programs[pid] for pid in self.islands[i] if pid in self.programs] |
| if progs: |
| self.island_best_programs[i] = max( |
| progs, |
| key=lambda p: _get_fitness(p.metrics, self.feature_dimensions), |
| ).id |
|
|
| |
| |
| |
|
|
| def _should_migrate(self) -> bool: |
| if self.num_islands < 2: |
| return False |
| return ( |
| max(self.island_generations) - self.last_migration_generation |
| ) >= self.migration_interval |
|
|
| def _migrate_programs(self) -> None: |
| if self.num_islands < 2: |
| return |
| logger.info("Performing migration between islands") |
|
|
| for i, island in enumerate(self.islands): |
| if not island: |
| continue |
| island_progs = [self.programs[pid] for pid in island if pid in self.programs] |
| if not island_progs: |
| continue |
|
|
| island_progs.sort( |
| key=lambda p: _get_fitness(p.metrics, self.feature_dimensions), |
| reverse=True, |
| ) |
| num_migrants = max(1, int(len(island_progs) * self.migration_rate)) |
| migrants = island_progs[:num_migrants] |
|
|
| targets = [ |
| (i + 1) % self.num_islands, |
| (i - 1) % self.num_islands, |
| ] |
|
|
| for migrant in migrants: |
| |
| |
| if migrant.metadata.get("migrant", False): |
| continue |
|
|
| for target in targets: |
| |
| target_progs = [ |
| self.programs[pid] for pid in self.islands[target] if pid in self.programs |
| ] |
| if any(p.solution == migrant.solution for p in target_progs): |
| continue |
|
|
| copy = Program( |
| id=str(uuid.uuid4()), |
| solution=migrant.solution, |
| language=migrant.language, |
| parent_id=migrant.id, |
| generation=migrant.generation, |
| metrics=migrant.metrics.copy(), |
| metadata={ |
| **migrant.metadata, |
| "island": target, |
| "migrant": True, |
| }, |
| ) |
| self.add( |
| copy, |
| target_island=target, |
| _is_migration=True, |
| ) |
|
|
| self.last_migration_generation = max(self.island_generations) |
| logger.info( |
| "Migration completed at generation %d", |
| self.last_migration_generation, |
| ) |
|
|
| |
| |
| |
|
|
| def save(self, path: Optional[str] = None, iteration: int = 0) -> None: |
| super().save(path=path, iteration=iteration) |
|
|
| save_path = path or getattr(self.config, "db_path", None) |
| if not save_path: |
| return |
|
|
| meta = { |
| "island_feature_maps": self.island_feature_maps, |
| "islands": [list(s) for s in self.islands], |
| "archive": list(self.archive), |
| "island_best_programs": self.island_best_programs, |
| "current_island": self.current_island, |
| "island_generations": self.island_generations, |
| "last_migration_generation": self.last_migration_generation, |
| "feature_stats": self._serialize_feature_stats(), |
| } |
| os.makedirs(save_path, exist_ok=True) |
| with open(os.path.join(save_path, "openevolve_native_metadata.json"), "w") as f: |
| json.dump(meta, f) |
|
|
| def load(self, path: str) -> None: |
| super().load(path) |
|
|
| meta_path = os.path.join(path, "openevolve_native_metadata.json") |
| if not os.path.exists(meta_path): |
| logger.warning( |
| "No openevolve_native_metadata.json found; distributing " "programs round-robin" |
| ) |
| self._distribute_programs_to_islands() |
| return |
|
|
| with open(meta_path, "r") as f: |
| meta = json.load(f) |
|
|
| self.island_feature_maps = meta.get( |
| "island_feature_maps", [{} for _ in range(self.num_islands)] |
| ) |
| saved_islands = meta.get("islands", []) |
| self.archive = set(meta.get("archive", [])) |
| self.island_best_programs = meta.get("island_best_programs", [None] * self.num_islands) |
| self.current_island = meta.get("current_island", 0) |
| self.island_generations = meta.get("island_generations", [0] * self.num_islands) |
| self.last_migration_generation = meta.get("last_migration_generation", 0) |
| self.feature_stats = self._deserialize_feature_stats(meta.get("feature_stats", {})) |
|
|
| self._reconstruct_islands(saved_islands) |
| self._log_island_status() |
|
|
| |
| |
| |
|
|
| def _reconstruct_islands(self, saved_islands: List[List[str]]) -> None: |
| num_islands = max(len(saved_islands), self.num_islands) |
| self.islands = [set() for _ in range(num_islands)] |
|
|
| for island_idx, program_ids in enumerate(saved_islands): |
| if island_idx >= len(self.islands): |
| continue |
| for pid in program_ids: |
| if pid in self.programs: |
| self.islands[island_idx].add(pid) |
| self.programs[pid].metadata["island"] = island_idx |
|
|
| |
| self.archive = {pid for pid in self.archive if pid in self.programs} |
| for imap in self.island_feature_maps: |
| for k in [k for k, pid in imap.items() if pid not in self.programs]: |
| del imap[k] |
|
|
| self._cleanup_stale_island_bests() |
|
|
| if self.best_program_id and self.best_program_id not in self.programs: |
| self.best_program_id = None |
|
|
| |
| if self.programs and sum(len(s) for s in self.islands) == 0: |
| self._distribute_programs_to_islands() |
|
|
| |
| while len(self.island_generations) < len(self.islands): |
| self.island_generations.append(0) |
| while len(self.island_best_programs) < len(self.islands): |
| self.island_best_programs.append(None) |
|
|
| def _distribute_programs_to_islands(self) -> None: |
| for i, pid in enumerate(self.programs): |
| idx = i % self.num_islands |
| self.islands[idx].add(pid) |
| self.programs[pid].metadata["island"] = idx |
|
|
| |
| |
| |
|
|
| def _serialize_feature_stats(self) -> Dict[str, Any]: |
| out: Dict[str, Any] = {} |
| for name, stats in self.feature_stats.items(): |
| s: Dict[str, Any] = {} |
| for k, v in stats.items(): |
| if k == "values": |
| s[k] = v[-100:] if isinstance(v, list) and len(v) > 100 else v |
| elif hasattr(v, "item"): |
| s[k] = v.item() |
| else: |
| s[k] = v |
| out[name] = s |
| return out |
|
|
| def _deserialize_feature_stats(self, raw: Dict[str, Any]) -> Dict[str, Any]: |
| if not raw: |
| return {} |
| out: Dict[str, Any] = {} |
| for name, stats in raw.items(): |
| if isinstance(stats, dict): |
| out[name] = { |
| "min": float(stats.get("min", 0.0)), |
| "max": float(stats.get("max", 1.0)), |
| "values": list(stats.get("values", [])), |
| } |
| return out |
|
|
| |
| |
| |
|
|
| def _log_island_status(self) -> None: |
| for i, island in enumerate(self.islands): |
| progs = [self.programs[pid] for pid in island if pid in self.programs] |
| if progs: |
| scores = [_get_fitness(p.metrics, self.feature_dimensions) for p in progs] |
| best, avg = max(scores), sum(scores) / len(scores) |
| else: |
| best = avg = 0.0 |
| cells = len(self.island_feature_maps[i]) if i < len(self.island_feature_maps) else 0 |
| gen = self.island_generations[i] if i < len(self.island_generations) else 0 |
| logger.info( |
| "Island %d: %d programs, %d cells, gen=%d, best=%.4f, avg=%.4f%s", |
| i, |
| len(progs), |
| cells, |
| gen, |
| best, |
| avg, |
| " [current]" if i == self.current_island else "", |
| ) |
|
|