""" TPD/TPR Simulator and Dataset Generation for Heterogeneous Catalysis. Implements 6 temperature-programmed desorption/reaction mechanisms: 1. FirstOrder - first-order desorption (Polanyi-Wigner, n=1) 2. SecondOrder - second-order/recombinative desorption (n=2) 3. LH_Surface - Langmuir-Hinshelwood bimolecular surface reaction 4. MvK - Mars-van Krevelen lattice oxygen mechanism 5. FirstOrderCovDep - first-order with coverage-dependent activation energy 6. DiffLimited - diffusion-limited desorption from porous materials Each mechanism is solved as an ODE with a linear temperature ramp T = T0 + beta*t. Supports multi-heating-rate generation (analogous to multi-scan-rate CVs in EC). Data format mirrors the electrochemistry pipeline for compatibility. """ import os import sys import json import argparse import numpy as np from tqdm import tqdm from multiprocessing import Pool, cpu_count import scipy.integrate # ============================================================================= # Mechanism registry # ============================================================================= TPD_MECHANISM_LIST = [ 'FirstOrder', 'SecondOrder', 'LH_Surface', 'MvK', 'FirstOrderCovDep', 'DiffLimited', ] TPD_MECHANISM_TO_ID = {m: i for i, m in enumerate(TPD_MECHANISM_LIST)} TPD_MECHANISM_PARAMS = { 'FirstOrder': { 'names': ['Ed', 'log10(nu)', 'theta_0'], 'dim': 3, }, 'SecondOrder': { 'names': ['Ed', 'log10(nu)', 'theta_0'], 'dim': 3, }, 'LH_Surface': { 'names': ['Ea', 'log10(nu)', 'theta_A0', 'theta_B0'], 'dim': 4, }, 'MvK': { 'names': ['Ea_red', 'Ea_reox', 'log10(nu_red)', 'theta_O0'], 'dim': 4, }, 'FirstOrderCovDep': { 'names': ['Ed0', 'alpha_cov', 'log10(nu)', 'theta_0'], 'dim': 4, }, 'DiffLimited': { 'names': ['Ed', 'log10(nu)', 'log10(D0)', 'E_diff', 'theta_0'], 'dim': 5, }, } # ============================================================================= # Simulation constants # ============================================================================= # We work in dimensionless temperature units: T_dimless = T_physical / T_ref. # Activation energies are also dimensionless: Ed_dimless = Ed_physical / (R * T_ref). # With T_ref = 1 K conceptually, T values are just Kelvin and Ed = Ea/(R) in K. # In practice we use T in Kelvin directly and Ed in units of K (= Ea/R). # This avoids carrying R everywhere. T_REF = 1.0 # Reference temperature (K); energies in K (= Ea/R) N_POINTS_DEFAULT = 500 # time steps per TPD curve # ============================================================================= # ODE Simulators # ============================================================================= def run_tpd_first_order(Ed, nu, theta_0, beta, T_start, T_end, n_points=N_POINTS_DEFAULT): """ First-order desorption: d(theta)/dt = -nu * theta * exp(-Ed / T(t)) where T(t) = T_start + beta * t. Parameters ---------- Ed : float Dimensionless desorption energy (= Ea / R, in Kelvin). nu : float Pre-exponential factor (s^-1). theta_0 : float Initial fractional surface coverage [0, 1]. beta : float Heating rate (K/s). T_start, T_end : float Temperature ramp range (K). n_points : int Number of output points. Returns ------- dict with 'temperature', 'rate', 'time', 'coverage', 'params' """ t_end = (T_end - T_start) / beta t_eval = np.linspace(0, t_end, n_points) def rhs(t, y): theta = y[0] T = T_start + beta * t if T < 1.0: T = 1.0 rate = -nu * theta * np.exp(-Ed / T) return [rate] sol = scipy.integrate.solve_ivp( rhs, [0, t_end], [theta_0], t_eval=t_eval, method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, ) theta = sol.y[0] temperature = T_start + beta * sol.t rate = nu * np.maximum(theta, 0) * np.exp(-Ed / np.maximum(temperature, 1.0)) return { 'temperature': temperature.astype(np.float32), 'rate': rate.astype(np.float32), 'time': sol.t.astype(np.float32), 'coverage': theta.astype(np.float32), 'params': { 'mechanism': 'FirstOrder', 'Ed': float(Ed), 'nu': float(nu), 'theta_0': float(theta_0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), }, } def run_tpd_second_order(Ed, nu, theta_0, beta, T_start, T_end, n_points=N_POINTS_DEFAULT): """ Second-order (recombinative) desorption: d(theta)/dt = -nu * theta^2 * exp(-Ed / T(t)) Peak position shifts to lower T with increasing theta_0 (key diagnostic). """ t_end = (T_end - T_start) / beta t_eval = np.linspace(0, t_end, n_points) def rhs(t, y): theta = y[0] T = T_start + beta * t if T < 1.0: T = 1.0 rate = -nu * theta ** 2 * np.exp(-Ed / T) return [rate] sol = scipy.integrate.solve_ivp( rhs, [0, t_end], [theta_0], t_eval=t_eval, method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, ) theta = sol.y[0] temperature = T_start + beta * sol.t rate = nu * np.maximum(theta, 0) ** 2 * np.exp(-Ed / np.maximum(temperature, 1.0)) return { 'temperature': temperature.astype(np.float32), 'rate': rate.astype(np.float32), 'time': sol.t.astype(np.float32), 'coverage': theta.astype(np.float32), 'params': { 'mechanism': 'SecondOrder', 'Ed': float(Ed), 'nu': float(nu), 'theta_0': float(theta_0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), }, } def run_tpd_lh_surface(Ea, nu, theta_A0, theta_B0, beta, T_start, T_end, n_points=N_POINTS_DEFAULT): """ Langmuir-Hinshelwood bimolecular surface reaction: A(ads) + B(ads) -> products d(theta_A)/dt = -nu * theta_A * theta_B * exp(-Ea / T) d(theta_B)/dt = -nu * theta_A * theta_B * exp(-Ea / T) rate = nu * theta_A * theta_B * exp(-Ea / T) """ t_end = (T_end - T_start) / beta t_eval = np.linspace(0, t_end, n_points) def rhs(t, y): theta_A, theta_B = y T = T_start + beta * t if T < 1.0: T = 1.0 r = -nu * theta_A * theta_B * np.exp(-Ea / T) return [r, r] sol = scipy.integrate.solve_ivp( rhs, [0, t_end], [theta_A0, theta_B0], t_eval=t_eval, method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, ) theta_A = sol.y[0] theta_B = sol.y[1] temperature = T_start + beta * sol.t rate = nu * np.maximum(theta_A, 0) * np.maximum(theta_B, 0) * \ np.exp(-Ea / np.maximum(temperature, 1.0)) return { 'temperature': temperature.astype(np.float32), 'rate': rate.astype(np.float32), 'time': sol.t.astype(np.float32), 'coverage': np.stack([theta_A, theta_B], axis=-1).astype(np.float32), 'params': { 'mechanism': 'LH_Surface', 'Ea': float(Ea), 'nu': float(nu), 'theta_A0': float(theta_A0), 'theta_B0': float(theta_B0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), }, } def run_tpd_mvk(Ea_red, Ea_reox, nu_red, theta_O0, beta, T_start, T_end, n_points=N_POINTS_DEFAULT): """ Mars-van Krevelen lattice oxygen mechanism: Reduction: rate_red = nu_red * theta_O * exp(-Ea_red / T) Reoxidation: rate_reox = nu_reox * (1 - theta_O) * exp(-Ea_reox / T) d(theta_O)/dt = rate_reox - rate_red Observable rate = rate_red (consumption of lattice oxygen) nu_reox is fixed at nu_red * 0.1 to reduce parameter count while keeping the two-process competition that creates the distinctive MvK peak shapes. """ nu_reox = nu_red * 0.1 t_end = (T_end - T_start) / beta t_eval = np.linspace(0, t_end, n_points) def rhs(t, y): theta_O = y[0] T = T_start + beta * t if T < 1.0: T = 1.0 r_red = nu_red * theta_O * np.exp(-Ea_red / T) r_reox = nu_reox * (1.0 - theta_O) * np.exp(-Ea_reox / T) return [r_reox - r_red] sol = scipy.integrate.solve_ivp( rhs, [0, t_end], [theta_O0], t_eval=t_eval, method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, ) theta_O = sol.y[0] temperature = T_start + beta * sol.t rate = nu_red * np.maximum(theta_O, 0) * \ np.exp(-Ea_red / np.maximum(temperature, 1.0)) return { 'temperature': temperature.astype(np.float32), 'rate': rate.astype(np.float32), 'time': sol.t.astype(np.float32), 'coverage': theta_O.astype(np.float32), 'params': { 'mechanism': 'MvK', 'Ea_red': float(Ea_red), 'Ea_reox': float(Ea_reox), 'nu_red': float(nu_red), 'nu_reox': float(nu_reox), 'theta_O0': float(theta_O0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), }, } def run_tpd_first_order_covdep(Ed0, alpha_cov, nu, theta_0, beta, T_start, T_end, n_points=N_POINTS_DEFAULT): """ First-order desorption with coverage-dependent activation energy: Ed(theta) = Ed0 + alpha_cov * theta d(theta)/dt = -nu * theta * exp(-Ed(theta) / T(t)) Repulsive lateral interactions (alpha_cov > 0) cause the peak to sharpen and shift to lower T as coverage decreases — a signature that standard Redhead analysis misinterprets as a change in Ed. Parameters ---------- Ed0 : float Zero-coverage desorption energy (K, = Ea0/R). alpha_cov : float Coverage-dependence coefficient (K). Positive = repulsive interactions. nu : float Pre-exponential factor (s^-1). theta_0 : float Initial fractional surface coverage [0, 1]. beta : float Heating rate (K/s). """ t_end = (T_end - T_start) / beta t_eval = np.linspace(0, t_end, n_points) def rhs(t, y): theta = y[0] T = T_start + beta * t if T < 1.0: T = 1.0 Ed_eff = Ed0 + alpha_cov * theta rate = -nu * theta * np.exp(-Ed_eff / T) return [rate] sol = scipy.integrate.solve_ivp( rhs, [0, t_end], [theta_0], t_eval=t_eval, method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, ) theta = sol.y[0] temperature = T_start + beta * sol.t Ed_eff = Ed0 + alpha_cov * np.maximum(theta, 0) rate = nu * np.maximum(theta, 0) * np.exp(-Ed_eff / np.maximum(temperature, 1.0)) return { 'temperature': temperature.astype(np.float32), 'rate': rate.astype(np.float32), 'time': sol.t.astype(np.float32), 'coverage': theta.astype(np.float32), 'params': { 'mechanism': 'FirstOrderCovDep', 'Ed0': float(Ed0), 'alpha_cov': float(alpha_cov), 'nu': float(nu), 'theta_0': float(theta_0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), }, } def run_tpd_diff_limited(Ed, nu, D0, E_diff, theta_0, beta, T_start, T_end, n_points=N_POINTS_DEFAULT, n_shells=20): """ Diffusion-limited desorption from a porous/layered material. Models a 1D spherical particle with n_shells concentric shells. Surface desorption follows first-order kinetics; replenishment of the surface layer is limited by intra-particle diffusion with an Arrhenius-type diffusivity D(T) = D0 * exp(-E_diff / T). This produces characteristic broadened, asymmetric peaks with long high-temperature tails that traditional Redhead/Kissinger methods cannot fit — the apparent activation energy depends on particle size and diffusivity. Parameters ---------- Ed : float Surface desorption energy (K, = Ea/R). nu : float Pre-exponential factor for desorption (s^-1). D0 : float Diffusion pre-exponential (s^-1, dimensionless Fourier units). E_diff : float Diffusion activation energy (K, = Ea_diff/R). theta_0 : float Initial uniform loading in all shells [0, 1]. beta : float Heating rate (K/s). n_shells : int Number of radial shells for the discretized diffusion. """ t_end = (T_end - T_start) / beta t_eval = np.linspace(0, t_end, n_points) # Radial grid: shells at r_i/R = (i+0.5)/n_shells, i=0..n_shells-1 # Shell 0 = center, shell n_shells-1 = surface dr = 1.0 / n_shells r = np.array([(i + 0.5) * dr for i in range(n_shells)]) r_face = np.array([i * dr for i in range(n_shells + 1)]) # Shell volumes (spherical): V_i = 4/3 pi (r_face[i+1]^3 - r_face[i]^3) vol = (4.0 / 3.0) * np.pi * (r_face[1:] ** 3 - r_face[:-1] ** 3) y0 = np.full(n_shells, theta_0) def rhs(t, y): T = T_start + beta * t if T < 1.0: T = 1.0 D = D0 * np.exp(-E_diff / T) dydt = np.zeros(n_shells) # Diffusion between adjacent shells (spherical coordinates) for i in range(n_shells - 1): area = 4.0 * np.pi * r_face[i + 1] ** 2 flux = D * area * (y[i] - y[i + 1]) / dr dydt[i] -= flux / vol[i] dydt[i + 1] += flux / vol[i + 1] # Surface desorption from outermost shell only k_des = nu * np.exp(-Ed / T) dydt[-1] -= k_des * y[-1] return dydt sol = scipy.integrate.solve_ivp( rhs, [0, t_end], y0, t_eval=t_eval, method='BDF', rtol=1e-8, atol=1e-10, max_step=t_end / 50, ) theta_all = sol.y # [n_shells, n_points] temperature = T_start + beta * sol.t # Observable rate = surface desorption rate theta_surf = np.maximum(theta_all[-1], 0) rate = nu * theta_surf * np.exp(-Ed / np.maximum(temperature, 1.0)) return { 'temperature': temperature.astype(np.float32), 'rate': rate.astype(np.float32), 'time': sol.t.astype(np.float32), 'coverage': theta_surf.astype(np.float32), 'params': { 'mechanism': 'DiffLimited', 'Ed': float(Ed), 'nu': float(nu), 'D0': float(D0), 'E_diff': float(E_diff), 'theta_0': float(theta_0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), 'n_shells': n_shells, }, } # ============================================================================= # Simulation dispatch # ============================================================================= def _run_single_tpd(params): """Run a single TPD simulation, dispatching to the correct mechanism.""" mech = params['mechanism'] beta = params['beta'] T_start = params['T_start'] T_end = params['T_end'] n_points = params.get('n_points', N_POINTS_DEFAULT) if mech == 'FirstOrder': return run_tpd_first_order( params['Ed'], params['nu'], params['theta_0'], beta, T_start, T_end, n_points, ) elif mech == 'SecondOrder': return run_tpd_second_order( params['Ed'], params['nu'], params['theta_0'], beta, T_start, T_end, n_points, ) elif mech == 'LH_Surface': return run_tpd_lh_surface( params['Ea'], params['nu'], params['theta_A0'], params['theta_B0'], beta, T_start, T_end, n_points, ) elif mech == 'MvK': return run_tpd_mvk( params['Ea_red'], params['Ea_reox'], params['nu_red'], params['theta_O0'], beta, T_start, T_end, n_points, ) elif mech == 'FirstOrderCovDep': return run_tpd_first_order_covdep( params['Ed0'], params['alpha_cov'], params['nu'], params['theta_0'], beta, T_start, T_end, n_points, ) elif mech == 'DiffLimited': return run_tpd_diff_limited( params['Ed'], params['nu'], params['D0'], params['E_diff'], params['theta_0'], beta, T_start, T_end, n_points, ) else: raise ValueError(f"Unknown TPD mechanism: {mech}") # ============================================================================= # Parameter sampling # ============================================================================= def _sample_common_tpd_params(rng): """Sample common TPD experiment parameters.""" T_start = rng.uniform(300, 400) T_end = rng.uniform(900, 1200) return T_start, T_end def _estimate_T_peak(Ed, log10_nu, beta): """Redhead estimate of peak temperature for first-order TPD. Solves Ed/T_peak^2 ≈ (nu/beta) * exp(-Ed/T_peak) iteratively. Good enough for rejection sampling.""" # Initial guess: T_peak ≈ Ed / (ln(nu*Ed/beta) - ln(T_peak^2)) # Simplified: T_peak ≈ Ed / (log10_nu * ln(10) - ln(Ed/beta)) ln_nu = log10_nu * np.log(10) T_est = Ed / (ln_nu - np.log(max(Ed / max(beta, 0.01), 1.0))) T_est = np.clip(T_est, 100, 2000) # One Newton step for _ in range(3): exp_term = np.exp(-Ed / max(T_est, 1.0)) f = Ed / (T_est ** 2) - (10 ** log10_nu / max(beta, 0.01)) * exp_term df = -2 * Ed / (T_est ** 3) - (10 ** log10_nu / max(beta, 0.01)) * exp_term * Ed / (T_est ** 2) if abs(df) > 1e-30: T_est = T_est - f / df T_est = np.clip(T_est, 100, 2000) return T_est def _sample_Ed_nu_beta(rng, T_start, T_end, max_attempts=50): """Sample (Ed, nu, beta) ensuring the estimated peak temperature falls within the measurement window [T_start+margin, T_end-margin].""" T_lo = T_start + 0.15 * (T_end - T_start) T_hi = T_end - 0.10 * (T_end - T_start) for _ in range(max_attempts): log10_nu = rng.uniform(12, 16) log10_beta = rng.uniform(-0.5, 1.5) beta = 10 ** log10_beta # Sample T_peak target, then back-solve for Ed T_peak_target = rng.uniform(T_lo, T_hi) ln_nu = log10_nu * np.log(10) # From Redhead: Ed ≈ T_peak * (ln(nu*T_peak/beta) - ln(T_peak)) # ≈ T_peak * (ln_nu + ln(T_peak) - ln(beta) - 3.64) Ed = T_peak_target * (ln_nu + np.log(T_peak_target) - np.log(beta) - 3.64) if Ed < 3000 or Ed > 50000: continue T_est = _estimate_T_peak(Ed, log10_nu, beta) if T_lo <= T_est <= T_hi: return Ed, 10 ** log10_nu, beta # Fallback: conservative parameters that always produce a mid-range peak beta = 10 ** rng.uniform(0.0, 1.0) T_peak_target = rng.uniform(T_lo, T_hi) log10_nu = 13.0 Ed = T_peak_target * (13.0 * np.log(10) + np.log(T_peak_target) - np.log(beta) - 3.64) return Ed, 10 ** log10_nu, beta def sample_first_order_params(rng): """Sample parameters for first-order desorption.""" T_start, T_end = _sample_common_tpd_params(rng) Ed, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end) theta_0 = rng.uniform(0.1, 1.0) return { 'mechanism': 'FirstOrder', 'Ed': float(Ed), 'nu': float(nu), 'theta_0': float(theta_0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), } def sample_second_order_params(rng): """Sample parameters for second-order desorption.""" T_start, T_end = _sample_common_tpd_params(rng) Ed, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end) theta_0 = rng.uniform(0.1, 1.0) return { 'mechanism': 'SecondOrder', 'Ed': float(Ed), 'nu': float(nu), 'theta_0': float(theta_0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), } def sample_lh_surface_params(rng): """Sample parameters for LH bimolecular surface reaction.""" T_start, T_end = _sample_common_tpd_params(rng) Ea, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end) theta_A0 = rng.uniform(0.1, 1.0) theta_B0 = rng.uniform(0.1, 1.0) return { 'mechanism': 'LH_Surface', 'Ea': float(Ea), 'nu': float(nu), 'theta_A0': float(theta_A0), 'theta_B0': float(theta_B0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), } def sample_mvk_params(rng): """Sample parameters for Mars-van Krevelen mechanism.""" T_start, T_end = _sample_common_tpd_params(rng) Ea_red, nu_red, beta = _sample_Ed_nu_beta(rng, T_start, T_end) # Reoxidation energy: sample independently but also constrain to reasonable range Ea_reox, _, _ = _sample_Ed_nu_beta(rng, T_start, T_end) theta_O0 = rng.uniform(0.5, 1.0) return { 'mechanism': 'MvK', 'Ea_red': float(Ea_red), 'Ea_reox': float(Ea_reox), 'nu_red': float(nu_red), 'theta_O0': float(theta_O0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), } def sample_first_order_covdep_params(rng): """Sample parameters for coverage-dependent first-order desorption. Ed(theta) = Ed0 + alpha_cov * theta. alpha_cov > 0 means repulsive lateral interactions (common for CO on metals). We sample Ed0 via the Redhead constraint at the *initial* coverage so the peak lands in the measurement window, then add a coverage-dependence coefficient that shifts the effective energy by up to ~30% of Ed0. """ T_start, T_end = _sample_common_tpd_params(rng) Ed0, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end) # alpha_cov in [0.05*Ed0, 0.35*Ed0]: ensures noticeable but not # overwhelming coverage dependence alpha_cov = rng.uniform(0.05, 0.35) * Ed0 theta_0 = rng.uniform(0.3, 1.0) return { 'mechanism': 'FirstOrderCovDep', 'Ed0': float(Ed0), 'alpha_cov': float(alpha_cov), 'nu': float(nu), 'theta_0': float(theta_0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), } def sample_diff_limited_params(rng): """Sample parameters for diffusion-limited desorption. The key physics: if E_diff is comparable to Ed, diffusion is the rate-limiting step and the TPD peak broadens with a long tail. If E_diff << Ed, diffusion is fast and the curve looks like standard first-order. We sample to ensure a range of diffusion-limitation regimes. """ T_start, T_end = _sample_common_tpd_params(rng) Ed, nu, beta = _sample_Ed_nu_beta(rng, T_start, T_end) # D0: dimensionless diffusion pre-exponential. # In Fourier number units (D*t/R^2), D0 ~ 1e2 to 1e6 gives a range # from strongly diffusion-limited to nearly surface-kinetics-limited. log10_D0 = rng.uniform(2.0, 6.0) D0 = 10 ** log10_D0 # E_diff: diffusion activation energy. # Ratio E_diff/Ed ~ 0.3 to 0.9 covers weakly to strongly limited regimes. E_diff = rng.uniform(0.3, 0.9) * Ed theta_0 = rng.uniform(0.3, 1.0) return { 'mechanism': 'DiffLimited', 'Ed': float(Ed), 'nu': float(nu), 'D0': float(D0), 'E_diff': float(E_diff), 'theta_0': float(theta_0), 'beta': float(beta), 'T_start': float(T_start), 'T_end': float(T_end), } def sample_tpd_params(rng, mechanism=None): """Sample TPD parameters, optionally for a specific mechanism.""" if mechanism is None: mechanism = rng.choice(TPD_MECHANISM_LIST) samplers = { 'FirstOrder': sample_first_order_params, 'SecondOrder': sample_second_order_params, 'LH_Surface': sample_lh_surface_params, 'MvK': sample_mvk_params, 'FirstOrderCovDep': sample_first_order_covdep_params, 'DiffLimited': sample_diff_limited_params, } return samplers[mechanism](rng) # ============================================================================= # Noise # ============================================================================= def _add_noise(signal, rng, noise_range=(0.001, 0.02)): """Add Gaussian noise to a signal. Returns (noisy_signal, sigma_noise). Clamps result to >= 0 since desorption/reaction rates are non-negative.""" sigma_noise = rng.uniform(*noise_range) peak = np.max(np.abs(signal)) + 1e-20 noise = sigma_noise * peak * rng.standard_normal(signal.shape) noisy = signal + noise.astype(signal.dtype) np.maximum(noisy, 0, out=noisy) return noisy, float(sigma_noise) # ============================================================================= # Multi-heating-rate sampling # ============================================================================= def _sample_heating_rates(rng, n_rates, log_beta_range=(-0.5, 1.5)): """Sample log-spaced heating rates spanning the given range.""" lo, hi = log_beta_range if n_rates == 1: return np.array([10 ** rng.uniform(lo, hi)]) anchors = np.linspace(lo, hi, n_rates) jitter = (hi - lo) / (n_rates - 1) * 0.3 log_betas = np.array([rng.uniform(a - jitter, a + jitter) for a in anchors]) log_betas = np.clip(log_betas, lo, hi) log_betas.sort() return 10 ** log_betas # ============================================================================= # Dataset generation # ============================================================================= def generate_sample_single(idx, outdir, seed, mechanism=None, n_heating_rates=1, add_noise=True): """ Generate and save a single TPD sample (single or multi-heating-rate). When n_heating_rates > 1, the same kinetic parameters are simulated at multiple heating rates and saved together. """ rng = np.random.default_rng(seed + idx) try: params = sample_tpd_params(rng, mechanism=mechanism) actual_mechanism = params['mechanism'] mechanism_id = TPD_MECHANISM_TO_ID[actual_mechanism] if n_heating_rates <= 1: result = _run_single_tpd(params) rate = result['rate'].copy() sigma_noise = 0.0 if add_noise: rate, sigma_noise = _add_noise(rate, rng) save_params = dict(params) save_params['sigma_noise'] = sigma_noise np.savez_compressed( os.path.join(outdir, f"sample_{idx:06d}.npz"), temperature=result['temperature'], rate=rate, time=result['time'], params=save_params, mechanism_id=np.int32(mechanism_id), ) meta = { 'idx': idx, 'success': True, 'mechanism': actual_mechanism, 'mechanism_id': int(mechanism_id), 'n_time': len(result['time']), 'beta': float(params['beta']), 'n_heating_rates': 1, 'sigma_noise': sigma_noise, } return meta # Multi-heating-rate: same kinetic params, different heating rates heating_rates = _sample_heating_rates(rng, n_heating_rates) temperatures, rates, times = [], [], [] for beta in heating_rates: p = dict(params) p['beta'] = float(beta) result = _run_single_tpd(p) temp = result['temperature'].copy() rate = result['rate'].copy() if add_noise: rate, _ = _add_noise(rate, rng) temperatures.append(temp) rates.append(rate) times.append(result['time'].copy()) # Pad to same length max_t = max(len(t) for t in temperatures) n_hr = len(heating_rates) temp_arr = np.zeros((n_hr, max_t), dtype=np.float32) rate_arr = np.zeros((n_hr, max_t), dtype=np.float32) time_arr = np.zeros((n_hr, max_t), dtype=np.float32) lengths = np.zeros(n_hr, dtype=np.int32) for i in range(n_hr): t_len = len(temperatures[i]) temp_arr[i, :t_len] = temperatures[i] rate_arr[i, :t_len] = rates[i] time_arr[i, :t_len] = times[i] lengths[i] = t_len save_params = dict(params) # Remove the single beta; heating_rates array is saved separately save_params.pop('beta', None) np.savez_compressed( os.path.join(outdir, f"sample_{idx:06d}.npz"), temperature=temp_arr, rate=rate_arr, time=time_arr, heating_rates=heating_rates.astype(np.float32), lengths=lengths, params=save_params, mechanism_id=np.int32(mechanism_id), n_heating_rates=np.int32(n_heating_rates), ) meta = { 'idx': idx, 'success': True, 'mechanism': actual_mechanism, 'mechanism_id': int(mechanism_id), 'n_time_max': int(max_t), 'heating_rates': [float(b) for b in heating_rates], 'n_heating_rates': n_heating_rates, } return meta except Exception as e: return { 'idx': idx, 'success': False, 'error': str(e), } def _worker_generate(args): """Worker function for multiprocessing (must be at module level).""" idx, outdir, seed, mechanism, n_heating_rates, add_noise = args return generate_sample_single(idx, outdir, seed, mechanism, n_heating_rates, add_noise) def generate_dataset( n_samples=1000, outdir="data_tpd/raw", seed=42, n_workers=None, mechanism=None, multi_mechanism=False, n_per_mechanism=None, n_heating_rates=1, add_noise=True, ): """Generate a dataset of TPD simulations.""" os.makedirs(outdir, exist_ok=True) if n_workers is None: n_workers = max(1, cpu_count() - 1) if multi_mechanism: if n_per_mechanism is None: n_per_mechanism = n_samples total = n_per_mechanism * len(TPD_MECHANISM_LIST) print(f"Generating multi-mechanism TPD dataset: " f"{n_per_mechanism} per mechanism x {len(TPD_MECHANISM_LIST)} = {total}") args_list = [] for mech_idx, mech in enumerate(TPD_MECHANISM_LIST): offset = mech_idx * n_per_mechanism for i in range(n_per_mechanism): args_list.append((offset + i, outdir, seed, mech, n_heating_rates, add_noise)) n_samples = total else: args_list = [(i, outdir, seed, mechanism, n_heating_rates, add_noise) for i in range(n_samples)] n_workers = min(n_workers, n_samples) print(f"Generating {n_samples} TPD samples...") print(f"Output directory: {outdir}") print(f"Heating rates per sample: {n_heating_rates}") print(f"Using {n_workers} worker(s)") metadata = [] if n_workers == 1: for args in tqdm(args_list, desc="Generating samples"): meta = _worker_generate(args) metadata.append(meta) if not meta['success']: print(f"\nSample {meta['idx']} failed: {meta.get('error', 'Unknown')}") else: try: with Pool(processes=n_workers) as pool: for meta in tqdm( pool.imap_unordered(_worker_generate, args_list, chunksize=max(1, n_samples // (n_workers * 4))), total=n_samples, desc="Generating samples", ): metadata.append(meta) if not meta['success']: tqdm.write(f"Sample {meta['idx']} failed: " f"{meta.get('error', 'Unknown')}") except (PermissionError, OSError) as e: print(f"\nWarning: Multiprocessing failed ({e}). " "Falling back to sequential...") metadata = [] for args in tqdm(args_list, desc="Generating samples (sequential)"): meta = _worker_generate(args) metadata.append(meta) if not meta['success']: print(f"\nSample {meta['idx']} failed: " f"{meta.get('error', 'Unknown')}") metadata = sorted(metadata, key=lambda x: x['idx']) n_success = sum(1 for m in metadata if m['success']) mech_counts = {} for m in metadata: if m['success']: mech = m.get('mechanism', 'Unknown') mech_counts[mech] = mech_counts.get(mech, 0) + 1 summary = { 'n_samples': n_samples, 'n_success': n_success, 'n_heating_rates': n_heating_rates, 'add_noise': add_noise, 'seed': seed, 'n_workers': n_workers, 'multi_mechanism': multi_mechanism, 'mechanism_counts': mech_counts, 'samples': metadata, } with open(os.path.join(outdir, "metadata.json"), "w") as f: json.dump(summary, f, indent=2) print(f"\nGeneration complete: {n_success}/{n_samples} successful") print(f"Mechanism counts: {mech_counts}") print(f"Metadata saved to {os.path.join(outdir, 'metadata.json')}") return metadata # ============================================================================= # Main # ============================================================================= if __name__ == "__main__": parser = argparse.ArgumentParser( description="Generate TPD dataset for catalysis mechanism identification" ) parser.add_argument("--n_samples", type=int, default=1000) parser.add_argument("--outdir", type=str, default="data_tpd/raw") parser.add_argument("--seed", type=int, default=42) parser.add_argument("--n_workers", type=int, default=None) parser.add_argument("--mechanism", type=str, default=None, choices=TPD_MECHANISM_LIST) parser.add_argument("--multi_mechanism", action="store_true") parser.add_argument("--n_per_mechanism", type=int, default=None) parser.add_argument("--n_heating_rates", type=int, default=1) parser.add_argument("--no_noise", action="store_true") parser.add_argument("--test", action="store_true", help="Run a single test simulation and plot") args = parser.parse_args() if args.test: print(f"Running test TPD simulations for all {len(TPD_MECHANISM_LIST)} mechanisms...\n") rng = np.random.default_rng(42) for mech in TPD_MECHANISM_LIST: params = sample_tpd_params(rng, mechanism=mech) result = _run_single_tpd(params) peak_rate = np.max(result['rate']) peak_T = result['temperature'][np.argmax(result['rate'])] print(f"{mech}:") print(f" T range: [{result['temperature'][0]:.0f}, " f"{result['temperature'][-1]:.0f}] K") print(f" Peak rate: {peak_rate:.4e} at T = {peak_T:.0f} K") print(f" Time steps: {len(result['time'])}") print(f" Params: {params}") print() try: import matplotlib.pyplot as plt fig, axes = plt.subplots(2, 2, figsize=(12, 10)) for ax, mech in zip(axes.flat, TPD_MECHANISM_LIST): params = sample_tpd_params(rng, mechanism=mech) for beta in [0.5, 2.0, 10.0]: p = dict(params) p['beta'] = beta result = _run_single_tpd(p) ax.plot(result['temperature'], result['rate'], label=f'beta={beta:.1f} K/s') ax.set_xlabel('Temperature (K)') ax.set_ylabel('Desorption/Reaction Rate') ax.set_title(mech) ax.legend(fontsize=8) plt.tight_layout() plt.savefig('test_tpd_simulation.png', dpi=150) print("Plot saved to test_tpd_simulation.png") except ImportError: print("matplotlib not available, skipping plot") else: generate_dataset( n_samples=args.n_samples, outdir=args.outdir, seed=args.seed, n_workers=args.n_workers, mechanism=args.mechanism, multi_mechanism=args.multi_mechanism, n_per_mechanism=args.n_per_mechanism, n_heating_rates=args.n_heating_rates, add_noise=not args.no_noise, )