RBA / Pseudocode.md
guanwencan's picture
Upload Pseudocode.md
2eecd10 verified

RBA Regression Model Pseudocode

Main Experiment Flow

ALGORITHM: RBA Regression Experiment
INPUT: data_path, random_state
OUTPUT: comprehensive_results, visualizations

PROCEDURE main_experiment():
    1. INITIALIZE experiment environment
       SET random seeds for reproducibility
       CONFIGURE matplotlib for publication quality (Times New Roman, 14pt)
    
    2. LOAD and PREPROCESS data
       LOAD California Housing dataset from data_path
       HANDLE missing values and outliers
       SPLIT data into train/test sets (80/20)
       APPLY StandardScaler to features and targets
    
    3. CREATE model architectures
       INITIALIZE RBA(input_dim, hidden_dim=128, heads=8, layers=3)
       INITIALIZE Transformer(input_dim, hidden_dim=128, heads=8, layers=3)
    
    4. TRAIN models
       FOR each model:
           TRAIN using Adam optimizer with early stopping
           APPLY learning rate scheduling
           MONITOR validation loss for convergence
    
    5. EVALUATE performance
       COMPUTE regression metrics (RMSE, MAE, R², CV, MAPE)
       ANALYZE uncertainty quantification (RBA only)
       PERFORM cross-validation analysis
    
    6. CONDUCT ablation study
       TRAIN ablation variants (NoGP, NoResidual, NoUncertainty, NoLayerNorm)
       COMPARE component importance
    
    7. GENERATE results
       PRINT comprehensive statistical analysis
       CREATE geographic visualizations
       SAVE publication-quality figures

Core Model Architecture

CLASS ResidualBayesianAttention:
    INPUTS: input_dim, hidden_dim, num_heads, num_layers, dropout, gp_kernel_type
    
    COMPONENTS:
        input_embedding: Linear(input_dim → hidden_dim)
        attention_layers: List[BayesianMultiHeadAttention]
        layer_norms: List[LayerNorm]
        feedforward_layers: List[BayesianFeedForward]
        residual_weights: List[Parameter([0.5, 0.5])]
        output_projection: Linear(hidden_dim → 1)
        uncertainty_head: Linear(hidden_dim → 1) + Softplus
    
    FORWARD(x):
        h = input_embedding(x)
        attention_uncertainties = []
        
        FOR layer_i in range(num_layers):
            residual_input = h
            h_norm = layer_norms[i](h)
            attention_output, uncertainty = attention_layers[i](h_norm)
            attention_uncertainties.append(uncertainty)
            
            alpha, beta = softmax(residual_weights[i])
            h = alpha * residual_input + beta * attention_output
            
            residual_input = h
            h_norm = ff_layer_norms[i](h)
            ff_output = feedforward_layers[i](h_norm)
            h = alpha * residual_input + beta * ff_output
        
        h_pooled = mean(h, dim=sequence)
        prediction = output_projection(h_pooled)
        uncertainty = uncertainty_head(h_pooled)
        
        total_uncertainty = uncertainty + mean(attention_uncertainties)
        
        RETURN prediction, total_uncertainty

Bayesian Multi-Head Attention

CLASS BayesianMultiHeadAttention:
    INPUTS: hidden_dim, num_heads, dropout, gp_kernel_type
    
    COMPONENTS:
        q_proj, k_proj, v_proj, o_proj: Linear projections
        length_scale: Parameter(ones(num_heads))
        signal_variance: Parameter(ones(num_heads))
        dropout: Dropout(dropout)
    
    COMPUTE_GP_KERNEL(x):
        batch_size, seq_len, hidden_dim = x.shape
        x_expanded = unsqueeze(x, dim=2)
        x_tiled = unsqueeze(x, dim=1)
        distances = norm(x_expanded - x_tiled, dim=-1)
        
        kernel_matrices = []
        FOR head_h in range(num_heads):
            kernel = signal_variance[h]² * exp(-distances² / (2 * length_scale[h]²))
            kernel_matrices.append(kernel)
        
        RETURN stack(kernel_matrices, dim=1)
    
    FORWARD(x):
        batch_size, seq_len, _ = x.shape
        
        Q = reshape_multihead(q_proj(x))
        K = reshape_multihead(k_proj(x))
        V = reshape_multihead(v_proj(x))
        
        attention_scores = matmul(Q, transpose(K, -2, -1)) * scale
        gp_kernel = compute_gp_kernel(x)
        enhanced_scores = attention_scores + gp_kernel
        
        attention_weights = softmax(enhanced_scores, dim=-1)
        attention_weights = dropout(attention_weights)
        
        attention_output = matmul(attention_weights, V)
        output = o_proj(reshape_back(attention_output))
        
        attention_entropy = -sum(attention_weights * log(attention_weights + ε), dim=-1)
        uncertainty = mean(attention_entropy, dim=(1, 2))
        
        RETURN output, uncertainty

