guanwencan's picture
Upload 9 files
69bf174 verified
"""
CHG (Covariance-based Hilbert Geometry) Algorithm Implementation
This module contains the core CHG algorithm implementation with multi-head attention
mechanism for Gaussian Process regression with enhanced covariance computation.
Author: CHG Algorithm Team
Version: 1.0.0
"""
import numpy as np
from typing import Tuple, Optional
class CHG:
"""
CHG (Covariance-based Hilbert Geometry) Model
A Gaussian Process model with multi-head attention mechanism for enhanced
covariance computation, supporting uncertainty quantification and optimization.
Parameters:
-----------
input_dim : int
Dimensionality of input features
hidden_dim : int
Hidden dimension for feature transformation
num_heads : int
Number of attention heads
"""
def __init__(self, input_dim: int, hidden_dim: int, num_heads: int):
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.num_heads = num_heads
self.head_dim = hidden_dim // num_heads
self._init_parameters()
def _init_parameters(self):
"""Initialize model parameters with proper scaling"""
# QKV projection matrices
self.W_q = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
self.W_k = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
self.W_v = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
# Covariance feedforward network
self.W_ff1 = np.random.normal(0, 0.02, (self.hidden_dim, 2 * self.hidden_dim))
self.b_ff1 = np.zeros((2 * self.hidden_dim,))
self.W_ff2 = np.random.normal(0, 0.02, (2 * self.hidden_dim, 1))
self.b_ff2 = np.zeros((1,))
# Layer normalization
self.gamma = np.ones((self.hidden_dim,))
self.beta = np.zeros((self.hidden_dim,))
# Multi-head fusion
self.W_heads = np.random.normal(0, 0.02, (self.num_heads, 1))
self.scale = np.random.normal(1.0, 0.1, (1,))
def _layer_norm(self, x: np.ndarray, gamma: np.ndarray, beta: np.ndarray,
eps: float = 1e-6) -> np.ndarray:
"""Apply layer normalization"""
mean = np.mean(x, axis=-1, keepdims=True)
var = np.var(x, axis=-1, keepdims=True)
return gamma * (x - mean) / np.sqrt(var + eps) + beta
def _gelu(self, x: np.ndarray) -> np.ndarray:
"""GELU activation function"""
return 0.5 * x * (1 + np.tanh(np.sqrt(2/np.pi) * (x + 0.044715 * x**3)))
def _compute_covariance(self, X1: np.ndarray, X2: np.ndarray) -> np.ndarray:
"""
Compute enhanced covariance matrix using multi-head attention mechanism
Parameters:
-----------
X1 : np.ndarray
First set of input points
X2 : np.ndarray
Second set of input points
Returns:
--------
np.ndarray
Covariance matrix between X1 and X2
"""
n1, n2 = X1.shape[0], X2.shape[0]
# Project to QKV spaces
Q1 = X1 @ self.W_q
K2 = X2 @ self.W_k
V2 = X2 @ self.W_v
# Reshape for multi-head attention
Q1_h = Q1.reshape(n1, self.num_heads, self.head_dim)
K2_h = K2.reshape(n2, self.num_heads, self.head_dim)
V2_h = V2.reshape(n2, self.num_heads, self.head_dim)
head_outputs = []
for h in range(self.num_heads):
Q_h = Q1_h[:, h, :]
K_h = K2_h[:, h, :]
V_h = V2_h[:, h, :]
# Attention scores as base similarity
attention_scores = Q_h @ K_h.T / np.sqrt(self.head_dim)
# Enhanced covariance computation
enhanced_cov = np.zeros((n1, n2))
for i in range(n1):
for j in range(n2):
base_sim = attention_scores[i, j]
# Feature interaction
feature_int = Q_h[i] * K_h[j]
# Layer normalization
norm_features = self._layer_norm(
feature_int.reshape(1, -1),
self.gamma[:self.head_dim],
self.beta[:self.head_dim]
).flatten()
# Feedforward processing
ff_hidden = norm_features @ self.W_ff1[:self.head_dim, :self.head_dim] + self.b_ff1[:self.head_dim]
ff_hidden = self._gelu(ff_hidden)
ff_out = ff_hidden @ self.W_ff2[:self.head_dim, :] + self.b_ff2
# Residual connection
enhanced_cov[i, j] = base_sim + ff_out[0]
head_outputs.append(enhanced_cov)
# Fuse multi-head outputs
final_cov = np.zeros((n1, n2))
for h, head_out in enumerate(head_outputs):
final_cov += self.W_heads[h, 0] * head_out
final_cov = self.scale[0] * final_cov
# Ensure positive definiteness for diagonal case
if n1 == n2 and np.allclose(X1, X2):
final_cov = 0.5 * (final_cov + final_cov.T)
final_cov += 1e-6 * np.eye(n1)
return final_cov
def fit_predict(self, X_train: np.ndarray, y_train: np.ndarray,
X_test: np.ndarray, noise_var: float = 1e-6) -> Tuple[np.ndarray, np.ndarray]:
"""
Fit the model and make predictions
Parameters:
-----------
X_train : np.ndarray
Training input data
y_train : np.ndarray
Training target values
X_test : np.ndarray
Test input data
noise_var : float
Observation noise variance
Returns:
--------
Tuple[np.ndarray, np.ndarray]
Predictive mean and variance
"""
# Compute covariance matrices
K_train = self._compute_covariance(X_train, X_train)
K_test_train = self._compute_covariance(X_test, X_train)
K_test = self._compute_covariance(X_test, X_test)
# GP inference
K_noisy = K_train + noise_var * np.eye(len(X_train))
try:
L = np.linalg.cholesky(K_noisy)
alpha = np.linalg.solve(L, y_train)
alpha = np.linalg.solve(L.T, alpha)
# Predictive mean
mean_pred = K_test_train @ alpha
# Predictive variance
v = np.linalg.solve(L, K_test_train.T)
var_pred = np.diag(K_test) - np.sum(v**2, axis=0)
except np.linalg.LinAlgError:
K_inv = np.linalg.pinv(K_noisy)
mean_pred = K_test_train @ K_inv @ y_train
var_pred = np.diag(K_test - K_test_train @ K_inv @ K_test_train.T)
var_pred = np.maximum(var_pred, 1e-8)
return mean_pred, var_pred
def log_marginal_likelihood(self, X: np.ndarray, y: np.ndarray,
noise_var: float = 1e-6) -> float:
"""
Compute log marginal likelihood for model selection
Parameters:
-----------
X : np.ndarray
Input data
y : np.ndarray
Target values
noise_var : float
Observation noise variance
Returns:
--------
float
Log marginal likelihood
"""
K = self._compute_covariance(X, X)
K_noisy = K + noise_var * np.eye(len(X))
try:
L = np.linalg.cholesky(K_noisy)
alpha = np.linalg.solve(L, y)
data_fit = -0.5 * y.T @ alpha
complexity = -np.sum(np.log(np.diag(L)))
normalization = -0.5 * len(y) * np.log(2 * np.pi)
return float(data_fit + complexity + normalization)
except np.linalg.LinAlgError:
sign, logdet = np.linalg.slogdet(K_noisy)
K_inv = np.linalg.pinv(K_noisy)
data_fit = -0.5 * y.T @ K_inv @ y
complexity = -0.5 * logdet if sign > 0 else -1e6
normalization = -0.5 * len(y) * np.log(2 * np.pi)
return float(data_fit + complexity + normalization)
def get_covariance_matrix(self, X: np.ndarray) -> np.ndarray:
"""Get the covariance matrix for given inputs"""
return self._compute_covariance(X, X)
def update_parameters(self, gradient_dict: dict, learning_rate: float = 0.001):
"""Update model parameters using computed gradients"""
for param_name, gradient in gradient_dict.items():
if hasattr(self, param_name):
current_param = getattr(self, param_name)
updated_param = current_param - learning_rate * gradient
setattr(self, param_name, updated_param)
class CHGOptimizer:
"""
Optimizer for CHG model parameters using numerical gradients
Parameters:
-----------
model : CHG
CHG model instance to optimize
learning_rate : float
Learning rate for parameter updates
"""
def __init__(self, model: CHG, learning_rate: float = 0.001):
self.model = model
self.lr = learning_rate
def compute_gradients(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6):
"""Compute numerical gradients for all model parameters"""
gradients = {}
eps = 1e-5
base_loss = -self.model.log_marginal_likelihood(X, y, noise_var)
for param_name in ['W_q', 'W_k', 'W_v', 'W_ff1', 'W_ff2', 'W_heads', 'scale']:
param = getattr(self.model, param_name)
grad = np.zeros_like(param)
flat_param = param.flatten()
flat_grad = grad.flatten()
for i in range(len(flat_param)):
flat_param[i] += eps
param_plus = flat_param.reshape(param.shape)
setattr(self.model, param_name, param_plus)
loss_plus = -self.model.log_marginal_likelihood(X, y, noise_var)
flat_param[i] -= 2 * eps
param_minus = flat_param.reshape(param.shape)
setattr(self.model, param_name, param_minus)
loss_minus = -self.model.log_marginal_likelihood(X, y, noise_var)
flat_grad[i] = (loss_plus - loss_minus) / (2 * eps)
flat_param[i] += eps
setattr(self.model, param_name, flat_param.reshape(param.shape))
gradients[param_name] = flat_grad.reshape(param.shape)
return gradients
def step(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6):
"""Perform one optimization step"""
gradients = self.compute_gradients(X, y, noise_var)
self.model.update_parameters(gradients, self.lr)
def run_chg_experiment():
"""
Run a simple experiment to demonstrate CHG functionality
Returns:
--------
Tuple
Trained model, predictions, and variances
"""
# Initialize CHG model
model = CHG(input_dim=3, hidden_dim=24, num_heads=4)
# Generate synthetic data
np.random.seed(42)
X_train = np.random.randn(80, 3)
y_train = np.sum(X_train**2, axis=1) + 0.3 * np.sin(2 * X_train[:, 0]) + 0.1 * np.random.randn(80)
X_test = np.random.randn(25, 3)
y_test = np.sum(X_test**2, axis=1) + 0.3 * np.sin(2 * X_test[:, 0])
# CHG prediction
pred_mean, pred_var = model.fit_predict(X_train, y_train, X_test)
# Evaluation metrics
rmse = np.sqrt(np.mean((pred_mean - y_test)**2))
mae = np.mean(np.abs(pred_mean - y_test))
# Uncertainty quantification
pred_std = np.sqrt(pred_var)
coverage = np.mean((y_test >= pred_mean - 1.96 * pred_std) &
(y_test <= pred_mean + 1.96 * pred_std))
print(f"CHG Performance:")
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"Coverage: {coverage:.4f}")
print(f"Log Marginal Likelihood: {model.log_marginal_likelihood(X_train, y_train):.4f}")
return model, pred_mean, pred_var