gaze_test / training.py
Olof Astrand
Added training
3b4813c
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import json
import cv2
import os
import matplotlib.pyplot as plt
import gc # Garbage collection
from pathlib import Path
# Set memory growth for GPU if available
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
except RuntimeError as e:
print(e)
class MemoryEfficientGazeModel:
def __init__(self, input_shape=(60, 80, 3), screen_width=1920, screen_height=1080):
self.input_shape = input_shape
self.screen_width = screen_width
self.screen_height = screen_height
self.model = None
def build_efficient_model(self):
"""Build a memory-efficient CNN model."""
inputs = keras.Input(shape=self.input_shape)
# Smaller model with fewer parameters
x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(inputs)
x = layers.MaxPooling2D((2, 2))(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = layers.MaxPooling2D((2, 2))(x)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = layers.MaxPooling2D((2, 2))(x)
x = layers.BatchNormalization()(x)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dense(64, activation='relu')(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(2, activation='sigmoid')(x)
self.model = keras.Model(inputs, outputs)
return self.model
def load_numpy_arrays(self, data_dir):
"""Load data from numpy arrays."""
data_dir = Path(data_dir)
arrays_dir = data_dir / 'arrays'
# Load metadata
with open(arrays_dir / 'metadata.json', 'r') as f:
metadata = json.load(f)
# Get screen dimensions from metadata
self.screen_width = metadata.get('screen_width', 1920)
self.screen_height = metadata.get('screen_height', 1080)
print(f"Screen dimensions: {self.screen_width}x{self.screen_height}")
# Load arrays
print("Loading numpy arrays...")
train_images = np.load(arrays_dir / 'train_images.npy')
train_gaze = np.load(arrays_dir / 'train_gaze.npy')
val_images = np.load(arrays_dir / 'val_images.npy')
val_gaze = np.load(arrays_dir / 'val_gaze.npy')
test_images = np.load(arrays_dir / 'test_images.npy')
test_gaze = np.load(arrays_dir / 'test_gaze.npy')
print(f"\nData shapes:")
print(f"Training: {train_images.shape}, {train_gaze.shape}")
print(f"Validation: {val_images.shape}, {val_gaze.shape}")
print(f"Test: {test_images.shape}, {test_gaze.shape}")
# Check if images need resizing
if train_images.shape[1:3] != self.input_shape[:2]:
print(f"\nResizing images from {train_images.shape[1:3]} to {self.input_shape[:2]}")
train_images = self.resize_batch(train_images, self.input_shape[:2])
val_images = self.resize_batch(val_images, self.input_shape[:2])
test_images = self.resize_batch(test_images, self.input_shape[:2])
# Normalize images to [0, 1] if needed
if train_images.max() > 1:
print("Normalizing images to [0, 1]")
train_images = train_images.astype('float32') / 255.0
val_images = val_images.astype('float32') / 255.0
test_images = test_images.astype('float32') / 255.0
# Normalize gaze coordinates to [0, 1] if needed
if train_gaze.max() > 1:
print("Normalizing gaze coordinates")
train_gaze[:, 0] = train_gaze[:, 0] / self.screen_width
train_gaze[:, 1] = train_gaze[:, 1] / self.screen_height
val_gaze[:, 0] = val_gaze[:, 0] / self.screen_width
val_gaze[:, 1] = val_gaze[:, 1] / self.screen_height
test_gaze[:, 0] = test_gaze[:, 0] / self.screen_width
test_gaze[:, 1] = test_gaze[:, 1] / self.screen_height
return (train_images, train_gaze), (val_images, val_gaze), (test_images, test_gaze)
def resize_batch(self, images, target_size):
"""Resize a batch of images."""
resized = np.zeros((images.shape[0], target_size[0], target_size[1], images.shape[3]))
for i in range(images.shape[0]):
resized[i] = cv2.resize(images[i], (target_size[1], target_size[0]))
return resized
def load_tf_datasets(self, data_dir, batch_size=16):
"""Load TensorFlow datasets if available."""
data_dir = Path(data_dir)
tf_dir = data_dir / 'tf_datasets'
if not tf_dir.exists():
print("TensorFlow datasets not found, falling back to numpy arrays")
return None
print("Loading TensorFlow datasets...")
# Load datasets
train_ds = tf.data.Dataset.load(str(tf_dir / 'train'))
val_ds = tf.data.Dataset.load(str(tf_dir / 'val'))
test_ds = tf.data.Dataset.load(str(tf_dir / 'test'))
# Get dataset info by examining first sample
is_batched = False
for sample in train_ds.take(1):
if isinstance(sample, tuple) and len(sample) == 2:
image_sample, gaze_sample = sample
print(f"Sample shapes - Image: {image_sample.shape}, Gaze: {gaze_sample.shape}")
# Check if already batched
if len(image_sample.shape) == 4: # (batch, H, W, C)
is_batched = True
print("Dataset is already batched")
else:
print(f"Unexpected sample format: {type(sample)}")
# Count samples - this might count batches if already batched
train_size = sum(1 for _ in train_ds)
val_size = sum(1 for _ in val_ds)
test_size = sum(1 for _ in test_ds)
print(f"Dataset sizes - Train: {train_size}, Val: {val_size}, Test: {test_size}")
if is_batched:
# Dataset is already batched, just preprocess
def preprocess_batch(image_batch, gaze_batch):
# Image batch shape: (batch, H, W, C)
# Resize if needed
if image_batch.shape[1] != self.input_shape[0] or image_batch.shape[2] != self.input_shape[1]:
image_batch = tf.image.resize(image_batch, (self.input_shape[0], self.input_shape[1]))
# Normalize to [0, 1]
image_batch = tf.cast(image_batch, tf.float32)
if tf.reduce_max(image_batch) > 1:
image_batch = image_batch / 255.0
# Handle gaze coordinates
gaze_batch = tf.cast(gaze_batch, tf.float32)
# Normalize gaze coordinates if needed
if tf.reduce_max(gaze_batch) > 1:
gaze_batch = tf.stack([
gaze_batch[:, 0] / self.screen_width,
gaze_batch[:, 1] / self.screen_height
], axis=1)
return image_batch, gaze_batch
# Apply preprocessing
train_ds = train_ds.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE)
val_ds = val_ds.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE)
test_ds = test_ds.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE)
else:
# Dataset is not batched, preprocess individual samples
def preprocess(image, gaze):
# Resize if needed
image = tf.image.resize(image, (self.input_shape[0], self.input_shape[1]))
# Normalize to [0, 1]
image = tf.cast(image, tf.float32)
if tf.reduce_max(image) > 1:
image = image / 255.0
# Handle gaze coordinates
gaze = tf.cast(gaze, tf.float32)
# Normalize gaze coordinates if needed
if tf.reduce_max(gaze) > 1:
gaze = tf.stack([
gaze[0] / self.screen_width,
gaze[1] / self.screen_height
])
return image, gaze
# Apply preprocessing and batching
train_ds = train_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE)
val_ds = val_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE)
test_ds = test_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE)
return train_ds, val_ds, test_ds
def create_data_generator(self, images, gaze_coords, batch_size=16, augment=False):
"""Create a data generator from numpy arrays."""
def generator():
indices = np.arange(len(images))
while True:
np.random.shuffle(indices)
for i in range(0, len(indices), batch_size):
batch_indices = indices[i:i + batch_size]
batch_images = images[batch_indices].copy()
batch_gaze = gaze_coords[batch_indices].copy()
if augment:
for j in range(len(batch_images)):
# Random brightness
if np.random.random() > 0.5:
brightness = np.random.uniform(0.8, 1.2)
batch_images[j] = np.clip(batch_images[j] * brightness, 0, 1)
# Random contrast
if np.random.random() > 0.5:
contrast = np.random.uniform(0.8, 1.2)
batch_images[j] = np.clip((batch_images[j] - 0.5) * contrast + 0.5, 0, 1)
# Horizontal flip
if np.random.random() > 0.5:
batch_images[j] = np.fliplr(batch_images[j])
batch_gaze[j, 0] = 1.0 - batch_gaze[j, 0]
yield batch_images, batch_gaze
return generator()
def train_model(self, train_data, val_data, batch_size=16, epochs=50):
"""Train the model with either numpy arrays or tf.data."""
# Compile model
self.model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
# Callbacks
callbacks = [
keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True,
verbose=1
),
keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-6,
verbose=1
),
keras.callbacks.ModelCheckpoint(
'best_gaze_model.keras',
monitor='val_loss',
save_best_only=True,
verbose=1
)
]
# Check if we have tf.data.Dataset or numpy arrays
if isinstance(train_data, tf.data.Dataset):
# Use tf.data directly
history = self.model.fit(
train_data,
validation_data=val_data,
epochs=epochs,
callbacks=callbacks,
verbose=1
)
else:
# Use numpy arrays with generators
train_images, train_gaze = train_data
val_images, val_gaze = val_data
train_gen = self.create_data_generator(train_images, train_gaze, batch_size, augment=True)
val_gen = self.create_data_generator(val_images, val_gaze, batch_size, augment=False)
train_steps = len(train_images) // batch_size
val_steps = len(val_images) // batch_size
history = self.model.fit(
train_gen,
steps_per_epoch=train_steps,
validation_data=val_gen,
validation_steps=val_steps,
epochs=epochs,
callbacks=callbacks,
verbose=1
)
return history
def evaluate_model(self, test_data, batch_size=16):
"""Evaluate the model."""
# Load best model
self.model = keras.models.load_model('best_gaze_model.keras')
if isinstance(test_data, tf.data.Dataset):
# Evaluate with tf.data
results = self.model.evaluate(test_data)
# Get predictions for detailed analysis
predictions = []
actuals = []
for batch in test_data:
if isinstance(batch, tuple):
batch_images, batch_gaze = batch
else:
batch_images = batch['image']
batch_gaze = batch['gaze']
pred = self.model.predict(batch_images, verbose=0)
predictions.extend(pred)
actuals.extend(batch_gaze.numpy())
else:
# Evaluate with numpy arrays
test_images, test_gaze = test_data
results = self.model.evaluate(test_images, test_gaze, batch_size=batch_size)
predictions = self.model.predict(test_images, batch_size=batch_size)
actuals = test_gaze
loss, mae = results
print(f"\nTest Results:")
print(f"Loss: {loss:.4f}")
print(f"MAE (normalized): {mae:.4f}")
print(f"Approximate pixel error: {mae * np.mean([self.screen_width, self.screen_height]):.2f} pixels")
# Detailed error analysis
predictions = np.array(predictions)
actuals = np.array(actuals)
# Denormalize
pred_x = predictions[:, 0] * self.screen_width
pred_y = predictions[:, 1] * self.screen_height
actual_x = actuals[:, 0] * self.screen_width
actual_y = actuals[:, 1] * self.screen_height
# Calculate errors
x_error = np.mean(np.abs(pred_x - actual_x))
y_error = np.mean(np.abs(pred_y - actual_y))
euclidean_error = np.mean(np.sqrt((pred_x - actual_x)**2 + (pred_y - actual_y)**2))
print(f"\nDetailed Error Analysis:")
print(f"X-axis error: {x_error:.2f} pixels")
print(f"Y-axis error: {y_error:.2f} pixels")
print(f"Euclidean error: {euclidean_error:.2f} pixels")
return predictions, actuals
def visualize_predictions(model, test_images, test_gaze, predictions, num_samples=5):
"""Visualize some predictions."""
plt.figure(figsize=(15, 3 * num_samples))
# Random sample indices
indices = np.random.choice(len(test_images), num_samples, replace=False)
for i, idx in enumerate(indices):
img = test_images[idx]
actual = test_gaze[idx]
pred = predictions[idx]
# Denormalize coordinates
actual_x = int(actual[0] * model.screen_width)
actual_y = int(actual[1] * model.screen_height)
pred_x = int(pred[0] * model.screen_width)
pred_y = int(pred[1] * model.screen_height)
# Create visualization
plt.subplot(num_samples, 3, i*3 + 1)
plt.imshow(img)
plt.title('Webcam Image')
plt.axis('off')
# Create screen visualization
screen = np.ones((model.screen_height // 10, model.screen_width // 10, 3))
# Draw actual gaze point (green)
cv2.circle(screen, (actual_x // 10, actual_y // 10), 5, (0, 1, 0), -1)
# Draw predicted gaze point (red)
cv2.circle(screen, (pred_x // 10, pred_y // 10), 5, (1, 0, 0), -1)
plt.subplot(num_samples, 3, i*3 + 2)
plt.imshow(screen)
plt.title(f'Gaze Points (Green: Actual, Red: Predicted)')
plt.axis('off')
# Error visualization
error = np.sqrt((pred_x - actual_x)**2 + (pred_y - actual_y)**2)
plt.subplot(num_samples, 3, i*3 + 3)
plt.text(0.5, 0.5, f'Error: {error:.1f} pixels',
horizontalalignment='center', verticalalignment='center',
transform=plt.gca().transAxes, fontsize=14)
plt.axis('off')
plt.tight_layout()
plt.savefig('prediction_examples.png', dpi=150)
plt.close()
def main():
# Configuration
data_dir = "my_dataset" # Your dataset directory
batch_size = 16 # Small batch size to save memory
use_tf_datasets = True # Try to use TF datasets if available
# Create model
model = MemoryEfficientGazeModel()
model.build_efficient_model()
model.model.summary()
# Try to load TF datasets first
if use_tf_datasets:
datasets = model.load_tf_datasets(data_dir, batch_size)
if datasets:
train_data, val_data, test_data = datasets
else:
use_tf_datasets = False
# Fall back to numpy arrays
if not use_tf_datasets:
(train_data, val_data, test_data) = model.load_numpy_arrays(data_dir)
# Train model
print("\nTraining model...")
history = model.train_model(train_data, val_data, batch_size=batch_size, epochs=50)
# Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Model Loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Training MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.xlabel('Epoch')
plt.ylabel('MAE')
plt.title('Model MAE')
plt.legend()
plt.tight_layout()
plt.savefig('training_history_efficient.png')
plt.close()
# Evaluate model
print("\nEvaluating model...")
predictions, actuals = model.evaluate_model(test_data, batch_size=batch_size)
# Visualize predictions
if not use_tf_datasets:
test_images, test_gaze = test_data
print("\nGenerating prediction visualizations...")
visualize_predictions(model, test_images, test_gaze, predictions)
# Convert to TFLite
print("\nConverting to TFLite...")
converter = tf.lite.TFLiteConverter.from_keras_model(model.model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# Representative dataset for quantization
if not use_tf_datasets:
train_images, _ = train_data
def representative_dataset():
for i in range(min(100, len(train_images))):
yield [np.expand_dims(train_images[i], axis=0).astype(np.float32)]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [
tf.lite.OpsSet.TFLITE_BUILTINS_INT8,
tf.lite.OpsSet.TFLITE_BUILTINS
]
tflite_model = converter.convert()
with open('gaze_model_efficient.tflite', 'wb') as f:
f.write(tflite_model)
print("TFLite model saved")
# Save model config
model_config = {
'input_shape': model.input_shape,
'screen_width': model.screen_width,
'screen_height': model.screen_height,
'model_version': '1.0',
'training_date': str(np.datetime64('today'))
}
with open('model_config.json', 'w') as f:
json.dump(model_config, f, indent=2)
print("\nTraining complete!")
# Clear memory
del model
gc.collect()
if __name__ == "__main__":
main()