Training Procedure

PROCEDURE train_model(model, train_loader, val_loader, epochs, lr):
    optimizer = Adam(model.parameters(), lr=lr, weight_decay=1e-5)
    scheduler = ReduceLROnPlateau(optimizer, patience=10, factor=0.5)
    
    best_val_loss = infinity
    patience_counter = 0
    patience = 20
    
    FOR epoch in range(epochs):
        model.train()
        train_loss = 0
        
        FOR batch_x, batch_y in train_loader:
            optimizer.zero_grad()
            
            IF isinstance(model, ResidualBayesianAttention):
                prediction, uncertainty = model(batch_x)
                loss = MSE(squeeze(prediction), batch_y)
                uncertainty_loss = mean(uncertainty)
                loss = loss + 0.01 * uncertainty_loss
            ELSE:
                prediction = model(batch_x)
                loss = MSE(squeeze(prediction), batch_y)
            
            loss.backward()
            clip_grad_norm(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item()
        
        model.eval()
        val_loss = 0
        
        WITH no_grad():
            FOR batch_x, batch_y in val_loader:
                IF isinstance(model, ResidualBayesianAttention):
                    prediction, _ = model(batch_x)
                ELSE:
                    prediction = model(batch_x)
                loss = MSE(squeeze(prediction), batch_y)
                val_loss += loss.item()
        
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        
        scheduler.step(val_loss)
        
        IF val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            save_model(model, 'best_model.pth')
        ELSE:
            patience_counter += 1
            IF patience_counter >= patience:
                BREAK
    
    load_model(model, 'best_model.pth')
    RETURN train_losses, val_losses

Evaluation and Analysis

PROCEDURE evaluate_comprehensive(model, test_loader, X_test, y_test_original):
    model.eval()
    predictions = []
    uncertainties = []
    
    WITH no_grad():
        FOR batch_x, _ in test_loader:
            IF isinstance(model, ResidualBayesianAttention):
                pred, uncertainty = model(batch_x)
                IF len(uncertainty.shape) > 1:
                    uncertainty = squeeze(uncertainty)
                uncertainties.extend(uncertainty.cpu().numpy())
            ELSE:
                pred = model(batch_x)
            predictions.extend(squeeze(pred).cpu().numpy())
    
    predictions = array(predictions)
    predictions_original = inverse_transform(predictions.reshape(-1, 1)).flatten()
    
    metrics = {
        'MSE': mean_squared_error(y_test_original, predictions_original),
        'RMSE': sqrt(MSE),
        'MAE': mean_absolute_error(y_test_original, predictions_original),
        'R²': r2_score(y_test_original, predictions_original),
        'MAPE': mean(abs((y_test_original - predictions_original) / y_test_original)) * 100,
        'CV': (RMSE / mean(y_test_original)) * 100,
        'Explained_Variance': 1 - (var(y_test_original - predictions_original) / var(y_test_original))
    }
    
    IF len(uncertainties) > 0:
        uncertainties = array(uncertainties)
        uncertainties_scaled = uncertainties * target_scaler.scale_[0]
        prediction_intervals = {
            'lower_95': predictions_original - 1.96 * uncertainties_scaled,
            'upper_95': predictions_original + 1.96 * uncertainties_scaled,
            'mean_interval_width': mean(3.92 * uncertainties_scaled)
        }
        metrics['prediction_intervals'] = prediction_intervals
    
    RETURN metrics

Cross-Validation Analysis

PROCEDURE cross_validation_analysis(X, y):
    kf = KFold(n_splits=5, shuffle=True, random_state=random_state)
    
    rba_cv_scores = []
    transformer_cv_scores = []
    rba_cv_values = []
    transformer_cv_values = []
    
    fold = 1
    FOR train_idx, val_idx in kf.split(X):
        X_train_cv, X_val_cv = X[train_idx], X[val_idx]
        y_train_cv, y_val_cv = y[train_idx], y[val_idx]
        
        scaler_X = StandardScaler()
        scaler_y = StandardScaler()
        
        X_train_cv_scaled = scaler_X.fit_transform(X_train_cv)
        X_val_cv_scaled = scaler_X.transform(X_val_cv)
        y_train_cv_scaled = scaler_y.fit_transform(y_train_cv.reshape(-1, 1)).flatten()
        
        train_loader_cv, val_loader_cv = create_torch_datasets(
            X_train_cv_scaled, X_val_cv_scaled, y_train_cv_scaled, y_train_cv_scaled[:len(X_val_cv_scaled)])
        
        rba_model = ResidualBayesianAttention(input_dim=X.shape[1], hidden_dim=128, num_heads=8, num_layers=3)
        train_model(rba_model, train_loader_cv, val_loader_cv, epochs=50)
        
        transformer_model = StandardTransformer(input_dim=X.shape[1], hidden_dim=128, num_heads=8, num_layers=3)
        train_model(transformer_model, train_loader_cv, val_loader_cv, epochs=50)
        
        rba_metrics = evaluate_comprehensive(rba_model, val_loader_cv, X_val_cv, y_val_cv)
        transformer_metrics = evaluate_comprehensive(transformer_model, val_loader_cv, X_val_cv, y_val_cv)
        
        rba_cv_scores.append(rba_metrics['R²'])
        transformer_cv_scores.append(transformer_metrics['R²'])
        rba_cv_values.append(rba_metrics['CV'])
        transformer_cv_values.append(transformer_metrics['CV'])
        
        fold += 1
    
    RETURN {
        'rba_r2_scores': rba_cv_scores,
        'transformer_r2_scores': transformer_cv_scores,
        'rba_cv_values': rba_cv_values,
        'transformer_cv_values': transformer_cv_values
    }

Ablation Study Analysis

PROCEDURE ablation_study_analysis(X, y):
    ablation_models = {
        'Full RBA': ResidualBayesianAttention,
        'No GP Kernel': RBA_NoGPKernel,
        'No Residual': RBA_NoResidual,
        'No Uncertainty': RBA_NoUncertainty,
        'No LayerNorm': RBA_NoLayerNorm,
        'Transformer': StandardTransformer
    }
    
    results = {}
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=random_state)
    
    scaler_X = StandardScaler()
    scaler_y = StandardScaler()
    
    X_train_scaled = scaler_X.fit_transform(X_train)
    X_test_scaled = scaler_X.transform(X_test)
    y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, 1)).flatten()
    
    train_loader, test_loader = create_torch_datasets(X_train_scaled, X_test_scaled, y_train_scaled, y_train_scaled[:len(X_test_scaled)])
    
    FOR model_name, model_class in ablation_models.items():
        model = model_class(input_dim=X.shape[1], hidden_dim=128, num_heads=8, num_layers=3)
        
        train_losses, val_losses = train_model(model, train_loader, test_loader, epochs=50)
        
        IF model_name == 'No Uncertainty':
            predictions = []
            WITH no_grad():
                FOR batch_x, _ in test_loader:
                    pred = model(batch_x)
                    predictions.extend(squeeze(pred).cpu().numpy())
            
            predictions = array(predictions)
            predictions_original = scaler_y.inverse_transform(predictions.reshape(-1, 1)).flatten()
            
            metrics = {
                'MSE': mean_squared_error(y_test, predictions_original),
                'RMSE': sqrt(MSE),
                'MAE': mean_absolute_error(y_test, predictions_original),
                'R²': r2_score(y_test, predictions_original),
                'MAPE': mean(abs((y_test - predictions_original) / y_test)) * 100,
                'CV': (RMSE / mean(y_test)) * 100,
                'predictions': predictions_original
            }
        ELSE:
            metrics = evaluate_comprehensive(model, test_loader, X_test_scaled, y_test)
        
        results[model_name] = metrics
    
    RETURN results

