Justify / Kps_function /read_Kps_model.py
dzs
initial upload
81e15fe
import json
import numpy as np
from pathlib import Path
class QuantileGridFromCoeffs:
def __init__(self, export_dir):
self.export_dir = Path(export_dir)
meta = json.loads((self.export_dir / "meta.json").read_text())
self.features = meta["features"]
self.taus = np.array(meta["taus"], dtype=float)
self.has_intercept = meta.get("has_intercept", False)
# load coefficients
coeffs = np.load(self.export_dir / "coeffs.npz")
# build a matrix shape (n_taus, n_coef)
coefs = []
for t in self.taus:
key = f"tau_{t}"
if key not in coeffs:
# try rounding formatting
found = [k for k in coeffs.files if k.startswith("tau_") and abs(float(k.split("_")[1]) - t) < 1e-12]
if not found:
raise KeyError(f"Coefficient for tau={t} not found in {coeffs.files}")
key = found[0]
coefs.append(coeffs[key])
self.coef_matrix = np.vstack(coefs) # shape (m_taus, n_coef)
def _create_polynomial_features(self, X):
"""
Create polynomial features for interaction terms.
Parameters:
X (array): Array with columns [x, y]
Returns:
Design matrix with polynomial features
"""
x = X[:, 0]
y = X[:, 1]
A = 2
# Create design matrix with polynomial features up to the specified degree
features = []
# Constant term (intercept)
if 'c' in self.features:
features.append(np.ones_like(x))
# Linear terms
if 'x' in self.features:
features.append(x)
if 'y' in self.features:
features.append(y)
if 'y_m' in self.features:
features.append(y-A)
if 'y_p' in self.features:
features.append(y+A)
# Interaction terms
if 'xy' in self.features:
features.append(x * y)
if 'xy_m' in self.features:
features.append(x * (y-A))
if 'xy_p' in self.features:
features.append(x * (y+A))
if 'xy2' in self.features:
features.append(x * y**2)
if 'xy2_m' in self.features:
features.append(x * (y-A)**2)
if 'xy2_p' in self.features:
features.append(x * (y+A)**2)
if 'x2y' in self.features:
features.append(x**2 * y)
if 'xy3' in self.features:
features.append(x * y**3)
if 'xy4' in self.features:
features.append(x * y**4)
if 'xy3_m' in self.features:
features.append(x * (y-A)**3)
if 'xy3_p' in self.features:
features.append(x * (y+A)**3)
if 'x3y' in self.features:
features.append(x**3 * y)
# Higher order terms
if 'x2' in self.features:
features.append(x**2)
if 'x3' in self.features:
features.append(x**3)
if 'y2' in self.features:
features.append(y**2)
if 'y3' in self.features:
features.append(y**3)
if 'y4' in self.features:
features.append(y**4)
if 'y2_m' in self.features:
features.append((y-A)**2)
if 'y3_m' in self.features:
features.append((y-A)**3)
if 'y4_m' in self.features:
features.append((y-A)**4)
if 'y2_p' in self.features:
features.append((y+A)**2)
if 'y3_p' in self.features:
features.append((y+A)**3)
if 'y4_p' in self.features:
features.append((y+A)**4)
return np.column_stack(features)
def predict_quantiles(self, X_new):
"""
Return Q (n_points, m_taus) predicted quantiles.
"""
X_new = np.asarray(X_new, dtype=float)
Xd = self._create_polynomial_features(X_new) # shape (n, p)
# matrix multiply: (m_taus, p) @ (p, n) -> (m_taus, n) then transpose
Q = (self.coef_matrix @ Xd.T).T
# optionally enforce monotonicity in tau
Q_sorted = np.sort(Q, axis=1)
return self.taus, Q_sorted
def predict_tau(self, X_new, tau_star):
taus, Q = self.predict_quantiles(X_new)
# vectorized interpolation (same approach as earlier)
# implement interpolation between nearest taus
import numpy as np
t0_idx = np.searchsorted(taus, tau_star, side='right') - 1
# for simplicity assume scalar tau_star
j = int(np.clip(t0_idx, 0, len(taus)-2))
t0, t1 = taus[j], taus[j+1]
q0, q1 = Q[:, j], Q[:, j+1]
w = (tau_star - t0) / (t1 - t0)
return q0 + w * (q1 - q0)
def sample(self, X_new, n_samples=1, rng=None):
"""
Draw samples from the approximate conditional distribution at X_new
using inverse-CDF sampling based on the saved quantile grid.
Parameters
----------
X_new : array-like, shape (n_points, 2)
New points [x, y] in your domain (e.g., LogP, PolarityIndex).
n_samples : int
Number of samples per point.
rng : None, int, or np.random.Generator
Random seed or Generator for reproducibility.
Returns
-------
samples : ndarray, shape (n_points, n_samples)
Samples drawn from the interpolated quantile function.
"""
if n_samples < 1:
raise ValueError("n_samples must be >= 1")
# Setup RNG
if isinstance(rng, np.random.Generator):
gen = rng
else:
gen = np.random.default_rng(rng)
X_new = np.asarray(X_new, dtype=float)
if X_new.ndim != 2 or X_new.shape[1] != 2:
raise ValueError("X_new must be a 2D array with exactly two columns [x, y]")
# Get quantile grid predictions: Q has shape (n_points, n_taus)
taus, Q = self.predict_quantiles(X_new)
taus = np.asarray(taus, dtype=float)
Q = np.asarray(Q, dtype=float)
n_points, m = Q.shape
if m < 2:
raise RuntimeError("Need at least two taus to sample with interpolation.")
# Sample u in the supported tau range of the grid
u = gen.uniform(taus[0], taus[-1], size=(n_points, n_samples))
# For each u, find interval [taus[j], taus[j+1]]
j = np.searchsorted(taus, u, side="right") - 1
j = np.clip(j, 0, m - 2)
# Gather endpoints
t0 = taus[j]
t1 = taus[j + 1]
row_idx = np.arange(n_points)[:, None]
q0 = Q[row_idx, j]
q1 = Q[row_idx, j + 1]
# Linear interpolation
w = (u - t0) / (t1 - t0)
samples = q0 + w * (q1 - q0)
return samples
## read saved model
#model = QuantileGridFromCoeffs(export_dir='Kps_model')
## example points: [(LogP, Polarity_Index), ...]
#X_new = np.array([[2.34665198, 10.2], ])
## sample the distribution at each X
#samples = model.sample(X_new, n_samples=50, rng=0)
#print(samples[0])