mlip-arena / serve /ranks /stability.py
github-actions[ci]
Clean sync from main branch - 2025-10-16 23:00:12
afe68b4
from pathlib import Path
from typing import Literal
import numpy as np
import pandas as pd
import streamlit as st
from mlip_arena.models import REGISTRY
DATA_DIR = Path(__file__).parents[2] / "benchmarks" / "stability"
@st.cache_data
def get_data(model_list, run_type: Literal["heating", "compression"]) -> pd.DataFrame:
"""Load parquet files for selected models."""
dfs = []
for m in model_list:
fpath = (
DATA_DIR / REGISTRY[str(m)]["family"].lower() / f"{m}-{run_type}.parquet"
)
if not fpath.exists():
continue
df_local = pd.read_parquet(fpath)
df_local["method"] = str(m)
dfs.append(df_local)
return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
@st.cache_data
def prepare_scatter_df(df_in: pd.DataFrame, max_points: int = 20000) -> pd.DataFrame:
"""Prepare scatter dataframe with marker sizes scaled by total steps."""
dfp = df_in.dropna(subset=["natoms", "steps_per_second"]).copy()
if dfp.empty:
return dfp
# Downsample if too many points
if len(dfp) > max_points:
dfp = dfp.sample(max_points, random_state=1)
if "total_steps" in dfp.columns:
ts_local = dfp["total_steps"].fillna(dfp["total_steps"].median()).astype(float)
ts_range = ts_local.max() - ts_local.min()
scaled = (ts_local - ts_local.min()) / (ts_range if ts_range != 0 else 1.0)
dfp["_marker_size"] = (scaled * 40) + 5
else:
dfp["_marker_size"] = 8
return dfp
@st.cache_data
def compute_power_law_fits(df_in: pd.DataFrame) -> dict:
"""Fit power-law scaling: steps/s ~ a * N^(-n)."""
fits = {}
for name, grp in df_in.groupby("method"):
grp_clean = grp.dropna(subset=["natoms", "steps_per_second"])
grp_clean = grp_clean[
(grp_clean["natoms"] > 0) & (grp_clean["steps_per_second"] > 0)
]
if len(grp_clean) < 3:
continue
try:
logsx = np.log(grp_clean["natoms"].astype(float))
logsy = np.log(grp_clean["steps_per_second"].astype(float))
slope, intercept = np.polyfit(logsx, logsy, 1)
fits[name] = (float(np.exp(intercept)), float(-slope)) # (a, n)
except Exception:
continue
return fits
@st.cache_data
def compute_auc(df: pd.DataFrame) -> dict:
"""Compute area under the valid run curve per method."""
aucs = {}
for method, dfm in df.groupby("method"):
dfm = dfm.drop_duplicates(["formula"])
if dfm.empty:
continue
hist, bin_edges = np.histogram(
dfm["normalized_final_step"], bins=np.linspace(0, 1, 100)
)
cumulative_population = np.cumsum(hist)
valid_curve = (cumulative_population[-1] - cumulative_population) / len(dfm)
aucs[method] = np.trapz(valid_curve, bin_edges[:-1]) # trapezoidal integration
return aucs
# Load data
df_nvt = get_data(list(REGISTRY.keys()), run_type="heating")
df_npt = get_data(list(REGISTRY.keys()), run_type="compression")
# Compute metrics
aucs_nvt = compute_auc(df_nvt)
aucs_npt = compute_auc(df_npt)
fits_nvt = compute_power_law_fits(df_nvt)
fits_npt = compute_power_law_fits(df_npt)
# Build summary table
rows = []
for method in set(aucs_nvt) | set(aucs_npt):
row = {
"Model": method,
"AUC (Heating)": aucs_nvt.get(method, np.nan),
"AUC (Compression)": aucs_npt.get(method, np.nan),
"Scaling exponent (Heating)": fits_nvt.get(method, (np.nan, np.nan))[1],
"Scaling exponent (Compression)": fits_npt.get(method, (np.nan, np.nan))[1],
}
rows.append(row)
table = pd.DataFrame(rows).set_index("Model")
table["Rank"] = table["AUC (Heating)"].rank(ascending=False, na_option="bottom")
table["Rank"] += table["AUC (Compression)"].rank(ascending=False, na_option="bottom")
table["Rank"] += table["Scaling exponent (Heating)"].rank(
ascending=True, na_option="bottom"
)
table["Rank"] += table["Scaling exponent (Compression)"].rank(
ascending=True, na_option="bottom"
)
table.sort_values(["Rank"], ascending=True, inplace=True)
table["Rank aggr."] = table["Rank"].astype(int)
table["Rank"] = table["Rank aggr."].rank(method="min").astype(int)
table = table.reindex(
columns=[
"Rank",
"Rank aggr.",
"AUC (Heating)",
"AUC (Compression)",
"Scaling exponent (Heating)",
"Scaling exponent (Compression)",
]
)
@st.cache_data
def get_table():
return table
def render():
# Style
s = (
table.style.background_gradient(
cmap="Blues",
subset=["Rank", "Rank aggr."],
)
.background_gradient(
cmap="Greens_r", subset=["AUC (Heating)", "AUC (Compression)"]
)
.background_gradient(
cmap="Greens",
subset=["Scaling exponent (Heating)", "Scaling exponent (Compression)"],
)
.format(
"{:.3f}",
subset=[
"AUC (Heating)",
"AUC (Compression)",
"Scaling exponent (Heating)",
"Scaling exponent (Compression)",
],
na_rep="-",
)
)
st.dataframe(s, use_container_width=True)