| """ |
| Dataset Generation Script for DiffEC Flow Matching Model |
| |
| This script generates simulated electrochemical datasets using DiffEC simulators. |
| Each sample contains: |
| - Input: current (flux) and potential profiles over time |
| - Output: 1D temporal concentration distribution for oxidized/reduced species |
| |
| The data format is designed for training flow matching models that learn to |
| generate concentration profiles from electrochemical measurements. |
| |
| Supports multiprocessing for fast parallel dataset generation. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import argparse |
| import numpy as np |
| from tqdm import tqdm |
| from multiprocessing import Pool, cpu_count |
| from functools import partial |
|
|
| |
| import warnings |
| import scipy.linalg |
|
|
| warnings.filterwarnings("ignore", message=".*[Ii]ll.conditioned.*") |
|
|
|
|
| |
| |
| |
| CYCLES = 1 |
| DELTA_X = 2e-6 |
| DELTA_THETA = 5e-2 |
| EXPANDING_GRID_FACTOR = 1.05 |
| SIMULATION_SPACE_MULTIPLE = 6.0 |
|
|
|
|
| |
| |
| |
|
|
| def calc_n(Xi, deltaX, maxX, expanding_grid_factor): |
| """Calculate number of grid points.""" |
| current_X = Xi |
| n = 0 |
| dX = deltaX |
| while current_X < maxX: |
| current_X += dX |
| dX = dX * expanding_grid_factor |
| n += 1 |
| return n + 1 |
|
|
|
|
| def gen_grid(Xi, deltaX, maxX, expanding_grid_factor): |
| """Generate expanding spatial grid.""" |
| n = calc_n(Xi, deltaX, maxX, expanding_grid_factor) |
| X_grid = np.zeros(n) |
| X_grid[0] = Xi |
| dX = deltaX |
| for i in range(1, n): |
| X_grid[i] = X_grid[i-1] + dX |
| dX = dX * expanding_grid_factor |
| return X_grid, n |
|
|
|
|
| def ini_conc(n, C_A_bulk, C_B_bulk): |
| """Initialize concentration arrays.""" |
| conc = np.zeros(2 * n) |
| conc[:n] = C_A_bulk |
| conc[n:] = C_B_bulk |
| conc_d = conc.copy() |
| return conc, conc_d |
|
|
|
|
| def ini_coeff(n): |
| """Initialize coefficient arrays.""" |
| A_matrix = np.zeros((2*n, 2*n)) |
| aA = np.zeros(n) |
| bA = np.zeros(n) |
| cA = np.zeros(n) |
| aB = np.zeros(n) |
| bB = np.zeros(n) |
| cB = np.zeros(n) |
| return A_matrix, aA, bA, cA, aB, bB, cB |
|
|
|
|
| def calc_abc_linear(n, X_grid, deltaT, a, b, c, D): |
| """Calculate coefficients for linear diffusion.""" |
| for i in range(1, n-1): |
| deltaX_m = X_grid[i] - X_grid[i-1] |
| deltaX_p = X_grid[i+1] - X_grid[i] |
| a[i] = D * (-2.0 * deltaT) / (deltaX_m * (deltaX_m + deltaX_p)) |
| c[i] = D * (-2.0 * deltaT) / (deltaX_p * (deltaX_m + deltaX_p)) |
| b[i] = 1.0 - a[i] - c[i] |
| return a, b, c |
|
|
|
|
| def update_d(Theta, conc, conc_d, n, C_A_bulk, C_B_bulk, kinetics, **kw): |
| """Update RHS vector.""" |
| conc_d[:] = conc[:] |
| if kinetics == 'Nernst': |
| conc_d[n-1] = 1.0 / (1.0 + np.exp(-Theta)) |
| conc_d[n] = 0.0 |
| elif kinetics in ('BV', 'MHC'): |
| conc_d[n-1] = 0.0 |
| conc_d[n] = 0.0 |
| elif kinetics == 'Ads': |
| conc_d[n-1] = 0.0 |
| conc_d[n] = 0.0 |
| conc_d[0] = C_A_bulk |
| conc_d[2*n-1] = C_B_bulk |
| return conc_d |
|
|
|
|
| |
| |
| |
|
|
| def _mhc_integrand_red(Theta_effective, reorg_e, hermgauss_degree=50): |
| """Gauss-Hermite quadrature for MHC reduction rate integral.""" |
| pts, wts = np.polynomial.hermite.hermgauss(hermgauss_degree) |
| y = 2 * np.sqrt(reorg_e) / ( |
| 1.0 + np.exp(-(reorg_e * (pts * 2.0 / np.sqrt(reorg_e) - 1.0) - Theta_effective)) |
| ) |
| return np.sum(wts * y) |
|
|
|
|
| def _mhc_integrand_ox(Theta_effective, reorg_e, hermgauss_degree=50): |
| """Gauss-Hermite quadrature for MHC oxidation rate integral.""" |
| pts, wts = np.polynomial.hermite.hermgauss(hermgauss_degree) |
| y = -2.0 * np.sqrt(reorg_e) / ( |
| 1.0 + np.exp(-reorg_e * (pts * 2.0 / np.sqrt(reorg_e) - 1.0) - Theta_effective) |
| ) |
| return np.sum(wts * y) |
|
|
|
|
| def calc_mhc_rates(Theta, K0, reorg_e): |
| """Compute MHC reduction and oxidation rate constants.""" |
| I_red = _mhc_integrand_red(Theta, reorg_e) |
| I_red0 = _mhc_integrand_red(0.0, reorg_e) |
| I_ox = _mhc_integrand_ox(Theta, reorg_e) |
| I_ox0 = _mhc_integrand_ox(0.0, reorg_e) |
| K_red = K0 * I_red / I_red0 |
| K_ox = K0 * I_ox / I_ox0 |
| return K_red, K_ox |
|
|
|
|
| def calc_matrix(A_matrix, X_grid, Theta, kinetics, n, aA, bA, cA, dA, aB, bB, cB, dB, K0, alpha, beta, **kw): |
| """Build the coefficient matrix for all supported kinetics.""" |
| |
| rows_A = np.arange(n-2, 0, -1) |
| A_matrix[rows_A, rows_A - 1] = cA[1:n-1] |
| A_matrix[rows_A, rows_A] = bA[1:n-1] |
| A_matrix[rows_A, rows_A + 1] = aA[1:n-1] |
|
|
| |
| rows_B = np.arange(n+1, 2*n-1) |
| A_matrix[rows_B, rows_B - 1] = aB[1:n-1] |
| A_matrix[rows_B, rows_B] = bB[1:n-1] |
| A_matrix[rows_B, rows_B + 1] = cB[1:n-1] |
|
|
| X0 = X_grid[1] - X_grid[0] |
|
|
| if kinetics == 'Nernst': |
| A_matrix[n-1, n-2] = 0.0 |
| A_matrix[n-1, n-1] = 1.0 |
| A_matrix[n-1, n] = 0.0 |
| A_matrix[n, n-2] = -dA |
| A_matrix[n, n-1] = dA |
| A_matrix[n, n] = dB |
| A_matrix[n, n+1] = -dB |
|
|
| elif kinetics == 'BV': |
| K_red = K0 * np.exp(-alpha * Theta) |
| K_ox = K0 * np.exp(beta * Theta) |
|
|
| A_matrix[n-1, n-2] = -1.0 |
| A_matrix[n-1, n-1] = 1.0 + X0/dA * K_red |
| A_matrix[n-1, n] = -X0/dA * K_ox |
|
|
| A_matrix[n, n-1] = -X0/dB * K_red |
| A_matrix[n, n] = 1.0 + X0/dB * K_ox |
| A_matrix[n, n+1] = -1.0 |
|
|
| elif kinetics == 'MHC': |
| reorg_e = kw.get('reorg_e', 20.0) |
| K_red, K_ox = calc_mhc_rates(Theta, K0, reorg_e) |
|
|
| A_matrix[n-1, n-2] = -1.0 |
| A_matrix[n-1, n-1] = 1.0 + X0/dA * K_red |
| A_matrix[n-1, n] = -X0/dA * K_ox |
|
|
| A_matrix[n, n-1] = -X0/dB * K_red |
| A_matrix[n, n] = 1.0 + X0/dB * K_ox |
| A_matrix[n, n+1] = -1.0 |
|
|
| elif kinetics == 'Ads': |
| K_red = K0 * np.exp(-alpha * Theta) |
| K_ox = K0 * np.exp(beta * Theta) |
| Gamma_sat = kw.get('Gamma_sat', 1.0) |
|
|
| A_matrix[n-1, n-2] = -1.0 |
| A_matrix[n-1, n-1] = 1.0 + X0/dA * K_red * Gamma_sat |
| A_matrix[n-1, n] = -X0/dA * K_ox * Gamma_sat |
|
|
| A_matrix[n, n-1] = -X0/dB * K_red * Gamma_sat |
| A_matrix[n, n] = 1.0 + X0/dB * K_ox * Gamma_sat |
| A_matrix[n, n+1] = -1.0 |
|
|
| |
| A_matrix[0, 0] = 1.0 |
| A_matrix[0, 1] = 0.0 |
| A_matrix[2*n-1, 2*n-1] = 1.0 |
| A_matrix[2*n-1, 2*n-2] = 0.0 |
|
|
| return A_matrix |
|
|
|
|
| def calc_flux(conc, n, dA, X_grid): |
| """Calculate current flux at electrode surface.""" |
| return -dA * (conc[n-2] - conc[n-1]) / (X_grid[1] - X_grid[0]) |
|
|
|
|
| |
| |
| |
|
|
| def run_cv_simulation( |
| sigma=1.0, |
| K0=1.0, |
| alpha=0.5, |
| beta=None, |
| kinetics='BV', |
| C_A_bulk=1.0, |
| C_B_bulk=0.0, |
| dA=1.0, |
| dB=1.0, |
| theta_i=20.0, |
| theta_v=-20.0, |
| cycles=1, |
| n_spatial_out=64, |
| verbose=False, |
| **extra_kinetics_params, |
| ): |
| """ |
| Run CV simulation and return full concentration history. |
| |
| Parameters |
| ---------- |
| sigma : float |
| Dimensionless scan rate |
| K0 : float |
| Dimensionless electrochemical rate constant |
| alpha : float |
| Cathodic transfer coefficient |
| beta : float, optional |
| Anodic transfer coefficient (default: 1 - alpha) |
| kinetics : str |
| 'BV' for Butler-Volmer or 'Nernst' for reversible |
| C_A_bulk : float |
| Dimensionless bulk concentration of oxidized species |
| C_B_bulk : float |
| Dimensionless bulk concentration of reduced species |
| dA, dB : float |
| Dimensionless diffusion coefficients |
| theta_i, theta_v : float |
| Dimensionless start and vertex potentials |
| cycles : int |
| Number of CV cycles |
| n_spatial_out : int |
| Number of spatial grid points in output (will resample) |
| verbose : bool |
| Show progress bar |
| |
| Returns |
| ------- |
| dict with keys: |
| - 'potential': (T,) array of potentials |
| - 'flux': (T,) array of current flux |
| - 'time': (T,) array of dimensionless time |
| - 'c_ox': (T, X) concentration of oxidized species |
| - 'c_red': (T, X) concentration of reduced species |
| - 'x_grid': (X,) spatial grid (original) |
| - 'x_grid_out': (n_spatial_out,) resampled spatial grid |
| - 'params': dict of simulation parameters |
| """ |
| if beta is None: |
| beta = 1.0 - alpha |
|
|
| deltaT = DELTA_THETA / sigma |
| maxT = cycles * 2.0 * np.abs(theta_v - theta_i) / sigma |
| nTimeSteps = int(2 * np.abs(theta_v - theta_i) / DELTA_THETA) + 1 |
|
|
| Esteps = np.arange(nTimeSteps) |
| E = np.where( |
| Esteps < nTimeSteps / 2.0, |
| theta_i - DELTA_THETA * Esteps, |
| theta_v + DELTA_THETA * (Esteps - nTimeSteps / 2.0) |
| ) |
| E = np.tile(E, cycles) |
| total_steps = len(E) |
|
|
| Xi = 0.0 |
| maxX = SIMULATION_SPACE_MULTIPLE * np.sqrt(maxT) |
| X_grid, n = gen_grid(Xi, DELTA_X, maxX, EXPANDING_GRID_FACTOR) |
|
|
| conc, conc_d = ini_conc(n, C_A_bulk, C_B_bulk) |
|
|
| A_matrix, aA, bA, cA, aB, bB, cB = ini_coeff(n) |
| aA, bA, cA = calc_abc_linear(n, X_grid, deltaT, aA, bA, cA, dA) |
| aB, bB, cB = calc_abc_linear(n, X_grid, deltaT, aB, bB, cB, dB) |
|
|
| fluxes = np.zeros(total_steps) |
| conc_A_history = [] |
| conc_B_history = [] |
|
|
| |
| |
| lu_factor = None |
| if kinetics == 'Nernst': |
| A_mat = calc_matrix( |
| A_matrix, X_grid, 0.0, kinetics, n, |
| aA, bA, cA, dA, aB, bB, cB, dB, |
| K0, alpha, beta, **extra_kinetics_params |
| ) |
| lu_factor = scipy.linalg.lu_factor(A_mat) |
|
|
| iterator = range(total_steps) |
| if verbose: |
| iterator = tqdm(iterator, desc="Simulating CV") |
|
|
| for idx in iterator: |
| Theta = E[idx] |
|
|
| conc_d = update_d(Theta, conc, conc_d, n, C_A_bulk, C_B_bulk, kinetics) |
|
|
| if lu_factor is not None: |
| conc = scipy.linalg.lu_solve(lu_factor, conc_d) |
| else: |
| A_mat = calc_matrix( |
| A_matrix, X_grid, Theta, kinetics, n, |
| aA, bA, cA, dA, aB, bB, cB, dB, |
| K0, alpha, beta, **extra_kinetics_params |
| ) |
| conc = scipy.linalg.solve(A_mat, conc_d) |
|
|
| flux = calc_flux(conc, n, dA, X_grid) |
| fluxes[idx] = flux |
|
|
| conc_A_history.append(conc[:n].copy()) |
| conc_B_history.append(conc[n:].copy()) |
|
|
| conc_A_history = np.stack(conc_A_history, axis=0) |
| conc_B_history = np.stack(conc_B_history, axis=0) |
|
|
| x_grid_out = np.linspace(X_grid[0], X_grid[-1], n_spatial_out) |
|
|
| c_ox_out = np.zeros((total_steps, n_spatial_out)) |
| c_red_out = np.zeros((total_steps, n_spatial_out)) |
| for t in range(total_steps): |
| c_ox_out[t] = np.interp(x_grid_out, X_grid, conc_A_history[t]) |
| c_red_out[t] = np.interp(x_grid_out, X_grid, conc_B_history[t]) |
|
|
| time_array = np.arange(total_steps) * float(deltaT) |
|
|
| params_out = { |
| 'experiment_type': 'CV', |
| 'sigma': float(sigma), |
| 'K0': float(K0), |
| 'alpha': float(alpha), |
| 'beta': float(beta), |
| 'kinetics': kinetics, |
| 'C_A_bulk': float(C_A_bulk), |
| 'C_B_bulk': float(C_B_bulk), |
| 'dA': float(dA), |
| 'dB': float(dB), |
| 'theta_i': float(theta_i), |
| 'theta_v': float(theta_v), |
| 'cycles': cycles, |
| } |
| for k, v in extra_kinetics_params.items(): |
| params_out[k] = float(v) if isinstance(v, (int, float, np.floating)) else v |
|
|
| return { |
| 'potential': E, |
| 'flux': fluxes, |
| 'time': time_array, |
| 'c_ox': c_ox_out, |
| 'c_red': c_red_out, |
| 'x_grid': X_grid, |
| 'x_grid_out': x_grid_out, |
| 'params': params_out, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def run_ads_cv_simulation( |
| sigma=1.0, |
| K0=1.0, |
| alpha=0.5, |
| beta=None, |
| Gamma_sat=1.0, |
| theta_i=20.0, |
| theta_v=-20.0, |
| cycles=1, |
| n_spatial_out=64, |
| verbose=False, |
| **extra_params, |
| ): |
| """ |
| Surface-confined (Laviron) CV: no diffusion, ODE for surface coverage. |
| |
| The redox species is adsorbed on the electrode surface. The fractional |
| coverage of the oxidized form, f_ox, evolves as: |
| |
| df_ox/dTheta = -(1/sigma) * [k_red * f_ox - k_ox * (1 - f_ox)] |
| |
| where k_red = K0*exp(-alpha*Theta), k_ox = K0*exp(beta*Theta). |
| |
| Dimensionless current: i = Gamma_sat * df_ox/dTheta * sigma |
| (= Gamma_sat * [k_red*f_ox - k_ox*(1-f_ox)]) |
| |
| Returns the same dict format as run_cv_simulation for compatibility. |
| Concentration arrays represent uniform surface coverage (no spatial variation). |
| """ |
| if beta is None: |
| beta = 1.0 - alpha |
|
|
| nTimeSteps = int(2 * np.abs(theta_v - theta_i) / DELTA_THETA) + 1 |
|
|
| Esteps = np.arange(nTimeSteps) |
| E = np.where( |
| Esteps < nTimeSteps / 2.0, |
| theta_i - DELTA_THETA * Esteps, |
| theta_v + DELTA_THETA * (Esteps - nTimeSteps / 2.0) |
| ) |
| E = np.tile(E, cycles) |
| total_steps = len(E) |
|
|
| deltaT = DELTA_THETA / sigma |
|
|
| |
| f_ox = 1.0 |
|
|
| fluxes = np.zeros(total_steps) |
| f_ox_history = np.zeros(total_steps) |
|
|
| for idx in range(total_steps): |
| Theta = E[idx] |
| k_red = K0 * np.exp(-alpha * Theta) |
| k_ox = K0 * np.exp(beta * Theta) |
|
|
| |
| |
| |
| k_total = k_red + k_ox |
| f_eq = k_ox / k_total |
| f_ox_new = f_eq + (f_ox - f_eq) * np.exp(-k_total * deltaT) |
| f_ox_new = np.clip(f_ox_new, 0.0, 1.0) |
|
|
| |
| |
| |
| fluxes[idx] = -Gamma_sat * (f_ox - f_ox_new) / deltaT |
|
|
| f_ox_history[idx] = f_ox |
| f_ox = f_ox_new |
|
|
| time_array = np.arange(total_steps) * float(deltaT) |
|
|
| |
| |
| |
| x_grid_out = np.linspace(0.0, 1.0, n_spatial_out) |
| c_ox_out = np.outer(f_ox_history, np.ones(n_spatial_out)) |
| c_red_out = np.outer(1.0 - f_ox_history, np.ones(n_spatial_out)) |
|
|
| params_out = { |
| 'experiment_type': 'CV', |
| 'sigma': float(sigma), |
| 'K0': float(K0), |
| 'alpha': float(alpha), |
| 'beta': float(beta), |
| 'kinetics': 'Ads', |
| 'Gamma_sat': float(Gamma_sat), |
| 'theta_i': float(theta_i), |
| 'theta_v': float(theta_v), |
| 'cycles': cycles, |
| 'C_A_bulk': 0.0, |
| 'C_B_bulk': 0.0, |
| 'dA': 1e-10, |
| 'dB': 1e-10, |
| } |
|
|
| return { |
| 'potential': E, |
| 'flux': fluxes, |
| 'time': time_array, |
| 'c_ox': c_ox_out.astype(np.float64), |
| 'c_red': c_red_out.astype(np.float64), |
| 'x_grid': x_grid_out, |
| 'x_grid_out': x_grid_out, |
| 'params': params_out, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def run_ec_cv_simulation( |
| sigma=1.0, |
| K0=1.0, |
| alpha=0.5, |
| beta=None, |
| kinetics='EC', |
| C_A_bulk=1.0, |
| C_B_bulk=0.0, |
| dA=1.0, |
| dB=1.0, |
| theta_i=20.0, |
| theta_v=-20.0, |
| cycles=1, |
| n_spatial_out=64, |
| verbose=False, |
| kc=1.0, |
| **extra_params, |
| ): |
| """ |
| EC mechanism CV: A + e- -> B (BV kinetics), then B -> Y (first-order chemical). |
| |
| The follow-up chemical reaction consumes B in the bulk with rate constant kc |
| (dimensionless, scaled by scan rate). This modifies the diffusion equation |
| for species B to include a homogeneous reaction sink term. |
| |
| The PDE for B becomes: |
| dc_B/dt = D_B * d²c_B/dx² - kc * c_B |
| |
| We handle this via operator splitting: after solving the diffusion step |
| (same matrix as BV), we apply the chemical decay analytically: |
| c_B(t+dt) = c_B_diffusion * exp(-kc * dt) |
| |
| Parameters |
| ---------- |
| kc : float |
| Dimensionless first-order chemical rate constant for B -> Y. |
| kc = k_chem / (F*v/RT) where k_chem is the dimensional rate constant |
| and v is the scan rate. Large kc means fast follow-up chemistry. |
| """ |
| if beta is None: |
| beta = 1.0 - alpha |
|
|
| deltaT = DELTA_THETA / sigma |
| maxT = cycles * 2.0 * np.abs(theta_v - theta_i) / sigma |
| nTimeSteps = int(2 * np.abs(theta_v - theta_i) / DELTA_THETA) + 1 |
|
|
| Esteps = np.arange(nTimeSteps) |
| E = np.where( |
| Esteps < nTimeSteps / 2.0, |
| theta_i - DELTA_THETA * Esteps, |
| theta_v + DELTA_THETA * (Esteps - nTimeSteps / 2.0) |
| ) |
| E = np.tile(E, cycles) |
| total_steps = len(E) |
|
|
| Xi = 0.0 |
| maxX = SIMULATION_SPACE_MULTIPLE * np.sqrt(maxT) |
| X_grid, n = gen_grid(Xi, DELTA_X, maxX, EXPANDING_GRID_FACTOR) |
|
|
| conc, conc_d = ini_conc(n, C_A_bulk, C_B_bulk) |
|
|
| A_matrix, aA, bA, cA, aB, bB, cB = ini_coeff(n) |
| aA, bA, cA = calc_abc_linear(n, X_grid, deltaT, aA, bA, cA, dA) |
| aB, bB, cB = calc_abc_linear(n, X_grid, deltaT, aB, bB, cB, dB) |
|
|
| fluxes = np.zeros(total_steps) |
| conc_A_history = [] |
| conc_B_history = [] |
|
|
| decay_factor = np.exp(-kc * deltaT) |
|
|
| iterator = range(total_steps) |
| if verbose: |
| iterator = tqdm(iterator, desc="Simulating EC CV") |
|
|
| for idx in iterator: |
| Theta = E[idx] |
|
|
| conc_d = update_d(Theta, conc, conc_d, n, C_A_bulk, C_B_bulk, 'BV') |
|
|
| A_mat = calc_matrix( |
| A_matrix, X_grid, Theta, 'BV', n, |
| aA, bA, cA, dA, aB, bB, cB, dB, |
| K0, alpha, beta |
| ) |
| conc = scipy.linalg.solve(A_mat, conc_d) |
|
|
| |
| conc[n:2*n-1] *= decay_factor |
| |
| conc[2*n-1] = C_B_bulk |
|
|
| flux = calc_flux(conc, n, dA, X_grid) |
| fluxes[idx] = flux |
|
|
| conc_A_history.append(conc[:n].copy()) |
| conc_B_history.append(conc[n:].copy()) |
|
|
| conc_A_history = np.stack(conc_A_history, axis=0) |
| conc_B_history = np.stack(conc_B_history, axis=0) |
|
|
| x_grid_out = np.linspace(X_grid[0], X_grid[-1], n_spatial_out) |
|
|
| c_ox_out = np.zeros((total_steps, n_spatial_out)) |
| c_red_out = np.zeros((total_steps, n_spatial_out)) |
| for t in range(total_steps): |
| c_ox_out[t] = np.interp(x_grid_out, X_grid, conc_A_history[t]) |
| c_red_out[t] = np.interp(x_grid_out, X_grid, conc_B_history[t]) |
|
|
| time_array = np.arange(total_steps) * float(deltaT) |
|
|
| params_out = { |
| 'experiment_type': 'CV', |
| 'sigma': float(sigma), |
| 'K0': float(K0), |
| 'alpha': float(alpha), |
| 'beta': float(beta), |
| 'kinetics': 'EC', |
| 'C_A_bulk': float(C_A_bulk), |
| 'C_B_bulk': float(C_B_bulk), |
| 'dA': float(dA), |
| 'dB': float(dB), |
| 'theta_i': float(theta_i), |
| 'theta_v': float(theta_v), |
| 'cycles': cycles, |
| 'kc': float(kc), |
| } |
|
|
| return { |
| 'potential': E, |
| 'flux': fluxes, |
| 'time': time_array, |
| 'c_ox': c_ox_out, |
| 'c_red': c_red_out, |
| 'x_grid': X_grid, |
| 'x_grid_out': x_grid_out, |
| 'params': params_out, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def run_lh_cv_simulation( |
| sigma=1.0, |
| K0=1.0, |
| alpha=0.5, |
| beta=None, |
| kinetics='LH', |
| C_A_bulk=1.0, |
| C_B_bulk=0.0, |
| dA=1.0, |
| dB=1.0, |
| theta_i=20.0, |
| theta_v=-20.0, |
| cycles=1, |
| n_spatial_out=64, |
| verbose=False, |
| KA_eq=10.0, |
| KB_eq=1.0, |
| **extra_params, |
| ): |
| """ |
| Langmuir-Hinshelwood CV: diffusion + fast adsorption equilibrium + surface ET. |
| |
| Reaction scheme: |
| A_sol <-> A_ads (fast Langmuir equilibrium, constant K_A) |
| B_sol <-> B_ads (fast Langmuir equilibrium, constant K_B) |
| A_ads + e- <-> B_ads (BV kinetics with K0, alpha) |
| |
| Under the pre-equilibrium approximation, surface coverages are: |
| theta_A = K_A * c_A(0,t) / (1 + K_A*c_A(0,t) + K_B*c_B(0,t)) |
| theta_B = K_B * c_B(0,t) / (1 + K_A*c_A(0,t) + K_B*c_B(0,t)) |
| |
| The effective boundary condition becomes coverage-dependent BV kinetics. |
| The flux at the surface is: |
| j = K0 * [exp(-alpha*Theta) * theta_A - exp(beta*Theta) * theta_B] |
| |
| This is implemented by modifying the boundary condition at each timestep |
| based on the current surface concentrations. |
| |
| Parameters |
| ---------- |
| KA_eq : float |
| Dimensionless adsorption equilibrium constant for oxidized species A. |
| KB_eq : float |
| Dimensionless adsorption equilibrium constant for reduced species B. |
| """ |
| if beta is None: |
| beta = 1.0 - alpha |
|
|
| deltaT = DELTA_THETA / sigma |
| maxT = cycles * 2.0 * np.abs(theta_v - theta_i) / sigma |
| nTimeSteps = int(2 * np.abs(theta_v - theta_i) / DELTA_THETA) + 1 |
|
|
| Esteps = np.arange(nTimeSteps) |
| E = np.where( |
| Esteps < nTimeSteps / 2.0, |
| theta_i - DELTA_THETA * Esteps, |
| theta_v + DELTA_THETA * (Esteps - nTimeSteps / 2.0) |
| ) |
| E = np.tile(E, cycles) |
| total_steps = len(E) |
|
|
| Xi = 0.0 |
| maxX = SIMULATION_SPACE_MULTIPLE * np.sqrt(maxT) |
| X_grid, n = gen_grid(Xi, DELTA_X, maxX, EXPANDING_GRID_FACTOR) |
|
|
| conc, conc_d = ini_conc(n, C_A_bulk, C_B_bulk) |
|
|
| A_matrix, aA, bA, cA, aB, bB, cB = ini_coeff(n) |
| aA, bA, cA = calc_abc_linear(n, X_grid, deltaT, aA, bA, cA, dA) |
| aB, bB, cB = calc_abc_linear(n, X_grid, deltaT, aB, bB, cB, dB) |
|
|
| fluxes = np.zeros(total_steps) |
| conc_A_history = [] |
| conc_B_history = [] |
|
|
| X0 = X_grid[1] - X_grid[0] |
|
|
| iterator = range(total_steps) |
| if verbose: |
| iterator = tqdm(iterator, desc="Simulating LH CV") |
|
|
| for idx in iterator: |
| Theta = E[idx] |
|
|
| conc_d[:] = conc[:] |
| conc_d[n-1] = 0.0 |
| conc_d[n] = 0.0 |
| conc_d[0] = C_A_bulk |
| conc_d[2*n-1] = C_B_bulk |
|
|
| |
| cA_surf = max(conc[n-1], 0.0) |
| cB_surf = max(conc[n], 0.0) |
| denom = 1.0 + KA_eq * cA_surf + KB_eq * cB_surf |
| theta_A = KA_eq * cA_surf / denom |
| theta_B = KB_eq * cB_surf / denom |
|
|
| K_red = K0 * np.exp(-alpha * Theta) |
| K_ox = K0 * np.exp(beta * Theta) |
|
|
| |
| |
| |
| |
| |
| K_red_eff = K_red * KA_eq / denom |
| K_ox_eff = K_ox * KB_eq / denom |
|
|
| |
| |
| rows_A = np.arange(n-2, 0, -1) |
| A_matrix[rows_A, rows_A - 1] = cA[1:n-1] |
| A_matrix[rows_A, rows_A] = bA[1:n-1] |
| A_matrix[rows_A, rows_A + 1] = aA[1:n-1] |
|
|
| rows_B = np.arange(n+1, 2*n-1) |
| A_matrix[rows_B, rows_B - 1] = aB[1:n-1] |
| A_matrix[rows_B, rows_B] = bB[1:n-1] |
| A_matrix[rows_B, rows_B + 1] = cB[1:n-1] |
|
|
| |
| A_matrix[n-1, n-2] = -1.0 |
| A_matrix[n-1, n-1] = 1.0 + X0/dA * K_red_eff |
| A_matrix[n-1, n] = -X0/dA * K_ox_eff |
|
|
| |
| A_matrix[n, n-1] = -X0/dB * K_red_eff |
| A_matrix[n, n] = 1.0 + X0/dB * K_ox_eff |
| A_matrix[n, n+1] = -1.0 |
|
|
| |
| A_matrix[0, 0] = 1.0 |
| A_matrix[0, 1] = 0.0 |
| A_matrix[2*n-1, 2*n-1] = 1.0 |
| A_matrix[2*n-1, 2*n-2] = 0.0 |
|
|
| conc = scipy.linalg.solve(A_matrix, conc_d) |
|
|
| flux = calc_flux(conc, n, dA, X_grid) |
| fluxes[idx] = flux |
|
|
| conc_A_history.append(conc[:n].copy()) |
| conc_B_history.append(conc[n:].copy()) |
|
|
| conc_A_history = np.stack(conc_A_history, axis=0) |
| conc_B_history = np.stack(conc_B_history, axis=0) |
|
|
| x_grid_out = np.linspace(X_grid[0], X_grid[-1], n_spatial_out) |
|
|
| c_ox_out = np.zeros((total_steps, n_spatial_out)) |
| c_red_out = np.zeros((total_steps, n_spatial_out)) |
| for t in range(total_steps): |
| c_ox_out[t] = np.interp(x_grid_out, X_grid, conc_A_history[t]) |
| c_red_out[t] = np.interp(x_grid_out, X_grid, conc_B_history[t]) |
|
|
| time_array = np.arange(total_steps) * float(deltaT) |
|
|
| params_out = { |
| 'experiment_type': 'CV', |
| 'sigma': float(sigma), |
| 'K0': float(K0), |
| 'alpha': float(alpha), |
| 'beta': float(beta), |
| 'kinetics': 'LH', |
| 'C_A_bulk': float(C_A_bulk), |
| 'C_B_bulk': float(C_B_bulk), |
| 'dA': float(dA), |
| 'dB': float(dB), |
| 'theta_i': float(theta_i), |
| 'theta_v': float(theta_v), |
| 'cycles': cycles, |
| 'KA_eq': float(KA_eq), |
| 'KB_eq': float(KB_eq), |
| } |
|
|
| return { |
| 'potential': E, |
| 'flux': fluxes, |
| 'time': time_array, |
| 'c_ox': c_ox_out, |
| 'c_red': c_red_out, |
| 'x_grid': X_grid, |
| 'x_grid_out': x_grid_out, |
| 'params': params_out, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def run_ca_simulation( |
| theta_step=-10.0, |
| K0=1.0, |
| alpha=0.5, |
| beta=None, |
| kinetics='BV', |
| C_A_bulk=1.0, |
| C_B_bulk=0.0, |
| dA=1.0, |
| dB=1.0, |
| t_max=10.0, |
| dt=0.01, |
| n_spatial_out=64, |
| verbose=False, |
| ): |
| """ |
| Run chronoamperometry simulation: step potential held constant. |
| |
| Parameters |
| ---------- |
| theta_step : float |
| Dimensionless step potential (held constant for all t > 0) |
| K0 : float |
| Dimensionless electrochemical rate constant |
| alpha : float |
| Cathodic transfer coefficient |
| beta : float, optional |
| Anodic transfer coefficient (default: 1 - alpha) |
| kinetics : str |
| 'BV' for Butler-Volmer |
| C_A_bulk, C_B_bulk : float |
| Dimensionless bulk concentrations |
| dA, dB : float |
| Dimensionless diffusion coefficients |
| t_max : float |
| Dimensionless total experiment time |
| dt : float |
| Dimensionless time step |
| n_spatial_out : int |
| Number of spatial grid points in output |
| verbose : bool |
| Show progress bar |
| |
| Returns |
| ------- |
| dict with same keys as run_cv_simulation, plus experiment_type='CA' |
| """ |
| if beta is None: |
| beta = 1.0 - alpha |
|
|
| total_steps = int(t_max / dt) + 1 |
| deltaT = dt |
|
|
| E = np.full(total_steps, theta_step) |
|
|
| Xi = 0.0 |
| maxX = SIMULATION_SPACE_MULTIPLE * np.sqrt(t_max * max(dA, dB)) |
| X_grid, n = gen_grid(Xi, DELTA_X, maxX, EXPANDING_GRID_FACTOR) |
|
|
| conc, conc_d = ini_conc(n, C_A_bulk, C_B_bulk) |
| A_matrix, aA, bA, cA, aB, bB, cB = ini_coeff(n) |
| aA, bA, cA = calc_abc_linear(n, X_grid, deltaT, aA, bA, cA, dA) |
| aB, bB, cB = calc_abc_linear(n, X_grid, deltaT, aB, bB, cB, dB) |
|
|
| fluxes = np.zeros(total_steps) |
| conc_A_history = [] |
| conc_B_history = [] |
|
|
| iterator = range(total_steps) |
| if verbose: |
| iterator = tqdm(iterator, desc="Simulating CA") |
|
|
| for idx in iterator: |
| Theta = E[idx] |
|
|
| A_mat = calc_matrix( |
| A_matrix, X_grid, Theta, kinetics, n, |
| aA, bA, cA, dA, aB, bB, cB, dB, |
| K0, alpha, beta |
| ) |
| conc_d = update_d(Theta, conc, conc_d, n, C_A_bulk, C_B_bulk, kinetics) |
| conc = scipy.linalg.solve(A_mat, conc_d) |
|
|
| flux = calc_flux(conc, n, dA, X_grid) |
| fluxes[idx] = flux |
|
|
| conc_A_history.append(conc[:n].copy()) |
| conc_B_history.append(conc[n:].copy()) |
|
|
| conc_A_history = np.stack(conc_A_history, axis=0) |
| conc_B_history = np.stack(conc_B_history, axis=0) |
|
|
| x_grid_out = np.linspace(X_grid[0], X_grid[-1], n_spatial_out) |
|
|
| c_ox_out = np.zeros((total_steps, n_spatial_out)) |
| c_red_out = np.zeros((total_steps, n_spatial_out)) |
| for t in range(total_steps): |
| c_ox_out[t] = np.interp(x_grid_out, X_grid, conc_A_history[t]) |
| c_red_out[t] = np.interp(x_grid_out, X_grid, conc_B_history[t]) |
|
|
| time_array = np.arange(total_steps) * float(deltaT) |
|
|
| return { |
| 'potential': E, |
| 'flux': fluxes, |
| 'time': time_array, |
| 'c_ox': c_ox_out, |
| 'c_red': c_red_out, |
| 'x_grid': X_grid, |
| 'x_grid_out': x_grid_out, |
| 'params': { |
| 'experiment_type': 'CA', |
| 'theta_step': float(theta_step), |
| 'K0': float(K0), |
| 'alpha': float(alpha), |
| 'beta': float(beta), |
| 'kinetics': kinetics, |
| 'C_A_bulk': float(C_A_bulk), |
| 'C_B_bulk': float(C_B_bulk), |
| 'dA': float(dA), |
| 'dB': float(dB), |
| 't_max': float(t_max), |
| 'dt': float(dt), |
| } |
| } |
|
|
|
|
| |
| |
| |
|
|
| def sample_ca_params(rng=None): |
| """ |
| Sample random dimensionless parameters for chronoamperometry. |
| |
| CA holds a constant potential and observes the transient current decay. |
| The Cottrell equation predicts j(t) ~ 1/sqrt(t) for diffusion-limited case. |
| """ |
| if rng is None: |
| rng = np.random.default_rng() |
|
|
| |
| theta_step = rng.uniform(-20.0, -2.0) |
|
|
| |
| log_K0 = rng.uniform(-2, 2) |
| K0 = 10 ** log_K0 |
|
|
| |
| alpha = rng.uniform(0.3, 0.7) |
|
|
| |
| d_ratio = 10 ** rng.uniform(-0.3, 0.3) |
| dA = 1.0 |
| dB = dA * d_ratio |
|
|
| |
| log_tmax = rng.uniform(0.5, 2.0) |
| t_max = 10 ** log_tmax |
| dt = t_max / rng.uniform(500, 1500) |
|
|
| return { |
| 'theta_step': theta_step, |
| 'K0': K0, |
| 'alpha': alpha, |
| 'kinetics': 'BV', |
| 'C_A_bulk': 1.0, |
| 'C_B_bulk': 0.0, |
| 'dA': dA, |
| 'dB': dB, |
| 't_max': t_max, |
| 'dt': dt, |
| } |
|
|
|
|
| def _sample_common_cv_params(rng): |
| """Sample scan rate, diffusion ratio, and potential window (shared across mechanisms).""" |
| log_sigma = rng.uniform(-1, 2) |
| sigma = 10 ** log_sigma |
|
|
| d_ratio = 10 ** rng.uniform(-0.3, 0.3) |
| dA = 1.0 |
| dB = dA * d_ratio |
|
|
| theta_center = rng.uniform(-5, 5) |
| theta_range = rng.uniform(15, 25) |
| theta_i = theta_center + theta_range / 2 |
| theta_v = theta_center - theta_range / 2 |
|
|
| return { |
| 'sigma': sigma, |
| 'C_A_bulk': 1.0, |
| 'C_B_bulk': 0.0, |
| 'dA': dA, |
| 'dB': dB, |
| 'theta_i': theta_i, |
| 'theta_v': theta_v, |
| 'cycles': 1, |
| } |
|
|
|
|
| def sample_cv_params_nernst(rng=None): |
| """Sample parameters for Nernstian (reversible) CV.""" |
| if rng is None: |
| rng = np.random.default_rng() |
| p = _sample_common_cv_params(rng) |
| p['kinetics'] = 'Nernst' |
| p['K0'] = 1e6 |
| p['alpha'] = 0.5 |
| p['E0_offset'] = 0.0 |
| return p |
|
|
|
|
| def sample_cv_params_bv(rng=None): |
| """Sample parameters for Butler-Volmer CV.""" |
| if rng is None: |
| rng = np.random.default_rng() |
| p = _sample_common_cv_params(rng) |
| p['kinetics'] = 'BV' |
| p['K0'] = 10 ** rng.uniform(-2, 2) |
| p['alpha'] = rng.uniform(0.3, 0.7) |
| return p |
|
|
|
|
| def sample_cv_params_mhc(rng=None): |
| """Sample parameters for Marcus-Hush-Chidsey CV.""" |
| if rng is None: |
| rng = np.random.default_rng() |
| p = _sample_common_cv_params(rng) |
| p['kinetics'] = 'MHC' |
| p['K0'] = 10 ** rng.uniform(-2, 2) |
| p['alpha'] = 0.5 |
| p['reorg_e'] = 10 ** rng.uniform(0.5, 2.0) |
| return p |
|
|
|
|
| def sample_cv_params_ads(rng=None): |
| """Sample parameters for surface adsorption (Laviron-type) CV. |
| |
| Surface-confined model: no diffusion, purely ODE for surface coverage. |
| Parameters: K0 (rate constant), alpha (transfer coefficient), Gamma_sat (coverage). |
| """ |
| if rng is None: |
| rng = np.random.default_rng() |
|
|
| log_sigma = rng.uniform(-1, 2) |
| sigma = 10 ** log_sigma |
|
|
| theta_center = rng.uniform(-5, 5) |
| theta_range = rng.uniform(15, 25) |
| theta_i = theta_center + theta_range / 2 |
| theta_v = theta_center - theta_range / 2 |
|
|
| return { |
| 'sigma': sigma, |
| 'theta_i': theta_i, |
| 'theta_v': theta_v, |
| 'cycles': 1, |
| 'kinetics': 'Ads', |
| 'K0': 10 ** rng.uniform(-2, 2), |
| 'alpha': rng.uniform(0.3, 0.7), |
| 'Gamma_sat': 10 ** rng.uniform(-1, 1), |
| } |
|
|
|
|
| def sample_cv_params_ec(rng=None): |
| """Sample parameters for EC mechanism CV. |
| |
| EC: A + e- -> B (BV kinetics), then B -> Y (first-order chemical). |
| Parameters: K0, alpha, dB (same as BV) + kc (chemical rate constant). |
| """ |
| if rng is None: |
| rng = np.random.default_rng() |
| p = _sample_common_cv_params(rng) |
| p['kinetics'] = 'EC' |
| p['K0'] = 10 ** rng.uniform(-2, 2) |
| p['alpha'] = rng.uniform(0.3, 0.7) |
| |
| |
| |
| p['kc'] = 10 ** rng.uniform(-2, 2) |
| return p |
|
|
|
|
| def sample_cv_params_lh(rng=None): |
| """Sample parameters for Langmuir-Hinshelwood CV. |
| |
| LH: diffusion + fast Langmuir adsorption equilibrium + surface BV ET. |
| Parameters: K0, alpha, KA_eq, KB_eq, dB. |
| """ |
| if rng is None: |
| rng = np.random.default_rng() |
| p = _sample_common_cv_params(rng) |
| p['kinetics'] = 'LH' |
| p['K0'] = 10 ** rng.uniform(-2, 2) |
| p['alpha'] = rng.uniform(0.3, 0.7) |
| |
| |
| p['KA_eq'] = 10 ** rng.uniform(-1, 2) |
| p['KB_eq'] = 10 ** rng.uniform(-1, 2) |
| return p |
|
|
|
|
| MECHANISM_SAMPLERS = { |
| 'Nernst': sample_cv_params_nernst, |
| 'BV': sample_cv_params_bv, |
| 'MHC': sample_cv_params_mhc, |
| 'Ads': sample_cv_params_ads, |
| 'EC': sample_cv_params_ec, |
| 'LH': sample_cv_params_lh, |
| } |
|
|
|
|
| def sample_cv_params(rng=None, mechanism=None): |
| """ |
| Sample random dimensionless parameters for CV simulation. |
| |
| If mechanism is specified, samples from that mechanism. |
| Otherwise defaults to BV for backward compatibility. |
| """ |
| if rng is None: |
| rng = np.random.default_rng() |
| if mechanism is None: |
| mechanism = 'BV' |
| return MECHANISM_SAMPLERS[mechanism](rng) |
|
|
|
|
| |
| |
| |
|
|
| MECHANISM_LIST = ['Nernst', 'BV', 'MHC', 'Ads', 'EC', 'LH'] |
| MECHANISM_TO_ID = {m: i for i, m in enumerate(MECHANISM_LIST)} |
|
|
|
|
| def _run_single_cv(params, n_spatial): |
| """Run a single CV simulation, dispatching to the correct simulator.""" |
| kin = params.get('kinetics') |
| if kin == 'Ads': |
| return run_ads_cv_simulation(**params, n_spatial_out=n_spatial, verbose=False) |
| elif kin == 'EC': |
| return run_ec_cv_simulation(**params, n_spatial_out=n_spatial, verbose=False) |
| elif kin == 'LH': |
| return run_lh_cv_simulation(**params, n_spatial_out=n_spatial, verbose=False) |
| else: |
| return run_cv_simulation(**params, n_spatial_out=n_spatial, verbose=False) |
|
|
|
|
| def _sample_scan_rates(rng, n_scan_rates, log_sigma_range=(-1, 2)): |
| """Sample log-spaced scan rates spanning the given range.""" |
| lo, hi = log_sigma_range |
| if n_scan_rates == 1: |
| return np.array([10 ** rng.uniform(lo, hi)]) |
| anchors = np.linspace(lo, hi, n_scan_rates) |
| jitter = (hi - lo) / (n_scan_rates - 1) * 0.3 |
| log_sigmas = np.array([rng.uniform(a - jitter, a + jitter) for a in anchors]) |
| log_sigmas = np.clip(log_sigmas, lo, hi) |
| log_sigmas.sort() |
| return 10 ** log_sigmas |
|
|
|
|
| def _add_noise(flux, rng, noise_range=(0.001, 0.02)): |
| """Add Gaussian noise to flux signal. Returns (noisy_flux, sigma_noise).""" |
| sigma_noise = rng.uniform(*noise_range) |
| peak = np.max(np.abs(flux)) + 1e-20 |
| noise = sigma_noise * peak * rng.standard_normal(flux.shape) |
| return flux + noise.astype(flux.dtype), float(sigma_noise) |
|
|
|
|
| def generate_sample_single(idx, outdir, seed, n_spatial=64, experiment_type='CV', |
| mechanism=None, n_scan_rates=1, add_noise=True): |
| """ |
| Generate and save a single sample (single or multi-scan-rate). |
| |
| When n_scan_rates > 1, the same kinetic parameters are simulated at |
| multiple scan rates and saved together. |
| |
| Parameters |
| ---------- |
| n_scan_rates : int |
| Number of scan rates per sample. Use -1 to randomly sample from |
| {1, 2, 3} per sample (drawn from the per-sample RNG for |
| reproducibility). |
| add_noise : bool |
| If True, add Gaussian noise to flux signals (sigma_noise ~ U(0.001, 0.02) |
| as a fraction of peak current). |
| """ |
| rng = np.random.default_rng(seed + idx) |
|
|
| if n_scan_rates == -1: |
| n_scan_rates = int(rng.integers(1, 4)) |
|
|
| if experiment_type == 'mixed': |
| exp = rng.choice(['CV', 'CA']) |
| else: |
| exp = experiment_type |
|
|
| try: |
| if exp == 'CV': |
| params = sample_cv_params(rng, mechanism=mechanism) |
| actual_mechanism = params.get('kinetics', 'BV') |
| mechanism_id = MECHANISM_TO_ID.get(actual_mechanism, -1) |
|
|
| if n_scan_rates <= 1: |
| |
| result = _run_single_cv(params, n_spatial) |
| flux = result['flux'].astype(np.float32) |
|
|
| sigma_noise = 0.0 |
| if add_noise: |
| flux, sigma_noise = _add_noise(flux, rng) |
|
|
| |
| sigma_ref = params['sigma'] |
| kin = actual_mechanism |
| phys_params = dict(result['params']) |
| K0_ref = params.get('K0', 1.0) |
| if kin in ('BV', 'MHC', 'EC', 'LH'): |
| phys_params['K0'] = float(K0_ref * np.sqrt(sigma_ref)) |
| elif kin == 'Ads': |
| phys_params['K0'] = float(K0_ref * sigma_ref) |
| if kin == 'EC': |
| phys_params['kc'] = float(params.get('kc', 1.0) * sigma_ref) |
| phys_params['sigma_noise'] = sigma_noise |
|
|
| np.savez_compressed( |
| os.path.join(outdir, f"sample_{idx:06d}.npz"), |
| potential=result['potential'].astype(np.float32), |
| flux=flux, |
| time=result['time'].astype(np.float32), |
| c_ox=result['c_ox'].astype(np.float32), |
| c_red=result['c_red'].astype(np.float32), |
| x_grid=result['x_grid_out'].astype(np.float32), |
| params=phys_params, |
| mechanism_id=np.int32(mechanism_id), |
| ) |
| meta = { |
| 'idx': idx, 'success': True, 'experiment_type': exp, |
| 'mechanism': actual_mechanism, 'mechanism_id': int(mechanism_id), |
| 'K0': float(phys_params['K0']), |
| 'alpha': float(params.get('alpha', 0.5)), |
| 'n_time': len(result['time']), |
| 'sigma': float(params['sigma']), |
| 'n_scan_rates': 1, |
| 'sigma_noise': sigma_noise, |
| } |
| return meta |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| sigmas = _sample_scan_rates(rng, n_scan_rates) |
| sigma_ref = params['sigma'] |
| kin = actual_mechanism |
|
|
| |
| K0_at_ref = params.get('K0', 1.0) |
| kc_at_ref = params.get('kc', 1.0) |
| if kin in ('BV', 'MHC', 'EC', 'LH'): |
| K0_at_1 = K0_at_ref * np.sqrt(sigma_ref) |
| elif kin == 'Ads': |
| K0_at_1 = K0_at_ref * sigma_ref |
| else: |
| K0_at_1 = K0_at_ref |
| kc_at_1 = kc_at_ref * sigma_ref if kin == 'EC' else kc_at_ref |
|
|
| potentials, fluxes, times = [], [], [] |
|
|
| for sigma in sigmas: |
| p = dict(params) |
| p['sigma'] = float(sigma) |
| if kin in ('BV', 'MHC', 'EC', 'LH'): |
| p['K0'] = K0_at_1 / np.sqrt(sigma) |
| elif kin == 'Ads': |
| p['K0'] = K0_at_1 / sigma |
| if kin == 'EC': |
| p['kc'] = kc_at_1 / sigma |
|
|
| result = _run_single_cv(p, n_spatial) |
| potentials.append(result['potential'].astype(np.float32)) |
| flux_clean = result['flux'].astype(np.float32) |
| if add_noise: |
| flux_clean, _ = _add_noise(flux_clean, rng) |
| fluxes.append(flux_clean) |
| times.append(result['time'].astype(np.float32)) |
|
|
| |
| |
| phys_params = dict(params) |
| phys_params['kinetics'] = actual_mechanism |
| phys_params['K0'] = float(K0_at_1) |
| if kin == 'EC': |
| phys_params['kc'] = float(kc_at_1) |
|
|
| |
| max_t = max(len(p) for p in potentials) |
| n_s = len(sigmas) |
| pot_arr = np.zeros((n_s, max_t), dtype=np.float32) |
| flux_arr = np.zeros((n_s, max_t), dtype=np.float32) |
| time_arr = np.zeros((n_s, max_t), dtype=np.float32) |
| lengths = np.zeros(n_s, dtype=np.int32) |
|
|
| for i in range(n_s): |
| t_len = len(potentials[i]) |
| pot_arr[i, :t_len] = potentials[i] |
| flux_arr[i, :t_len] = fluxes[i] |
| time_arr[i, :t_len] = times[i] |
| lengths[i] = t_len |
|
|
| np.savez_compressed( |
| os.path.join(outdir, f"sample_{idx:06d}.npz"), |
| potential=pot_arr, |
| flux=flux_arr, |
| time=time_arr, |
| sigmas=sigmas.astype(np.float32), |
| lengths=lengths, |
| params=phys_params, |
| mechanism_id=np.int32(mechanism_id), |
| n_scan_rates=np.int32(n_scan_rates), |
| ) |
|
|
| meta = { |
| 'idx': idx, 'success': True, 'experiment_type': exp, |
| 'mechanism': actual_mechanism, 'mechanism_id': int(mechanism_id), |
| 'K0': float(params.get('K0', 0)), |
| 'alpha': float(params.get('alpha', 0.5)), |
| 'n_time_max': int(max_t), |
| 'sigmas': [float(s) for s in sigmas], |
| 'n_scan_rates': n_scan_rates, |
| } |
| return meta |
|
|
| elif exp == 'CA': |
| params = sample_ca_params(rng) |
| result = run_ca_simulation(**params, n_spatial_out=n_spatial, verbose=False) |
| actual_mechanism = params.get('kinetics', 'BV') |
| mechanism_id = MECHANISM_TO_ID.get(actual_mechanism, -1) |
| np.savez_compressed( |
| os.path.join(outdir, f"sample_{idx:06d}.npz"), |
| potential=result['potential'].astype(np.float32), |
| flux=result['flux'].astype(np.float32), |
| time=result['time'].astype(np.float32), |
| c_ox=result['c_ox'].astype(np.float32), |
| c_red=result['c_red'].astype(np.float32), |
| x_grid=result['x_grid_out'].astype(np.float32), |
| params=result['params'], |
| mechanism_id=np.int32(mechanism_id), |
| ) |
| meta = { |
| 'idx': idx, 'success': True, 'experiment_type': exp, |
| 'mechanism': actual_mechanism, 'mechanism_id': int(mechanism_id), |
| 'n_time': len(result['time']), |
| 'theta_step': float(params['theta_step']), |
| 'n_scan_rates': 1, |
| } |
| return meta |
| else: |
| raise ValueError(f"Unknown experiment type: {exp}") |
|
|
| except Exception as e: |
| return { |
| 'idx': idx, |
| 'success': False, |
| 'error': str(e), |
| } |
|
|
|
|
| def _worker_generate(args): |
| """Worker function for multiprocessing (must be at module level).""" |
| import signal |
|
|
| def _timeout_handler(signum, frame): |
| raise TimeoutError("Sample generation timed out") |
|
|
| if len(args) == 8: |
| idx, outdir, seed, n_spatial, experiment_type, mechanism, n_scan_rates, add_noise = args |
| elif len(args) == 7: |
| idx, outdir, seed, n_spatial, experiment_type, mechanism, n_scan_rates = args |
| add_noise = True |
| elif len(args) == 6: |
| idx, outdir, seed, n_spatial, experiment_type, mechanism = args |
| n_scan_rates = 1 |
| add_noise = True |
| else: |
| idx, outdir, seed, n_spatial, experiment_type = args |
| mechanism = None |
| n_scan_rates = 1 |
| add_noise = True |
|
|
| |
| outpath = os.path.join(outdir, f"sample_{idx:06d}.npz") |
| if os.path.exists(outpath): |
| return {'idx': idx, 'success': True, 'error': None, 'skipped': True} |
|
|
| old_handler = signal.signal(signal.SIGALRM, _timeout_handler) |
| signal.alarm(180) |
| try: |
| result = generate_sample_single(idx, outdir, seed, n_spatial, experiment_type, |
| mechanism, n_scan_rates, add_noise) |
| except (TimeoutError, Exception) as e: |
| result = {'idx': idx, 'success': False, 'error': f'Timeout or error: {e}'} |
| finally: |
| signal.alarm(0) |
| signal.signal(signal.SIGALRM, old_handler) |
| return result |
|
|
|
|
| def generate_dataset( |
| n_samples=1000, |
| outdir="data/diffec_cv", |
| n_spatial=64, |
| seed=42, |
| n_workers=None, |
| experiment_type='CV', |
| mechanism=None, |
| multi_mechanism=False, |
| n_per_mechanism=None, |
| n_scan_rates=1, |
| add_noise=True, |
| ): |
| """ |
| Generate a dataset of electrochemical simulations. |
| |
| Parameters |
| ---------- |
| n_samples : int |
| Number of samples to generate (used when mechanism is single or None) |
| outdir : str |
| Output directory |
| n_spatial : int |
| Number of spatial grid points in output |
| seed : int |
| Random seed |
| n_workers : int, optional |
| Number of parallel workers. Default: number of CPU cores. |
| experiment_type : str |
| 'CV' for cyclic voltammetry, 'CA' for chronoamperometry, |
| 'mixed' for random mix of both. |
| mechanism : str or None |
| Specific mechanism to use. None defaults to BV. |
| multi_mechanism : bool |
| If True, generate balanced data across all 4 mechanisms. |
| n_per_mechanism : int or None |
| Samples per mechanism when multi_mechanism=True. Defaults to n_samples. |
| n_scan_rates : int |
| Number of scan rates per sample. 1 = legacy single-CV format. |
| >1 = multi-scan-rate format with shared kinetic parameters. |
| -1 = randomly sample from {1, 2, 3} per sample. |
| """ |
| os.makedirs(outdir, exist_ok=True) |
|
|
| if n_workers is None: |
| n_workers = max(1, cpu_count() - 1) |
|
|
| if multi_mechanism: |
| if n_per_mechanism is None: |
| n_per_mechanism = n_samples |
| total = n_per_mechanism * len(MECHANISM_LIST) |
| print(f"Generating multi-mechanism dataset: {n_per_mechanism} per mechanism x {len(MECHANISM_LIST)} = {total}") |
| args_list = [] |
| for mech_idx, mech in enumerate(MECHANISM_LIST): |
| offset = mech_idx * n_per_mechanism |
| for i in range(n_per_mechanism): |
| args_list.append((offset + i, outdir, seed, n_spatial, experiment_type, mech, n_scan_rates, add_noise)) |
| n_samples = total |
| else: |
| args_list = [(i, outdir, seed, n_spatial, experiment_type, mechanism, n_scan_rates, add_noise) for i in range(n_samples)] |
|
|
| n_workers = min(n_workers, n_samples) |
|
|
| print(f"Generating {n_samples} samples...") |
| print(f"Output directory: {outdir}") |
| print(f"Spatial resolution: {n_spatial} points") |
| print(f"Using {n_workers} worker(s)") |
| |
| metadata = [] |
| |
| if n_workers == 1: |
| |
| for args in tqdm(args_list, desc="Generating samples"): |
| meta = _worker_generate(args) |
| metadata.append(meta) |
| if not meta['success']: |
| print(f"\nSample {meta['idx']} failed: {meta.get('error', 'Unknown error')}") |
| else: |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| TASK_TIMEOUT = 180 |
| try: |
| with ProcessPoolExecutor(max_workers=n_workers) as executor: |
| futures = {executor.submit(_worker_generate, a): a[0] for a in args_list} |
| pbar = tqdm(total=n_samples, desc="Generating samples") |
| for future in as_completed(futures): |
| idx = futures[future] |
| try: |
| meta = future.result(timeout=TASK_TIMEOUT) |
| except Exception as e: |
| meta = {'idx': idx, 'success': False, 'error': f'Worker error: {e}'} |
| metadata.append(meta) |
| if not meta['success'] and not meta.get('skipped'): |
| tqdm.write(f"Sample {meta['idx']} failed: {meta.get('error', 'Unknown error')}") |
| pbar.update(1) |
| pbar.close() |
| except (PermissionError, OSError) as e: |
| print(f"\nWarning: Multiprocessing failed ({e}). Falling back to sequential processing...") |
| metadata = [] |
| for args in tqdm(args_list, desc="Generating samples (sequential)"): |
| meta = _worker_generate(args) |
| metadata.append(meta) |
| if not meta['success']: |
| print(f"\nSample {meta['idx']} failed: {meta.get('error', 'Unknown error')}") |
| |
| |
| metadata = sorted(metadata, key=lambda x: x['idx']) |
| |
| |
| n_success = sum(1 for m in metadata if m['success']) |
| |
| mech_counts = {} |
| for m in metadata: |
| if m['success']: |
| mech = m.get('mechanism', 'BV') |
| mech_counts[mech] = mech_counts.get(mech, 0) + 1 |
|
|
| summary = { |
| 'n_samples': n_samples, |
| 'n_success': n_success, |
| 'n_spatial': n_spatial, |
| 'n_scan_rates': 'random(1-3)' if n_scan_rates == -1 else n_scan_rates, |
| 'add_noise': add_noise, |
| 'seed': seed, |
| 'n_workers': n_workers, |
| 'experiment_type': experiment_type, |
| 'multi_mechanism': multi_mechanism, |
| 'mechanism_counts': mech_counts, |
| 'samples': metadata, |
| } |
| |
| with open(os.path.join(outdir, "metadata.json"), "w") as f: |
| json.dump(summary, f, indent=2) |
| |
| print(f"\nGeneration complete: {n_success}/{n_samples} successful") |
| print(f"Metadata saved to {os.path.join(outdir, 'metadata.json')}") |
| |
| return metadata |
|
|
|
|
| |
| |
| |
|
|
| def load_sample(filepath): |
| """Load a single sample from npz file.""" |
| data = np.load(filepath, allow_pickle=True) |
| return { |
| 'potential': data['potential'], |
| 'flux': data['flux'], |
| 'time': data['time'], |
| 'c_ox': data['c_ox'], |
| 'c_red': data['c_red'], |
| 'x_grid': data['x_grid'], |
| 'params': data['params'].item(), |
| } |
|
|
|
|
| def load_dataset(datadir, max_samples=None): |
| """Load all samples from a directory.""" |
| import glob |
| |
| files = sorted(glob.glob(os.path.join(datadir, "sample_*.npz"))) |
| if max_samples is not None: |
| files = files[:max_samples] |
| |
| samples = [] |
| for f in tqdm(files, desc="Loading samples"): |
| samples.append(load_sample(f)) |
| |
| return samples |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser( |
| description="Generate DiffEC CV dataset for flow matching" |
| ) |
| parser.add_argument( |
| "--n_samples", type=int, default=1000, |
| help="Number of samples to generate" |
| ) |
| parser.add_argument( |
| "--outdir", type=str, default="data/diffec_cv", |
| help="Output directory" |
| ) |
| parser.add_argument( |
| "--n_spatial", type=int, default=64, |
| help="Number of spatial grid points in output" |
| ) |
| parser.add_argument( |
| "--seed", type=int, default=42, |
| help="Random seed" |
| ) |
| parser.add_argument( |
| "--n_workers", type=int, default=None, |
| help="Number of parallel workers (default: num_cpus - 1). Set to 1 for sequential." |
| ) |
| parser.add_argument( |
| "--experiment_type", type=str, default="CV", |
| choices=["CV", "CA", "mixed"], |
| help="Experiment type: CV (cyclic voltammetry), CA (chronoamperometry), or mixed" |
| ) |
| parser.add_argument( |
| "--mechanism", type=str, default=None, |
| choices=["Nernst", "BV", "MHC", "Ads", "EC", "LH"], |
| help="Specific mechanism to generate data for" |
| ) |
| parser.add_argument( |
| "--multi_mechanism", action="store_true", |
| help="Generate balanced data across all 4 mechanisms" |
| ) |
| parser.add_argument( |
| "--n_per_mechanism", type=int, default=None, |
| help="Samples per mechanism when using --multi_mechanism" |
| ) |
| parser.add_argument( |
| "--n_scan_rates", type=str, default="1", |
| help="Number of scan rates per sample. 1 = legacy single-CV. " |
| ">1 = multi-scan-rate with shared kinetic parameters. " |
| "'random' = randomly 1-3 per sample." |
| ) |
| parser.add_argument( |
| "--no_noise", action="store_true", |
| help="Disable noise augmentation (generate clean signals)" |
| ) |
| parser.add_argument( |
| "--test", action="store_true", |
| help="Run a single test simulation and plot results" |
| ) |
|
|
| args = parser.parse_args() |
|
|
| if args.test: |
| |
| print("Running test simulation...") |
| params = sample_cv_params(np.random.default_rng(42)) |
| print(f"Parameters: {params}") |
| |
| result = run_cv_simulation(**params, n_spatial_out=64, verbose=True) |
| |
| print(f"\nResults:") |
| print(f" Time steps: {len(result['time'])}") |
| print(f" Spatial points: {result['c_ox'].shape[1]}") |
| print(f" Potential range: [{result['potential'].min():.2f}, {result['potential'].max():.2f}]") |
| print(f" Flux range: [{result['flux'].min():.4f}, {result['flux'].max():.4f}]") |
| |
| |
| try: |
| import matplotlib.pyplot as plt |
| |
| fig, axes = plt.subplots(2, 2, figsize=(12, 10)) |
| |
| |
| ax = axes[0, 0] |
| ax.plot(result['potential'], result['flux']) |
| ax.set_xlabel('Potential (dimensionless)') |
| ax.set_ylabel('Flux (dimensionless)') |
| ax.set_title('Cyclic Voltammogram') |
| |
| |
| ax = axes[0, 1] |
| ax.plot(result['time'], result['c_ox'][:, -1], label='Ox (surface)') |
| ax.plot(result['time'], result['c_red'][:, 0], label='Red (surface)') |
| ax.set_xlabel('Time (dimensionless)') |
| ax.set_ylabel('Concentration') |
| ax.set_title('Surface Concentration vs Time') |
| ax.legend() |
| |
| |
| ax = axes[1, 0] |
| im = ax.imshow( |
| result['c_ox'].T, aspect='auto', origin='lower', |
| extent=[0, result['time'][-1], 0, result['x_grid_out'][-1]] |
| ) |
| plt.colorbar(im, ax=ax, label='Concentration') |
| ax.set_xlabel('Time') |
| ax.set_ylabel('Distance from electrode') |
| ax.set_title('Oxidized Species Concentration') |
| |
| |
| ax = axes[1, 1] |
| im = ax.imshow( |
| result['c_red'].T, aspect='auto', origin='lower', |
| extent=[0, result['time'][-1], 0, result['x_grid_out'][-1]] |
| ) |
| plt.colorbar(im, ax=ax, label='Concentration') |
| ax.set_xlabel('Time') |
| ax.set_ylabel('Distance from electrode') |
| ax.set_title('Reduced Species Concentration') |
| |
| plt.tight_layout() |
| plt.savefig('test_simulation.png', dpi=150) |
| print("\nPlot saved to test_simulation.png") |
| |
| except ImportError: |
| print("\nmatplotlib not available, skipping plot") |
| |
| else: |
| n_scan_rates = -1 if args.n_scan_rates.lower() == 'random' else int(args.n_scan_rates) |
| generate_dataset( |
| n_samples=args.n_samples, |
| outdir=args.outdir, |
| n_spatial=args.n_spatial, |
| seed=args.seed, |
| n_workers=args.n_workers, |
| experiment_type=args.experiment_type, |
| mechanism=args.mechanism, |
| multi_mechanism=args.multi_mechanism, |
| n_per_mechanism=args.n_per_mechanism, |
| n_scan_rates=n_scan_rates, |
| add_noise=not args.no_noise, |
| ) |
|
|