| | |
| | from __future__ import annotations |
| |
|
| | import json |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | from typing import Callable, Dict, List, Optional, Tuple |
| |
|
| | import numpy as np |
| | import pandas as pd |
| | from rdkit import Chem, DataStructs |
| | from rdkit.Chem import AllChem |
| | from . import sascorer |
| |
|
| | |
| | def canonicalize_smiles(smiles: str) -> Optional[str]: |
| | s = (smiles or "").strip() |
| | if not s: |
| | return None |
| | m = Chem.MolFromSmiles(s) |
| | if m is None: |
| | return None |
| | return Chem.MolToSmiles(m, canonical=True) |
| |
|
| |
|
| | |
| | |
| | |
| | @dataclass |
| | class DiscoverySpec: |
| | dataset: List[str] |
| | polyinfo: str |
| | polyinfo_csv: str |
| |
|
| | hard_constraints: Dict[str, Dict[str, float]] |
| | objectives: List[Dict[str, str]] |
| |
|
| | max_pool: int = 200000 |
| | pareto_max: int = 50000 |
| | max_candidates: int = 30 |
| | max_pareto_fronts: int = 5 |
| | min_distance: float = 0.30 |
| | fingerprint: str = "morgan" |
| | random_seed: int = 7 |
| | use_canonical_smiles: bool = True |
| | use_full_data: bool = False |
| | trust_weights: Dict[str, float] | None = None |
| | selection_weights: Dict[str, float] | None = None |
| |
|
| |
|
| | |
| | |
| | |
| | def mean_col(prop_key: str) -> str: |
| | return f"mean_{prop_key.lower()}" |
| |
|
| | def std_col(prop_key: str) -> str: |
| | return f"std_{prop_key.lower()}" |
| |
|
| |
|
| | def normalize_weights(weights: Dict[str, float], defaults: Dict[str, float]) -> Dict[str, float]: |
| | out: Dict[str, float] = {} |
| | for k, v in defaults.items(): |
| | try: |
| | vv = float(weights.get(k, v)) |
| | except Exception: |
| | vv = float(v) |
| | out[k] = max(0.0, vv) |
| | s = float(sum(out.values())) |
| | if s <= 0.0: |
| | return defaults.copy() |
| | return {k: float(v / s) for k, v in out.items()} |
| |
|
| | def spec_from_dict(obj: dict, dataset_path: List[str], polyinfo_path: str, polyinfo_csv_path: str) -> DiscoverySpec: |
| | pareto_max = int(obj.get("pareto_max", 50000)) |
| | return DiscoverySpec( |
| | dataset=list(dataset_path), |
| | polyinfo=polyinfo_path, |
| | polyinfo_csv=polyinfo_csv_path, |
| | hard_constraints=obj.get("hard_constraints", {}), |
| | objectives=obj.get("objectives", []), |
| | |
| | max_pool=pareto_max, |
| | pareto_max=pareto_max, |
| | max_candidates=int(obj.get("max_candidates", 30)), |
| | max_pareto_fronts=int(obj.get("max_pareto_fronts", 5)), |
| | min_distance=float(obj.get("min_distance", 0.30)), |
| | fingerprint=str(obj.get("fingerprint", "morgan")), |
| | random_seed=int(obj.get("random_seed", 7)), |
| | use_canonical_smiles=not bool(obj.get("skip_smiles_canonicalization", True)), |
| | use_full_data=bool(obj.get("use_full_data", False)), |
| | trust_weights=obj.get("trust_weights"), |
| | selection_weights=obj.get("selection_weights"), |
| | ) |
| |
|
| | |
| | |
| | |
| | def load_parquet_columns(path: str | List[str], columns: List[str]) -> pd.DataFrame: |
| | """ |
| | Load only requested columns from Parquet (critical for 1M rows). |
| | Accepts a single path or a list of paths and concatenates rows. |
| | """ |
| | def _load_one(fp: str, req_cols: List[str]) -> pd.DataFrame: |
| | available: list[str] |
| | try: |
| | import pyarrow.parquet as pq |
| |
|
| | pf = pq.ParquetFile(fp) |
| | available = [str(c) for c in pf.schema.names] |
| | except Exception: |
| | |
| | return pd.read_parquet(fp, columns=req_cols) |
| |
|
| | available_set = set(available) |
| | lower_to_actual = {c.lower(): c for c in available} |
| |
|
| | |
| | resolved: dict[str, str] = {} |
| | for req in req_cols: |
| | if req in available_set: |
| | resolved[req] = req |
| | continue |
| | alt = lower_to_actual.get(str(req).lower()) |
| | if alt is not None: |
| | resolved[req] = alt |
| |
|
| | use_cols = sorted(set(resolved.values())) |
| | if not use_cols: |
| | return pd.DataFrame(columns=req_cols) |
| |
|
| | out = pd.read_parquet(fp, columns=use_cols) |
| | for req in req_cols: |
| | src = resolved.get(req) |
| | if src is None: |
| | out[req] = np.nan |
| | elif src != req: |
| | out[req] = out[src] |
| | return out[req_cols] |
| |
|
| | if isinstance(path, (list, tuple)): |
| | frames = [_load_one(p, columns) for p in path] |
| | if not frames: |
| | return pd.DataFrame(columns=columns) |
| | return pd.concat(frames, ignore_index=True) |
| | return _load_one(path, columns) |
| |
|
| |
|
| | def normalize_smiles(smiles: str, use_canonical_smiles: bool) -> Optional[str]: |
| | s = (smiles or "").strip() |
| | if not s: |
| | return None |
| | if not use_canonical_smiles: |
| | |
| | return s |
| | m = Chem.MolFromSmiles(s) |
| | if m is None: |
| | return None |
| | if use_canonical_smiles: |
| | return Chem.MolToSmiles(m, canonical=True) |
| | return s |
| |
|
| |
|
| | def load_polyinfo_index(polyinfo_csv_path: str, use_canonical_smiles: bool = True) -> pd.DataFrame: |
| | """ |
| | Expected CSV columns: SMILES, Polymer_Class, polymer_name (or common variants). |
| | Returns dataframe with index on smiles_key and columns polymer_name/polymer_class. |
| | """ |
| | df = pd.read_csv(polyinfo_csv_path) |
| |
|
| | |
| | cols = {c: c for c in df.columns} |
| | |
| | if "SMILES" in cols: |
| | df = df.rename(columns={"SMILES": "smiles"}) |
| | elif "smiles" not in df.columns: |
| | raise ValueError(f"{polyinfo_csv_path} missing SMILES/smiles column") |
| |
|
| | if "Polymer_Name" in df.columns: |
| | df = df.rename(columns={"Polymer_Name": "polymer_name"}) |
| | if "polymer_Name" in df.columns: |
| | df = df.rename(columns={"polymer_Name": "polymer_name"}) |
| | if "Polymer_Class" in df.columns: |
| | df = df.rename(columns={"Polymer_Class": "polymer_class"}) |
| |
|
| | if "polymer_name" not in df.columns: |
| | df["polymer_name"] = pd.NA |
| | if "polymer_class" not in df.columns: |
| | df["polymer_class"] = pd.NA |
| |
|
| | df["smiles_key"] = df["smiles"].astype(str).map(lambda s: normalize_smiles(s, use_canonical_smiles)) |
| | df = df.dropna(subset=["smiles_key"]).drop_duplicates("smiles_key") |
| | df = df.set_index("smiles_key", drop=True) |
| | return df[["polymer_name", "polymer_class"]] |
| |
|
| |
|
| | |
| | |
| | |
| | def pareto_front_mask(X: np.ndarray) -> np.ndarray: |
| | """ |
| | Returns mask for nondominated points. |
| | X: (N, M), all objectives assumed to be minimized. |
| | For maximize objectives, we invert before calling this. |
| | """ |
| | N = X.shape[0] |
| | is_efficient = np.ones(N, dtype=bool) |
| | for i in range(N): |
| | if not is_efficient[i]: |
| | continue |
| | |
| | dominates = np.all(X <= X[i], axis=1) & np.any(X < X[i], axis=1) |
| | |
| | if np.any(dominates): |
| | is_efficient[i] = False |
| | continue |
| | |
| | dominated_by_i = np.all(X[i] <= X, axis=1) & np.any(X[i] < X, axis=1) |
| | is_efficient[dominated_by_i] = False |
| | is_efficient[i] = True |
| | return is_efficient |
| |
|
| |
|
| | def pareto_layers(X: np.ndarray, max_layers: int = 10) -> np.ndarray: |
| | """ |
| | Returns layer index per point: 1 = Pareto front, 2 = second layer, ... |
| | Unassigned points beyond max_layers get 0. |
| | """ |
| | N = X.shape[0] |
| | layers = np.zeros(N, dtype=int) |
| | remaining = np.arange(N) |
| |
|
| | layer = 1 |
| | while remaining.size > 0 and layer <= max_layers: |
| | mask = pareto_front_mask(X[remaining]) |
| | front_idx = remaining[mask] |
| | layers[front_idx] = layer |
| | remaining = remaining[~mask] |
| | layer += 1 |
| | return layers |
| |
|
| |
|
| | def pareto_front_mask_chunked( |
| | X: np.ndarray, |
| | chunk_size: int = 100000, |
| | progress_callback: Optional[Callable[[int, int], None]] = None, |
| | ) -> np.ndarray: |
| | """ |
| | Exact global Pareto front mask via chunk-local front reduction + global reconcile. |
| | This is exact for front-1: |
| | 1) compute exact local front within each chunk |
| | 2) union local fronts |
| | 3) compute exact front on the union |
| | """ |
| | N = X.shape[0] |
| | if N <= chunk_size: |
| | if progress_callback is not None: |
| | progress_callback(1, 1) |
| | return pareto_front_mask(X) |
| |
|
| | local_front_idx = [] |
| | total_chunks = (N + chunk_size - 1) // chunk_size |
| | done_chunks = 0 |
| | for start in range(0, N, chunk_size): |
| | end = min(start + chunk_size, N) |
| | idx = np.arange(start, end) |
| | mask_local = pareto_front_mask(X[idx]) |
| | local_front_idx.append(idx[mask_local]) |
| | done_chunks += 1 |
| | if progress_callback is not None: |
| | progress_callback(done_chunks, total_chunks) |
| |
|
| | if not local_front_idx: |
| | return np.zeros(N, dtype=bool) |
| |
|
| | reduced_idx = np.concatenate(local_front_idx) |
| | reduced_mask = pareto_front_mask(X[reduced_idx]) |
| | front_idx = reduced_idx[reduced_mask] |
| |
|
| | out = np.zeros(N, dtype=bool) |
| | out[front_idx] = True |
| | return out |
| |
|
| |
|
| | def pareto_layers_chunked( |
| | X: np.ndarray, |
| | max_layers: int = 10, |
| | chunk_size: int = 100000, |
| | progress_callback: Optional[Callable[[int, int, int], None]] = None, |
| | ) -> np.ndarray: |
| | """ |
| | Exact Pareto layers using repeated exact chunked front extraction. |
| | """ |
| | N = X.shape[0] |
| | layers = np.zeros(N, dtype=int) |
| | remaining = np.arange(N) |
| | layer = 1 |
| |
|
| | while remaining.size > 0 and layer <= max_layers: |
| | def on_chunk(done: int, total: int) -> None: |
| | if progress_callback is not None: |
| | progress_callback(layer, done, total) |
| |
|
| | mask = pareto_front_mask_chunked(X[remaining], chunk_size=chunk_size, progress_callback=on_chunk) |
| | front_idx = remaining[mask] |
| | layers[front_idx] = layer |
| | remaining = remaining[~mask] |
| | layer += 1 |
| |
|
| | return layers |
| |
|
| |
|
| | |
| | |
| | |
| | def morgan_fp(smiles: str, radius: int = 2, nbits: int = 2048): |
| | m = Chem.MolFromSmiles(smiles) |
| | if m is None: |
| | return None |
| | return AllChem.GetMorganFingerprintAsBitVect(m, radius, nBits=nbits) |
| |
|
| | def tanimoto_distance(fp1, fp2) -> float: |
| | return 1.0 - DataStructs.TanimotoSimilarity(fp1, fp2) |
| |
|
| | def greedy_diverse_select( |
| | smiles_list: List[str], |
| | scores: np.ndarray, |
| | max_k: int, |
| | min_dist: float, |
| | ) -> List[int]: |
| | """ |
| | Greedy selection by descending score, enforcing min Tanimoto distance. |
| | Returns indices into smiles_list. |
| | """ |
| | fps = [] |
| | valid_idx = [] |
| | for i, s in enumerate(smiles_list): |
| | fp = morgan_fp(s) |
| | if fp is not None: |
| | fps.append(fp) |
| | valid_idx.append(i) |
| |
|
| | if not valid_idx: |
| | return [] |
| |
|
| | |
| | order = np.argsort(-scores[valid_idx]) |
| | selected_global = [] |
| | selected_fps = [] |
| |
|
| | for oi in order: |
| | i = valid_idx[oi] |
| | fp_i = fps[oi] |
| | ok = True |
| | for fp_j in selected_fps: |
| | if tanimoto_distance(fp_i, fp_j) < min_dist: |
| | ok = False |
| | break |
| | if ok: |
| | selected_global.append(i) |
| | selected_fps.append(fp_i) |
| | if len(selected_global) >= max_k: |
| | break |
| |
|
| | return selected_global |
| |
|
| |
|
| | |
| | |
| | |
| | def internal_consistency_penalty(row: pd.Series) -> float: |
| | """ |
| | Very simple physics/validity checks. Penalty in [0,1]. |
| | Adjust/add rules later. |
| | """ |
| | viol = 0 |
| | total = 0 |
| |
|
| | def chk(cond: bool): |
| | nonlocal viol, total |
| | total += 1 |
| | if not cond: |
| | viol += 1 |
| |
|
| | |
| | for p in ["cp", "tc", "rho", "dif", "visc", "tg", "tm", "bandgap"]: |
| | c = mean_col(p) |
| | if c in row.index and pd.notna(row[c]): |
| | if p in ["bandgap", "tg", "tm"]: |
| | chk(float(row[c]) >= 0.0) |
| | else: |
| | chk(float(row[c]) > 0.0) |
| |
|
| | |
| | if mean_col("poisson") in row.index and pd.notna(row[mean_col("poisson")]): |
| | v = float(row[mean_col("poisson")]) |
| | chk(0.0 <= v <= 0.5) |
| |
|
| | |
| | if mean_col("tg") in row.index and mean_col("tm") in row.index: |
| | if pd.notna(row[mean_col("tg")]) and pd.notna(row[mean_col("tm")]): |
| | chk(float(row[mean_col("tg")]) <= float(row[mean_col("tm")])) |
| |
|
| | if total == 0: |
| | return 0.0 |
| | return viol / total |
| |
|
| |
|
| | def synthesizability_score(smiles: str) -> float: |
| | """ |
| | RDKit SA-score based synthesizability proxy in [0,1]. |
| | SA-score is ~[1 (easy), 10 (hard)]. |
| | We map: 1 -> 1.0, 10 -> 0.0 |
| | """ |
| | m = Chem.MolFromSmiles(smiles) |
| | if m is None: |
| | return 0.0 |
| |
|
| | |
| | try: |
| | sa_raw = sascorer.calculateScore(m) |
| | except Exception: |
| | return 0.0 |
| | if sa_raw is None: |
| | return 0.0 |
| |
|
| | sa = float(sa_raw) |
| | s_syn = 1.0 - (sa - 1.0) / 9.0 |
| | return float(np.clip(s_syn, 0.0, 1.0)) |
| |
|
| |
|
| | def compute_trust_scores( |
| | df: pd.DataFrame, |
| | real_fps: List, |
| | real_smiles: List[str], |
| | trust_weights: Dict[str, float] | None = None, |
| | ) -> np.ndarray: |
| | """ |
| | Trust score in [0,1] (higher = more trustworthy / lower risk). |
| | Components: |
| | - distance to nearest real polymer (fingerprint distance) |
| | - internal consistency penalty |
| | - uncertainty penalty (if std columns exist) |
| | - synthesizability |
| | """ |
| | N = len(df) |
| | trust = np.zeros(N, dtype=float) |
| | tw_defaults = {"real": 0.45, "consistency": 0.25, "uncertainty": 0.10, "synth": 0.20} |
| | tw = normalize_weights(trust_weights or {}, tw_defaults) |
| |
|
| | |
| | |
| | smiles_col = "smiles_key" if "smiles_key" in df.columns else "smiles_canon" |
| | for i in range(N): |
| | s = df.iloc[i][smiles_col] |
| | fp = morgan_fp(s) |
| | if fp is None or not real_fps: |
| | d_real = 1.0 |
| | else: |
| | sims = DataStructs.BulkTanimotoSimilarity(fp, real_fps) |
| | d_real = 1.0 - float(max(sims)) |
| |
|
| | |
| | pen_cons = internal_consistency_penalty(df.iloc[i]) |
| |
|
| | |
| | std_cols = [c for c in df.columns if c.startswith("std_")] |
| | if std_cols: |
| | std_vals = df.iloc[i][std_cols].astype(float) |
| | std_vals = std_vals.replace([np.inf, -np.inf], np.nan).dropna() |
| | pen_unc = float(np.clip(std_vals.mean() / (std_vals.mean() + 1.0), 0.0, 1.0)) if len(std_vals) else 0.0 |
| | else: |
| | pen_unc = 0.0 |
| |
|
| | |
| | s_syn = synthesizability_score(s) |
| |
|
| | |
| | |
| | s_real = 1.0 - np.clip(d_real, 0.0, 1.0) |
| |
|
| | trust[i] = ( |
| | tw["real"] * s_real + |
| | tw["consistency"] * (1.0 - pen_cons) + |
| | tw["uncertainty"] * (1.0 - pen_unc) + |
| | tw["synth"] * s_syn |
| | ) |
| |
|
| | trust = np.clip(trust, 0.0, 1.0) |
| | return trust |
| |
|
| |
|
| | |
| | |
| | |
| | def run_discovery( |
| | spec: DiscoverySpec, |
| | progress_callback: Optional[Callable[[str, float], None]] = None, |
| | ) -> Tuple[pd.DataFrame, Dict[str, float], pd.DataFrame]: |
| | def report(step: str, pct: float) -> None: |
| | if progress_callback is not None: |
| | progress_callback(step, pct) |
| |
|
| | rng = np.random.default_rng(spec.random_seed) |
| |
|
| | |
| | report("Preparing columns…", 0.02) |
| | obj_props = [o["property"].lower() for o in spec.objectives] |
| | cons_props = [p.lower() for p in spec.hard_constraints.keys()] |
| |
|
| | needed_props = sorted(set(obj_props + cons_props)) |
| | cols = ["SMILES"] + [mean_col(p) for p in needed_props] |
| |
|
| | |
| | std_cols = [std_col(p) for p in needed_props] |
| | cols += std_cols |
| |
|
| | |
| | report("Loading data from parquet…", 0.05) |
| | df = load_parquet_columns(spec.dataset, columns=[c for c in cols if c != "SMILES"] + ["SMILES"]) |
| | |
| | if "SMILES" not in df.columns and "smiles" in df.columns: |
| | df = df.rename(columns={"smiles": "SMILES"}) |
| | normalize_step = "Canonicalizing SMILES…" if spec.use_canonical_smiles else "Skipping SMILES normalization…" |
| | report(normalize_step, 0.10) |
| | df["smiles_key"] = df["SMILES"].astype(str).map(lambda s: normalize_smiles(s, spec.use_canonical_smiles)) |
| | df = df.dropna(subset=["smiles_key"]).reset_index(drop=True) |
| |
|
| | |
| | report("Applying constraints…", 0.22) |
| | for p, rule in spec.hard_constraints.items(): |
| | p = p.lower() |
| | c = mean_col(p) |
| | if c not in df.columns: |
| | |
| | df = df.iloc[0:0] |
| | break |
| | if "min" in rule: |
| | df = df[df[c] >= float(rule["min"])] |
| | if "max" in rule: |
| | df = df[df[c] <= float(rule["max"])] |
| |
|
| | n_after = len(df) |
| | if n_after == 0: |
| | empty_stats = {"n_total": 0, "n_after_constraints": 0, "n_pool": 0, "n_pareto_pool": 0, "n_selected": 0} |
| | return df, empty_stats, pd.DataFrame() |
| |
|
| | n_pool = len(df) |
| |
|
| | |
| | report("Building objective matrix…", 0.30) |
| | |
| | X = [] |
| | for o in spec.objectives: |
| | prop = o["property"].lower() |
| | goal = o["goal"].lower() |
| | c = mean_col(prop) |
| | if c not in df.columns: |
| | raise ValueError(f"Objective column missing: {c}") |
| | v = df[c].to_numpy(dtype=float) |
| | if goal == "maximize": |
| | v = -v |
| | X.append(v) |
| | X = np.stack(X, axis=1) |
| |
|
| | |
| | if spec.use_full_data: |
| | report("Using full dataset (no Pareto cap)…", 0.35) |
| | elif len(df) > spec.pareto_max: |
| | idx = rng.choice(len(df), size=spec.pareto_max, replace=False) |
| | df = df.iloc[idx].reset_index(drop=True) |
| | X = X[idx] |
| |
|
| | |
| | report("Computing Pareto layers…", 0.40) |
| | pareto_start = 0.40 |
| | pareto_end = 0.54 |
| | max_layers_for_pool = max(1, int(spec.max_pareto_fronts)) |
| | pareto_chunk_ref = {"chunks_per_layer": None} |
| |
|
| | def on_pareto_chunk(layer_i: int, done_chunks: int, total_chunks: int) -> None: |
| | if pareto_chunk_ref["chunks_per_layer"] is None: |
| | pareto_chunk_ref["chunks_per_layer"] = max(1, int(total_chunks)) |
| | ref_chunks = pareto_chunk_ref["chunks_per_layer"] |
| | total_units = max_layers_for_pool * ref_chunks |
| | done_units = min(total_units, ((layer_i - 1) * ref_chunks) + done_chunks) |
| | pareto_pct = int(round(100.0 * done_units / max(1, total_units))) |
| |
|
| | layer_progress = done_chunks / max(1, total_chunks) |
| | overall = ((layer_i - 1) + layer_progress) / max_layers_for_pool |
| | pct = pareto_start + (pareto_end - pareto_start) * min(1.0, max(0.0, overall)) |
| | report( |
| | f"Computing Pareto layers… {pareto_pct}% (Layer {layer_i}/{max_layers_for_pool}, chunk {done_chunks}/{total_chunks})", |
| | pct, |
| | ) |
| |
|
| | layers = pareto_layers_chunked( |
| | X, |
| | max_layers=max_layers_for_pool, |
| | chunk_size=100000, |
| | progress_callback=on_pareto_chunk, |
| | ) |
| | report("Computing Pareto layers…", pareto_end) |
| | df["pareto_layer"] = layers |
| | plot_df = df[["smiles_key"] + [mean_col(p) for p in obj_props] + ["pareto_layer"]].copy() |
| | plot_df = plot_df.rename(columns={"smiles_key": "SMILES"}) |
| |
|
| | |
| | cand = df[df["pareto_layer"].between(1, max_layers_for_pool)].copy() |
| | if cand.empty: |
| | cand = df[df["pareto_layer"] == 1].copy() |
| | cand = cand.reset_index(drop=True) |
| | n_pareto = len(cand) |
| |
|
| | |
| | report("Loading POLYINFO index…", 0.55) |
| | polyinfo = load_polyinfo_index(spec.polyinfo_csv, use_canonical_smiles=spec.use_canonical_smiles) |
| | real_smiles = polyinfo.index.to_list() |
| |
|
| | report("Building real-polymer fingerprints…", 0.60) |
| | real_fps = [] |
| | for s in real_smiles: |
| | fp = morgan_fp(s) |
| | if fp is not None: |
| | real_fps.append(fp) |
| |
|
| | |
| | report("Computing trust scores…", 0.70) |
| | trust = compute_trust_scores( |
| | cand, |
| | real_fps=real_fps, |
| | real_smiles=real_smiles, |
| | trust_weights=spec.trust_weights, |
| | ) |
| | cand["trust_score"] = trust |
| |
|
| | |
| | report("Diversity selection…", 0.88) |
| | |
| | |
| | sw_defaults = {"pareto": 0.60, "trust": 0.40} |
| | sw = normalize_weights(spec.selection_weights or {}, sw_defaults) |
| | pareto_bonus = ( |
| | (max_layers_for_pool + 1) - np.clip(cand["pareto_layer"].to_numpy(dtype=int), 1, max_layers_for_pool) |
| | ) / float(max_layers_for_pool) |
| | sel_score = sw["pareto"] * pareto_bonus + sw["trust"] * cand["trust_score"].to_numpy(dtype=float) |
| |
|
| | chosen_idx = greedy_diverse_select( |
| | smiles_list=cand["smiles_key"].tolist(), |
| | scores=sel_score, |
| | max_k=spec.max_candidates, |
| | min_dist=spec.min_distance, |
| | ) |
| | out = cand.iloc[chosen_idx].copy().reset_index(drop=True) |
| |
|
| | |
| | report("Finalizing results…", 0.96) |
| | out = out.set_index("smiles_key", drop=False) |
| | out = out.join(polyinfo, how="left") |
| | out = out.reset_index(drop=True) |
| |
|
| | |
| | |
| | keep = ["smiles_key", "polymer_name", "polymer_class", "pareto_layer", "trust_score"] |
| | for p in needed_props: |
| | mc = mean_col(p) |
| | sc = std_col(p) |
| | if mc in out.columns: |
| | keep.append(mc) |
| | if sc in out.columns: |
| | keep.append(sc) |
| |
|
| | out = out[keep].rename(columns={"smiles_key": "SMILES"}) |
| |
|
| | stats = { |
| | "n_total": float(len(df)), |
| | "n_after_constraints": float(n_after), |
| | "n_pool": float(n_pool), |
| | "n_pareto_pool": float(n_pareto), |
| | "n_selected": float(len(out)), |
| | } |
| | report("Done.", 1.0) |
| | return out, stats, plot_df |
| |
|
| |
|
| | def build_pareto_plot_df(spec: DiscoverySpec, max_plot_points: int = 30000) -> pd.DataFrame: |
| | """ |
| | Returns a small dataframe for plotting (sampled), with objective columns and pareto_layer. |
| | Does NOT compute trust/diversity. Safe for live plotting. |
| | """ |
| | rng = np.random.default_rng(spec.random_seed) |
| |
|
| | obj_props = [o["property"].lower() for o in spec.objectives] |
| | cons_props = [p.lower() for p in spec.hard_constraints.keys()] |
| | needed_props = sorted(set(obj_props + cons_props)) |
| |
|
| | cols = ["SMILES"] + [mean_col(p) for p in needed_props] |
| | df = load_parquet_columns(spec.dataset, columns=cols) |
| |
|
| | if "SMILES" not in df.columns and "smiles" in df.columns: |
| | df = df.rename(columns={"smiles": "SMILES"}) |
| |
|
| | df["smiles_key"] = df["SMILES"].astype(str).map(lambda s: normalize_smiles(s, spec.use_canonical_smiles)) |
| | df = df.dropna(subset=["smiles_key"]).reset_index(drop=True) |
| |
|
| | |
| | for p, rule in spec.hard_constraints.items(): |
| | p = p.lower() |
| | c = mean_col(p) |
| | if c not in df.columns: |
| | return df.iloc[0:0] |
| | if "min" in rule: |
| | df = df[df[c] >= float(rule["min"])] |
| | if "max" in rule: |
| | df = df[df[c] <= float(rule["max"])] |
| |
|
| | if len(df) == 0: |
| | return df |
| |
|
| | |
| | plot_cap = min(int(max_plot_points), int(spec.pareto_max)) |
| | if len(df) > plot_cap: |
| | idx = rng.choice(len(df), size=plot_cap, replace=False) |
| | df = df.iloc[idx].reset_index(drop=True) |
| |
|
| | |
| | X = [] |
| | for o in spec.objectives: |
| | prop = o["property"].lower() |
| | goal = o["goal"].lower() |
| | c = mean_col(prop) |
| | v = df[c].to_numpy(dtype=float) |
| | if goal == "maximize": |
| | v = -v |
| | X.append(v) |
| | X = np.stack(X, axis=1) |
| |
|
| | df["pareto_layer"] = pareto_layers(X, max_layers=5) |
| |
|
| | |
| | keep = ["smiles_key", "pareto_layer"] + [mean_col(p) for p in obj_props] |
| | out = df[keep].rename(columns={"smiles_key": "SMILES"}) |
| | return out |
| |
|
| |
|
| | def parse_spec(text: str, dataset_path: List[str], polyinfo_path: str, polyinfo_csv_path: str) -> DiscoverySpec: |
| | obj = json.loads(text) |
| | pareto_max = int(obj.get("pareto_max", 50000)) |
| |
|
| | return DiscoverySpec( |
| | dataset=list(dataset_path), |
| | polyinfo=polyinfo_path, |
| | polyinfo_csv=polyinfo_csv_path, |
| | hard_constraints=obj.get("hard_constraints", {}), |
| | objectives=obj.get("objectives", []), |
| | max_pool=pareto_max, |
| | pareto_max=pareto_max, |
| | max_candidates=int(obj.get("max_candidates", 30)), |
| | max_pareto_fronts=int(obj.get("max_pareto_fronts", 5)), |
| | min_distance=float(obj.get("min_distance", 0.30)), |
| | fingerprint=str(obj.get("fingerprint", "morgan")), |
| | random_seed=int(obj.get("random_seed", 7)), |
| | use_canonical_smiles=not bool(obj.get("skip_smiles_canonicalization", True)), |
| | use_full_data=bool(obj.get("use_full_data", False)), |
| | trust_weights=obj.get("trust_weights"), |
| | selection_weights=obj.get("selection_weights"), |
| | ) |
| |
|