Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import time | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Iterable | |
| import gradio as gr | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| from catboost import CatBoostRegressor | |
| from rdkit import Chem | |
| from rdkit.Chem import Descriptors | |
| from sklearn.ensemble import RandomForestRegressor | |
| from sklearn.metrics import mean_squared_error, r2_score | |
| from sklearn.preprocessing import StandardScaler | |
| from tabicl import TabICLRegressor | |
| DEFAULT_TRAIN_PATH = Path(__file__).resolve().parent / "data" / "default_train.csv" | |
| DEFAULT_TEST_PATH = Path(__file__).resolve().parent / "data" / "default_test.csv" | |
| REQUIRED_COLUMNS = ["smiles", "value"] | |
| def _validate_schema(df: pd.DataFrame, name: str) -> pd.DataFrame: | |
| cols = [str(c).strip() for c in df.columns] | |
| if cols != REQUIRED_COLUMNS: | |
| raise ValueError( | |
| f"{name} must have exactly these columns in order: {REQUIRED_COLUMNS}. " | |
| f"Found: {cols}" | |
| ) | |
| clean = df.copy() | |
| clean["smiles"] = clean["smiles"].astype(str).str.strip() | |
| clean["value"] = pd.to_numeric(clean["value"], errors="coerce") | |
| if clean["smiles"].isna().any() or (clean["smiles"] == "").any(): | |
| raise ValueError(f"{name} contains empty smiles values.") | |
| if clean["value"].isna().any(): | |
| raise ValueError(f"{name} contains non-numeric or missing value entries.") | |
| invalid = [s for s in clean["smiles"].tolist() if Chem.MolFromSmiles(s) is None] | |
| if invalid: | |
| raise ValueError( | |
| f"{name} contains invalid SMILES. First invalid example: {invalid[0]}" | |
| ) | |
| return clean | |
| def _load_input_data( | |
| use_default_split: bool, | |
| train_file: str | None, | |
| test_file: str | None, | |
| ) -> tuple[pd.DataFrame, pd.DataFrame]: | |
| if use_default_split: | |
| train_df = pd.read_csv(DEFAULT_TRAIN_PATH) | |
| test_df = pd.read_csv(DEFAULT_TEST_PATH) | |
| else: | |
| if train_file is None or test_file is None: | |
| raise ValueError( | |
| "Please upload both train and test CSV files, or enable default split." | |
| ) | |
| train_df = pd.read_csv(train_file) | |
| test_df = pd.read_csv(test_file) | |
| train_df = _validate_schema(train_df, "Train CSV") | |
| test_df = _validate_schema(test_df, "Test CSV") | |
| if len(train_df) < 2: | |
| raise ValueError("Train CSV must contain at least 2 rows.") | |
| if len(test_df) < 1: | |
| raise ValueError("Test CSV must contain at least 1 row.") | |
| return train_df, test_df | |
| def _get_mordred_calculator(): | |
| from mordred import Calculator, descriptors | |
| calc = Calculator(descriptors, ignore_3D=True) | |
| calc.config(timeout=1) | |
| return calc | |
| def _mordred_features(smiles: Iterable[str]) -> np.ndarray: | |
| mols = [Chem.MolFromSmiles(s) for s in smiles] | |
| calc = _get_mordred_calculator() | |
| arr = calc.pandas(mols, nproc=1).fill_missing().to_numpy(dtype=np.float32) | |
| arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) | |
| return arr.astype(np.float32) | |
| def _get_rdkit2d_descriptors() -> tuple[tuple[str, object], ...]: | |
| return tuple(Descriptors._descList) | |
| def _rdkit2d_features(smiles: Iterable[str]) -> np.ndarray: | |
| descriptors = _get_rdkit2d_descriptors() | |
| rows: list[list[float]] = [] | |
| for smile in smiles: | |
| mol = Chem.MolFromSmiles(smile) | |
| values: list[float] = [] | |
| for _, descriptor_fn in descriptors: | |
| try: | |
| values.append(float(descriptor_fn(mol))) | |
| except Exception: | |
| values.append(0.0) | |
| rows.append(values) | |
| arr = np.asarray(rows, dtype=np.float32) | |
| arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) | |
| return arr.astype(np.float32) | |
| class CheMeleonFingerprint: | |
| def __init__(self) -> None: | |
| from urllib.request import urlretrieve | |
| import torch | |
| from chemprop import featurizers, nn | |
| from chemprop.models import MPNN | |
| from chemprop.nn import RegressionFFN | |
| self._torch = torch | |
| self._mol_graph_featurizer = featurizers.SimpleMoleculeMolGraphFeaturizer() | |
| agg = nn.MeanAggregation() | |
| ckpt_dir = Path.home() / ".chemprop" | |
| ckpt_dir.mkdir(exist_ok=True) | |
| mp_path = ckpt_dir / "chemeleon_mp.pt" | |
| if not mp_path.exists(): | |
| urlretrieve( | |
| "https://zenodo.org/records/15460715/files/chemeleon_mp.pt", | |
| mp_path, | |
| ) | |
| chemeleon_mp = torch.load(mp_path, map_location="cpu", weights_only=True) | |
| mp = nn.BondMessagePassing(**chemeleon_mp["hyper_parameters"]) | |
| mp.load_state_dict(chemeleon_mp["state_dict"]) | |
| self.model = MPNN( | |
| message_passing=mp, | |
| agg=agg, | |
| predictor=RegressionFFN(input_dim=mp.output_dim), | |
| ) | |
| self.model.eval() | |
| self.model.to(device="cpu") | |
| def __call__(self, smiles_batch: list[str]) -> np.ndarray: | |
| from chemprop.data import BatchMolGraph | |
| bmg = BatchMolGraph( | |
| [self._mol_graph_featurizer(Chem.MolFromSmiles(s)) for s in smiles_batch] | |
| ) | |
| bmg.to(device=self.model.device) | |
| with self._torch.no_grad(): | |
| return self.model.fingerprint(bmg).numpy(force=True) | |
| def _get_chemeleon_fingerprinter() -> CheMeleonFingerprint: | |
| return CheMeleonFingerprint() | |
| def _chemeleon_features(smiles: Iterable[str], batch_size: int = 128) -> np.ndarray: | |
| smiles_list = list(smiles) | |
| fingerprinter = _get_chemeleon_fingerprinter() | |
| batches: list[np.ndarray] = [] | |
| for start in range(0, len(smiles_list), batch_size): | |
| batch = smiles_list[start : start + batch_size] | |
| batches.append(np.asarray(fingerprinter(batch), dtype=np.float32)) | |
| return np.vstack(batches).astype(np.float32) | |
| def _build_features( | |
| featurizer_name: str, | |
| train_smiles: Iterable[str], | |
| test_smiles: Iterable[str], | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| if featurizer_name == "RDKit2D": | |
| x_train = _rdkit2d_features(train_smiles) | |
| x_test = _rdkit2d_features(test_smiles) | |
| return x_train, x_test | |
| if featurizer_name == "Mordred": | |
| x_train = _mordred_features(train_smiles) | |
| x_test = _mordred_features(test_smiles) | |
| return x_train, x_test | |
| if featurizer_name == "CheMeleon": | |
| x_train = _chemeleon_features(train_smiles) | |
| x_test = _chemeleon_features(test_smiles) | |
| return x_train, x_test | |
| raise ValueError(f"Unsupported featurizer: {featurizer_name}") | |
| def _scale_xy( | |
| x_train: np.ndarray, | |
| x_test: np.ndarray, | |
| y_train: np.ndarray, | |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray, StandardScaler]: | |
| x_scaler = StandardScaler() | |
| x_train_scaled = x_scaler.fit_transform(x_train) | |
| x_test_scaled = x_scaler.transform(x_test) | |
| x_train_scaled = np.clip(x_train_scaled, -6.0, 6.0).astype(np.float32) | |
| x_test_scaled = np.clip(x_test_scaled, -6.0, 6.0).astype(np.float32) | |
| y_scaler = StandardScaler() | |
| y_train_scaled = y_scaler.fit_transform(y_train.reshape(-1, 1)).ravel().astype(np.float32) | |
| return x_train_scaled, x_test_scaled, y_train_scaled, y_scaler | |
| def _parity_plot( | |
| y_true: np.ndarray, | |
| y_pred: np.ndarray, | |
| title: str, | |
| rmse: float, | |
| r2: float, | |
| ): | |
| fig, ax = plt.subplots(figsize=(4.8, 4.2), dpi=140) | |
| lo = float(min(np.min(y_true), np.min(y_pred))) | |
| hi = float(max(np.max(y_true), np.max(y_pred))) | |
| pad = max((hi - lo) * 0.05, 1e-6) | |
| lo -= pad | |
| hi += pad | |
| ax.scatter(y_true, y_pred, s=35, alpha=0.85) | |
| ax.plot([lo, hi], [lo, hi], "k--", linewidth=1.3) | |
| ax.set_xlim(lo, hi) | |
| ax.set_ylim(lo, hi) | |
| ax.set_xlabel("True value") | |
| ax.set_ylabel("Predicted value") | |
| ax.set_title(f"{title}\nRMSE={rmse:.4f} | R²={r2:.4f}") | |
| ax.grid(alpha=0.2) | |
| fig.tight_layout() | |
| return fig | |
| def _run_models( | |
| featurizer_name: str, | |
| use_default_split: bool, | |
| train_file: str | None, | |
| test_file: str | None, | |
| ): | |
| train_df, test_df = _load_input_data(use_default_split, train_file, test_file) | |
| x_train, x_test = _build_features( | |
| featurizer_name=featurizer_name, | |
| train_smiles=train_df["smiles"].tolist(), | |
| test_smiles=test_df["smiles"].tolist(), | |
| ) | |
| y_train = train_df["value"].to_numpy(dtype=np.float32) | |
| y_test = test_df["value"].to_numpy(dtype=np.float32) | |
| x_train_s, x_test_s, y_train_s, y_scaler = _scale_xy(x_train, x_test, y_train) | |
| models = { | |
| "TabICL": TabICLRegressor( | |
| n_estimators=1, | |
| random_state=42, | |
| device="cpu", | |
| n_jobs=1, | |
| disk_offload_dir=str((Path(__file__).resolve().parent / "tabicl_offload").resolve()), | |
| ), | |
| "RandomForest": RandomForestRegressor(random_state=42, n_jobs=1), | |
| "CatBoost": CatBoostRegressor( | |
| iterations=100, | |
| random_seed=42, | |
| thread_count=1, | |
| verbose=False, | |
| allow_writing_files=False, | |
| ), | |
| } | |
| rows: list[dict] = [] | |
| figures: dict[str, object] = {} | |
| for model_name, model in models.items(): | |
| t0 = time.time() | |
| model.fit(x_train_s, y_train_s) | |
| y_pred_s = np.asarray(model.predict(x_test_s), dtype=np.float32).ravel() | |
| y_pred = y_scaler.inverse_transform(y_pred_s.reshape(-1, 1)).ravel().astype(np.float32) | |
| runtime_s = float(time.time() - t0) | |
| rmse = float(np.sqrt(mean_squared_error(y_test, y_pred))) | |
| r2 = float(r2_score(y_test, y_pred)) if len(y_test) > 1 else float("nan") | |
| rows.append( | |
| { | |
| "model": model_name, | |
| "rmse": rmse, | |
| "r2": r2, | |
| "runtime_s": runtime_s, | |
| "n_train": int(len(y_train)), | |
| "n_test": int(len(y_test)), | |
| "n_features": int(x_train.shape[1]), | |
| "featurizer": featurizer_name, | |
| } | |
| ) | |
| figures[model_name] = _parity_plot(y_test, y_pred, model_name, rmse, r2) | |
| metrics = pd.DataFrame(rows).sort_values("rmse", ascending=True).reset_index(drop=True) | |
| summary = ( | |
| f"Done. Featurizer={featurizer_name} | train={len(train_df)} rows | " | |
| f"test={len(test_df)} rows | features={x_train.shape[1]}" | |
| ) | |
| return ( | |
| metrics, | |
| figures["TabICL"], | |
| figures["RandomForest"], | |
| figures["CatBoost"], | |
| summary, | |
| ) | |
| def run_demo( | |
| featurizer_name: str, | |
| use_default_split: bool, | |
| train_file, | |
| test_file, | |
| ): | |
| train_path = None if train_file is None else str(train_file) | |
| test_path = None if test_file is None else str(test_file) | |
| try: | |
| return _run_models( | |
| featurizer_name=featurizer_name, | |
| use_default_split=bool(use_default_split), | |
| train_file=train_path, | |
| test_file=test_path, | |
| ) | |
| except Exception as exc: | |
| return ( | |
| pd.DataFrame(), | |
| None, | |
| None, | |
| None, | |
| f"Error: {exc}", | |
| ) | |
| DESCRIPTION = """ | |
| # TabICLmolprop Demo (CPU) | |
| This Space compares **TabICL**, **RandomForest**, and **CatBoost** on molecular regression. | |
| - Featurizer options: **RDKit2D**, **CheMeleon**, or **Mordred** | |
| - Default data: fixed **DCN** split with **100 train** and **10 test** rows | |
| - Custom data schema: CSV with exactly two columns in this order: `smiles,value` | |
| Full Benchmark Repo: [https://git.rwth-aachen.de/avt-svt/public/tabpfn-molprop](https://git.rwth-aachen.de/avt-svt/public/tabpfn-molprop) | |
| """ | |
| AKNOWLEDGEMENTS = """ | |
| ## Acknowledgements | |
| This code uses [CheMeleon](https://github.com/JacksonBurns/chemeleon). | |
| The code also uses the [TabICLv2](https://github.com/soda-inria/tabicl) Model. | |
| Example dataset from here: [Graph neural networks for ignition quality prediction](https://git.rwth-aachen.de/avt-svt/public/graph_neural_network_for_fuel_ignition_quality) | |
| """ | |
| with gr.Blocks() as demo: | |
| gr.Markdown(DESCRIPTION) | |
| with gr.Row(): | |
| featurizer = gr.Dropdown( | |
| choices=["RDKit2D", "CheMeleon", "Mordred"], | |
| value="RDKit2D", | |
| label="Featurizer", | |
| ) | |
| use_default = gr.Checkbox(value=True, label="Use default DCN 100/10 split") | |
| with gr.Row(): | |
| train_csv = gr.File(label="Train CSV (smiles,value)", file_types=[".csv"], type="filepath") | |
| test_csv = gr.File(label="Test CSV (smiles,value)", file_types=[".csv"], type="filepath") | |
| run_btn = gr.Button("Run Models") | |
| metrics_out = gr.Dataframe(label="Metrics", wrap=True) | |
| with gr.Row(): | |
| tapicl_plot = gr.Plot(label="TabICL parity") | |
| rf_plot = gr.Plot(label="RandomForest parity") | |
| cat_plot = gr.Plot(label="CatBoost parity") | |
| status = gr.Textbox(label="Status", lines=2) | |
| run_btn.click( | |
| fn=run_demo, | |
| inputs=[featurizer, use_default, train_csv, test_csv], | |
| outputs=[metrics_out, tapicl_plot, rf_plot, cat_plot, status], | |
| ) | |
| gr.Markdown(AKNOWLEDGEMENTS) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) | |