| import numpy as np |
| import tensorflow as tf |
| from tensorflow import keras |
| from tensorflow_privacy.privacy.optimizers import dp_optimizer_keras |
| from tensorflow_privacy.privacy.analysis import compute_dp_sgd_privacy |
| import time |
| from typing import Dict, List, Any, Union |
| try: |
| from typing import List, Dict |
| except ImportError: |
| pass |
| import logging |
| from .gradient_utils import generate_gradient_norms, generate_clipped_gradients, generate_gradient_info |
|
|
| |
| logging.getLogger('tensorflow').setLevel(logging.ERROR) |
|
|
| class RealTrainer: |
| def __init__(self): |
| |
| tf.random.set_seed(42) |
| np.random.seed(42) |
| |
| |
| self.x_train, self.y_train, self.x_test, self.y_test = self._load_mnist() |
| self.model = None |
| |
| def _load_mnist(self): |
| """Load and preprocess MNIST dataset.""" |
| print("Loading MNIST dataset...") |
| |
| |
| (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data() |
| |
| |
| x_train = x_train.astype('float32') / 255.0 |
| x_test = x_test.astype('float32') / 255.0 |
| |
| |
| x_train = x_train.reshape(-1, 28 * 28) |
| x_test = x_test.reshape(-1, 28 * 28) |
| |
| |
| y_train = keras.utils.to_categorical(y_train, 10) |
| y_test = keras.utils.to_categorical(y_test, 10) |
| |
| print(f"Training data shape: {x_train.shape}") |
| print(f"Test data shape: {x_test.shape}") |
| |
| return x_train, y_train, x_test, y_test |
| |
| def _create_model(self): |
| """Create a simple MLP model for MNIST classification.""" |
| model = keras.Sequential([ |
| keras.layers.Dense(128, activation='relu', input_shape=(784,)), |
| keras.layers.Dropout(0.2), |
| keras.layers.Dense(64, activation='relu'), |
| keras.layers.Dropout(0.2), |
| keras.layers.Dense(10, activation='softmax') |
| ]) |
| return model |
| |
| def train(self, params): |
| """ |
| Train a model on MNIST using DP-SGD. |
| |
| Args: |
| params: Dictionary containing training parameters: |
| - clipping_norm: float |
| - noise_multiplier: float |
| - batch_size: int |
| - learning_rate: float |
| - epochs: int |
| |
| Returns: |
| Dictionary containing training results and metrics |
| """ |
| try: |
| print(f"Starting training with parameters: {params}") |
| |
| |
| clipping_norm = params['clipping_norm'] |
| noise_multiplier = params['noise_multiplier'] |
| batch_size = params['batch_size'] |
| learning_rate = params['learning_rate'] |
| epochs = params['epochs'] |
| |
| |
| self.model = self._create_model() |
| |
| |
| optimizer = dp_optimizer_keras.DPKerasAdamOptimizer( |
| l2_norm_clip=clipping_norm, |
| noise_multiplier=noise_multiplier, |
| num_microbatches=batch_size, |
| learning_rate=learning_rate |
| ) |
| |
| |
| self.model.compile( |
| optimizer=optimizer, |
| loss='categorical_crossentropy', |
| metrics=['accuracy'] |
| ) |
| |
| |
| train_dataset = tf.data.Dataset.from_tensor_slices((self.x_train, self.y_train)) |
| train_dataset = train_dataset.batch(batch_size).shuffle(1000) |
| |
| |
| test_dataset = tf.data.Dataset.from_tensor_slices((self.x_test, self.y_test)) |
| test_dataset = test_dataset.batch(batch_size) |
| |
| |
| epochs_data = [] |
| start_time = time.time() |
| |
| |
| for epoch in range(epochs): |
| print(f"Epoch {epoch + 1}/{epochs}") |
| |
| |
| history = self.model.fit( |
| train_dataset, |
| epochs=1, |
| verbose='0', |
| validation_data=test_dataset |
| ) |
| |
| |
| train_accuracy = history.history['accuracy'][0] * 100 |
| train_loss = history.history['loss'][0] |
| val_accuracy = history.history['val_accuracy'][0] * 100 |
| val_loss = history.history['val_loss'][0] |
| |
| epochs_data.append({ |
| 'epoch': epoch + 1, |
| 'accuracy': val_accuracy, |
| 'loss': val_loss, |
| 'train_accuracy': train_accuracy, |
| 'train_loss': train_loss |
| }) |
| |
| print(f" Train accuracy: {train_accuracy:.2f}%, Loss: {train_loss:.4f}") |
| print(f" Val accuracy: {val_accuracy:.2f}%, Loss: {val_loss:.4f}") |
| |
| training_time = time.time() - start_time |
| |
| |
| final_metrics = { |
| 'accuracy': epochs_data[-1]['accuracy'], |
| 'loss': epochs_data[-1]['loss'], |
| 'training_time': training_time |
| } |
| |
| |
| privacy_budget = self._calculate_privacy_budget(params) |
| |
| |
| recommendations = self._generate_recommendations(params, final_metrics) |
| |
| |
| gradient_info = generate_gradient_info(clipping_norm) |
| |
| print(f"Training completed in {training_time:.2f} seconds") |
| print(f"Final accuracy: {final_metrics['accuracy']:.2f}%") |
| print(f"Privacy budget (ε): {privacy_budget:.2f}") |
| |
| return { |
| 'epochs_data': epochs_data, |
| 'final_metrics': final_metrics, |
| 'recommendations': recommendations, |
| 'gradient_info': gradient_info, |
| 'privacy_budget': privacy_budget |
| } |
| |
| except Exception as e: |
| print(f"Training error: {str(e)}") |
| |
| return self._fallback_training(params) |
| |
| def _calculate_privacy_budget(self, params): |
| """Calculate the actual privacy budget using TensorFlow Privacy.""" |
| try: |
| dataset_size = len(self.x_train) |
| batch_size = params['batch_size'] |
| epochs = params['epochs'] |
| noise_multiplier = params['noise_multiplier'] |
| |
| |
| eps, delta = compute_dp_sgd_privacy.compute_dp_sgd_privacy( |
| n=dataset_size, |
| batch_size=batch_size, |
| noise_multiplier=noise_multiplier, |
| epochs=epochs, |
| delta=1e-5 |
| ) |
| |
| return eps |
| except Exception as e: |
| print(f"Privacy calculation error: {str(e)}") |
| |
| return max(0.1, 10.0 / params['noise_multiplier']) |
| |
| def _fallback_training(self, params): |
| """Fallback to mock training if real training fails.""" |
| print("Falling back to mock training...") |
| from .mock_trainer import MockTrainer |
| mock_trainer = MockTrainer() |
| return mock_trainer.train(params) |
| |
| def _generate_recommendations(self, params, metrics): |
| """Generate recommendations based on real training results.""" |
| recommendations = [] |
| |
| |
| if params['clipping_norm'] < 0.5: |
| recommendations.append({ |
| 'icon': '⚠️', |
| 'text': 'Very low clipping norm detected. This might severely limit gradient updates.' |
| }) |
| elif params['clipping_norm'] > 5.0: |
| recommendations.append({ |
| 'icon': '🔒', |
| 'text': 'High clipping norm reduces privacy protection. Consider lowering it.' |
| }) |
| |
| |
| if params['noise_multiplier'] < 0.8: |
| recommendations.append({ |
| 'icon': '🔒', |
| 'text': 'Low noise multiplier provides weaker privacy guarantees.' |
| }) |
| elif params['noise_multiplier'] > 3.0: |
| recommendations.append({ |
| 'icon': '⚠️', |
| 'text': 'Very high noise is significantly impacting model accuracy.' |
| }) |
| |
| |
| if metrics['accuracy'] < 70: |
| recommendations.append({ |
| 'icon': '📉', |
| 'text': 'Low accuracy achieved. Consider reducing noise or increasing epochs.' |
| }) |
| elif metrics['accuracy'] > 95: |
| recommendations.append({ |
| 'icon': '✅', |
| 'text': 'Excellent accuracy! Privacy-utility tradeoff is well balanced.' |
| }) |
| |
| |
| if params['batch_size'] < 32: |
| recommendations.append({ |
| 'icon': '⚡', |
| 'text': 'Small batch size with DP-SGD can lead to poor convergence.' |
| }) |
| |
| |
| if params['learning_rate'] > 0.1: |
| recommendations.append({ |
| 'icon': '⚠️', |
| 'text': 'High learning rate may cause instability with DP-SGD noise.' |
| }) |
| |
| return recommendations |
| |
| |
| |
| |
| def generate_gradient_norms(self, clipping_norm): |
| """Generate realistic gradient norms for visualization.""" |
| return generate_gradient_norms(clipping_norm) |
| |
| def generate_clipped_gradients(self, clipping_norm): |
| """Generate clipped versions of the gradient norms.""" |
| return generate_clipped_gradients(clipping_norm) |