Queimo's picture
Upload folder using huggingface_hub
c7c3124 verified
from __future__ import annotations
import time
from functools import lru_cache
from pathlib import Path
from typing import Iterable
import gradio as gr
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from catboost import CatBoostRegressor
from rdkit import Chem
from rdkit.Chem import Descriptors
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
from tabicl import TabICLRegressor
DEFAULT_TRAIN_PATH = Path(__file__).resolve().parent / "data" / "default_train.csv"
DEFAULT_TEST_PATH = Path(__file__).resolve().parent / "data" / "default_test.csv"
REQUIRED_COLUMNS = ["smiles", "value"]
def _validate_schema(df: pd.DataFrame, name: str) -> pd.DataFrame:
cols = [str(c).strip() for c in df.columns]
if cols != REQUIRED_COLUMNS:
raise ValueError(
f"{name} must have exactly these columns in order: {REQUIRED_COLUMNS}. "
f"Found: {cols}"
)
clean = df.copy()
clean["smiles"] = clean["smiles"].astype(str).str.strip()
clean["value"] = pd.to_numeric(clean["value"], errors="coerce")
if clean["smiles"].isna().any() or (clean["smiles"] == "").any():
raise ValueError(f"{name} contains empty smiles values.")
if clean["value"].isna().any():
raise ValueError(f"{name} contains non-numeric or missing value entries.")
invalid = [s for s in clean["smiles"].tolist() if Chem.MolFromSmiles(s) is None]
if invalid:
raise ValueError(
f"{name} contains invalid SMILES. First invalid example: {invalid[0]}"
)
return clean
def _load_input_data(
use_default_split: bool,
train_file: str | None,
test_file: str | None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
if use_default_split:
train_df = pd.read_csv(DEFAULT_TRAIN_PATH)
test_df = pd.read_csv(DEFAULT_TEST_PATH)
else:
if train_file is None or test_file is None:
raise ValueError(
"Please upload both train and test CSV files, or enable default split."
)
train_df = pd.read_csv(train_file)
test_df = pd.read_csv(test_file)
train_df = _validate_schema(train_df, "Train CSV")
test_df = _validate_schema(test_df, "Test CSV")
if len(train_df) < 2:
raise ValueError("Train CSV must contain at least 2 rows.")
if len(test_df) < 1:
raise ValueError("Test CSV must contain at least 1 row.")
return train_df, test_df
@lru_cache(maxsize=1)
def _get_mordred_calculator():
from mordred import Calculator, descriptors
calc = Calculator(descriptors, ignore_3D=True)
calc.config(timeout=1)
return calc
def _mordred_features(smiles: Iterable[str]) -> np.ndarray:
mols = [Chem.MolFromSmiles(s) for s in smiles]
calc = _get_mordred_calculator()
arr = calc.pandas(mols, nproc=1).fill_missing().to_numpy(dtype=np.float32)
arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
return arr.astype(np.float32)
@lru_cache(maxsize=1)
def _get_rdkit2d_descriptors() -> tuple[tuple[str, object], ...]:
return tuple(Descriptors._descList)
def _rdkit2d_features(smiles: Iterable[str]) -> np.ndarray:
descriptors = _get_rdkit2d_descriptors()
rows: list[list[float]] = []
for smile in smiles:
mol = Chem.MolFromSmiles(smile)
values: list[float] = []
for _, descriptor_fn in descriptors:
try:
values.append(float(descriptor_fn(mol)))
except Exception:
values.append(0.0)
rows.append(values)
arr = np.asarray(rows, dtype=np.float32)
arr = np.nan_to_num(arr, nan=0.0, posinf=0.0, neginf=0.0)
return arr.astype(np.float32)
class CheMeleonFingerprint:
def __init__(self) -> None:
from urllib.request import urlretrieve
import torch
from chemprop import featurizers, nn
from chemprop.models import MPNN
from chemprop.nn import RegressionFFN
self._torch = torch
self._mol_graph_featurizer = featurizers.SimpleMoleculeMolGraphFeaturizer()
agg = nn.MeanAggregation()
ckpt_dir = Path.home() / ".chemprop"
ckpt_dir.mkdir(exist_ok=True)
mp_path = ckpt_dir / "chemeleon_mp.pt"
if not mp_path.exists():
urlretrieve(
"https://zenodo.org/records/15460715/files/chemeleon_mp.pt",
mp_path,
)
chemeleon_mp = torch.load(mp_path, map_location="cpu", weights_only=True)
mp = nn.BondMessagePassing(**chemeleon_mp["hyper_parameters"])
mp.load_state_dict(chemeleon_mp["state_dict"])
self.model = MPNN(
message_passing=mp,
agg=agg,
predictor=RegressionFFN(input_dim=mp.output_dim),
)
self.model.eval()
self.model.to(device="cpu")
def __call__(self, smiles_batch: list[str]) -> np.ndarray:
from chemprop.data import BatchMolGraph
bmg = BatchMolGraph(
[self._mol_graph_featurizer(Chem.MolFromSmiles(s)) for s in smiles_batch]
)
bmg.to(device=self.model.device)
with self._torch.no_grad():
return self.model.fingerprint(bmg).numpy(force=True)
@lru_cache(maxsize=1)
def _get_chemeleon_fingerprinter() -> CheMeleonFingerprint:
return CheMeleonFingerprint()
def _chemeleon_features(smiles: Iterable[str], batch_size: int = 128) -> np.ndarray:
smiles_list = list(smiles)
fingerprinter = _get_chemeleon_fingerprinter()
batches: list[np.ndarray] = []
for start in range(0, len(smiles_list), batch_size):
batch = smiles_list[start : start + batch_size]
batches.append(np.asarray(fingerprinter(batch), dtype=np.float32))
return np.vstack(batches).astype(np.float32)
def _build_features(
featurizer_name: str,
train_smiles: Iterable[str],
test_smiles: Iterable[str],
) -> tuple[np.ndarray, np.ndarray]:
if featurizer_name == "RDKit2D":
x_train = _rdkit2d_features(train_smiles)
x_test = _rdkit2d_features(test_smiles)
return x_train, x_test
if featurizer_name == "Mordred":
x_train = _mordred_features(train_smiles)
x_test = _mordred_features(test_smiles)
return x_train, x_test
if featurizer_name == "CheMeleon":
x_train = _chemeleon_features(train_smiles)
x_test = _chemeleon_features(test_smiles)
return x_train, x_test
raise ValueError(f"Unsupported featurizer: {featurizer_name}")
def _scale_xy(
x_train: np.ndarray,
x_test: np.ndarray,
y_train: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, StandardScaler]:
x_scaler = StandardScaler()
x_train_scaled = x_scaler.fit_transform(x_train)
x_test_scaled = x_scaler.transform(x_test)
x_train_scaled = np.clip(x_train_scaled, -6.0, 6.0).astype(np.float32)
x_test_scaled = np.clip(x_test_scaled, -6.0, 6.0).astype(np.float32)
y_scaler = StandardScaler()
y_train_scaled = y_scaler.fit_transform(y_train.reshape(-1, 1)).ravel().astype(np.float32)
return x_train_scaled, x_test_scaled, y_train_scaled, y_scaler
def _parity_plot(
y_true: np.ndarray,
y_pred: np.ndarray,
title: str,
rmse: float,
r2: float,
):
fig, ax = plt.subplots(figsize=(4.8, 4.2), dpi=140)
lo = float(min(np.min(y_true), np.min(y_pred)))
hi = float(max(np.max(y_true), np.max(y_pred)))
pad = max((hi - lo) * 0.05, 1e-6)
lo -= pad
hi += pad
ax.scatter(y_true, y_pred, s=35, alpha=0.85)
ax.plot([lo, hi], [lo, hi], "k--", linewidth=1.3)
ax.set_xlim(lo, hi)
ax.set_ylim(lo, hi)
ax.set_xlabel("True value")
ax.set_ylabel("Predicted value")
ax.set_title(f"{title}\nRMSE={rmse:.4f} | R²={r2:.4f}")
ax.grid(alpha=0.2)
fig.tight_layout()
return fig
def _run_models(
featurizer_name: str,
use_default_split: bool,
train_file: str | None,
test_file: str | None,
):
train_df, test_df = _load_input_data(use_default_split, train_file, test_file)
x_train, x_test = _build_features(
featurizer_name=featurizer_name,
train_smiles=train_df["smiles"].tolist(),
test_smiles=test_df["smiles"].tolist(),
)
y_train = train_df["value"].to_numpy(dtype=np.float32)
y_test = test_df["value"].to_numpy(dtype=np.float32)
x_train_s, x_test_s, y_train_s, y_scaler = _scale_xy(x_train, x_test, y_train)
models = {
"TabICL": TabICLRegressor(
n_estimators=1,
random_state=42,
device="cpu",
n_jobs=1,
disk_offload_dir=str((Path(__file__).resolve().parent / "tabicl_offload").resolve()),
),
"RandomForest": RandomForestRegressor(random_state=42, n_jobs=1),
"CatBoost": CatBoostRegressor(
iterations=100,
random_seed=42,
thread_count=1,
verbose=False,
allow_writing_files=False,
),
}
rows: list[dict] = []
figures: dict[str, object] = {}
for model_name, model in models.items():
t0 = time.time()
model.fit(x_train_s, y_train_s)
y_pred_s = np.asarray(model.predict(x_test_s), dtype=np.float32).ravel()
y_pred = y_scaler.inverse_transform(y_pred_s.reshape(-1, 1)).ravel().astype(np.float32)
runtime_s = float(time.time() - t0)
rmse = float(np.sqrt(mean_squared_error(y_test, y_pred)))
r2 = float(r2_score(y_test, y_pred)) if len(y_test) > 1 else float("nan")
rows.append(
{
"model": model_name,
"rmse": rmse,
"r2": r2,
"runtime_s": runtime_s,
"n_train": int(len(y_train)),
"n_test": int(len(y_test)),
"n_features": int(x_train.shape[1]),
"featurizer": featurizer_name,
}
)
figures[model_name] = _parity_plot(y_test, y_pred, model_name, rmse, r2)
metrics = pd.DataFrame(rows).sort_values("rmse", ascending=True).reset_index(drop=True)
summary = (
f"Done. Featurizer={featurizer_name} | train={len(train_df)} rows | "
f"test={len(test_df)} rows | features={x_train.shape[1]}"
)
return (
metrics,
figures["TabICL"],
figures["RandomForest"],
figures["CatBoost"],
summary,
)
def run_demo(
featurizer_name: str,
use_default_split: bool,
train_file,
test_file,
):
train_path = None if train_file is None else str(train_file)
test_path = None if test_file is None else str(test_file)
try:
return _run_models(
featurizer_name=featurizer_name,
use_default_split=bool(use_default_split),
train_file=train_path,
test_file=test_path,
)
except Exception as exc:
return (
pd.DataFrame(),
None,
None,
None,
f"Error: {exc}",
)
DESCRIPTION = """
# TabICLmolprop Demo (CPU)
This Space compares **TabICL**, **RandomForest**, and **CatBoost** on molecular regression.
- Featurizer options: **RDKit2D**, **CheMeleon**, or **Mordred**
- Default data: fixed **DCN** split with **100 train** and **10 test** rows
- Custom data schema: CSV with exactly two columns in this order: `smiles,value`
Full Benchmark Repo: [https://git.rwth-aachen.de/avt-svt/public/tabpfn-molprop](https://git.rwth-aachen.de/avt-svt/public/tabpfn-molprop)
"""
AKNOWLEDGEMENTS = """
## Acknowledgements
This code uses [CheMeleon](https://github.com/JacksonBurns/chemeleon).
The code also uses the [TabICLv2](https://github.com/soda-inria/tabicl) Model.
Example dataset from here: [Graph neural networks for ignition quality prediction](https://git.rwth-aachen.de/avt-svt/public/graph_neural_network_for_fuel_ignition_quality)
"""
with gr.Blocks() as demo:
gr.Markdown(DESCRIPTION)
with gr.Row():
featurizer = gr.Dropdown(
choices=["RDKit2D", "CheMeleon", "Mordred"],
value="RDKit2D",
label="Featurizer",
)
use_default = gr.Checkbox(value=True, label="Use default DCN 100/10 split")
with gr.Row():
train_csv = gr.File(label="Train CSV (smiles,value)", file_types=[".csv"], type="filepath")
test_csv = gr.File(label="Test CSV (smiles,value)", file_types=[".csv"], type="filepath")
run_btn = gr.Button("Run Models")
metrics_out = gr.Dataframe(label="Metrics", wrap=True)
with gr.Row():
tapicl_plot = gr.Plot(label="TabICL parity")
rf_plot = gr.Plot(label="RandomForest parity")
cat_plot = gr.Plot(label="CatBoost parity")
status = gr.Textbox(label="Status", lines=2)
run_btn.click(
fn=run_demo,
inputs=[featurizer, use_default, train_csv, test_csv],
outputs=[metrics_out, tapicl_plot, rf_plot, cat_plot, status],
)
gr.Markdown(AKNOWLEDGEMENTS)
if __name__ == "__main__":
demo.launch(share=True)