import json import numpy as np from pathlib import Path class QuantileGridFromCoeffs: def __init__(self, export_dir): self.export_dir = Path(export_dir) meta = json.loads((self.export_dir / "meta.json").read_text()) self.features = meta["features"] self.taus = np.array(meta["taus"], dtype=float) self.has_intercept = meta.get("has_intercept", False) # load coefficients coeffs = np.load(self.export_dir / "coeffs.npz") # build a matrix shape (n_taus, n_coef) coefs = [] for t in self.taus: key = f"tau_{t}" if key not in coeffs: # try rounding formatting found = [k for k in coeffs.files if k.startswith("tau_") and abs(float(k.split("_")[1]) - t) < 1e-12] if not found: raise KeyError(f"Coefficient for tau={t} not found in {coeffs.files}") key = found[0] coefs.append(coeffs[key]) self.coef_matrix = np.vstack(coefs) # shape (m_taus, n_coef) def _create_polynomial_features(self, X): """ Create polynomial features for interaction terms. Parameters: X (array): Array with columns [x, y] Returns: Design matrix with polynomial features """ x = X[:, 0] y = X[:, 1] A = 2 # Create design matrix with polynomial features up to the specified degree features = [] # Constant term (intercept) if 'c' in self.features: features.append(np.ones_like(x)) # Linear terms if 'x' in self.features: features.append(x) if 'y' in self.features: features.append(y) if 'y_m' in self.features: features.append(y-A) if 'y_p' in self.features: features.append(y+A) # Interaction terms if 'xy' in self.features: features.append(x * y) if 'xy_m' in self.features: features.append(x * (y-A)) if 'xy_p' in self.features: features.append(x * (y+A)) if 'xy2' in self.features: features.append(x * y**2) if 'xy2_m' in self.features: features.append(x * (y-A)**2) if 'xy2_p' in self.features: features.append(x * (y+A)**2) if 'x2y' in self.features: features.append(x**2 * y) if 'xy3' in self.features: features.append(x * y**3) if 'xy4' in self.features: features.append(x * y**4) if 'xy3_m' in self.features: features.append(x * (y-A)**3) if 'xy3_p' in self.features: features.append(x * (y+A)**3) if 'x3y' in self.features: features.append(x**3 * y) # Higher order terms if 'x2' in self.features: features.append(x**2) if 'x3' in self.features: features.append(x**3) if 'y2' in self.features: features.append(y**2) if 'y3' in self.features: features.append(y**3) if 'y4' in self.features: features.append(y**4) if 'y2_m' in self.features: features.append((y-A)**2) if 'y3_m' in self.features: features.append((y-A)**3) if 'y4_m' in self.features: features.append((y-A)**4) if 'y2_p' in self.features: features.append((y+A)**2) if 'y3_p' in self.features: features.append((y+A)**3) if 'y4_p' in self.features: features.append((y+A)**4) return np.column_stack(features) def predict_quantiles(self, X_new): """ Return Q (n_points, m_taus) predicted quantiles. """ X_new = np.asarray(X_new, dtype=float) Xd = self._create_polynomial_features(X_new) # shape (n, p) # matrix multiply: (m_taus, p) @ (p, n) -> (m_taus, n) then transpose Q = (self.coef_matrix @ Xd.T).T # optionally enforce monotonicity in tau Q_sorted = np.sort(Q, axis=1) return self.taus, Q_sorted def predict_tau(self, X_new, tau_star): taus, Q = self.predict_quantiles(X_new) # vectorized interpolation (same approach as earlier) # implement interpolation between nearest taus import numpy as np t0_idx = np.searchsorted(taus, tau_star, side='right') - 1 # for simplicity assume scalar tau_star j = int(np.clip(t0_idx, 0, len(taus)-2)) t0, t1 = taus[j], taus[j+1] q0, q1 = Q[:, j], Q[:, j+1] w = (tau_star - t0) / (t1 - t0) return q0 + w * (q1 - q0) def sample(self, X_new, n_samples=1, rng=None): """ Draw samples from the approximate conditional distribution at X_new using inverse-CDF sampling based on the saved quantile grid. Parameters ---------- X_new : array-like, shape (n_points, 2) New points [x, y] in your domain (e.g., LogP, PolarityIndex). n_samples : int Number of samples per point. rng : None, int, or np.random.Generator Random seed or Generator for reproducibility. Returns ------- samples : ndarray, shape (n_points, n_samples) Samples drawn from the interpolated quantile function. """ if n_samples < 1: raise ValueError("n_samples must be >= 1") # Setup RNG if isinstance(rng, np.random.Generator): gen = rng else: gen = np.random.default_rng(rng) X_new = np.asarray(X_new, dtype=float) if X_new.ndim != 2 or X_new.shape[1] != 2: raise ValueError("X_new must be a 2D array with exactly two columns [x, y]") # Get quantile grid predictions: Q has shape (n_points, n_taus) taus, Q = self.predict_quantiles(X_new) taus = np.asarray(taus, dtype=float) Q = np.asarray(Q, dtype=float) n_points, m = Q.shape if m < 2: raise RuntimeError("Need at least two taus to sample with interpolation.") # Sample u in the supported tau range of the grid u = gen.uniform(taus[0], taus[-1], size=(n_points, n_samples)) # For each u, find interval [taus[j], taus[j+1]] j = np.searchsorted(taus, u, side="right") - 1 j = np.clip(j, 0, m - 2) # Gather endpoints t0 = taus[j] t1 = taus[j + 1] row_idx = np.arange(n_points)[:, None] q0 = Q[row_idx, j] q1 = Q[row_idx, j + 1] # Linear interpolation w = (u - t0) / (t1 - t0) samples = q0 + w * (q1 - q0) return samples ## read saved model #model = QuantileGridFromCoeffs(export_dir='Kps_model') ## example points: [(LogP, Polarity_Index), ...] #X_new = np.array([[2.34665198, 10.2], ]) ## sample the distribution at each X #samples = model.sample(X_new, n_samples=50, rng=0) #print(samples[0])