File size: 7,050 Bytes
81e15fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import json
import numpy as np
from pathlib import Path

class QuantileGridFromCoeffs:
    def __init__(self, export_dir):
        self.export_dir = Path(export_dir)
        meta = json.loads((self.export_dir / "meta.json").read_text())
        self.features = meta["features"]
        self.taus = np.array(meta["taus"], dtype=float)
        self.has_intercept = meta.get("has_intercept", False)

        # load coefficients
        coeffs = np.load(self.export_dir / "coeffs.npz")
        # build a matrix shape (n_taus, n_coef)
        coefs = []
        for t in self.taus:
            key = f"tau_{t}"
            if key not in coeffs:
                # try rounding formatting
                found = [k for k in coeffs.files if k.startswith("tau_") and abs(float(k.split("_")[1]) - t) < 1e-12]
                if not found:
                    raise KeyError(f"Coefficient for tau={t} not found in {coeffs.files}")
                key = found[0]
            coefs.append(coeffs[key])
        self.coef_matrix = np.vstack(coefs)  # shape (m_taus, n_coef)

    def _create_polynomial_features(self, X):
        """
        Create polynomial features for interaction terms.

        Parameters:
            X (array): Array with columns [x, y]

        Returns:
            Design matrix with polynomial features
        """
        x = X[:, 0]
        y = X[:, 1]

        A = 2

        # Create design matrix with polynomial features up to the specified degree
        features = []

        # Constant term (intercept)
        if 'c' in self.features:
            features.append(np.ones_like(x))
        # Linear terms
        if 'x' in self.features:
            features.append(x)
        if 'y' in self.features:
            features.append(y)
        if 'y_m' in self.features:
            features.append(y-A)
        if 'y_p' in self.features:
            features.append(y+A)
        # Interaction terms
        if 'xy' in self.features:
            features.append(x * y)  
        if 'xy_m' in self.features:
            features.append(x * (y-A))  
        if 'xy_p' in self.features:
            features.append(x * (y+A))  
        if 'xy2' in self.features:
            features.append(x * y**2)  
        if 'xy2_m' in self.features:
            features.append(x * (y-A)**2)  
        if 'xy2_p' in self.features:
            features.append(x * (y+A)**2)  
        if 'x2y' in self.features:
            features.append(x**2 * y)  
        if 'xy3' in self.features:
            features.append(x * y**3)  
        if 'xy4' in self.features:
            features.append(x * y**4)  
        if 'xy3_m' in self.features:
            features.append(x * (y-A)**3)  
        if 'xy3_p' in self.features:
            features.append(x * (y+A)**3)  
        if 'x3y' in self.features:
            features.append(x**3 * y)  
        # Higher order terms
        if 'x2' in self.features:
            features.append(x**2)
        if 'x3' in self.features:
            features.append(x**3)
        if 'y2' in self.features:
            features.append(y**2)
        if 'y3' in self.features:
            features.append(y**3)
        if 'y4' in self.features:
            features.append(y**4)
        if 'y2_m' in self.features:
            features.append((y-A)**2)
        if 'y3_m' in self.features:
            features.append((y-A)**3)
        if 'y4_m' in self.features:
            features.append((y-A)**4)
        if 'y2_p' in self.features:
            features.append((y+A)**2)
        if 'y3_p' in self.features:
            features.append((y+A)**3)
        if 'y4_p' in self.features:
            features.append((y+A)**4)
        return np.column_stack(features)

    def predict_quantiles(self, X_new):
        """
        Return Q (n_points, m_taus) predicted quantiles.
        """
        X_new = np.asarray(X_new, dtype=float)
        Xd = self._create_polynomial_features(X_new)  # shape (n, p)
        # matrix multiply: (m_taus, p) @ (p, n) -> (m_taus, n) then transpose
        Q = (self.coef_matrix @ Xd.T).T
        # optionally enforce monotonicity in tau
        Q_sorted = np.sort(Q, axis=1)
        return self.taus, Q_sorted

    def predict_tau(self, X_new, tau_star):
        taus, Q = self.predict_quantiles(X_new)
        # vectorized interpolation (same approach as earlier)
        # implement interpolation between nearest taus
        import numpy as np
        t0_idx = np.searchsorted(taus, tau_star, side='right') - 1
        # for simplicity assume scalar tau_star
        j = int(np.clip(t0_idx, 0, len(taus)-2))
        t0, t1 = taus[j], taus[j+1]
        q0, q1 = Q[:, j], Q[:, j+1]
        w = (tau_star - t0) / (t1 - t0)
        return q0 + w * (q1 - q0)

    def sample(self, X_new, n_samples=1, rng=None):
        """
        Draw samples from the approximate conditional distribution at X_new
        using inverse-CDF sampling based on the saved quantile grid.

        Parameters
        ----------
        X_new : array-like, shape (n_points, 2)
            New points [x, y] in your domain (e.g., LogP, PolarityIndex).
        n_samples : int
            Number of samples per point.
        rng : None, int, or np.random.Generator
            Random seed or Generator for reproducibility.

        Returns
        -------
        samples : ndarray, shape (n_points, n_samples)
            Samples drawn from the interpolated quantile function.
        """
        if n_samples < 1:
            raise ValueError("n_samples must be >= 1")

        # Setup RNG
        if isinstance(rng, np.random.Generator):
            gen = rng
        else:
            gen = np.random.default_rng(rng)

        X_new = np.asarray(X_new, dtype=float)
        if X_new.ndim != 2 or X_new.shape[1] != 2:
            raise ValueError("X_new must be a 2D array with exactly two columns [x, y]")

        # Get quantile grid predictions: Q has shape (n_points, n_taus)
        taus, Q = self.predict_quantiles(X_new)
        taus = np.asarray(taus, dtype=float)
        Q = np.asarray(Q, dtype=float)

        n_points, m = Q.shape
        if m < 2:
            raise RuntimeError("Need at least two taus to sample with interpolation.")

        # Sample u in the supported tau range of the grid
        u = gen.uniform(taus[0], taus[-1], size=(n_points, n_samples))

        # For each u, find interval [taus[j], taus[j+1]]
        j = np.searchsorted(taus, u, side="right") - 1
        j = np.clip(j, 0, m - 2)

        # Gather endpoints
        t0 = taus[j]
        t1 = taus[j + 1]

        row_idx = np.arange(n_points)[:, None]
        q0 = Q[row_idx, j]
        q1 = Q[row_idx, j + 1]

        # Linear interpolation
        w = (u - t0) / (t1 - t0)
        samples = q0 + w * (q1 - q0)

        return samples

## read saved model
#model = QuantileGridFromCoeffs(export_dir='Kps_model')
## example points: [(LogP, Polarity_Index), ...]
#X_new = np.array([[2.34665198, 10.2], ])
## sample the distribution at each X
#samples = model.sample(X_new, n_samples=50, rng=0)
#print(samples[0])