| # RBA Regression Model Pseudocode | |
| ## Main Experiment Flow | |
| ``` | |
| ALGORITHM: RBA Regression Experiment | |
| INPUT: data_path, random_state | |
| OUTPUT: comprehensive_results, visualizations | |
| PROCEDURE main_experiment(): | |
| 1. INITIALIZE experiment environment | |
| SET random seeds for reproducibility | |
| CONFIGURE matplotlib for publication quality (Times New Roman, 14pt) | |
| 2. LOAD and PREPROCESS data | |
| LOAD California Housing dataset from data_path | |
| HANDLE missing values and outliers | |
| SPLIT data into train/test sets (80/20) | |
| APPLY StandardScaler to features and targets | |
| 3. CREATE model architectures | |
| INITIALIZE RBA(input_dim, hidden_dim=128, heads=8, layers=3) | |
| INITIALIZE Transformer(input_dim, hidden_dim=128, heads=8, layers=3) | |
| 4. TRAIN models | |
| FOR each model: | |
| TRAIN using Adam optimizer with early stopping | |
| APPLY learning rate scheduling | |
| MONITOR validation loss for convergence | |
| 5. EVALUATE performance | |
| COMPUTE regression metrics (RMSE, MAE, R², CV, MAPE) | |
| ANALYZE uncertainty quantification (RBA only) | |
| PERFORM cross-validation analysis | |
| 6. CONDUCT ablation study | |
| TRAIN ablation variants (NoGP, NoResidual, NoUncertainty, NoLayerNorm) | |
| COMPARE component importance | |
| 7. GENERATE results | |
| PRINT comprehensive statistical analysis | |
| CREATE geographic visualizations | |
| SAVE publication-quality figures | |
| ``` | |
| ## Core Model Architecture | |
| ``` | |
| CLASS ResidualBayesianAttention: | |
| INPUTS: input_dim, hidden_dim, num_heads, num_layers, dropout, gp_kernel_type | |
| COMPONENTS: | |
| input_embedding: Linear(input_dim → hidden_dim) | |
| attention_layers: List[BayesianMultiHeadAttention] | |
| layer_norms: List[LayerNorm] | |
| feedforward_layers: List[BayesianFeedForward] | |
| residual_weights: List[Parameter([0.5, 0.5])] | |
| output_projection: Linear(hidden_dim → 1) | |
| uncertainty_head: Linear(hidden_dim → 1) + Softplus | |
| FORWARD(x): | |
| h = input_embedding(x) | |
| attention_uncertainties = [] | |
| FOR layer_i in range(num_layers): | |
| residual_input = h | |
| h_norm = layer_norms[i](h) | |
| attention_output, uncertainty = attention_layers[i](h_norm) | |
| attention_uncertainties.append(uncertainty) | |
| alpha, beta = softmax(residual_weights[i]) | |
| h = alpha * residual_input + beta * attention_output | |
| residual_input = h | |
| h_norm = ff_layer_norms[i](h) | |
| ff_output = feedforward_layers[i](h_norm) | |
| h = alpha * residual_input + beta * ff_output | |
| h_pooled = mean(h, dim=sequence) | |
| prediction = output_projection(h_pooled) | |
| uncertainty = uncertainty_head(h_pooled) | |
| total_uncertainty = uncertainty + mean(attention_uncertainties) | |
| RETURN prediction, total_uncertainty | |
| ``` | |
| ## Bayesian Multi-Head Attention | |
| ``` | |
| CLASS BayesianMultiHeadAttention: | |
| INPUTS: hidden_dim, num_heads, dropout, gp_kernel_type | |
| COMPONENTS: | |
| q_proj, k_proj, v_proj, o_proj: Linear projections | |
| length_scale: Parameter(ones(num_heads)) | |
| signal_variance: Parameter(ones(num_heads)) | |
| dropout: Dropout(dropout) | |
| COMPUTE_GP_KERNEL(x): | |
| batch_size, seq_len, hidden_dim = x.shape | |
| x_expanded = unsqueeze(x, dim=2) | |
| x_tiled = unsqueeze(x, dim=1) | |
| distances = norm(x_expanded - x_tiled, dim=-1) | |
| kernel_matrices = [] | |
| FOR head_h in range(num_heads): | |
| kernel = signal_variance[h]² * exp(-distances² / (2 * length_scale[h]²)) | |
| kernel_matrices.append(kernel) | |
| RETURN stack(kernel_matrices, dim=1) | |
| FORWARD(x): | |
| batch_size, seq_len, _ = x.shape | |
| Q = reshape_multihead(q_proj(x)) | |
| K = reshape_multihead(k_proj(x)) | |
| V = reshape_multihead(v_proj(x)) | |
| attention_scores = matmul(Q, transpose(K, -2, -1)) * scale | |
| gp_kernel = compute_gp_kernel(x) | |
| enhanced_scores = attention_scores + gp_kernel | |
| attention_weights = softmax(enhanced_scores, dim=-1) | |
| attention_weights = dropout(attention_weights) | |
| attention_output = matmul(attention_weights, V) | |
| output = o_proj(reshape_back(attention_output)) | |
| attention_entropy = -sum(attention_weights * log(attention_weights + ε), dim=-1) | |
| uncertainty = mean(attention_entropy, dim=(1, 2)) | |
| RETURN output, uncertainty | |
| ``` | |
| ## Training Procedure | |
| ``` | |
| PROCEDURE train_model(model, train_loader, val_loader, epochs, lr): | |
| optimizer = Adam(model.parameters(), lr=lr, weight_decay=1e-5) | |
| scheduler = ReduceLROnPlateau(optimizer, patience=10, factor=0.5) | |
| best_val_loss = infinity | |
| patience_counter = 0 | |
| patience = 20 | |
| FOR epoch in range(epochs): | |
| model.train() | |
| train_loss = 0 | |
| FOR batch_x, batch_y in train_loader: | |
| optimizer.zero_grad() | |
| IF isinstance(model, ResidualBayesianAttention): | |
| prediction, uncertainty = model(batch_x) | |
| loss = MSE(squeeze(prediction), batch_y) | |
| uncertainty_loss = mean(uncertainty) | |
| loss = loss + 0.01 * uncertainty_loss | |
| ELSE: | |
| prediction = model(batch_x) | |
| loss = MSE(squeeze(prediction), batch_y) | |
| loss.backward() | |
| clip_grad_norm(model.parameters(), max_norm=1.0) | |
| optimizer.step() | |
| train_loss += loss.item() | |
| model.eval() | |
| val_loss = 0 | |
| WITH no_grad(): | |
| FOR batch_x, batch_y in val_loader: | |
| IF isinstance(model, ResidualBayesianAttention): | |
| prediction, _ = model(batch_x) | |
| ELSE: | |
| prediction = model(batch_x) | |
| loss = MSE(squeeze(prediction), batch_y) | |
| val_loss += loss.item() | |
| train_loss /= len(train_loader) | |
| val_loss /= len(val_loader) | |
| scheduler.step(val_loss) | |
| IF val_loss < best_val_loss: | |
| best_val_loss = val_loss | |
| patience_counter = 0 | |
| save_model(model, 'best_model.pth') | |
| ELSE: | |
| patience_counter += 1 | |
| IF patience_counter >= patience: | |
| BREAK | |
| load_model(model, 'best_model.pth') | |
| RETURN train_losses, val_losses | |
| ``` | |
| ## Evaluation and Analysis | |
| ``` | |
| PROCEDURE evaluate_comprehensive(model, test_loader, X_test, y_test_original): | |
| model.eval() | |
| predictions = [] | |
| uncertainties = [] | |
| WITH no_grad(): | |
| FOR batch_x, _ in test_loader: | |
| IF isinstance(model, ResidualBayesianAttention): | |
| pred, uncertainty = model(batch_x) | |
| IF len(uncertainty.shape) > 1: | |
| uncertainty = squeeze(uncertainty) | |
| uncertainties.extend(uncertainty.cpu().numpy()) | |
| ELSE: | |
| pred = model(batch_x) | |
| predictions.extend(squeeze(pred).cpu().numpy()) | |
| predictions = array(predictions) | |
| predictions_original = inverse_transform(predictions.reshape(-1, 1)).flatten() | |
| metrics = { | |
| 'MSE': mean_squared_error(y_test_original, predictions_original), | |
| 'RMSE': sqrt(MSE), | |
| 'MAE': mean_absolute_error(y_test_original, predictions_original), | |
| 'R²': r2_score(y_test_original, predictions_original), | |
| 'MAPE': mean(abs((y_test_original - predictions_original) / y_test_original)) * 100, | |
| 'CV': (RMSE / mean(y_test_original)) * 100, | |
| 'Explained_Variance': 1 - (var(y_test_original - predictions_original) / var(y_test_original)) | |
| } | |
| IF len(uncertainties) > 0: | |
| uncertainties = array(uncertainties) | |
| uncertainties_scaled = uncertainties * target_scaler.scale_[0] | |
| prediction_intervals = { | |
| 'lower_95': predictions_original - 1.96 * uncertainties_scaled, | |
| 'upper_95': predictions_original + 1.96 * uncertainties_scaled, | |
| 'mean_interval_width': mean(3.92 * uncertainties_scaled) | |
| } | |
| metrics['prediction_intervals'] = prediction_intervals | |
| RETURN metrics | |
| ``` | |
| ## Cross-Validation Analysis | |
| ``` | |
| PROCEDURE cross_validation_analysis(X, y): | |
| kf = KFold(n_splits=5, shuffle=True, random_state=random_state) | |
| rba_cv_scores = [] | |
| transformer_cv_scores = [] | |
| rba_cv_values = [] | |
| transformer_cv_values = [] | |
| fold = 1 | |
| FOR train_idx, val_idx in kf.split(X): | |
| X_train_cv, X_val_cv = X[train_idx], X[val_idx] | |
| y_train_cv, y_val_cv = y[train_idx], y[val_idx] | |
| scaler_X = StandardScaler() | |
| scaler_y = StandardScaler() | |
| X_train_cv_scaled = scaler_X.fit_transform(X_train_cv) | |
| X_val_cv_scaled = scaler_X.transform(X_val_cv) | |
| y_train_cv_scaled = scaler_y.fit_transform(y_train_cv.reshape(-1, 1)).flatten() | |
| train_loader_cv, val_loader_cv = create_torch_datasets( | |
| X_train_cv_scaled, X_val_cv_scaled, y_train_cv_scaled, y_train_cv_scaled[:len(X_val_cv_scaled)]) | |
| rba_model = ResidualBayesianAttention(input_dim=X.shape[1], hidden_dim=128, num_heads=8, num_layers=3) | |
| train_model(rba_model, train_loader_cv, val_loader_cv, epochs=50) | |
| transformer_model = StandardTransformer(input_dim=X.shape[1], hidden_dim=128, num_heads=8, num_layers=3) | |
| train_model(transformer_model, train_loader_cv, val_loader_cv, epochs=50) | |
| rba_metrics = evaluate_comprehensive(rba_model, val_loader_cv, X_val_cv, y_val_cv) | |
| transformer_metrics = evaluate_comprehensive(transformer_model, val_loader_cv, X_val_cv, y_val_cv) | |
| rba_cv_scores.append(rba_metrics['R²']) | |
| transformer_cv_scores.append(transformer_metrics['R²']) | |
| rba_cv_values.append(rba_metrics['CV']) | |
| transformer_cv_values.append(transformer_metrics['CV']) | |
| fold += 1 | |
| RETURN { | |
| 'rba_r2_scores': rba_cv_scores, | |
| 'transformer_r2_scores': transformer_cv_scores, | |
| 'rba_cv_values': rba_cv_values, | |
| 'transformer_cv_values': transformer_cv_values | |
| } | |
| ``` | |
| ## Ablation Study Analysis | |
| ``` | |
| PROCEDURE ablation_study_analysis(X, y): | |
| ablation_models = { | |
| 'Full RBA': ResidualBayesianAttention, | |
| 'No GP Kernel': RBA_NoGPKernel, | |
| 'No Residual': RBA_NoResidual, | |
| 'No Uncertainty': RBA_NoUncertainty, | |
| 'No LayerNorm': RBA_NoLayerNorm, | |
| 'Transformer': StandardTransformer | |
| } | |
| results = {} | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=random_state) | |
| scaler_X = StandardScaler() | |
| scaler_y = StandardScaler() | |
| X_train_scaled = scaler_X.fit_transform(X_train) | |
| X_test_scaled = scaler_X.transform(X_test) | |
| y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, 1)).flatten() | |
| train_loader, test_loader = create_torch_datasets(X_train_scaled, X_test_scaled, y_train_scaled, y_train_scaled[:len(X_test_scaled)]) | |
| FOR model_name, model_class in ablation_models.items(): | |
| model = model_class(input_dim=X.shape[1], hidden_dim=128, num_heads=8, num_layers=3) | |
| train_losses, val_losses = train_model(model, train_loader, test_loader, epochs=50) | |
| IF model_name == 'No Uncertainty': | |
| predictions = [] | |
| WITH no_grad(): | |
| FOR batch_x, _ in test_loader: | |
| pred = model(batch_x) | |
| predictions.extend(squeeze(pred).cpu().numpy()) | |
| predictions = array(predictions) | |
| predictions_original = scaler_y.inverse_transform(predictions.reshape(-1, 1)).flatten() | |
| metrics = { | |
| 'MSE': mean_squared_error(y_test, predictions_original), | |
| 'RMSE': sqrt(MSE), | |
| 'MAE': mean_absolute_error(y_test, predictions_original), | |
| 'R²': r2_score(y_test, predictions_original), | |
| 'MAPE': mean(abs((y_test - predictions_original) / y_test)) * 100, | |
| 'CV': (RMSE / mean(y_test)) * 100, | |
| 'predictions': predictions_original | |
| } | |
| ELSE: | |
| metrics = evaluate_comprehensive(model, test_loader, X_test_scaled, y_test) | |
| results[model_name] = metrics | |
| RETURN results | |
| ``` | |
| ## Statistical Analysis | |
| ``` | |
| PROCEDURE statistical_analysis(rba_metrics, transformer_metrics): | |
| rba_errors = abs(rba_metrics['residuals']) | |
| trans_errors = abs(transformer_metrics['residuals']) | |
| t_statistic, p_value = paired_t_test(trans_errors, rba_errors) | |
| effect_size = t_statistic / sqrt(len(rba_errors)) | |
| significance_level = IF p_value < 0.001 THEN "***" | |
| ELSE IF p_value < 0.01 THEN "**" | |
| ELSE IF p_value < 0.05 THEN "*" | |
| ELSE "ns" | |
| IF rba_metrics['prediction_intervals']: | |
| actual = y_test_original | |
| intervals = rba_metrics['prediction_intervals'] | |
| coverage_95 = mean((actual >= intervals['lower_95']) & (actual <= intervals['upper_95'])) * 100 | |
| RETURN { | |
| 't_statistic': t_statistic, | |
| 'p_value': p_value, | |
| 'effect_size': effect_size, | |
| 'significance': significance_level, | |
| 'coverage': coverage_95 if available else None | |
| } | |
| ``` | |
| ## Visualization Generation | |
| ``` | |
| PROCEDURE plot_focused_analysis(): | |
| figure = create_figure(size=(20, 12)) | |
| # CV Comparison Box Plot | |
| subplot1 = subplot(2, 3, 1) | |
| cv_data = [cv_results['rba_cv_values'], cv_results['transformer_cv_values']] | |
| boxplot(cv_data, labels=['RBA', 'Transformer']) | |
| title('CV Comparison Across 5-Fold Cross-Validation') | |
| # Geographic Distribution Plots | |
| subplot2 = subplot(2, 3, 2) | |
| scatter(coordinates[:, 0], coordinates[:, 1], c=y_true, cmap='viridis') | |
| title('True House Values (Geographic Distribution)') | |
| subplot3 = subplot(2, 3, 3) | |
| scatter(coordinates[:, 0], coordinates[:, 1], c=rba_predictions, cmap='viridis') | |
| title('RBA Predictions (Geographic Distribution)') | |
| subplot4 = subplot(2, 3, 4) | |
| scatter(coordinates[:, 0], coordinates[:, 1], c=transformer_predictions, cmap='viridis') | |
| title('Transformer Predictions (Geographic Distribution)') | |
| # Error Analysis | |
| subplot5 = subplot(2, 3, 5) | |
| error_difference = transformer_errors - rba_errors | |
| scatter(coordinates[:, 0], coordinates[:, 1], c=error_difference, cmap='RdBu_r') | |
| title('Error Improvement (Transformer Error - RBA Error)') | |
| # Performance Summary Table | |
| subplot6 = subplot(2, 3, 6) | |
| create_performance_table(rba_metrics, transformer_metrics) | |
| save_figure('Focused_RBA_vs_Transformer_Analysis', formats=['png', 'pdf']) | |
| show() | |
| ``` | |
| ## Component Importance Analysis | |
| ``` | |
| PROCEDURE analyze_component_importance(ablation_results): | |
| full_rba_metrics = ablation_results['Full RBA'] | |
| component_analysis = [] | |
| FOR model_name, metrics in ablation_results.items(): | |
| IF model_name != 'Full RBA': | |
| rmse_change = ((metrics['RMSE'] - full_rba_metrics['RMSE']) / full_rba_metrics['RMSE']) * 100 | |
| r2_change = ((metrics['R²'] - full_rba_metrics['R²']) / full_rba_metrics['R²']) * 100 | |
| cv_change = ((metrics['CV'] - full_rba_metrics['CV']) / full_rba_metrics['CV']) * 100 | |
| impact = IF abs(rmse_change) > 10 THEN "极高" | |
| ELSE IF abs(rmse_change) > 5 THEN "高" | |
| ELSE IF abs(rmse_change) > 2 THEN "中等" | |
| ELSE "低" | |
| component_analysis.append({ | |
| 'component': model_name, | |
| 'rmse_change': rmse_change, | |
| 'r2_change': r2_change, | |
| 'cv_change': cv_change, | |
| 'impact': impact, | |
| 'abs_impact': abs(rmse_change) | |
| }) | |
| sort(component_analysis, key=lambda x: x['abs_impact'], reverse=True) | |
| RETURN component_analysis | |
| ``` |