Spaces:
Sleeping
Sleeping
| from .population import Population | |
| from .molecule import Molecule | |
| from core.predictors.pure_component.property_predictor import PropertyPredictor | |
| from core.config import EvolutionConfig | |
| from crem.crem import mutate_mol | |
| from rdkit import Chem | |
| import pandas as pd | |
| import numpy as np | |
| import random | |
| from typing import List, Tuple | |
| from core.data_prep import df # Initial dataset for sampling | |
| from pathlib import Path | |
| class MolecularEvolution: | |
| """Main evolutionary algorithm coordinator.""" | |
| BASE_DIR = Path(__file__).resolve().parent.parent.parent | |
| REP_DB_PATH = BASE_DIR / "data" / "fragments" / "diesel_fragments.db" | |
| def __init__(self, config: EvolutionConfig): | |
| self.config = config | |
| self.predictor = PropertyPredictor(config) | |
| self.population = Population(config) | |
| def _mutate_molecule(self, mol: Chem.Mol) -> List[str]: | |
| """Generate mutations for a molecule using CREM.""" | |
| try: | |
| mutants = list(mutate_mol( | |
| mol, | |
| db_name=str(self.REP_DB_PATH), | |
| max_size=2, | |
| return_mol=False | |
| )) | |
| return [m for m in mutants if m and m not in self.population.seen_smiles] | |
| except Exception: | |
| return [] | |
| def _create_molecules(self, smiles_list: List[str]) -> List[Molecule]: | |
| """Create Molecule objects from SMILES with predictions (OPTIMIZED).""" | |
| if not smiles_list: | |
| return [] | |
| # OPTIMIZATION: Single featurization + all predictions | |
| predictions = self.predictor.predict_all_properties(smiles_list) | |
| molecules = [] | |
| for i, smiles in enumerate(smiles_list): | |
| # Extract predictions for this molecule | |
| props = {k: v[i] for k, v in predictions.items()} | |
| # Validate required properties | |
| if props.get('cn') is None: | |
| continue | |
| if self.config.minimize_ysi and props.get('ysi') is None: | |
| continue | |
| # Validate filtered properties | |
| if not all(self.predictor.is_valid(k, props.get(k)) for k in ['bp', 'density', 'lhv', 'dynamic_viscosity']): | |
| continue | |
| molecules.append(Molecule( | |
| smiles=smiles, | |
| cn=props['cn'], | |
| cn_error=abs(props['cn'] - self.config.target_cn), | |
| cn_score=props['cn'], # For maximize mode | |
| bp=props.get('bp'), | |
| ysi=props.get('ysi'), | |
| density=props.get('density'), | |
| lhv=props.get('lhv'), | |
| dynamic_viscosity=props.get('dynamic_viscosity') | |
| )) | |
| return molecules | |
| def initialize_population(self, initial_smiles: List[str]) -> int: | |
| """Initialize the population from initial SMILES.""" | |
| print("Predicting properties for initial population...") | |
| molecules = self._create_molecules(initial_smiles) | |
| return self.population.add_molecules(molecules) | |
| def _log_generation_stats(self, generation: int): | |
| """Log statistics for the current generation.""" | |
| mols = self.population.molecules | |
| if self.config.maximize_cn: | |
| best_cn = max(mols, key=lambda m: m.cn) | |
| avg_cn = np.mean([m.cn for m in mols]) | |
| print_msg = (f"Gen {generation}/{self.config.generations} | " | |
| f"Pop {len(mols)} | " | |
| f"Best CN: {best_cn.cn:.3f} | " | |
| f"Avg CN: {avg_cn:.3f}") | |
| else: | |
| best_cn = min(mols, key=lambda m: m.cn_error) | |
| avg_cn_err = np.mean([m.cn_error for m in mols]) | |
| print_msg = (f"Gen {generation}/{self.config.generations} | " | |
| f"Pop {len(mols)} | " | |
| f"Best CN err: {best_cn.cn_error:.3f} | " | |
| f"Avg CN err: {avg_cn_err:.3f}") | |
| if self.config.minimize_ysi: | |
| front = self.population.pareto_front() | |
| best_ysi = min(mols, key=lambda m: m.ysi) | |
| avg_ysi = np.mean([m.ysi for m in mols]) | |
| print_msg += (f" | Best YSI: {best_ysi.ysi:.3f} | " | |
| f"Avg YSI: {avg_ysi:.3f} | " | |
| f"Pareto: {len(front)}") | |
| print(print_msg) | |
| def _generate_offspring(self, survivors: List[Molecule]) -> List[Molecule]: | |
| """Generates offspring from survivors.""" | |
| target_count = self.config.population_size - len(survivors) | |
| max_attempts = target_count * self.config.max_offspring_attempts | |
| all_children = [] | |
| new_molecules = [] | |
| print(f" → Generating offspring (target: {target_count})...") | |
| for attempt in range(max_attempts): | |
| if len(new_molecules) >= target_count: | |
| break | |
| # Generate mutations | |
| parent = random.choice(survivors) | |
| mol = Chem.MolFromSmiles(parent.smiles) | |
| if mol is None: | |
| continue | |
| children = self._mutate_molecule(mol) | |
| all_children.extend(children[:self.config.mutations_per_parent]) | |
| # Process in larger batches (single featurization per batch) | |
| if len(all_children) >= self.config.batch_size: | |
| print(f" → Evaluating batch of {len(all_children)} (featurizing once)...") | |
| new_molecules.extend(self._create_molecules(all_children)) | |
| all_children = [] | |
| # Process remaining children | |
| if all_children: | |
| print(f" → Evaluating final batch of {len(all_children)}...") | |
| new_molecules.extend(self._create_molecules(all_children)) | |
| print(f" ✓ Generated {len(new_molecules)} valid offspring") | |
| return new_molecules | |
| def _run_evolution_loop(self): | |
| """Run the main evolution loop.""" | |
| for gen in range(1, self.config.generations + 1): | |
| self._log_generation_stats(gen) | |
| survivors = self.population.get_survivors() | |
| offspring = self._generate_offspring(survivors) | |
| # Create new population | |
| new_pop = Population(self.config) | |
| new_pop.add_molecules(survivors + offspring) | |
| self.population = new_pop | |
| def _generate_results(self) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
| """Generate final results DataFrames.""" | |
| final_df = self.population.to_dataframe() | |
| # Apply different filtering based on mode | |
| if self.config.maximize_cn: | |
| if self.config.minimize_ysi and "ysi" in final_df.columns: | |
| # Maximize CN + minimize YSI: keep high CN, low YSI | |
| final_df = final_df[ | |
| (final_df["cn"] > 50) & | |
| (final_df["ysi"] < 50) | |
| ].sort_values(["cn", "ysi"], ascending=[False, True]) | |
| else: | |
| # Maximize CN only: just keep high CN | |
| final_df = final_df[final_df["cn"] > 50].sort_values("cn", ascending=False) | |
| else: | |
| if self.config.minimize_ysi and "ysi" in final_df.columns: | |
| # Target CN + minimize YSI: keep low error, low YSI | |
| final_df = final_df[ | |
| (final_df["cn_error"] < 5) & | |
| (final_df["ysi"] < 50) | |
| ].sort_values(["cn_error", "ysi"], ascending=True) | |
| else: | |
| # Target CN only: just keep low error | |
| final_df = final_df[final_df["cn_error"] < 5].sort_values("cn_error", ascending=True) | |
| # Overwrite rank safely | |
| final_df["rank"] = range(1, len(final_df) + 1) | |
| if self.config.minimize_ysi: | |
| pareto_mols = self.population.pareto_front() | |
| pareto_df = pd.DataFrame([m.to_dict() for m in pareto_mols]) | |
| if not pareto_df.empty: | |
| if self.config.maximize_cn: | |
| pareto_df = pareto_df[ | |
| (pareto_df['cn'] > 50) & (pareto_df['ysi'] < 50) | |
| ].sort_values(["cn", "ysi"], ascending=[False, True]) | |
| else: | |
| pareto_df = pareto_df[ | |
| (pareto_df['cn_error'] < 5) & (pareto_df['ysi'] < 50) | |
| ].sort_values(["cn_error", "ysi"], ascending=True) | |
| pareto_df.insert(0, 'rank', range(1, len(pareto_df) + 1)) | |
| else: | |
| pareto_df = pd.DataFrame() | |
| return final_df, pareto_df | |
| def evolve(self) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
| """Run the evolutionary algorithm.""" | |
| # Initialize | |
| df_bins = pd.qcut(df["cn"], q=30) | |
| initial_smiles = ( | |
| df.groupby(df_bins, observed=False) | |
| .apply(lambda x: x.sample(20, random_state=42)) | |
| .reset_index(drop=True)["SMILES"] | |
| .tolist() | |
| ) | |
| init_count = self.initialize_population(initial_smiles) | |
| if init_count == 0: | |
| print("No valid initial molecules") | |
| return pd.DataFrame(), pd.DataFrame() | |
| print(f"✓ Initial population size: {init_count}\n") | |
| # Evolution | |
| self._run_evolution_loop() | |
| # Results | |
| return self._generate_results() |