Statistical Analysis

PROCEDURE statistical_analysis(rba_metrics, transformer_metrics):
    rba_errors = abs(rba_metrics['residuals'])
    trans_errors = abs(transformer_metrics['residuals'])
    
    t_statistic, p_value = paired_t_test(trans_errors, rba_errors)
    effect_size = t_statistic / sqrt(len(rba_errors))
    
    significance_level = IF p_value < 0.001 THEN "***"
                        ELSE IF p_value < 0.01 THEN "**"
                        ELSE IF p_value < 0.05 THEN "*"
                        ELSE "ns"
    
    IF rba_metrics['prediction_intervals']:
        actual = y_test_original
        intervals = rba_metrics['prediction_intervals']
        coverage_95 = mean((actual >= intervals['lower_95']) & (actual <= intervals['upper_95'])) * 100
    
    RETURN {
        't_statistic': t_statistic,
        'p_value': p_value,
        'effect_size': effect_size,
        'significance': significance_level,
        'coverage': coverage_95 if available else None
    }

Visualization Generation

PROCEDURE plot_focused_analysis():
    figure = create_figure(size=(20, 12))
    
    # CV Comparison Box Plot
    subplot1 = subplot(2, 3, 1)
    cv_data = [cv_results['rba_cv_values'], cv_results['transformer_cv_values']]
    boxplot(cv_data, labels=['RBA', 'Transformer'])
    title('CV Comparison Across 5-Fold Cross-Validation')
    
    # Geographic Distribution Plots
    subplot2 = subplot(2, 3, 2)
    scatter(coordinates[:, 0], coordinates[:, 1], c=y_true, cmap='viridis')
    title('True House Values (Geographic Distribution)')
    
    subplot3 = subplot(2, 3, 3)
    scatter(coordinates[:, 0], coordinates[:, 1], c=rba_predictions, cmap='viridis')
    title('RBA Predictions (Geographic Distribution)')
    
    subplot4 = subplot(2, 3, 4)
    scatter(coordinates[:, 0], coordinates[:, 1], c=transformer_predictions, cmap='viridis')
    title('Transformer Predictions (Geographic Distribution)')
    
    # Error Analysis
    subplot5 = subplot(2, 3, 5)
    error_difference = transformer_errors - rba_errors
    scatter(coordinates[:, 0], coordinates[:, 1], c=error_difference, cmap='RdBu_r')
    title('Error Improvement (Transformer Error - RBA Error)')
    
    # Performance Summary Table
    subplot6 = subplot(2, 3, 6)
    create_performance_table(rba_metrics, transformer_metrics)
    
    save_figure('Focused_RBA_vs_Transformer_Analysis', formats=['png', 'pdf'])
    show()

Component Importance Analysis

PROCEDURE analyze_component_importance(ablation_results):
    full_rba_metrics = ablation_results['Full RBA']
    component_analysis = []
    
    FOR model_name, metrics in ablation_results.items():
        IF model_name != 'Full RBA':
            rmse_change = ((metrics['RMSE'] - full_rba_metrics['RMSE']) / full_rba_metrics['RMSE']) * 100
            r2_change = ((metrics['R²'] - full_rba_metrics['R²']) / full_rba_metrics['R²']) * 100
            cv_change = ((metrics['CV'] - full_rba_metrics['CV']) / full_rba_metrics['CV']) * 100
            
            impact = IF abs(rmse_change) > 10 THEN "极高"
                    ELSE IF abs(rmse_change) > 5 THEN "高"
                    ELSE IF abs(rmse_change) > 2 THEN "中等"
                    ELSE "低"
            
            component_analysis.append({
                'component': model_name,
                'rmse_change': rmse_change,
                'r2_change': r2_change,
                'cv_change': cv_change,
                'impact': impact,
                'abs_impact': abs(rmse_change)
            })
    
    sort(component_analysis, key=lambda x: x['abs_impact'], reverse=True)
    
    RETURN component_analysis