| """ |
| Core Bio-ACDC optimization loop. |
| Manages the coevolution of biological LM populations and sequence tasks. |
| """ |
|
|
| import os |
| import json |
| import logging |
| import numpy as np |
| import torch |
| from typing import List, Dict, Optional, Tuple, Any |
| from dataclasses import dataclass, field |
| from pathlib import Path |
|
|
| from .archive import BioArchive, BioSolution |
| from .tasks import BioTaskPool |
| from .evaluator import BioEvaluator |
| from .mergers import BioModelMerger |
| from .mutators import BioMutator |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class BioACDCConfig: |
| """Configuration for Bio-ACDC optimization.""" |
| |
| |
| archive_size: int = 50 |
| num_generations: int = 100 |
| offspring_per_gen: int = 10 |
| |
| |
| seed_model_paths: List[str] = field(default_factory=list) |
| base_model_path: Optional[str] = None |
| |
| |
| num_tasks_per_eval: int = 20 |
| task_difficulty_threshold: float = 0.5 |
| |
| |
| use_mutation: bool = True |
| mutation_rate: float = 0.3 |
| |
| |
| eval_batch_size: int = 8 |
| max_sequence_length: int = 1024 |
| |
| |
| output_dir: str = "./bio_acdc_output" |
| save_frequency: int = 5 |
| |
| |
| device: str = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
| class BioACDC: |
| """ |
| Main Bio-ACDC optimization controller. |
| |
| Coevolves a population of biological language models with synthetic |
| sequence tasks to discover specialized experts. |
| """ |
| |
| def __init__( |
| self, |
| config: BioACDCConfig, |
| task_pool: BioTaskPool, |
| evaluator: BioEvaluator, |
| merger: BioModelMerger, |
| mutator: Optional[BioMutator] = None, |
| ): |
| self.config = config |
| self.task_pool = task_pool |
| self.evaluator = evaluator |
| self.merger = merger |
| self.mutator = mutator |
| |
| |
| self.archive = BioArchive( |
| archive_size=config.archive_size, |
| k_neighbors=5, |
| dominated_score=999.0, |
| ) |
| |
| |
| self.output_dir = Path(config.output_dir) |
| self.model_dir = self.output_dir / "models" |
| self.archive_dir = self.output_dir / "archives" |
| self.task_dir = self.output_dir / "tasks" |
| self.eval_dir = self.output_dir / "evaluations" |
| |
| for d in [self.model_dir, self.archive_dir, self.task_dir, self.eval_dir]: |
| d.mkdir(parents=True, exist_ok=True) |
| |
| |
| self.rng = np.random.RandomState(42) |
| |
| logger.info(f"Bio-ACDC initialized with config: {config}") |
| |
| def initialize_seed_population(self) -> None: |
| """Load and evaluate seed models into the archive.""" |
| logger.info(f"Initializing seed population from {len(self.config.seed_model_paths)} models") |
| |
| for model_path in self.config.seed_model_paths: |
| logger.info(f"Evaluating seed model: {model_path}") |
| |
| |
| tasks = self.task_pool.get_tasks(n=self.config.num_tasks_per_eval) |
| |
| |
| metrics = self.evaluator.evaluate_model(model_path, tasks) |
| |
| |
| skill_vector = {t.task_id: metrics.get(t.task_id, 0.0) for t in tasks} |
| fitness = np.mean(list(skill_vector.values())) if skill_vector else 0.0 |
| |
| solution = BioSolution( |
| model_path=model_path, |
| fitness=fitness, |
| skill_vector=skill_vector, |
| generation=0, |
| ) |
| |
| self.archive.add_solution(solution) |
| logger.info(f"Seed model {model_path}: fitness={fitness:.4f}") |
| |
| |
| self._save_archive(0) |
| |
| def evolve(self) -> List[BioSolution]: |
| """Run the main optimization loop.""" |
| logger.info("Starting Bio-ACDC evolution") |
| |
| |
| self.initialize_seed_population() |
| |
| for generation in range(1, self.config.num_generations + 1): |
| logger.info(f"=== Generation {generation}/{self.config.num_generations} ===") |
| |
| |
| new_solutions = self._generate_offspring(generation) |
| |
| |
| for sol in new_solutions: |
| tasks = self.task_pool.get_tasks(n=self.config.num_tasks_per_eval) |
| metrics = self.evaluator.evaluate_model(sol.model_path, tasks) |
| |
| skill_vector = {t.task_id: metrics.get(t.task_id, 0.0) for t in tasks} |
| sol.fitness = np.mean(list(skill_vector.values())) if skill_vector else 0.0 |
| sol.skill_vector = skill_vector |
| |
| |
| self.archive.update(new_solutions) |
| |
| |
| self._update_task_pool(generation) |
| |
| |
| if generation % self.config.save_frequency == 0: |
| self._save_archive(generation) |
| self._save_tasks(generation) |
| |
| |
| best = self.archive.get_best() |
| if best: |
| logger.info(f"Generation {generation}: Best fitness={best.fitness:.4f}, Archive size={len(self.archive.solutions)}") |
| |
| |
| self._save_archive(self.config.num_generations) |
| |
| return self.archive.solutions |
| |
| def _generate_offspring(self, generation: int) -> List[BioSolution]: |
| """Create new models by merging parents from archive.""" |
| new_solutions = [] |
| |
| for i in range(self.config.offspring_per_gen): |
| |
| if len(self.archive.solutions) >= 2: |
| parents = self.rng.choice( |
| self.archive.solutions, |
| size=2, |
| replace=False, |
| ) |
| else: |
| |
| parents = self.rng.choice( |
| self.config.seed_model_paths, |
| size=2, |
| replace=False, |
| ) |
| parents = [ |
| BioSolution(model_path=p, fitness=0.0, skill_vector={}, generation=0) |
| for p in parents |
| ] |
| |
| parent_paths = [p.model_path for p in parents] |
| |
| |
| save_path = str(self.model_dir / f"gen_{generation}_ind_{i}") |
| |
| try: |
| merged_path = self.merger.merge( |
| parent_paths=parent_paths, |
| save_path=save_path, |
| ) |
| |
| |
| if self.config.use_mutation and self.mutator is not None: |
| if self.rng.random() < self.config.mutation_rate: |
| merged_path = self.mutator.mutate( |
| model_path=merged_path, |
| save_path=save_path + "_mutated", |
| ) |
| |
| sol = BioSolution( |
| model_path=merged_path, |
| fitness=0.0, |
| skill_vector={}, |
| generation=generation, |
| parent_paths=parent_paths, |
| ) |
| new_solutions.append(sol) |
| |
| except Exception as e: |
| logger.error(f"Failed to create offspring {i}: {e}") |
| continue |
| |
| return new_solutions |
| |
| def _update_task_pool(self, generation: int) -> None: |
| """Generate new tasks targeting archive weaknesses.""" |
| |
| difficulty_weights = self.archive.compute_difficulty_weights() |
| |
| |
| new_tasks = self.task_pool.generate_new_tasks( |
| archive=self.archive, |
| num_tasks=5, |
| difficulty_weights=difficulty_weights, |
| ) |
| |
| |
| for task in new_tasks: |
| task_path = self.task_dir / f"gen_{generation}_{task.task_id}.json" |
| with open(task_path, "w") as f: |
| json.dump(task.to_dict(), f, indent=2) |
| |
| logger.info(f"Generated {len(new_tasks)} new tasks") |
| |
| def _save_archive(self, generation: int) -> None: |
| """Save archive to disk.""" |
| archive_path = self.archive_dir / f"archive_gen_{generation}.json" |
| self.archive.save(archive_path) |
| logger.info(f"Archive saved to {archive_path}") |
| |
| def _save_tasks(self, generation: int) -> None: |
| """Save current task pool.""" |
| tasks_path = self.task_dir / f"tasks_gen_{generation}.json" |
| tasks_data = [t.to_dict() for t in self.task_pool.tasks] |
| with open(tasks_path, "w") as f: |
| json.dump(tasks_data, f, indent=2) |