File size: 12,834 Bytes
69bf174
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
"""

CHG (Covariance-based Hilbert Geometry) Algorithm Implementation



This module contains the core CHG algorithm implementation with multi-head attention

mechanism for Gaussian Process regression with enhanced covariance computation.



Author: CHG Algorithm Team

Version: 1.0.0

"""

import numpy as np
from typing import Tuple, Optional


class CHG:
    """

    CHG (Covariance-based Hilbert Geometry) Model

    

    A Gaussian Process model with multi-head attention mechanism for enhanced

    covariance computation, supporting uncertainty quantification and optimization.

    

    Parameters:

    -----------

    input_dim : int

        Dimensionality of input features

    hidden_dim : int

        Hidden dimension for feature transformation

    num_heads : int

        Number of attention heads

    """
    
    def __init__(self, input_dim: int, hidden_dim: int, num_heads: int):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads
        self.head_dim = hidden_dim // num_heads
        
        self._init_parameters()
    
    def _init_parameters(self):
        """Initialize model parameters with proper scaling"""
        # QKV projection matrices
        self.W_q = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
        self.W_k = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
        self.W_v = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
        
        # Covariance feedforward network
        self.W_ff1 = np.random.normal(0, 0.02, (self.hidden_dim, 2 * self.hidden_dim))
        self.b_ff1 = np.zeros((2 * self.hidden_dim,))
        self.W_ff2 = np.random.normal(0, 0.02, (2 * self.hidden_dim, 1))
        self.b_ff2 = np.zeros((1,))
        
        # Layer normalization
        self.gamma = np.ones((self.hidden_dim,))
        self.beta = np.zeros((self.hidden_dim,))
        
        # Multi-head fusion
        self.W_heads = np.random.normal(0, 0.02, (self.num_heads, 1))
        self.scale = np.random.normal(1.0, 0.1, (1,))

    def _layer_norm(self, x: np.ndarray, gamma: np.ndarray, beta: np.ndarray, 

                    eps: float = 1e-6) -> np.ndarray:
        """Apply layer normalization"""
        mean = np.mean(x, axis=-1, keepdims=True)
        var = np.var(x, axis=-1, keepdims=True)
        return gamma * (x - mean) / np.sqrt(var + eps) + beta

    def _gelu(self, x: np.ndarray) -> np.ndarray:
        """GELU activation function"""
        return 0.5 * x * (1 + np.tanh(np.sqrt(2/np.pi) * (x + 0.044715 * x**3)))

    def _compute_covariance(self, X1: np.ndarray, X2: np.ndarray) -> np.ndarray:
        """

        Compute enhanced covariance matrix using multi-head attention mechanism

        

        Parameters:

        -----------

        X1 : np.ndarray

            First set of input points

        X2 : np.ndarray  

            Second set of input points

            

        Returns:

        --------

        np.ndarray

            Covariance matrix between X1 and X2

        """
        n1, n2 = X1.shape[0], X2.shape[0]
        
        # Project to QKV spaces
        Q1 = X1 @ self.W_q
        K2 = X2 @ self.W_k
        V2 = X2 @ self.W_v
        
        # Reshape for multi-head attention
        Q1_h = Q1.reshape(n1, self.num_heads, self.head_dim)
        K2_h = K2.reshape(n2, self.num_heads, self.head_dim)
        V2_h = V2.reshape(n2, self.num_heads, self.head_dim)
        
        head_outputs = []
        
        for h in range(self.num_heads):
            Q_h = Q1_h[:, h, :]
            K_h = K2_h[:, h, :]
            V_h = V2_h[:, h, :]
            
            # Attention scores as base similarity
            attention_scores = Q_h @ K_h.T / np.sqrt(self.head_dim)
            
            # Enhanced covariance computation
            enhanced_cov = np.zeros((n1, n2))
            
            for i in range(n1):
                for j in range(n2):
                    base_sim = attention_scores[i, j]
                    
                    # Feature interaction
                    feature_int = Q_h[i] * K_h[j]
                    
                    # Layer normalization
                    norm_features = self._layer_norm(
                        feature_int.reshape(1, -1), 
                        self.gamma[:self.head_dim], 
                        self.beta[:self.head_dim]
                    ).flatten()
                    
                    # Feedforward processing
                    ff_hidden = norm_features @ self.W_ff1[:self.head_dim, :self.head_dim] + self.b_ff1[:self.head_dim]
                    ff_hidden = self._gelu(ff_hidden)
                    ff_out = ff_hidden @ self.W_ff2[:self.head_dim, :] + self.b_ff2
                    
                    # Residual connection
                    enhanced_cov[i, j] = base_sim + ff_out[0]
            
            head_outputs.append(enhanced_cov)
        
        # Fuse multi-head outputs
        final_cov = np.zeros((n1, n2))
        for h, head_out in enumerate(head_outputs):
            final_cov += self.W_heads[h, 0] * head_out
        
        final_cov = self.scale[0] * final_cov
        
        # Ensure positive definiteness for diagonal case
        if n1 == n2 and np.allclose(X1, X2):
            final_cov = 0.5 * (final_cov + final_cov.T)
            final_cov += 1e-6 * np.eye(n1)
        
        return final_cov

    def fit_predict(self, X_train: np.ndarray, y_train: np.ndarray, 

                   X_test: np.ndarray, noise_var: float = 1e-6) -> Tuple[np.ndarray, np.ndarray]:
        """

        Fit the model and make predictions

        

        Parameters:

        -----------

        X_train : np.ndarray

            Training input data

        y_train : np.ndarray

            Training target values

        X_test : np.ndarray

            Test input data

        noise_var : float

            Observation noise variance

            

        Returns:

        --------

        Tuple[np.ndarray, np.ndarray]

            Predictive mean and variance

        """
        # Compute covariance matrices
        K_train = self._compute_covariance(X_train, X_train)
        K_test_train = self._compute_covariance(X_test, X_train)
        K_test = self._compute_covariance(X_test, X_test)
        
        # GP inference
        K_noisy = K_train + noise_var * np.eye(len(X_train))
        
        try:
            L = np.linalg.cholesky(K_noisy)
            alpha = np.linalg.solve(L, y_train)
            alpha = np.linalg.solve(L.T, alpha)
            
            # Predictive mean
            mean_pred = K_test_train @ alpha
            
            # Predictive variance
            v = np.linalg.solve(L, K_test_train.T)
            var_pred = np.diag(K_test) - np.sum(v**2, axis=0)
            
        except np.linalg.LinAlgError:
            K_inv = np.linalg.pinv(K_noisy)
            mean_pred = K_test_train @ K_inv @ y_train
            var_pred = np.diag(K_test - K_test_train @ K_inv @ K_test_train.T)
        
        var_pred = np.maximum(var_pred, 1e-8)
        return mean_pred, var_pred

    def log_marginal_likelihood(self, X: np.ndarray, y: np.ndarray, 

                               noise_var: float = 1e-6) -> float:
        """

        Compute log marginal likelihood for model selection

        

        Parameters:

        -----------

        X : np.ndarray

            Input data

        y : np.ndarray

            Target values

        noise_var : float

            Observation noise variance

            

        Returns:

        --------

        float

            Log marginal likelihood

        """
        K = self._compute_covariance(X, X)
        K_noisy = K + noise_var * np.eye(len(X))
        
        try:
            L = np.linalg.cholesky(K_noisy)
            alpha = np.linalg.solve(L, y)
            
            data_fit = -0.5 * y.T @ alpha
            complexity = -np.sum(np.log(np.diag(L)))
            normalization = -0.5 * len(y) * np.log(2 * np.pi)
            
            return float(data_fit + complexity + normalization)
            
        except np.linalg.LinAlgError:
            sign, logdet = np.linalg.slogdet(K_noisy)
            K_inv = np.linalg.pinv(K_noisy)
            
            data_fit = -0.5 * y.T @ K_inv @ y
            complexity = -0.5 * logdet if sign > 0 else -1e6
            normalization = -0.5 * len(y) * np.log(2 * np.pi)
            
            return float(data_fit + complexity + normalization)

    def get_covariance_matrix(self, X: np.ndarray) -> np.ndarray:
        """Get the covariance matrix for given inputs"""
        return self._compute_covariance(X, X)

    def update_parameters(self, gradient_dict: dict, learning_rate: float = 0.001):
        """Update model parameters using computed gradients"""
        for param_name, gradient in gradient_dict.items():
            if hasattr(self, param_name):
                current_param = getattr(self, param_name)
                updated_param = current_param - learning_rate * gradient
                setattr(self, param_name, updated_param)


