| | """
|
| | CHG (Covariance-based Hilbert Geometry) Algorithm Implementation
|
| |
|
| | This module contains the core CHG algorithm implementation with multi-head attention
|
| | mechanism for Gaussian Process regression with enhanced covariance computation.
|
| |
|
| | Author: CHG Algorithm Team
|
| | Version: 1.0.0
|
| | """
|
| |
|
| | import numpy as np
|
| | from typing import Tuple, Optional
|
| |
|
| |
|
| | class CHG:
|
| | """
|
| | CHG (Covariance-based Hilbert Geometry) Model
|
| |
|
| | A Gaussian Process model with multi-head attention mechanism for enhanced
|
| | covariance computation, supporting uncertainty quantification and optimization.
|
| |
|
| | Parameters:
|
| | -----------
|
| | input_dim : int
|
| | Dimensionality of input features
|
| | hidden_dim : int
|
| | Hidden dimension for feature transformation
|
| | num_heads : int
|
| | Number of attention heads
|
| | """
|
| |
|
| | def __init__(self, input_dim: int, hidden_dim: int, num_heads: int):
|
| | self.input_dim = input_dim
|
| | self.hidden_dim = hidden_dim
|
| | self.num_heads = num_heads
|
| | self.head_dim = hidden_dim // num_heads
|
| |
|
| | self._init_parameters()
|
| |
|
| | def _init_parameters(self):
|
| | """Initialize model parameters with proper scaling"""
|
| |
|
| | self.W_q = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
|
| | self.W_k = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
|
| | self.W_v = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
|
| |
|
| |
|
| | self.W_ff1 = np.random.normal(0, 0.02, (self.hidden_dim, 2 * self.hidden_dim))
|
| | self.b_ff1 = np.zeros((2 * self.hidden_dim,))
|
| | self.W_ff2 = np.random.normal(0, 0.02, (2 * self.hidden_dim, 1))
|
| | self.b_ff2 = np.zeros((1,))
|
| |
|
| |
|
| | self.gamma = np.ones((self.hidden_dim,))
|
| | self.beta = np.zeros((self.hidden_dim,))
|
| |
|
| |
|
| | self.W_heads = np.random.normal(0, 0.02, (self.num_heads, 1))
|
| | self.scale = np.random.normal(1.0, 0.1, (1,))
|
| |
|
| | def _layer_norm(self, x: np.ndarray, gamma: np.ndarray, beta: np.ndarray,
|
| | eps: float = 1e-6) -> np.ndarray:
|
| | """Apply layer normalization"""
|
| | mean = np.mean(x, axis=-1, keepdims=True)
|
| | var = np.var(x, axis=-1, keepdims=True)
|
| | return gamma * (x - mean) / np.sqrt(var + eps) + beta
|
| |
|
| | def _gelu(self, x: np.ndarray) -> np.ndarray:
|
| | """GELU activation function"""
|
| | return 0.5 * x * (1 + np.tanh(np.sqrt(2/np.pi) * (x + 0.044715 * x**3)))
|
| |
|
| | def _compute_covariance(self, X1: np.ndarray, X2: np.ndarray) -> np.ndarray:
|
| | """
|
| | Compute enhanced covariance matrix using multi-head attention mechanism
|
| |
|
| | Parameters:
|
| | -----------
|
| | X1 : np.ndarray
|
| | First set of input points
|
| | X2 : np.ndarray
|
| | Second set of input points
|
| |
|
| | Returns:
|
| | --------
|
| | np.ndarray
|
| | Covariance matrix between X1 and X2
|
| | """
|
| | n1, n2 = X1.shape[0], X2.shape[0]
|
| |
|
| |
|
| | Q1 = X1 @ self.W_q
|
| | K2 = X2 @ self.W_k
|
| | V2 = X2 @ self.W_v
|
| |
|
| |
|
| | Q1_h = Q1.reshape(n1, self.num_heads, self.head_dim)
|
| | K2_h = K2.reshape(n2, self.num_heads, self.head_dim)
|
| | V2_h = V2.reshape(n2, self.num_heads, self.head_dim)
|
| |
|
| | head_outputs = []
|
| |
|
| | for h in range(self.num_heads):
|
| | Q_h = Q1_h[:, h, :]
|
| | K_h = K2_h[:, h, :]
|
| | V_h = V2_h[:, h, :]
|
| |
|
| |
|
| | attention_scores = Q_h @ K_h.T / np.sqrt(self.head_dim)
|
| |
|
| |
|
| | enhanced_cov = np.zeros((n1, n2))
|
| |
|
| | for i in range(n1):
|
| | for j in range(n2):
|
| | base_sim = attention_scores[i, j]
|
| |
|
| |
|
| | feature_int = Q_h[i] * K_h[j]
|
| |
|
| |
|
| | norm_features = self._layer_norm(
|
| | feature_int.reshape(1, -1),
|
| | self.gamma[:self.head_dim],
|
| | self.beta[:self.head_dim]
|
| | ).flatten()
|
| |
|
| |
|
| | ff_hidden = norm_features @ self.W_ff1[:self.head_dim, :self.head_dim] + self.b_ff1[:self.head_dim]
|
| | ff_hidden = self._gelu(ff_hidden)
|
| | ff_out = ff_hidden @ self.W_ff2[:self.head_dim, :] + self.b_ff2
|
| |
|
| |
|
| | enhanced_cov[i, j] = base_sim + ff_out[0]
|
| |
|
| | head_outputs.append(enhanced_cov)
|
| |
|
| |
|
| | final_cov = np.zeros((n1, n2))
|
| | for h, head_out in enumerate(head_outputs):
|
| | final_cov += self.W_heads[h, 0] * head_out
|
| |
|
| | final_cov = self.scale[0] * final_cov
|
| |
|
| |
|
| | if n1 == n2 and np.allclose(X1, X2):
|
| | final_cov = 0.5 * (final_cov + final_cov.T)
|
| | final_cov += 1e-6 * np.eye(n1)
|
| |
|
| | return final_cov
|
| |
|
| | def fit_predict(self, X_train: np.ndarray, y_train: np.ndarray,
|
| | X_test: np.ndarray, noise_var: float = 1e-6) -> Tuple[np.ndarray, np.ndarray]:
|
| | """
|
| | Fit the model and make predictions
|
| |
|
| | Parameters:
|
| | -----------
|
| | X_train : np.ndarray
|
| | Training input data
|
| | y_train : np.ndarray
|
| | Training target values
|
| | X_test : np.ndarray
|
| | Test input data
|
| | noise_var : float
|
| | Observation noise variance
|
| |
|
| | Returns:
|
| | --------
|
| | Tuple[np.ndarray, np.ndarray]
|
| | Predictive mean and variance
|
| | """
|
| |
|
| | K_train = self._compute_covariance(X_train, X_train)
|
| | K_test_train = self._compute_covariance(X_test, X_train)
|
| | K_test = self._compute_covariance(X_test, X_test)
|
| |
|
| |
|
| | K_noisy = K_train + noise_var * np.eye(len(X_train))
|
| |
|
| | try:
|
| | L = np.linalg.cholesky(K_noisy)
|
| | alpha = np.linalg.solve(L, y_train)
|
| | alpha = np.linalg.solve(L.T, alpha)
|
| |
|
| |
|
| | mean_pred = K_test_train @ alpha
|
| |
|
| |
|
| | v = np.linalg.solve(L, K_test_train.T)
|
| | var_pred = np.diag(K_test) - np.sum(v**2, axis=0)
|
| |
|
| | except np.linalg.LinAlgError:
|
| | K_inv = np.linalg.pinv(K_noisy)
|
| | mean_pred = K_test_train @ K_inv @ y_train
|
| | var_pred = np.diag(K_test - K_test_train @ K_inv @ K_test_train.T)
|
| |
|
| | var_pred = np.maximum(var_pred, 1e-8)
|
| | return mean_pred, var_pred
|
| |
|
| | def log_marginal_likelihood(self, X: np.ndarray, y: np.ndarray,
|
| | noise_var: float = 1e-6) -> float:
|
| | """
|
| | Compute log marginal likelihood for model selection
|
| |
|
| | Parameters:
|
| | -----------
|
| | X : np.ndarray
|
| | Input data
|
| | y : np.ndarray
|
| | Target values
|
| | noise_var : float
|
| | Observation noise variance
|
| |
|
| | Returns:
|
| | --------
|
| | float
|
| | Log marginal likelihood
|
| | """
|
| | K = self._compute_covariance(X, X)
|
| | K_noisy = K + noise_var * np.eye(len(X))
|
| |
|
| | try:
|
| | L = np.linalg.cholesky(K_noisy)
|
| | alpha = np.linalg.solve(L, y)
|
| |
|
| | data_fit = -0.5 * y.T @ alpha
|
| | complexity = -np.sum(np.log(np.diag(L)))
|
| | normalization = -0.5 * len(y) * np.log(2 * np.pi)
|
| |
|
| | return float(data_fit + complexity + normalization)
|
| |
|
| | except np.linalg.LinAlgError:
|
| | sign, logdet = np.linalg.slogdet(K_noisy)
|
| | K_inv = np.linalg.pinv(K_noisy)
|
| |
|
| | data_fit = -0.5 * y.T @ K_inv @ y
|
| | complexity = -0.5 * logdet if sign > 0 else -1e6
|
| | normalization = -0.5 * len(y) * np.log(2 * np.pi)
|
| |
|
| | return float(data_fit + complexity + normalization)
|
| |
|
| | def get_covariance_matrix(self, X: np.ndarray) -> np.ndarray:
|
| | """Get the covariance matrix for given inputs"""
|
| | return self._compute_covariance(X, X)
|
| |
|
| | def update_parameters(self, gradient_dict: dict, learning_rate: float = 0.001):
|
| | """Update model parameters using computed gradients"""
|
| | for param_name, gradient in gradient_dict.items():
|
| | if hasattr(self, param_name):
|
| | current_param = getattr(self, param_name)
|
| | updated_param = current_param - learning_rate * gradient
|
| | setattr(self, param_name, updated_param)
|
| |
|
| |
|
| | class CHGOptimizer:
|
| | """
|
| | Optimizer for CHG model parameters using numerical gradients
|
| |
|
| | Parameters:
|
| | -----------
|
| | model : CHG
|
| | CHG model instance to optimize
|
| | learning_rate : float
|
| | Learning rate for parameter updates
|
| | """
|
| |
|
| | def __init__(self, model: CHG, learning_rate: float = 0.001):
|
| | self.model = model
|
| | self.lr = learning_rate
|
| |
|
| | def compute_gradients(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6):
|
| | """Compute numerical gradients for all model parameters"""
|
| | gradients = {}
|
| | eps = 1e-5
|
| |
|
| | base_loss = -self.model.log_marginal_likelihood(X, y, noise_var)
|
| |
|
| | for param_name in ['W_q', 'W_k', 'W_v', 'W_ff1', 'W_ff2', 'W_heads', 'scale']:
|
| | param = getattr(self.model, param_name)
|
| | grad = np.zeros_like(param)
|
| |
|
| | flat_param = param.flatten()
|
| | flat_grad = grad.flatten()
|
| |
|
| | for i in range(len(flat_param)):
|
| | flat_param[i] += eps
|
| | param_plus = flat_param.reshape(param.shape)
|
| | setattr(self.model, param_name, param_plus)
|
| |
|
| | loss_plus = -self.model.log_marginal_likelihood(X, y, noise_var)
|
| |
|
| | flat_param[i] -= 2 * eps
|
| | param_minus = flat_param.reshape(param.shape)
|
| | setattr(self.model, param_name, param_minus)
|
| |
|
| | loss_minus = -self.model.log_marginal_likelihood(X, y, noise_var)
|
| |
|
| | flat_grad[i] = (loss_plus - loss_minus) / (2 * eps)
|
| | flat_param[i] += eps
|
| |
|
| | setattr(self.model, param_name, flat_param.reshape(param.shape))
|
| | gradients[param_name] = flat_grad.reshape(param.shape)
|
| |
|
| | return gradients
|
| |
|
| | def step(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6):
|
| | """Perform one optimization step"""
|
| | gradients = self.compute_gradients(X, y, noise_var)
|
| | self.model.update_parameters(gradients, self.lr)
|
| |
|
| |
|
| | def run_chg_experiment():
|
| | """
|
| | Run a simple experiment to demonstrate CHG functionality
|
| |
|
| | Returns:
|
| | --------
|
| | Tuple
|
| | Trained model, predictions, and variances
|
| | """
|
| |
|
| | model = CHG(input_dim=3, hidden_dim=24, num_heads=4)
|
| |
|
| |
|
| | np.random.seed(42)
|
| | X_train = np.random.randn(80, 3)
|
| | y_train = np.sum(X_train**2, axis=1) + 0.3 * np.sin(2 * X_train[:, 0]) + 0.1 * np.random.randn(80)
|
| |
|
| | X_test = np.random.randn(25, 3)
|
| | y_test = np.sum(X_test**2, axis=1) + 0.3 * np.sin(2 * X_test[:, 0])
|
| |
|
| |
|
| | pred_mean, pred_var = model.fit_predict(X_train, y_train, X_test)
|
| |
|
| |
|
| | rmse = np.sqrt(np.mean((pred_mean - y_test)**2))
|
| | mae = np.mean(np.abs(pred_mean - y_test))
|
| |
|
| |
|
| | pred_std = np.sqrt(pred_var)
|
| | coverage = np.mean((y_test >= pred_mean - 1.96 * pred_std) &
|
| | (y_test <= pred_mean + 1.96 * pred_std))
|
| |
|
| | print(f"CHG Performance:")
|
| | print(f"RMSE: {rmse:.4f}")
|
| | print(f"MAE: {mae:.4f}")
|
| | print(f"Coverage: {coverage:.4f}")
|
| | print(f"Log Marginal Likelihood: {model.log_marginal_likelihood(X_train, y_train):.4f}")
|
| |
|
| | return model, pred_mean, pred_var |