from __future__ import annotations import time from functools import lru_cache from pathlib import Path from typing import Iterable import gradio as gr import matplotlib.pyplot as plt import numpy as np import pandas as pd from catboost import CatBoostRegressor from rdkit import Chem from rdkit.Chem import Descriptors from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_squared_error, r2_score from sklearn.preprocessing import StandardScaler from tabicl import TabICLRegressor DEFAULT_TRAIN_PATH = Path(__file__).resolve().parent / "data" / "default_train.csv" DEFAULT_TEST_PATH = Path(__file__).resolve().parent / "data" / "default_test.csv" REQUIRED_COLUMNS = ["smiles", "value"] def _validate_schema(df: pd.DataFrame, name: str) -> pd.DataFrame: cols = [str(c).strip() for c in df.columns] if cols != REQUIRED_COLUMNS: raise ValueError( f"{name} must have exactly these columns in order: {REQUIRED_COLUMNS}. " f"Found: {cols}" ) clean = df.copy() clean["smiles"] = clean["smiles"].astype(str).str.strip() clean["value"] = pd.to_numeric(clean["value"], errors="coerce") if clean["smiles"].isna().any() or (clean["smiles"] == "").any(): raise ValueError(f"{name} contains empty smiles values.") if clean["value"].isna().any(): raise ValueError(f"{name} contains non-numeric or missing value entries.") invalid = [s for s in clean["smiles"].tolist() if Chem.MolFromSmiles(s) is None] if invalid: raise ValueError( f"{name} contains invalid SMILES. First invalid example: {invalid[0]}" ) return clean def _load_input_data( use_default_split: bool, train_file: str | None, test_file: str | None, ) -> tuple[pd.DataFrame, pd.DataFrame]: if use_default_split: train_df = pd.read_csv(DEFAULT_TRAIN_PATH) test_df = pd.read_csv(DEFAULT_TEST_PATH) else: if train_file is None or test_file is None: raise ValueError( "Please upload both train and test CSV files, or enable default split." ) train_df = pd.read_csv(train_file) test_df = pd.read_csv(test_file) train_df = _validate_schema(train_df, "Train CSV") test_df = _validate_schema(test_df, "Test CSV") if len(train_df) < 2: raise ValueError("Train CSV must contain at least 2 rows.") if len(test_df) < 1: raise ValueError("Test CSV must contain at least 1 row.") return train_df, test_df @lru_cache(maxsize=1) def _get_mordred_calculator(): from mordred import Calculator, descriptors calc = Calculator(descriptors, ignore_3D=True) calc.config(timeout=1) return calc def _mordred_features(smiles: Iterable[str]) -> np.ndarray: mols = [Chem.MolFromSmiles(s) for s in smiles] calc = _get_mordred_calculator() arr = calc.pandas(mols, nproc=1).fill_missing().to_numpy(dtype=np.float32) arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) return arr.astype(np.float32) @lru_cache(maxsize=1) def _get_rdkit2d_descriptors() -> tuple[tuple[str, object], ...]: return tuple(Descriptors._descList) def _rdkit2d_features(smiles: Iterable[str]) -> np.ndarray: descriptors = _get_rdkit2d_descriptors() rows: list[list[float]] = [] for smile in smiles: mol = Chem.MolFromSmiles(smile) values: list[float] = [] for _, descriptor_fn in descriptors: try: values.append(float(descriptor_fn(mol))) except Exception: values.append(0.0) rows.append(values) arr = np.asarray(rows, dtype=np.float32) arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0) return arr.astype(np.float32) class CheMeleonFingerprint: def __init__(self) -> None: from urllib.request import urlretrieve import torch from chemprop import featurizers, nn from chemprop.models import MPNN from chemprop.nn import RegressionFFN self._torch = torch self._mol_graph_featurizer = featurizers.SimpleMoleculeMolGraphFeaturizer() agg = nn.MeanAggregation() ckpt_dir = Path.home() / ".chemprop" ckpt_dir.mkdir(exist_ok=True) mp_path = ckpt_dir / "chemeleon_mp.pt" if not mp_path.exists(): urlretrieve( "https://zenodo.org/records/15460715/files/chemeleon_mp.pt", mp_path, ) chemeleon_mp = torch.load(mp_path, map_location="cpu", weights_only=True) mp = nn.BondMessagePassing(**chemeleon_mp["hyper_parameters"]) mp.load_state_dict(chemeleon_mp["state_dict"]) self.model = MPNN( message_passing=mp, agg=agg, predictor=RegressionFFN(input_dim=mp.output_dim), ) self.model.eval() self.model.to(device="cpu") def __call__(self, smiles_batch: list[str]) -> np.ndarray: from chemprop.data import BatchMolGraph bmg = BatchMolGraph( [self._mol_graph_featurizer(Chem.MolFromSmiles(s)) for s in smiles_batch] ) bmg.to(device=self.model.device) with self._torch.no_grad(): return self.model.fingerprint(bmg).numpy(force=True) @lru_cache(maxsize=1) def _get_chemeleon_fingerprinter() -> CheMeleonFingerprint: return CheMeleonFingerprint() def _chemeleon_features(smiles: Iterable[str], batch_size: int = 128) -> np.ndarray: smiles_list = list(smiles) fingerprinter = _get_chemeleon_fingerprinter() batches: list[np.ndarray] = [] for start in range(0, len(smiles_list), batch_size): batch = smiles_list[start : start + batch_size] batches.append(np.asarray(fingerprinter(batch), dtype=np.float32)) return np.vstack(batches).astype(np.float32) def _build_features( featurizer_name: str, train_smiles: Iterable[str], test_smiles: Iterable[str], ) -> tuple[np.ndarray, np.ndarray]: if featurizer_name == "RDKit2D": x_train = _rdkit2d_features(train_smiles) x_test = _rdkit2d_features(test_smiles) return x_train, x_test if featurizer_name == "Mordred": x_train = _mordred_features(train_smiles) x_test = _mordred_features(test_smiles) return x_train, x_test if featurizer_name == "CheMeleon": x_train = _chemeleon_features(train_smiles) x_test = _chemeleon_features(test_smiles) return x_train, x_test raise ValueError(f"Unsupported featurizer: {featurizer_name}") def _scale_xy( x_train: np.ndarray, x_test: np.ndarray, y_train: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, StandardScaler]: x_scaler = StandardScaler() x_train_scaled = x_scaler.fit_transform(x_train) x_test_scaled = x_scaler.transform(x_test) x_train_scaled = np.clip(x_train_scaled, -6.0, 6.0).astype(np.float32) x_test_scaled = np.clip(x_test_scaled, -6.0, 6.0).astype(np.float32) y_scaler = StandardScaler() y_train_scaled = y_scaler.fit_transform(y_train.reshape(-1, 1)).ravel().astype(np.float32) return x_train_scaled, x_test_scaled, y_train_scaled, y_scaler def _parity_plot( y_true: np.ndarray, y_pred: np.ndarray, title: str, rmse: float, r2: float, ): fig, ax = plt.subplots(figsize=(4.8, 4.2), dpi=140) lo = float(min(np.min(y_true), np.min(y_pred))) hi = float(max(np.max(y_true), np.max(y_pred))) pad = max((hi - lo) * 0.05, 1e-6) lo -= pad hi += pad ax.scatter(y_true, y_pred, s=35, alpha=0.85) ax.plot([lo, hi], [lo, hi], "k--", linewidth=1.3) ax.set_xlim(lo, hi) ax.set_ylim(lo, hi) ax.set_xlabel("True value") ax.set_ylabel("Predicted value") ax.set_title(f"{title}\nRMSE={rmse:.4f} | R²={r2:.4f}") ax.grid(alpha=0.2) fig.tight_layout() return fig def _run_models( featurizer_name: str, use_default_split: bool, train_file: str | None, test_file: str | None, ): train_df, test_df = _load_input_data(use_default_split, train_file, test_file) x_train, x_test = _build_features( featurizer_name=featurizer_name, train_smiles=train_df["smiles"].tolist(), test_smiles=test_df["smiles"].tolist(), ) y_train = train_df["value"].to_numpy(dtype=np.float32) y_test = test_df["value"].to_numpy(dtype=np.float32) x_train_s, x_test_s, y_train_s, y_scaler = _scale_xy(x_train, x_test, y_train) models = { "TabICL": TabICLRegressor( n_estimators=1, random_state=42, device="cpu", n_jobs=1, disk_offload_dir=str((Path(__file__).resolve().parent / "tabicl_offload").resolve()), ), "RandomForest": RandomForestRegressor(random_state=42, n_jobs=1), "CatBoost": CatBoostRegressor( iterations=100, random_seed=42, thread_count=1, verbose=False, allow_writing_files=False, ), } rows: list[dict] = [] figures: dict[str, object] = {} for model_name, model in models.items(): t0 = time.time() model.fit(x_train_s, y_train_s) y_pred_s = np.asarray(model.predict(x_test_s), dtype=np.float32).ravel() y_pred = y_scaler.inverse_transform(y_pred_s.reshape(-1, 1)).ravel().astype(np.float32) runtime_s = float(time.time() - t0) rmse = float(np.sqrt(mean_squared_error(y_test, y_pred))) r2 = float(r2_score(y_test, y_pred)) if len(y_test) > 1 else float("nan") rows.append( { "model": model_name, "rmse": rmse, "r2": r2, "runtime_s": runtime_s, "n_train": int(len(y_train)), "n_test": int(len(y_test)), "n_features": int(x_train.shape[1]), "featurizer": featurizer_name, } ) figures[model_name] = _parity_plot(y_test, y_pred, model_name, rmse, r2) metrics = pd.DataFrame(rows).sort_values("rmse", ascending=True).reset_index(drop=True) summary = ( f"Done. Featurizer={featurizer_name} | train={len(train_df)} rows | " f"test={len(test_df)} rows | features={x_train.shape[1]}" ) return ( metrics, figures["TabICL"], figures["RandomForest"], figures["CatBoost"], summary, ) def run_demo( featurizer_name: str, use_default_split: bool, train_file, test_file, ): train_path = None if train_file is None else str(train_file) test_path = None if test_file is None else str(test_file) try: return _run_models( featurizer_name=featurizer_name, use_default_split=bool(use_default_split), train_file=train_path, test_file=test_path, ) except Exception as exc: return ( pd.DataFrame(), None, None, None, f"Error: {exc}", ) DESCRIPTION = """ # TabICLmolprop Demo (CPU) This Space compares **TabICL**, **RandomForest**, and **CatBoost** on molecular regression. - Featurizer options: **RDKit2D**, **CheMeleon**, or **Mordred** - Default data: fixed **DCN** split with **100 train** and **10 test** rows - Custom data schema: CSV with exactly two columns in this order: `smiles,value` Full Benchmark Repo: [https://git.rwth-aachen.de/avt-svt/public/tabpfn-molprop](https://git.rwth-aachen.de/avt-svt/public/tabpfn-molprop) """ AKNOWLEDGEMENTS = """ ## Acknowledgements This code uses [CheMeleon](https://github.com/JacksonBurns/chemeleon). The code also uses the [TabICLv2](https://github.com/soda-inria/tabicl) Model. Example dataset from here: [Graph neural networks for ignition quality prediction](https://git.rwth-aachen.de/avt-svt/public/graph_neural_network_for_fuel_ignition_quality) """ with gr.Blocks() as demo: gr.Markdown(DESCRIPTION) with gr.Row(): featurizer = gr.Dropdown( choices=["RDKit2D", "CheMeleon", "Mordred"], value="RDKit2D", label="Featurizer", ) use_default = gr.Checkbox(value=True, label="Use default DCN 100/10 split") with gr.Row(): train_csv = gr.File(label="Train CSV (smiles,value)", file_types=[".csv"], type="filepath") test_csv = gr.File(label="Test CSV (smiles,value)", file_types=[".csv"], type="filepath") run_btn = gr.Button("Run Models") metrics_out = gr.Dataframe(label="Metrics", wrap=True) with gr.Row(): tapicl_plot = gr.Plot(label="TabICL parity") rf_plot = gr.Plot(label="RandomForest parity") cat_plot = gr.Plot(label="CatBoost parity") status = gr.Textbox(label="Status", lines=2) run_btn.click( fn=run_demo, inputs=[featurizer, use_default, train_csv, test_csv], outputs=[metrics_out, tapicl_plot, rf_plot, cat_plot, status], ) gr.Markdown(AKNOWLEDGEMENTS) if __name__ == "__main__": demo.launch(share=True)