class CHGOptimizer:
    """

    Optimizer for CHG model parameters using numerical gradients

    

    Parameters:

    -----------

    model : CHG

        CHG model instance to optimize

    learning_rate : float

        Learning rate for parameter updates

    """
    
    def __init__(self, model: CHG, learning_rate: float = 0.001):
        self.model = model
        self.lr = learning_rate
        
    def compute_gradients(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6):
        """Compute numerical gradients for all model parameters"""
        gradients = {}
        eps = 1e-5
        
        base_loss = -self.model.log_marginal_likelihood(X, y, noise_var)
        
        for param_name in ['W_q', 'W_k', 'W_v', 'W_ff1', 'W_ff2', 'W_heads', 'scale']:
            param = getattr(self.model, param_name)
            grad = np.zeros_like(param)
            
            flat_param = param.flatten()
            flat_grad = grad.flatten()
            
            for i in range(len(flat_param)):
                flat_param[i] += eps
                param_plus = flat_param.reshape(param.shape)
                setattr(self.model, param_name, param_plus)
                
                loss_plus = -self.model.log_marginal_likelihood(X, y, noise_var)
                
                flat_param[i] -= 2 * eps
                param_minus = flat_param.reshape(param.shape)
                setattr(self.model, param_name, param_minus)
                
                loss_minus = -self.model.log_marginal_likelihood(X, y, noise_var)
                
                flat_grad[i] = (loss_plus - loss_minus) / (2 * eps)
                flat_param[i] += eps
            
            setattr(self.model, param_name, flat_param.reshape(param.shape))
            gradients[param_name] = flat_grad.reshape(param.shape)
        
        return gradients
    
    def step(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6):
        """Perform one optimization step"""
        gradients = self.compute_gradients(X, y, noise_var)
        self.model.update_parameters(gradients, self.lr)


