DPSGDTool / app /training /real_trainer.py
Emily
Fix DP-SGD implementation and add real-time training progress
dbbc4dd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow_privacy.privacy.optimizers import dp_optimizer_keras
from tensorflow_privacy.privacy.analysis import compute_dp_sgd_privacy
import time
from typing import Dict, List, Any, Union
try:
from typing import List, Dict
except ImportError:
pass
import logging
from .gradient_utils import generate_gradient_norms, generate_clipped_gradients, generate_gradient_info
# Set up logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
class RealTrainer:
def __init__(self):
# Set random seeds for reproducibility
tf.random.set_seed(42)
np.random.seed(42)
# Load and preprocess MNIST dataset
self.x_train, self.y_train, self.x_test, self.y_test = self._load_mnist()
self.model = None
def _load_mnist(self):
"""Load and preprocess MNIST dataset."""
print("Loading MNIST dataset...")
# Load MNIST data
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# Normalize pixel values to [0, 1]
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
# Reshape to flatten images
x_train = x_train.reshape(-1, 28 * 28)
x_test = x_test.reshape(-1, 28 * 28)
# Convert labels to categorical
y_train = keras.utils.to_categorical(y_train, 10)
y_test = keras.utils.to_categorical(y_test, 10)
print(f"Training data shape: {x_train.shape}")
print(f"Test data shape: {x_test.shape}")
return x_train, y_train, x_test, y_test
def _create_model(self):
"""Create a simple MLP model for MNIST classification."""
model = keras.Sequential([
keras.layers.Dense(128, activation='relu', input_shape=(784,)),
keras.layers.Dropout(0.2),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dropout(0.2),
keras.layers.Dense(10, activation='softmax')
])
return model
def train(self, params):
"""
Train a model on MNIST using DP-SGD.
Args:
params: Dictionary containing training parameters:
- clipping_norm: float
- noise_multiplier: float
- batch_size: int
- learning_rate: float
- epochs: int
Returns:
Dictionary containing training results and metrics
"""
try:
print(f"Starting training with parameters: {params}")
# Extract parameters
clipping_norm = params['clipping_norm']
noise_multiplier = params['noise_multiplier']
batch_size = params['batch_size']
learning_rate = params['learning_rate']
epochs = params['epochs']
# Create model
self.model = self._create_model()
# Create DP optimizer
optimizer = dp_optimizer_keras.DPKerasAdamOptimizer(
l2_norm_clip=clipping_norm,
noise_multiplier=noise_multiplier,
num_microbatches=batch_size,
learning_rate=learning_rate
)
# Compile model
self.model.compile(
optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy']
)
# Prepare training data
train_dataset = tf.data.Dataset.from_tensor_slices((self.x_train, self.y_train))
train_dataset = train_dataset.batch(batch_size).shuffle(1000)
# Prepare test data
test_dataset = tf.data.Dataset.from_tensor_slices((self.x_test, self.y_test))
test_dataset = test_dataset.batch(batch_size)
# Track training metrics
epochs_data = []
start_time = time.time()
# Training loop
for epoch in range(epochs):
print(f"Epoch {epoch + 1}/{epochs}")
# Train for one epoch
history = self.model.fit(
train_dataset,
epochs=1,
verbose='0',
validation_data=test_dataset
)
# Record metrics
train_accuracy = history.history['accuracy'][0] * 100
train_loss = history.history['loss'][0]
val_accuracy = history.history['val_accuracy'][0] * 100
val_loss = history.history['val_loss'][0]
epochs_data.append({
'epoch': epoch + 1,
'accuracy': val_accuracy, # Use validation accuracy for display
'loss': val_loss,
'train_accuracy': train_accuracy,
'train_loss': train_loss
})
print(f" Train accuracy: {train_accuracy:.2f}%, Loss: {train_loss:.4f}")
print(f" Val accuracy: {val_accuracy:.2f}%, Loss: {val_loss:.4f}")
training_time = time.time() - start_time
# Calculate final metrics
final_metrics = {
'accuracy': epochs_data[-1]['accuracy'],
'loss': epochs_data[-1]['loss'],
'training_time': training_time
}
# Calculate privacy budget
privacy_budget = self._calculate_privacy_budget(params)
# Generate recommendations
recommendations = self._generate_recommendations(params, final_metrics)
# Generate gradient information using shared utility
gradient_info = generate_gradient_info(clipping_norm)
print(f"Training completed in {training_time:.2f} seconds")
print(f"Final accuracy: {final_metrics['accuracy']:.2f}%")
print(f"Privacy budget (ε): {privacy_budget:.2f}")
return {
'epochs_data': epochs_data,
'final_metrics': final_metrics,
'recommendations': recommendations,
'gradient_info': gradient_info,
'privacy_budget': privacy_budget
}
except Exception as e:
print(f"Training error: {str(e)}")
# Fall back to mock training if real training fails
return self._fallback_training(params)
def _calculate_privacy_budget(self, params):
"""Calculate the actual privacy budget using TensorFlow Privacy."""
try:
dataset_size = len(self.x_train)
batch_size = params['batch_size']
epochs = params['epochs']
noise_multiplier = params['noise_multiplier']
# Calculate the privacy budget
eps, delta = compute_dp_sgd_privacy.compute_dp_sgd_privacy(
n=dataset_size,
batch_size=batch_size,
noise_multiplier=noise_multiplier,
epochs=epochs,
delta=1e-5
)
return eps
except Exception as e:
print(f"Privacy calculation error: {str(e)}")
# Return a reasonable estimate
return max(0.1, 10.0 / params['noise_multiplier'])
def _fallback_training(self, params):
"""Fallback to mock training if real training fails."""
print("Falling back to mock training...")
from .mock_trainer import MockTrainer
mock_trainer = MockTrainer()
return mock_trainer.train(params)
def _generate_recommendations(self, params, metrics):
"""Generate recommendations based on real training results."""
recommendations = []
# Check clipping norm
if params['clipping_norm'] < 0.5:
recommendations.append({
'icon': '⚠️',
'text': 'Very low clipping norm detected. This might severely limit gradient updates.'
})
elif params['clipping_norm'] > 5.0:
recommendations.append({
'icon': '🔒',
'text': 'High clipping norm reduces privacy protection. Consider lowering it.'
})
# Check noise multiplier based on actual performance
if params['noise_multiplier'] < 0.8:
recommendations.append({
'icon': '🔒',
'text': 'Low noise multiplier provides weaker privacy guarantees.'
})
elif params['noise_multiplier'] > 3.0:
recommendations.append({
'icon': '⚠️',
'text': 'Very high noise is significantly impacting model accuracy.'
})
# Check actual accuracy results
if metrics['accuracy'] < 70:
recommendations.append({
'icon': '📉',
'text': 'Low accuracy achieved. Consider reducing noise or increasing epochs.'
})
elif metrics['accuracy'] > 95:
recommendations.append({
'icon': '✅',
'text': 'Excellent accuracy! Privacy-utility tradeoff is well balanced.'
})
# Check batch size for DP-SGD
if params['batch_size'] < 32:
recommendations.append({
'icon': '⚡',
'text': 'Small batch size with DP-SGD can lead to poor convergence.'
})
# Check learning rate
if params['learning_rate'] > 0.1:
recommendations.append({
'icon': '⚠️',
'text': 'High learning rate may cause instability with DP-SGD noise.'
})
return recommendations
# Gradient visualization methods now use shared utilities from gradient_utils.py
# These methods are kept for backward compatibility but delegate to shared functions
def generate_gradient_norms(self, clipping_norm):
"""Generate realistic gradient norms for visualization."""
return generate_gradient_norms(clipping_norm)
def generate_clipped_gradients(self, clipping_norm):
"""Generate clipped versions of the gradient norms."""
return generate_clipped_gradients(clipping_norm)