""" CHG (Covariance-based Hilbert Geometry) Algorithm Implementation This module contains the core CHG algorithm implementation with multi-head attention mechanism for Gaussian Process regression with enhanced covariance computation. Author: CHG Algorithm Team Version: 1.0.0 """ import numpy as np from typing import Tuple, Optional class CHG: """ CHG (Covariance-based Hilbert Geometry) Model A Gaussian Process model with multi-head attention mechanism for enhanced covariance computation, supporting uncertainty quantification and optimization. Parameters: ----------- input_dim : int Dimensionality of input features hidden_dim : int Hidden dimension for feature transformation num_heads : int Number of attention heads """ def __init__(self, input_dim: int, hidden_dim: int, num_heads: int): self.input_dim = input_dim self.hidden_dim = hidden_dim self.num_heads = num_heads self.head_dim = hidden_dim // num_heads self._init_parameters() def _init_parameters(self): """Initialize model parameters with proper scaling""" # QKV projection matrices self.W_q = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim)) self.W_k = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim)) self.W_v = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim)) # Covariance feedforward network self.W_ff1 = np.random.normal(0, 0.02, (self.hidden_dim, 2 * self.hidden_dim)) self.b_ff1 = np.zeros((2 * self.hidden_dim,)) self.W_ff2 = np.random.normal(0, 0.02, (2 * self.hidden_dim, 1)) self.b_ff2 = np.zeros((1,)) # Layer normalization self.gamma = np.ones((self.hidden_dim,)) self.beta = np.zeros((self.hidden_dim,)) # Multi-head fusion self.W_heads = np.random.normal(0, 0.02, (self.num_heads, 1)) self.scale = np.random.normal(1.0, 0.1, (1,)) def _layer_norm(self, x: np.ndarray, gamma: np.ndarray, beta: np.ndarray, eps: float = 1e-6) -> np.ndarray: """Apply layer normalization""" mean = np.mean(x, axis=-1, keepdims=True) var = np.var(x, axis=-1, keepdims=True) return gamma * (x - mean) / np.sqrt(var + eps) + beta def _gelu(self, x: np.ndarray) -> np.ndarray: """GELU activation function""" return 0.5 * x * (1 + np.tanh(np.sqrt(2/np.pi) * (x + 0.044715 * x**3))) def _compute_covariance(self, X1: np.ndarray, X2: np.ndarray) -> np.ndarray: """ Compute enhanced covariance matrix using multi-head attention mechanism Parameters: ----------- X1 : np.ndarray First set of input points X2 : np.ndarray Second set of input points Returns: -------- np.ndarray Covariance matrix between X1 and X2 """ n1, n2 = X1.shape[0], X2.shape[0] # Project to QKV spaces Q1 = X1 @ self.W_q K2 = X2 @ self.W_k V2 = X2 @ self.W_v # Reshape for multi-head attention Q1_h = Q1.reshape(n1, self.num_heads, self.head_dim) K2_h = K2.reshape(n2, self.num_heads, self.head_dim) V2_h = V2.reshape(n2, self.num_heads, self.head_dim) head_outputs = [] for h in range(self.num_heads): Q_h = Q1_h[:, h, :] K_h = K2_h[:, h, :] V_h = V2_h[:, h, :] # Attention scores as base similarity attention_scores = Q_h @ K_h.T / np.sqrt(self.head_dim) # Enhanced covariance computation enhanced_cov = np.zeros((n1, n2)) for i in range(n1): for j in range(n2): base_sim = attention_scores[i, j] # Feature interaction feature_int = Q_h[i] * K_h[j] # Layer normalization norm_features = self._layer_norm( feature_int.reshape(1, -1), self.gamma[:self.head_dim], self.beta[:self.head_dim] ).flatten() # Feedforward processing ff_hidden = norm_features @ self.W_ff1[:self.head_dim, :self.head_dim] + self.b_ff1[:self.head_dim] ff_hidden = self._gelu(ff_hidden) ff_out = ff_hidden @ self.W_ff2[:self.head_dim, :] + self.b_ff2 # Residual connection enhanced_cov[i, j] = base_sim + ff_out[0] head_outputs.append(enhanced_cov) # Fuse multi-head outputs final_cov = np.zeros((n1, n2)) for h, head_out in enumerate(head_outputs): final_cov += self.W_heads[h, 0] * head_out final_cov = self.scale[0] * final_cov # Ensure positive definiteness for diagonal case if n1 == n2 and np.allclose(X1, X2): final_cov = 0.5 * (final_cov + final_cov.T) final_cov += 1e-6 * np.eye(n1) return final_cov def fit_predict(self, X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, noise_var: float = 1e-6) -> Tuple[np.ndarray, np.ndarray]: """ Fit the model and make predictions Parameters: ----------- X_train : np.ndarray Training input data y_train : np.ndarray Training target values X_test : np.ndarray Test input data noise_var : float Observation noise variance Returns: -------- Tuple[np.ndarray, np.ndarray] Predictive mean and variance """ # Compute covariance matrices K_train = self._compute_covariance(X_train, X_train) K_test_train = self._compute_covariance(X_test, X_train) K_test = self._compute_covariance(X_test, X_test) # GP inference K_noisy = K_train + noise_var * np.eye(len(X_train)) try: L = np.linalg.cholesky(K_noisy) alpha = np.linalg.solve(L, y_train) alpha = np.linalg.solve(L.T, alpha) # Predictive mean mean_pred = K_test_train @ alpha # Predictive variance v = np.linalg.solve(L, K_test_train.T) var_pred = np.diag(K_test) - np.sum(v**2, axis=0) except np.linalg.LinAlgError: K_inv = np.linalg.pinv(K_noisy) mean_pred = K_test_train @ K_inv @ y_train var_pred = np.diag(K_test - K_test_train @ K_inv @ K_test_train.T) var_pred = np.maximum(var_pred, 1e-8) return mean_pred, var_pred def log_marginal_likelihood(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6) -> float: """ Compute log marginal likelihood for model selection Parameters: ----------- X : np.ndarray Input data y : np.ndarray Target values noise_var : float Observation noise variance Returns: -------- float Log marginal likelihood """ K = self._compute_covariance(X, X) K_noisy = K + noise_var * np.eye(len(X)) try: L = np.linalg.cholesky(K_noisy) alpha = np.linalg.solve(L, y) data_fit = -0.5 * y.T @ alpha complexity = -np.sum(np.log(np.diag(L))) normalization = -0.5 * len(y) * np.log(2 * np.pi) return float(data_fit + complexity + normalization) except np.linalg.LinAlgError: sign, logdet = np.linalg.slogdet(K_noisy) K_inv = np.linalg.pinv(K_noisy) data_fit = -0.5 * y.T @ K_inv @ y complexity = -0.5 * logdet if sign > 0 else -1e6 normalization = -0.5 * len(y) * np.log(2 * np.pi) return float(data_fit + complexity + normalization) def get_covariance_matrix(self, X: np.ndarray) -> np.ndarray: """Get the covariance matrix for given inputs""" return self._compute_covariance(X, X) def update_parameters(self, gradient_dict: dict, learning_rate: float = 0.001): """Update model parameters using computed gradients""" for param_name, gradient in gradient_dict.items(): if hasattr(self, param_name): current_param = getattr(self, param_name) updated_param = current_param - learning_rate * gradient setattr(self, param_name, updated_param) class CHGOptimizer: """ Optimizer for CHG model parameters using numerical gradients Parameters: ----------- model : CHG CHG model instance to optimize learning_rate : float Learning rate for parameter updates """ def __init__(self, model: CHG, learning_rate: float = 0.001): self.model = model self.lr = learning_rate def compute_gradients(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6): """Compute numerical gradients for all model parameters""" gradients = {} eps = 1e-5 base_loss = -self.model.log_marginal_likelihood(X, y, noise_var) for param_name in ['W_q', 'W_k', 'W_v', 'W_ff1', 'W_ff2', 'W_heads', 'scale']: param = getattr(self.model, param_name) grad = np.zeros_like(param) flat_param = param.flatten() flat_grad = grad.flatten() for i in range(len(flat_param)): flat_param[i] += eps param_plus = flat_param.reshape(param.shape) setattr(self.model, param_name, param_plus) loss_plus = -self.model.log_marginal_likelihood(X, y, noise_var) flat_param[i] -= 2 * eps param_minus = flat_param.reshape(param.shape) setattr(self.model, param_name, param_minus) loss_minus = -self.model.log_marginal_likelihood(X, y, noise_var) flat_grad[i] = (loss_plus - loss_minus) / (2 * eps) flat_param[i] += eps setattr(self.model, param_name, flat_param.reshape(param.shape)) gradients[param_name] = flat_grad.reshape(param.shape) return gradients def step(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6): """Perform one optimization step""" gradients = self.compute_gradients(X, y, noise_var) self.model.update_parameters(gradients, self.lr) def run_chg_experiment(): """ Run a simple experiment to demonstrate CHG functionality Returns: -------- Tuple Trained model, predictions, and variances """ # Initialize CHG model model = CHG(input_dim=3, hidden_dim=24, num_heads=4) # Generate synthetic data np.random.seed(42) X_train = np.random.randn(80, 3) y_train = np.sum(X_train**2, axis=1) + 0.3 * np.sin(2 * X_train[:, 0]) + 0.1 * np.random.randn(80) X_test = np.random.randn(25, 3) y_test = np.sum(X_test**2, axis=1) + 0.3 * np.sin(2 * X_test[:, 0]) # CHG prediction pred_mean, pred_var = model.fit_predict(X_train, y_train, X_test) # Evaluation metrics rmse = np.sqrt(np.mean((pred_mean - y_test)**2)) mae = np.mean(np.abs(pred_mean - y_test)) # Uncertainty quantification pred_std = np.sqrt(pred_var) coverage = np.mean((y_test >= pred_mean - 1.96 * pred_std) & (y_test <= pred_mean + 1.96 * pred_std)) print(f"CHG Performance:") print(f"RMSE: {rmse:.4f}") print(f"MAE: {mae:.4f}") print(f"Coverage: {coverage:.4f}") print(f"Log Marginal Likelihood: {model.log_marginal_likelihood(X_train, y_train):.4f}") return model, pred_mean, pred_var