def run_chg_experiment():
    """

    Run a simple experiment to demonstrate CHG functionality

    

    Returns:

    --------

    Tuple

        Trained model, predictions, and variances

    """
    # Initialize CHG model
    model = CHG(input_dim=3, hidden_dim=24, num_heads=4)
    
    # Generate synthetic data
    np.random.seed(42)
    X_train = np.random.randn(80, 3)
    y_train = np.sum(X_train**2, axis=1) + 0.3 * np.sin(2 * X_train[:, 0]) + 0.1 * np.random.randn(80)
    
    X_test = np.random.randn(25, 3)
    y_test = np.sum(X_test**2, axis=1) + 0.3 * np.sin(2 * X_test[:, 0])
    
    # CHG prediction
    pred_mean, pred_var = model.fit_predict(X_train, y_train, X_test)
    
    # Evaluation metrics
    rmse = np.sqrt(np.mean((pred_mean - y_test)**2))
    mae = np.mean(np.abs(pred_mean - y_test))
    
    # Uncertainty quantification
    pred_std = np.sqrt(pred_var)
    coverage = np.mean((y_test >= pred_mean - 1.96 * pred_std) & 
                      (y_test <= pred_mean + 1.96 * pred_std))
    
    print(f"CHG Performance:")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"Coverage: {coverage:.4f}")
    print(f"Log Marginal Likelihood: {model.log_marginal_likelihood(X_train, y_train):.4f}")
    
    return model, pred_mean, pred_var