| |
| from __future__ import annotations |
|
|
| import json |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Callable, Dict, List, Optional, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
| from rdkit import Chem, DataStructs |
| from rdkit.Chem import AllChem |
| from . import sascorer |
|
|
| |
| def canonicalize_smiles(smiles: str) -> Optional[str]: |
| s = (smiles or "").strip() |
| if not s: |
| return None |
| m = Chem.MolFromSmiles(s) |
| if m is None: |
| return None |
| return Chem.MolToSmiles(m, canonical=True) |
|
|
|
|
| |
| |
| |
| @dataclass |
| class DiscoverySpec: |
| dataset: List[str] |
| polyinfo: str |
| polyinfo_csv: str |
|
|
| hard_constraints: Dict[str, Dict[str, float]] |
| objectives: List[Dict[str, str]] |
|
|
| max_pool: int = 200000 |
| pareto_max: int = 50000 |
| max_candidates: int = 30 |
| max_pareto_fronts: int = 5 |
| min_distance: float = 0.30 |
| fingerprint: str = "morgan" |
| random_seed: int = 7 |
| use_canonical_smiles: bool = True |
| use_full_data: bool = False |
| trust_weights: Dict[str, float] | None = None |
| selection_weights: Dict[str, float] | None = None |
|
|
|
|
| |
| |
| |
| PROPERTY_META: Dict[str, Dict[str, str]] = { |
| |
| "tm": {"name": "Melting temperature", "unit": "K"}, |
| "tg": {"name": "Glass transition temperature", "unit": "K"}, |
| "td": {"name": "Thermal diffusivity", "unit": "m^2/s"}, |
| "tc": {"name": "Thermal conductivity", "unit": "W/m-K"}, |
| "cp": {"name": "Specific heat capacity", "unit": "J/kg-K"}, |
| |
| "young": {"name": "Young's modulus", "unit": "GPa"}, |
| "shear": {"name": "Shear modulus", "unit": "GPa"}, |
| "bulk": {"name": "Bulk modulus", "unit": "GPa"}, |
| "poisson": {"name": "Poisson ratio", "unit": "-"}, |
| |
| "visc": {"name": "Viscosity", "unit": "Pa-s"}, |
| "dif": {"name": "Diffusivity", "unit": "cm^2/s"}, |
| |
| "phe": {"name": "He permeability", "unit": "Barrer"}, |
| "ph2": {"name": "H2 permeability", "unit": "Barrer"}, |
| "pco2": {"name": "CO2 permeability", "unit": "Barrer"}, |
| "pn2": {"name": "N2 permeability", "unit": "Barrer"}, |
| "po2": {"name": "O2 permeability", "unit": "Barrer"}, |
| "pch4": {"name": "CH4 permeability", "unit": "Barrer"}, |
| |
| "alpha": {"name": "Polarizability", "unit": "a.u."}, |
| "homo": {"name": "HOMO energy", "unit": "eV"}, |
| "lumo": {"name": "LUMO energy", "unit": "eV"}, |
| "bandgap": {"name": "Band gap", "unit": "eV"}, |
| "mu": {"name": "Dipole moment", "unit": "Debye"}, |
| "etotal": {"name": "Total electronic energy", "unit": "eV"}, |
| "ri": {"name": "Refractive index", "unit": "-"}, |
| "dc": {"name": "Dielectric constant", "unit": "-"}, |
| "pe": {"name": "Permittivity", "unit": "-"}, |
| |
| "rg": {"name": "Radius of gyration", "unit": "A"}, |
| "rho": {"name": "Density", "unit": "g/cm^3"}, |
| } |
|
|
|
|
| |
| |
| |
| def mean_col(prop_key: str) -> str: |
| return f"mean_{prop_key.lower()}" |
|
|
| def std_col(prop_key: str) -> str: |
| return f"std_{prop_key.lower()}" |
|
|
|
|
| def normalize_weights(weights: Dict[str, float], defaults: Dict[str, float]) -> Dict[str, float]: |
| out: Dict[str, float] = {} |
| for k, v in defaults.items(): |
| try: |
| vv = float(weights.get(k, v)) |
| except Exception: |
| vv = float(v) |
| out[k] = max(0.0, vv) |
| s = float(sum(out.values())) |
| if s <= 0.0: |
| return defaults.copy() |
| return {k: float(v / s) for k, v in out.items()} |
|
|
| def spec_from_dict(obj: dict, dataset_path: List[str], polyinfo_path: str, polyinfo_csv_path: str) -> DiscoverySpec: |
| pareto_max = int(obj.get("pareto_max", 50000)) |
| return DiscoverySpec( |
| dataset=list(dataset_path), |
| polyinfo=polyinfo_path, |
| polyinfo_csv=polyinfo_csv_path, |
| hard_constraints=obj.get("hard_constraints", {}), |
| objectives=obj.get("objectives", []), |
| |
| max_pool=pareto_max, |
| pareto_max=pareto_max, |
| max_candidates=int(obj.get("max_candidates", 30)), |
| max_pareto_fronts=int(obj.get("max_pareto_fronts", 5)), |
| min_distance=float(obj.get("min_distance", 0.30)), |
| fingerprint=str(obj.get("fingerprint", "morgan")), |
| random_seed=int(obj.get("random_seed", 7)), |
| use_canonical_smiles=not bool(obj.get("skip_smiles_canonicalization", True)), |
| use_full_data=bool(obj.get("use_full_data", False)), |
| trust_weights=obj.get("trust_weights"), |
| selection_weights=obj.get("selection_weights"), |
| ) |
|
|
| |
| |
| |
| def load_parquet_columns(path: str | List[str], columns: List[str]) -> pd.DataFrame: |
| """ |
| Load only requested columns from Parquet (critical for 1M rows). |
| Accepts a single path or a list of paths and concatenates rows. |
| """ |
| def _load_one(fp: str, req_cols: List[str]) -> pd.DataFrame: |
| available: list[str] |
| try: |
| import pyarrow.parquet as pq |
|
|
| pf = pq.ParquetFile(fp) |
| available = [str(c) for c in pf.schema.names] |
| except Exception: |
| |
| return pd.read_parquet(fp, columns=req_cols) |
|
|
| available_set = set(available) |
| lower_to_actual = {c.lower(): c for c in available} |
|
|
| |
| resolved: dict[str, str] = {} |
| for req in req_cols: |
| if req in available_set: |
| resolved[req] = req |
| continue |
| alt = lower_to_actual.get(str(req).lower()) |
| if alt is not None: |
| resolved[req] = alt |
|
|
| use_cols = sorted(set(resolved.values())) |
| if not use_cols: |
| return pd.DataFrame(columns=req_cols) |
|
|
| out = pd.read_parquet(fp, columns=use_cols) |
| for req in req_cols: |
| src = resolved.get(req) |
| if src is None: |
| out[req] = np.nan |
| elif src != req: |
| out[req] = out[src] |
| return out[req_cols] |
|
|
| if isinstance(path, (list, tuple)): |
| frames = [_load_one(p, columns) for p in path] |
| if not frames: |
| return pd.DataFrame(columns=columns) |
| return pd.concat(frames, ignore_index=True) |
| return _load_one(path, columns) |
|
|
|
|
| def normalize_smiles(smiles: str, use_canonical_smiles: bool) -> Optional[str]: |
| s = (smiles or "").strip() |
| if not s: |
| return None |
| if not use_canonical_smiles: |
| |
| return s |
| m = Chem.MolFromSmiles(s) |
| if m is None: |
| return None |
| if use_canonical_smiles: |
| return Chem.MolToSmiles(m, canonical=True) |
| return s |
|
|
|
|
| def load_polyinfo_index(polyinfo_csv_path: str, use_canonical_smiles: bool = True) -> pd.DataFrame: |
| """ |
| Expected CSV columns: SMILES, Polymer_Class, polymer_name (or common variants). |
| Returns dataframe with index on smiles_key and columns polymer_name/polymer_class. |
| """ |
| df = pd.read_csv(polyinfo_csv_path) |
|
|
| |
| cols = {c: c for c in df.columns} |
| |
| if "SMILES" in cols: |
| df = df.rename(columns={"SMILES": "smiles"}) |
| elif "smiles" not in df.columns: |
| raise ValueError(f"{polyinfo_csv_path} missing SMILES/smiles column") |
|
|
| if "Polymer_Name" in df.columns: |
| df = df.rename(columns={"Polymer_Name": "polymer_name"}) |
| if "polymer_Name" in df.columns: |
| df = df.rename(columns={"polymer_Name": "polymer_name"}) |
| if "Polymer_Class" in df.columns: |
| df = df.rename(columns={"Polymer_Class": "polymer_class"}) |
|
|
| if "polymer_name" not in df.columns: |
| df["polymer_name"] = pd.NA |
| if "polymer_class" not in df.columns: |
| df["polymer_class"] = pd.NA |
|
|
| df["smiles_key"] = df["smiles"].astype(str).map(lambda s: normalize_smiles(s, use_canonical_smiles)) |
| df = df.dropna(subset=["smiles_key"]).drop_duplicates("smiles_key") |
| df = df.set_index("smiles_key", drop=True) |
| return df[["polymer_name", "polymer_class"]] |
|
|
|
|
| |
| |
| |
| def pareto_front_mask(X: np.ndarray) -> np.ndarray: |
| """ |
| Returns mask for nondominated points. |
| X: (N, M), all objectives assumed to be minimized. |
| For maximize objectives, we invert before calling this. |
| """ |
| N = X.shape[0] |
| is_efficient = np.ones(N, dtype=bool) |
| for i in range(N): |
| if not is_efficient[i]: |
| continue |
| |
| dominates = np.all(X <= X[i], axis=1) & np.any(X < X[i], axis=1) |
| |
| if np.any(dominates): |
| is_efficient[i] = False |
| continue |
| |
| dominated_by_i = np.all(X[i] <= X, axis=1) & np.any(X[i] < X, axis=1) |
| is_efficient[dominated_by_i] = False |
| is_efficient[i] = True |
| return is_efficient |
|
|
|
|
| def pareto_layers(X: np.ndarray, max_layers: int = 10) -> np.ndarray: |
| """ |
| Returns layer index per point: 1 = Pareto front, 2 = second layer, ... |
| Unassigned points beyond max_layers get 0. |
| """ |
| N = X.shape[0] |
| layers = np.zeros(N, dtype=int) |
| remaining = np.arange(N) |
|
|
| layer = 1 |
| while remaining.size > 0 and layer <= max_layers: |
| mask = pareto_front_mask(X[remaining]) |
| front_idx = remaining[mask] |
| layers[front_idx] = layer |
| remaining = remaining[~mask] |
| layer += 1 |
| return layers |
|
|
|
|
| def pareto_front_mask_chunked( |
| X: np.ndarray, |
| chunk_size: int = 100000, |
| progress_callback: Optional[Callable[[int, int], None]] = None, |
| ) -> np.ndarray: |
| """ |
| Exact global Pareto front mask via chunk-local front reduction + global reconcile. |
| This is exact for front-1: |
| 1) compute exact local front within each chunk |
| 2) union local fronts |
| 3) compute exact front on the union |
| """ |
| N = X.shape[0] |
| if N <= chunk_size: |
| if progress_callback is not None: |
| progress_callback(1, 1) |
| return pareto_front_mask(X) |
|
|
| local_front_idx = [] |
| total_chunks = (N + chunk_size - 1) // chunk_size |
| done_chunks = 0 |
| for start in range(0, N, chunk_size): |
| end = min(start + chunk_size, N) |
| idx = np.arange(start, end) |
| mask_local = pareto_front_mask(X[idx]) |
| local_front_idx.append(idx[mask_local]) |
| done_chunks += 1 |
| if progress_callback is not None: |
| progress_callback(done_chunks, total_chunks) |
|
|
| if not local_front_idx: |
| return np.zeros(N, dtype=bool) |
|
|
| reduced_idx = np.concatenate(local_front_idx) |
| reduced_mask = pareto_front_mask(X[reduced_idx]) |
| front_idx = reduced_idx[reduced_mask] |
|
|
| out = np.zeros(N, dtype=bool) |
| out[front_idx] = True |
| return out |
|
|
|
|
| def pareto_layers_chunked( |
| X: np.ndarray, |
| max_layers: int = 10, |
| chunk_size: int = 100000, |
| progress_callback: Optional[Callable[[int, int, int], None]] = None, |
| ) -> np.ndarray: |
| """ |
| Exact Pareto layers using repeated exact chunked front extraction. |
| """ |
| N = X.shape[0] |
| layers = np.zeros(N, dtype=int) |
| remaining = np.arange(N) |
| layer = 1 |
|
|
| while remaining.size > 0 and layer <= max_layers: |
| def on_chunk(done: int, total: int) -> None: |
| if progress_callback is not None: |
| progress_callback(layer, done, total) |
|
|
| mask = pareto_front_mask_chunked(X[remaining], chunk_size=chunk_size, progress_callback=on_chunk) |
| front_idx = remaining[mask] |
| layers[front_idx] = layer |
| remaining = remaining[~mask] |
| layer += 1 |
|
|
| return layers |
|
|
|
|
| |
| |
| |
| def morgan_fp(smiles: str, radius: int = 2, nbits: int = 2048): |
| m = Chem.MolFromSmiles(smiles) |
| if m is None: |
| return None |
| return AllChem.GetMorganFingerprintAsBitVect(m, radius, nBits=nbits) |
|
|
| def tanimoto_distance(fp1, fp2) -> float: |
| return 1.0 - DataStructs.TanimotoSimilarity(fp1, fp2) |
|
|
| def greedy_diverse_select( |
| smiles_list: List[str], |
| scores: np.ndarray, |
| max_k: int, |
| min_dist: float, |
| ) -> List[int]: |
| """ |
| Greedy selection by descending score, enforcing min Tanimoto distance. |
| Returns indices into smiles_list. |
| """ |
| fps = [] |
| valid_idx = [] |
| for i, s in enumerate(smiles_list): |
| fp = morgan_fp(s) |
| if fp is not None: |
| fps.append(fp) |
| valid_idx.append(i) |
|
|
| if not valid_idx: |
| return [] |
|
|
| |
| order = np.argsort(-scores[valid_idx]) |
| selected_global = [] |
| selected_fps = [] |
|
|
| for oi in order: |
| i = valid_idx[oi] |
| fp_i = fps[oi] |
| ok = True |
| for fp_j in selected_fps: |
| if tanimoto_distance(fp_i, fp_j) < min_dist: |
| ok = False |
| break |
| if ok: |
| selected_global.append(i) |
| selected_fps.append(fp_i) |
| if len(selected_global) >= max_k: |
| break |
|
|
| return selected_global |
|
|
|
|
| |
| |
| |
| def internal_consistency_penalty(row: pd.Series) -> float: |
| """ |
| Very simple physics/validity checks. Penalty in [0,1]. |
| Adjust/add rules later. |
| """ |
| viol = 0 |
| total = 0 |
|
|
| def chk(cond: bool): |
| nonlocal viol, total |
| total += 1 |
| if not cond: |
| viol += 1 |
|
|
| |
| for p in ["cp", "tc", "rho", "dif", "visc", "tg", "tm", "bandgap"]: |
| c = mean_col(p) |
| if c in row.index and pd.notna(row[c]): |
| if p in ["bandgap", "tg", "tm"]: |
| chk(float(row[c]) >= 0.0) |
| else: |
| chk(float(row[c]) > 0.0) |
|
|
| |
| if mean_col("poisson") in row.index and pd.notna(row[mean_col("poisson")]): |
| v = float(row[mean_col("poisson")]) |
| chk(0.0 <= v <= 0.5) |
|
|
| |
| if mean_col("tg") in row.index and mean_col("tm") in row.index: |
| if pd.notna(row[mean_col("tg")]) and pd.notna(row[mean_col("tm")]): |
| chk(float(row[mean_col("tg")]) <= float(row[mean_col("tm")])) |
|
|
| if total == 0: |
| return 0.0 |
| return viol / total |
|
|
|
|
| def synthesizability_score(smiles: str) -> float: |
| """ |
| RDKit SA-score based synthesizability proxy in [0,1]. |
| SA-score is ~[1 (easy), 10 (hard)]. |
| We map: 1 -> 1.0, 10 -> 0.0 |
| """ |
| m = Chem.MolFromSmiles(smiles) |
| if m is None: |
| return 0.0 |
|
|
| |
| try: |
| sa_raw = sascorer.calculateScore(m) |
| except Exception: |
| return 0.0 |
| if sa_raw is None: |
| return 0.0 |
|
|
| sa = float(sa_raw) |
| s_syn = 1.0 - (sa - 1.0) / 9.0 |
| return float(np.clip(s_syn, 0.0, 1.0)) |
|
|
|
|
| def compute_trust_scores( |
| df: pd.DataFrame, |
| real_fps: List, |
| real_smiles: List[str], |
| trust_weights: Dict[str, float] | None = None, |
| ) -> np.ndarray: |
| """ |
| Trust score in [0,1] (higher = more trustworthy / lower risk). |
| Components: |
| - distance to nearest real polymer (fingerprint distance) |
| - internal consistency penalty |
| - uncertainty penalty (if std columns exist) |
| - synthesizability |
| """ |
| N = len(df) |
| trust = np.zeros(N, dtype=float) |
| tw_defaults = {"real": 0.45, "consistency": 0.25, "uncertainty": 0.10, "synth": 0.20} |
| tw = normalize_weights(trust_weights or {}, tw_defaults) |
|
|
| |
| |
| smiles_col = "smiles_key" if "smiles_key" in df.columns else "smiles_canon" |
| for i in range(N): |
| s = df.iloc[i][smiles_col] |
| fp = morgan_fp(s) |
| if fp is None or not real_fps: |
| d_real = 1.0 |
| else: |
| sims = DataStructs.BulkTanimotoSimilarity(fp, real_fps) |
| d_real = 1.0 - float(max(sims)) |
|
|
| |
| pen_cons = internal_consistency_penalty(df.iloc[i]) |
|
|
| |
| std_cols = [c for c in df.columns if c.startswith("std_")] |
| if std_cols: |
| std_vals = df.iloc[i][std_cols].astype(float) |
| std_vals = std_vals.replace([np.inf, -np.inf], np.nan).dropna() |
| pen_unc = float(np.clip(std_vals.mean() / (std_vals.mean() + 1.0), 0.0, 1.0)) if len(std_vals) else 0.0 |
| else: |
| pen_unc = 0.0 |
|
|
| |
| s_syn = synthesizability_score(s) |
|
|
| |
| |
| s_real = 1.0 - np.clip(d_real, 0.0, 1.0) |
|
|
| trust[i] = ( |
| tw["real"] * s_real + |
| tw["consistency"] * (1.0 - pen_cons) + |
| tw["uncertainty"] * (1.0 - pen_unc) + |
| tw["synth"] * s_syn |
| ) |
|
|
| trust = np.clip(trust, 0.0, 1.0) |
| return trust |
|
|
|
|
| |
| |
| |
| def run_discovery( |
| spec: DiscoverySpec, |
| progress_callback: Optional[Callable[[str, float], None]] = None, |
| ) -> Tuple[pd.DataFrame, Dict[str, float], pd.DataFrame]: |
| def report(step: str, pct: float) -> None: |
| if progress_callback is not None: |
| progress_callback(step, pct) |
|
|
| rng = np.random.default_rng(spec.random_seed) |
|
|
| |
| report("Preparing columns…", 0.02) |
| obj_props = [o["property"].lower() for o in spec.objectives] |
| cons_props = [p.lower() for p in spec.hard_constraints.keys()] |
|
|
| needed_props = sorted(set(obj_props + cons_props)) |
| cols = ["SMILES"] + [mean_col(p) for p in needed_props] |
|
|
| |
| std_cols = [std_col(p) for p in needed_props] |
| cols += std_cols |
|
|
| |
| report("Loading data from parquet…", 0.05) |
| df = load_parquet_columns(spec.dataset, columns=[c for c in cols if c != "SMILES"] + ["SMILES"]) |
| |
| if "SMILES" not in df.columns and "smiles" in df.columns: |
| df = df.rename(columns={"smiles": "SMILES"}) |
| normalize_step = "Canonicalizing SMILES…" if spec.use_canonical_smiles else "Skipping SMILES normalization…" |
| report(normalize_step, 0.10) |
| df["smiles_key"] = df["SMILES"].astype(str).map(lambda s: normalize_smiles(s, spec.use_canonical_smiles)) |
| df = df.dropna(subset=["smiles_key"]).reset_index(drop=True) |
|
|
| |
| report("Applying constraints…", 0.22) |
| for p, rule in spec.hard_constraints.items(): |
| p = p.lower() |
| c = mean_col(p) |
| if c not in df.columns: |
| |
| df = df.iloc[0:0] |
| break |
| if "min" in rule: |
| df = df[df[c] >= float(rule["min"])] |
| if "max" in rule: |
| df = df[df[c] <= float(rule["max"])] |
|
|
| n_after = len(df) |
| if n_after == 0: |
| empty_stats = {"n_total": 0, "n_after_constraints": 0, "n_pool": 0, "n_pareto_pool": 0, "n_selected": 0} |
| return df, empty_stats, pd.DataFrame() |
|
|
| n_pool = len(df) |
|
|
| |
| report("Building objective matrix…", 0.30) |
| |
| X = [] |
| resolved_objectives = [] |
| for o in spec.objectives: |
| prop = o["property"].lower() |
| goal = o["goal"].lower() |
| c = mean_col(prop) |
| if c not in df.columns: |
| continue |
| v = df[c].to_numpy(dtype=float) |
| if goal == "maximize": |
| v = -v |
| X.append(v) |
| resolved_objectives.append({"property": prop, "goal": goal}) |
| if not X: |
| |
| fallback_col = next((c for c in df.columns if str(c).startswith("mean_")), None) |
| if fallback_col is None: |
| empty_stats = {"n_total": 0, "n_after_constraints": 0, "n_pool": 0, "n_pareto_pool": 0, "n_selected": 0} |
| return df.iloc[0:0], empty_stats, pd.DataFrame() |
| X = [df[fallback_col].to_numpy(dtype=float) * -1.0] |
| resolved_objectives = [{"property": fallback_col.replace("mean_", ""), "goal": "maximize"}] |
| X = np.stack(X, axis=1) |
| obj_props = [o["property"] for o in resolved_objectives] |
|
|
| |
| if spec.use_full_data: |
| report("Using full dataset (no Pareto cap)…", 0.35) |
| elif len(df) > spec.pareto_max: |
| idx = rng.choice(len(df), size=spec.pareto_max, replace=False) |
| df = df.iloc[idx].reset_index(drop=True) |
| X = X[idx] |
|
|
| |
| report("Computing Pareto layers…", 0.40) |
| pareto_start = 0.40 |
| pareto_end = 0.54 |
| max_layers_for_pool = max(1, int(spec.max_pareto_fronts)) |
| pareto_chunk_ref = {"chunks_per_layer": None} |
|
|
| def on_pareto_chunk(layer_i: int, done_chunks: int, total_chunks: int) -> None: |
| if pareto_chunk_ref["chunks_per_layer"] is None: |
| pareto_chunk_ref["chunks_per_layer"] = max(1, int(total_chunks)) |
| ref_chunks = pareto_chunk_ref["chunks_per_layer"] |
| total_units = max_layers_for_pool * ref_chunks |
| done_units = min(total_units, ((layer_i - 1) * ref_chunks) + done_chunks) |
| pareto_pct = int(round(100.0 * done_units / max(1, total_units))) |
|
|
| layer_progress = done_chunks / max(1, total_chunks) |
| overall = ((layer_i - 1) + layer_progress) / max_layers_for_pool |
| pct = pareto_start + (pareto_end - pareto_start) * min(1.0, max(0.0, overall)) |
| report( |
| f"Computing Pareto layers… {pareto_pct}% (Layer {layer_i}/{max_layers_for_pool}, chunk {done_chunks}/{total_chunks})", |
| pct, |
| ) |
|
|
| layers = pareto_layers_chunked( |
| X, |
| max_layers=max_layers_for_pool, |
| chunk_size=100000, |
| progress_callback=on_pareto_chunk, |
| ) |
| report("Computing Pareto layers…", pareto_end) |
| df["pareto_layer"] = layers |
| plot_df = df[["smiles_key"] + [mean_col(p) for p in obj_props] + ["pareto_layer"]].copy() |
| plot_df = plot_df.rename(columns={"smiles_key": "SMILES"}) |
|
|
| |
| cand = df[df["pareto_layer"].between(1, max_layers_for_pool)].copy() |
| if cand.empty: |
| cand = df[df["pareto_layer"] == 1].copy() |
| cand = cand.reset_index(drop=True) |
| n_pareto = len(cand) |
|
|
| |
| report("Loading POLYINFO index…", 0.55) |
| polyinfo = load_polyinfo_index(spec.polyinfo_csv, use_canonical_smiles=spec.use_canonical_smiles) |
| real_smiles = polyinfo.index.to_list() |
|
|
| report("Building real-polymer fingerprints…", 0.60) |
| real_fps = [] |
| for s in real_smiles: |
| fp = morgan_fp(s) |
| if fp is not None: |
| real_fps.append(fp) |
|
|
| |
| report("Computing trust scores…", 0.70) |
| trust = compute_trust_scores( |
| cand, |
| real_fps=real_fps, |
| real_smiles=real_smiles, |
| trust_weights=spec.trust_weights, |
| ) |
| cand["trust_score"] = trust |
|
|
| |
| report("Diversity selection…", 0.88) |
| |
| |
| sw_defaults = {"pareto": 0.60, "trust": 0.40} |
| sw = normalize_weights(spec.selection_weights or {}, sw_defaults) |
| pareto_bonus = ( |
| (max_layers_for_pool + 1) - np.clip(cand["pareto_layer"].to_numpy(dtype=int), 1, max_layers_for_pool) |
| ) / float(max_layers_for_pool) |
| sel_score = sw["pareto"] * pareto_bonus + sw["trust"] * cand["trust_score"].to_numpy(dtype=float) |
|
|
| chosen_idx = greedy_diverse_select( |
| smiles_list=cand["smiles_key"].tolist(), |
| scores=sel_score, |
| max_k=spec.max_candidates, |
| min_dist=spec.min_distance, |
| ) |
| out = cand.iloc[chosen_idx].copy().reset_index(drop=True) |
|
|
| |
| report("Finalizing results…", 0.96) |
| out = out.set_index("smiles_key", drop=False) |
| out = out.join(polyinfo, how="left") |
| out = out.reset_index(drop=True) |
|
|
| |
| |
| keep = ["smiles_key", "polymer_name", "polymer_class", "pareto_layer", "trust_score"] |
| for p in needed_props: |
| mc = mean_col(p) |
| sc = std_col(p) |
| if mc in out.columns: |
| keep.append(mc) |
| if sc in out.columns: |
| keep.append(sc) |
|
|
| out = out[keep].rename(columns={"smiles_key": "SMILES"}) |
|
|
| stats = { |
| "n_total": float(len(df)), |
| "n_after_constraints": float(n_after), |
| "n_pool": float(n_pool), |
| "n_pareto_pool": float(n_pareto), |
| "n_selected": float(len(out)), |
| } |
| report("Done.", 1.0) |
| return out, stats, plot_df |
|
|
|
|
| def build_pareto_plot_df(spec: DiscoverySpec, max_plot_points: int = 30000) -> pd.DataFrame: |
| """ |
| Returns a small dataframe for plotting (sampled), with objective columns and pareto_layer. |
| Does NOT compute trust/diversity. Safe for live plotting. |
| """ |
| rng = np.random.default_rng(spec.random_seed) |
|
|
| obj_props = [o["property"].lower() for o in spec.objectives] |
| cons_props = [p.lower() for p in spec.hard_constraints.keys()] |
| needed_props = sorted(set(obj_props + cons_props)) |
|
|
| cols = ["SMILES"] + [mean_col(p) for p in needed_props] |
| df = load_parquet_columns(spec.dataset, columns=cols) |
|
|
| if "SMILES" not in df.columns and "smiles" in df.columns: |
| df = df.rename(columns={"smiles": "SMILES"}) |
|
|
| df["smiles_key"] = df["SMILES"].astype(str).map(lambda s: normalize_smiles(s, spec.use_canonical_smiles)) |
| df = df.dropna(subset=["smiles_key"]).reset_index(drop=True) |
|
|
| |
| for p, rule in spec.hard_constraints.items(): |
| p = p.lower() |
| c = mean_col(p) |
| if c not in df.columns: |
| return df.iloc[0:0] |
| if "min" in rule: |
| df = df[df[c] >= float(rule["min"])] |
| if "max" in rule: |
| df = df[df[c] <= float(rule["max"])] |
|
|
| if len(df) == 0: |
| return df |
|
|
| |
| plot_cap = min(int(max_plot_points), int(spec.pareto_max)) |
| if len(df) > plot_cap: |
| idx = rng.choice(len(df), size=plot_cap, replace=False) |
| df = df.iloc[idx].reset_index(drop=True) |
|
|
| |
| X = [] |
| resolved_obj_props = [] |
| for o in spec.objectives: |
| prop = o["property"].lower() |
| goal = o["goal"].lower() |
| c = mean_col(prop) |
| if c not in df.columns: |
| continue |
| v = df[c].to_numpy(dtype=float) |
| if goal == "maximize": |
| v = -v |
| X.append(v) |
| resolved_obj_props.append(prop) |
| if not X: |
| fallback_col = next((c for c in df.columns if str(c).startswith("mean_")), None) |
| if fallback_col is None: |
| return df.iloc[0:0] |
| X = [df[fallback_col].to_numpy(dtype=float) * -1.0] |
| resolved_obj_props = [fallback_col.replace("mean_", "")] |
| X = np.stack(X, axis=1) |
|
|
| df["pareto_layer"] = pareto_layers(X, max_layers=5) |
|
|
| |
| keep = ["smiles_key", "pareto_layer"] + [mean_col(p) for p in resolved_obj_props] |
| out = df[keep].rename(columns={"smiles_key": "SMILES"}) |
| return out |
|
|
|
|
| def parse_spec(text: str, dataset_path: List[str], polyinfo_path: str, polyinfo_csv_path: str) -> DiscoverySpec: |
| obj = json.loads(text) |
| pareto_max = int(obj.get("pareto_max", 50000)) |
|
|
| return DiscoverySpec( |
| dataset=list(dataset_path), |
| polyinfo=polyinfo_path, |
| polyinfo_csv=polyinfo_csv_path, |
| hard_constraints=obj.get("hard_constraints", {}), |
| objectives=obj.get("objectives", []), |
| max_pool=pareto_max, |
| pareto_max=pareto_max, |
| max_candidates=int(obj.get("max_candidates", 30)), |
| max_pareto_fronts=int(obj.get("max_pareto_fronts", 5)), |
| min_distance=float(obj.get("min_distance", 0.30)), |
| fingerprint=str(obj.get("fingerprint", "morgan")), |
| random_seed=int(obj.get("random_seed", 7)), |
| use_canonical_smiles=not bool(obj.get("skip_smiles_canonicalization", True)), |
| use_full_data=bool(obj.get("use_full_data", False)), |
| trust_weights=obj.get("trust_weights"), |
| selection_weights=obj.get("selection_weights"), |
| ) |
|
|