SalZa2004's picture
updated applications
315d4ad
from .population import Population
from .molecule import Molecule
from core.predictors.pure_component.property_predictor import PropertyPredictor
from core.config import EvolutionConfig
from crem.crem import mutate_mol
from rdkit import Chem
import pandas as pd
import numpy as np
import random
from typing import List, Tuple
from core.data_prep import df # Initial dataset for sampling
from pathlib import Path
class MolecularEvolution:
"""Main evolutionary algorithm coordinator."""
BASE_DIR = Path(__file__).resolve().parent.parent.parent
REP_DB_PATH = BASE_DIR / "data" / "fragments" / "diesel_fragments.db"
def __init__(self, config: EvolutionConfig):
self.config = config
self.predictor = PropertyPredictor(config)
self.population = Population(config)
def _mutate_molecule(self, mol: Chem.Mol) -> List[str]:
"""Generate mutations for a molecule using CREM."""
try:
mutants = list(mutate_mol(
mol,
db_name=str(self.REP_DB_PATH),
max_size=2,
return_mol=False
))
return [m for m in mutants if m and m not in self.population.seen_smiles]
except Exception:
return []
def _create_molecules(self, smiles_list: List[str]) -> List[Molecule]:
"""Create Molecule objects from SMILES with predictions (OPTIMIZED)."""
if not smiles_list:
return []
# OPTIMIZATION: Single featurization + all predictions
predictions = self.predictor.predict_all_properties(smiles_list)
molecules = []
for i, smiles in enumerate(smiles_list):
# Extract predictions for this molecule
props = {k: v[i] for k, v in predictions.items()}
# Validate required properties
if props.get('cn') is None:
continue
if self.config.minimize_ysi and props.get('ysi') is None:
continue
# Validate filtered properties
if not all(self.predictor.is_valid(k, props.get(k)) for k in ['bp', 'density', 'lhv', 'dynamic_viscosity']):
continue
molecules.append(Molecule(
smiles=smiles,
cn=props['cn'],
cn_error=abs(props['cn'] - self.config.target_cn),
cn_score=props['cn'], # For maximize mode
bp=props.get('bp'),
ysi=props.get('ysi'),
density=props.get('density'),
lhv=props.get('lhv'),
dynamic_viscosity=props.get('dynamic_viscosity')
))
return molecules
def initialize_population(self, initial_smiles: List[str]) -> int:
"""Initialize the population from initial SMILES."""
print("Predicting properties for initial population...")
molecules = self._create_molecules(initial_smiles)
return self.population.add_molecules(molecules)
def _log_generation_stats(self, generation: int):
"""Log statistics for the current generation."""
mols = self.population.molecules
if self.config.maximize_cn:
best_cn = max(mols, key=lambda m: m.cn)
avg_cn = np.mean([m.cn for m in mols])
print_msg = (f"Gen {generation}/{self.config.generations} | "
f"Pop {len(mols)} | "
f"Best CN: {best_cn.cn:.3f} | "
f"Avg CN: {avg_cn:.3f}")
else:
best_cn = min(mols, key=lambda m: m.cn_error)
avg_cn_err = np.mean([m.cn_error for m in mols])
print_msg = (f"Gen {generation}/{self.config.generations} | "
f"Pop {len(mols)} | "
f"Best CN err: {best_cn.cn_error:.3f} | "
f"Avg CN err: {avg_cn_err:.3f}")
if self.config.minimize_ysi:
front = self.population.pareto_front()
best_ysi = min(mols, key=lambda m: m.ysi)
avg_ysi = np.mean([m.ysi for m in mols])
print_msg += (f" | Best YSI: {best_ysi.ysi:.3f} | "
f"Avg YSI: {avg_ysi:.3f} | "
f"Pareto: {len(front)}")
print(print_msg)
def _generate_offspring(self, survivors: List[Molecule]) -> List[Molecule]:
"""Generates offspring from survivors."""
target_count = self.config.population_size - len(survivors)
max_attempts = target_count * self.config.max_offspring_attempts
all_children = []
new_molecules = []
print(f" → Generating offspring (target: {target_count})...")
for attempt in range(max_attempts):
if len(new_molecules) >= target_count:
break
# Generate mutations
parent = random.choice(survivors)
mol = Chem.MolFromSmiles(parent.smiles)
if mol is None:
continue
children = self._mutate_molecule(mol)
all_children.extend(children[:self.config.mutations_per_parent])
# Process in larger batches (single featurization per batch)
if len(all_children) >= self.config.batch_size:
print(f" → Evaluating batch of {len(all_children)} (featurizing once)...")
new_molecules.extend(self._create_molecules(all_children))
all_children = []
# Process remaining children
if all_children:
print(f" → Evaluating final batch of {len(all_children)}...")
new_molecules.extend(self._create_molecules(all_children))
print(f" ✓ Generated {len(new_molecules)} valid offspring")
return new_molecules
def _run_evolution_loop(self):
"""Run the main evolution loop."""
for gen in range(1, self.config.generations + 1):
self._log_generation_stats(gen)
survivors = self.population.get_survivors()
offspring = self._generate_offspring(survivors)
# Create new population
new_pop = Population(self.config)
new_pop.add_molecules(survivors + offspring)
self.population = new_pop
def _generate_results(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Generate final results DataFrames."""
final_df = self.population.to_dataframe()
# Apply different filtering based on mode
if self.config.maximize_cn:
if self.config.minimize_ysi and "ysi" in final_df.columns:
# Maximize CN + minimize YSI: keep high CN, low YSI
final_df = final_df[
(final_df["cn"] > 50) &
(final_df["ysi"] < 50)
].sort_values(["cn", "ysi"], ascending=[False, True])
else:
# Maximize CN only: just keep high CN
final_df = final_df[final_df["cn"] > 50].sort_values("cn", ascending=False)
else:
if self.config.minimize_ysi and "ysi" in final_df.columns:
# Target CN + minimize YSI: keep low error, low YSI
final_df = final_df[
(final_df["cn_error"] < 5) &
(final_df["ysi"] < 50)
].sort_values(["cn_error", "ysi"], ascending=True)
else:
# Target CN only: just keep low error
final_df = final_df[final_df["cn_error"] < 5].sort_values("cn_error", ascending=True)
# Overwrite rank safely
final_df["rank"] = range(1, len(final_df) + 1)
if self.config.minimize_ysi:
pareto_mols = self.population.pareto_front()
pareto_df = pd.DataFrame([m.to_dict() for m in pareto_mols])
if not pareto_df.empty:
if self.config.maximize_cn:
pareto_df = pareto_df[
(pareto_df['cn'] > 50) & (pareto_df['ysi'] < 50)
].sort_values(["cn", "ysi"], ascending=[False, True])
else:
pareto_df = pareto_df[
(pareto_df['cn_error'] < 5) & (pareto_df['ysi'] < 50)
].sort_values(["cn_error", "ysi"], ascending=True)
pareto_df.insert(0, 'rank', range(1, len(pareto_df) + 1))
else:
pareto_df = pd.DataFrame()
return final_df, pareto_df
def evolve(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Run the evolutionary algorithm."""
# Initialize
df_bins = pd.qcut(df["cn"], q=30)
initial_smiles = (
df.groupby(df_bins, observed=False)
.apply(lambda x: x.sample(20, random_state=42))
.reset_index(drop=True)["SMILES"]
.tolist()
)
init_count = self.initialize_population(initial_smiles)
if init_count == 0:
print("No valid initial molecules")
return pd.DataFrame(), pd.DataFrame()
print(f"✓ Initial population size: {init_count}\n")
# Evolution
self._run_evolution_loop()
# Results
return self._generate_results()