Spaces:
Sleeping
Sleeping
| import json | |
| import numpy as np | |
| from pathlib import Path | |
| class QuantileGridFromCoeffs: | |
| def __init__(self, export_dir): | |
| self.export_dir = Path(export_dir) | |
| meta = json.loads((self.export_dir / "meta.json").read_text()) | |
| self.features = meta["features"] | |
| self.taus = np.array(meta["taus"], dtype=float) | |
| self.has_intercept = meta.get("has_intercept", False) | |
| # load coefficients | |
| coeffs = np.load(self.export_dir / "coeffs.npz") | |
| # build a matrix shape (n_taus, n_coef) | |
| coefs = [] | |
| for t in self.taus: | |
| key = f"tau_{t}" | |
| if key not in coeffs: | |
| # try rounding formatting | |
| found = [k for k in coeffs.files if k.startswith("tau_") and abs(float(k.split("_")[1]) - t) < 1e-12] | |
| if not found: | |
| raise KeyError(f"Coefficient for tau={t} not found in {coeffs.files}") | |
| key = found[0] | |
| coefs.append(coeffs[key]) | |
| self.coef_matrix = np.vstack(coefs) # shape (m_taus, n_coef) | |
| def _create_polynomial_features(self, X): | |
| """ | |
| Create polynomial features for interaction terms. | |
| Parameters: | |
| X (array): Array with columns [x, y] | |
| Returns: | |
| Design matrix with polynomial features | |
| """ | |
| x = X[:, 0] | |
| y = X[:, 1] | |
| A = 2 | |
| # Create design matrix with polynomial features up to the specified degree | |
| features = [] | |
| # Constant term (intercept) | |
| if 'c' in self.features: | |
| features.append(np.ones_like(x)) | |
| # Linear terms | |
| if 'x' in self.features: | |
| features.append(x) | |
| if 'y' in self.features: | |
| features.append(y) | |
| if 'y_m' in self.features: | |
| features.append(y-A) | |
| if 'y_p' in self.features: | |
| features.append(y+A) | |
| # Interaction terms | |
| if 'xy' in self.features: | |
| features.append(x * y) | |
| if 'xy_m' in self.features: | |
| features.append(x * (y-A)) | |
| if 'xy_p' in self.features: | |
| features.append(x * (y+A)) | |
| if 'xy2' in self.features: | |
| features.append(x * y**2) | |
| if 'xy2_m' in self.features: | |
| features.append(x * (y-A)**2) | |
| if 'xy2_p' in self.features: | |
| features.append(x * (y+A)**2) | |
| if 'x2y' in self.features: | |
| features.append(x**2 * y) | |
| if 'xy3' in self.features: | |
| features.append(x * y**3) | |
| if 'xy4' in self.features: | |
| features.append(x * y**4) | |
| if 'xy3_m' in self.features: | |
| features.append(x * (y-A)**3) | |
| if 'xy3_p' in self.features: | |
| features.append(x * (y+A)**3) | |
| if 'x3y' in self.features: | |
| features.append(x**3 * y) | |
| # Higher order terms | |
| if 'x2' in self.features: | |
| features.append(x**2) | |
| if 'x3' in self.features: | |
| features.append(x**3) | |
| if 'y2' in self.features: | |
| features.append(y**2) | |
| if 'y3' in self.features: | |
| features.append(y**3) | |
| if 'y4' in self.features: | |
| features.append(y**4) | |
| if 'y2_m' in self.features: | |
| features.append((y-A)**2) | |
| if 'y3_m' in self.features: | |
| features.append((y-A)**3) | |
| if 'y4_m' in self.features: | |
| features.append((y-A)**4) | |
| if 'y2_p' in self.features: | |
| features.append((y+A)**2) | |
| if 'y3_p' in self.features: | |
| features.append((y+A)**3) | |
| if 'y4_p' in self.features: | |
| features.append((y+A)**4) | |
| return np.column_stack(features) | |
| def predict_quantiles(self, X_new): | |
| """ | |
| Return Q (n_points, m_taus) predicted quantiles. | |
| """ | |
| X_new = np.asarray(X_new, dtype=float) | |
| Xd = self._create_polynomial_features(X_new) # shape (n, p) | |
| # matrix multiply: (m_taus, p) @ (p, n) -> (m_taus, n) then transpose | |
| Q = (self.coef_matrix @ Xd.T).T | |
| # optionally enforce monotonicity in tau | |
| Q_sorted = np.sort(Q, axis=1) | |
| return self.taus, Q_sorted | |
| def predict_tau(self, X_new, tau_star): | |
| taus, Q = self.predict_quantiles(X_new) | |
| # vectorized interpolation (same approach as earlier) | |
| # implement interpolation between nearest taus | |
| import numpy as np | |
| t0_idx = np.searchsorted(taus, tau_star, side='right') - 1 | |
| # for simplicity assume scalar tau_star | |
| j = int(np.clip(t0_idx, 0, len(taus)-2)) | |
| t0, t1 = taus[j], taus[j+1] | |
| q0, q1 = Q[:, j], Q[:, j+1] | |
| w = (tau_star - t0) / (t1 - t0) | |
| return q0 + w * (q1 - q0) | |
| def sample(self, X_new, n_samples=1, rng=None): | |
| """ | |
| Draw samples from the approximate conditional distribution at X_new | |
| using inverse-CDF sampling based on the saved quantile grid. | |
| Parameters | |
| ---------- | |
| X_new : array-like, shape (n_points, 2) | |
| New points [x, y] in your domain (e.g., LogP, PolarityIndex). | |
| n_samples : int | |
| Number of samples per point. | |
| rng : None, int, or np.random.Generator | |
| Random seed or Generator for reproducibility. | |
| Returns | |
| ------- | |
| samples : ndarray, shape (n_points, n_samples) | |
| Samples drawn from the interpolated quantile function. | |
| """ | |
| if n_samples < 1: | |
| raise ValueError("n_samples must be >= 1") | |
| # Setup RNG | |
| if isinstance(rng, np.random.Generator): | |
| gen = rng | |
| else: | |
| gen = np.random.default_rng(rng) | |
| X_new = np.asarray(X_new, dtype=float) | |
| if X_new.ndim != 2 or X_new.shape[1] != 2: | |
| raise ValueError("X_new must be a 2D array with exactly two columns [x, y]") | |
| # Get quantile grid predictions: Q has shape (n_points, n_taus) | |
| taus, Q = self.predict_quantiles(X_new) | |
| taus = np.asarray(taus, dtype=float) | |
| Q = np.asarray(Q, dtype=float) | |
| n_points, m = Q.shape | |
| if m < 2: | |
| raise RuntimeError("Need at least two taus to sample with interpolation.") | |
| # Sample u in the supported tau range of the grid | |
| u = gen.uniform(taus[0], taus[-1], size=(n_points, n_samples)) | |
| # For each u, find interval [taus[j], taus[j+1]] | |
| j = np.searchsorted(taus, u, side="right") - 1 | |
| j = np.clip(j, 0, m - 2) | |
| # Gather endpoints | |
| t0 = taus[j] | |
| t1 = taus[j + 1] | |
| row_idx = np.arange(n_points)[:, None] | |
| q0 = Q[row_idx, j] | |
| q1 = Q[row_idx, j + 1] | |
| # Linear interpolation | |
| w = (u - t0) / (t1 - t0) | |
| samples = q0 + w * (q1 - q0) | |
| return samples | |
| ## read saved model | |
| #model = QuantileGridFromCoeffs(export_dir='Kps_model') | |
| ## example points: [(LogP, Polarity_Index), ...] | |
| #X_new = np.array([[2.34665198, 10.2], ]) | |
| ## sample the distribution at each X | |
| #samples = model.sample(X_new, n_samples=50, rng=0) | |
| #print(samples[0]) | |