import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers import numpy as np import json import cv2 import os import matplotlib.pyplot as plt import gc # Garbage collection from pathlib import Path # Set memory growth for GPU if available gpus = tf.config.experimental.list_physical_devices('GPU') if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) except RuntimeError as e: print(e) class MemoryEfficientGazeModel: def __init__(self, input_shape=(60, 80, 3), screen_width=1920, screen_height=1080): self.input_shape = input_shape self.screen_width = screen_width self.screen_height = screen_height self.model = None def build_efficient_model(self): """Build a memory-efficient CNN model.""" inputs = keras.Input(shape=self.input_shape) # Smaller model with fewer parameters x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(inputs) x = layers.MaxPooling2D((2, 2))(x) x = layers.BatchNormalization()(x) x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x) x = layers.MaxPooling2D((2, 2))(x) x = layers.BatchNormalization()(x) x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x) x = layers.MaxPooling2D((2, 2))(x) x = layers.BatchNormalization()(x) x = layers.GlobalAveragePooling2D()(x) x = layers.Dense(64, activation='relu')(x) x = layers.Dropout(0.3)(x) outputs = layers.Dense(2, activation='sigmoid')(x) self.model = keras.Model(inputs, outputs) return self.model def load_numpy_arrays(self, data_dir): """Load data from numpy arrays.""" data_dir = Path(data_dir) arrays_dir = data_dir / 'arrays' # Load metadata with open(arrays_dir / 'metadata.json', 'r') as f: metadata = json.load(f) # Get screen dimensions from metadata self.screen_width = metadata.get('screen_width', 1920) self.screen_height = metadata.get('screen_height', 1080) print(f"Screen dimensions: {self.screen_width}x{self.screen_height}") # Load arrays print("Loading numpy arrays...") train_images = np.load(arrays_dir / 'train_images.npy') train_gaze = np.load(arrays_dir / 'train_gaze.npy') val_images = np.load(arrays_dir / 'val_images.npy') val_gaze = np.load(arrays_dir / 'val_gaze.npy') test_images = np.load(arrays_dir / 'test_images.npy') test_gaze = np.load(arrays_dir / 'test_gaze.npy') print(f"\nData shapes:") print(f"Training: {train_images.shape}, {train_gaze.shape}") print(f"Validation: {val_images.shape}, {val_gaze.shape}") print(f"Test: {test_images.shape}, {test_gaze.shape}") # Check if images need resizing if train_images.shape[1:3] != self.input_shape[:2]: print(f"\nResizing images from {train_images.shape[1:3]} to {self.input_shape[:2]}") train_images = self.resize_batch(train_images, self.input_shape[:2]) val_images = self.resize_batch(val_images, self.input_shape[:2]) test_images = self.resize_batch(test_images, self.input_shape[:2]) # Normalize images to [0, 1] if needed if train_images.max() > 1: print("Normalizing images to [0, 1]") train_images = train_images.astype('float32') / 255.0 val_images = val_images.astype('float32') / 255.0 test_images = test_images.astype('float32') / 255.0 # Normalize gaze coordinates to [0, 1] if needed if train_gaze.max() > 1: print("Normalizing gaze coordinates") train_gaze[:, 0] = train_gaze[:, 0] / self.screen_width train_gaze[:, 1] = train_gaze[:, 1] / self.screen_height val_gaze[:, 0] = val_gaze[:, 0] / self.screen_width val_gaze[:, 1] = val_gaze[:, 1] / self.screen_height test_gaze[:, 0] = test_gaze[:, 0] / self.screen_width test_gaze[:, 1] = test_gaze[:, 1] / self.screen_height return (train_images, train_gaze), (val_images, val_gaze), (test_images, test_gaze) def resize_batch(self, images, target_size): """Resize a batch of images.""" resized = np.zeros((images.shape[0], target_size[0], target_size[1], images.shape[3])) for i in range(images.shape[0]): resized[i] = cv2.resize(images[i], (target_size[1], target_size[0])) return resized def load_tf_datasets(self, data_dir, batch_size=16): """Load TensorFlow datasets if available.""" data_dir = Path(data_dir) tf_dir = data_dir / 'tf_datasets' if not tf_dir.exists(): print("TensorFlow datasets not found, falling back to numpy arrays") return None print("Loading TensorFlow datasets...") # Load datasets train_ds = tf.data.Dataset.load(str(tf_dir / 'train')) val_ds = tf.data.Dataset.load(str(tf_dir / 'val')) test_ds = tf.data.Dataset.load(str(tf_dir / 'test')) # Get dataset info by examining first sample is_batched = False for sample in train_ds.take(1): if isinstance(sample, tuple) and len(sample) == 2: image_sample, gaze_sample = sample print(f"Sample shapes - Image: {image_sample.shape}, Gaze: {gaze_sample.shape}") # Check if already batched if len(image_sample.shape) == 4: # (batch, H, W, C) is_batched = True print("Dataset is already batched") else: print(f"Unexpected sample format: {type(sample)}") # Count samples - this might count batches if already batched train_size = sum(1 for _ in train_ds) val_size = sum(1 for _ in val_ds) test_size = sum(1 for _ in test_ds) print(f"Dataset sizes - Train: {train_size}, Val: {val_size}, Test: {test_size}") if is_batched: # Dataset is already batched, just preprocess def preprocess_batch(image_batch, gaze_batch): # Image batch shape: (batch, H, W, C) # Resize if needed if image_batch.shape[1] != self.input_shape[0] or image_batch.shape[2] != self.input_shape[1]: image_batch = tf.image.resize(image_batch, (self.input_shape[0], self.input_shape[1])) # Normalize to [0, 1] image_batch = tf.cast(image_batch, tf.float32) if tf.reduce_max(image_batch) > 1: image_batch = image_batch / 255.0 # Handle gaze coordinates gaze_batch = tf.cast(gaze_batch, tf.float32) # Normalize gaze coordinates if needed if tf.reduce_max(gaze_batch) > 1: gaze_batch = tf.stack([ gaze_batch[:, 0] / self.screen_width, gaze_batch[:, 1] / self.screen_height ], axis=1) return image_batch, gaze_batch # Apply preprocessing train_ds = train_ds.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE) val_ds = val_ds.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE) test_ds = test_ds.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE) else: # Dataset is not batched, preprocess individual samples def preprocess(image, gaze): # Resize if needed image = tf.image.resize(image, (self.input_shape[0], self.input_shape[1])) # Normalize to [0, 1] image = tf.cast(image, tf.float32) if tf.reduce_max(image) > 1: image = image / 255.0 # Handle gaze coordinates gaze = tf.cast(gaze, tf.float32) # Normalize gaze coordinates if needed if tf.reduce_max(gaze) > 1: gaze = tf.stack([ gaze[0] / self.screen_width, gaze[1] / self.screen_height ]) return image, gaze # Apply preprocessing and batching train_ds = train_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE) val_ds = val_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE) test_ds = test_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE) return train_ds, val_ds, test_ds def create_data_generator(self, images, gaze_coords, batch_size=16, augment=False): """Create a data generator from numpy arrays.""" def generator(): indices = np.arange(len(images)) while True: np.random.shuffle(indices) for i in range(0, len(indices), batch_size): batch_indices = indices[i:i + batch_size] batch_images = images[batch_indices].copy() batch_gaze = gaze_coords[batch_indices].copy() if augment: for j in range(len(batch_images)): # Random brightness if np.random.random() > 0.5: brightness = np.random.uniform(0.8, 1.2) batch_images[j] = np.clip(batch_images[j] * brightness, 0, 1) # Random contrast if np.random.random() > 0.5: contrast = np.random.uniform(0.8, 1.2) batch_images[j] = np.clip((batch_images[j] - 0.5) * contrast + 0.5, 0, 1) # Horizontal flip if np.random.random() > 0.5: batch_images[j] = np.fliplr(batch_images[j]) batch_gaze[j, 0] = 1.0 - batch_gaze[j, 0] yield batch_images, batch_gaze return generator() def train_model(self, train_data, val_data, batch_size=16, epochs=50): """Train the model with either numpy arrays or tf.data.""" # Compile model self.model.compile( optimizer=keras.optimizers.Adam(learning_rate=0.001), loss='mse', metrics=['mae'] ) # Callbacks callbacks = [ keras.callbacks.EarlyStopping( monitor='val_loss', patience=10, restore_best_weights=True, verbose=1 ), keras.callbacks.ReduceLROnPlateau( monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1 ), keras.callbacks.ModelCheckpoint( 'best_gaze_model.keras', monitor='val_loss', save_best_only=True, verbose=1 ) ] # Check if we have tf.data.Dataset or numpy arrays if isinstance(train_data, tf.data.Dataset): # Use tf.data directly history = self.model.fit( train_data, validation_data=val_data, epochs=epochs, callbacks=callbacks, verbose=1 ) else: # Use numpy arrays with generators train_images, train_gaze = train_data val_images, val_gaze = val_data train_gen = self.create_data_generator(train_images, train_gaze, batch_size, augment=True) val_gen = self.create_data_generator(val_images, val_gaze, batch_size, augment=False) train_steps = len(train_images) // batch_size val_steps = len(val_images) // batch_size history = self.model.fit( train_gen, steps_per_epoch=train_steps, validation_data=val_gen, validation_steps=val_steps, epochs=epochs, callbacks=callbacks, verbose=1 ) return history def evaluate_model(self, test_data, batch_size=16): """Evaluate the model.""" # Load best model self.model = keras.models.load_model('best_gaze_model.keras') if isinstance(test_data, tf.data.Dataset): # Evaluate with tf.data results = self.model.evaluate(test_data) # Get predictions for detailed analysis predictions = [] actuals = [] for batch in test_data: if isinstance(batch, tuple): batch_images, batch_gaze = batch else: batch_images = batch['image'] batch_gaze = batch['gaze'] pred = self.model.predict(batch_images, verbose=0) predictions.extend(pred) actuals.extend(batch_gaze.numpy()) else: # Evaluate with numpy arrays test_images, test_gaze = test_data results = self.model.evaluate(test_images, test_gaze, batch_size=batch_size) predictions = self.model.predict(test_images, batch_size=batch_size) actuals = test_gaze loss, mae = results print(f"\nTest Results:") print(f"Loss: {loss:.4f}") print(f"MAE (normalized): {mae:.4f}") print(f"Approximate pixel error: {mae * np.mean([self.screen_width, self.screen_height]):.2f} pixels") # Detailed error analysis predictions = np.array(predictions) actuals = np.array(actuals) # Denormalize pred_x = predictions[:, 0] * self.screen_width pred_y = predictions[:, 1] * self.screen_height actual_x = actuals[:, 0] * self.screen_width actual_y = actuals[:, 1] * self.screen_height # Calculate errors x_error = np.mean(np.abs(pred_x - actual_x)) y_error = np.mean(np.abs(pred_y - actual_y)) euclidean_error = np.mean(np.sqrt((pred_x - actual_x)**2 + (pred_y - actual_y)**2)) print(f"\nDetailed Error Analysis:") print(f"X-axis error: {x_error:.2f} pixels") print(f"Y-axis error: {y_error:.2f} pixels") print(f"Euclidean error: {euclidean_error:.2f} pixels") return predictions, actuals def visualize_predictions(model, test_images, test_gaze, predictions, num_samples=5): """Visualize some predictions.""" plt.figure(figsize=(15, 3 * num_samples)) # Random sample indices indices = np.random.choice(len(test_images), num_samples, replace=False) for i, idx in enumerate(indices): img = test_images[idx] actual = test_gaze[idx] pred = predictions[idx] # Denormalize coordinates actual_x = int(actual[0] * model.screen_width) actual_y = int(actual[1] * model.screen_height) pred_x = int(pred[0] * model.screen_width) pred_y = int(pred[1] * model.screen_height) # Create visualization plt.subplot(num_samples, 3, i*3 + 1) plt.imshow(img) plt.title('Webcam Image') plt.axis('off') # Create screen visualization screen = np.ones((model.screen_height // 10, model.screen_width // 10, 3)) # Draw actual gaze point (green) cv2.circle(screen, (actual_x // 10, actual_y // 10), 5, (0, 1, 0), -1) # Draw predicted gaze point (red) cv2.circle(screen, (pred_x // 10, pred_y // 10), 5, (1, 0, 0), -1) plt.subplot(num_samples, 3, i*3 + 2) plt.imshow(screen) plt.title(f'Gaze Points (Green: Actual, Red: Predicted)') plt.axis('off') # Error visualization error = np.sqrt((pred_x - actual_x)**2 + (pred_y - actual_y)**2) plt.subplot(num_samples, 3, i*3 + 3) plt.text(0.5, 0.5, f'Error: {error:.1f} pixels', horizontalalignment='center', verticalalignment='center', transform=plt.gca().transAxes, fontsize=14) plt.axis('off') plt.tight_layout() plt.savefig('prediction_examples.png', dpi=150) plt.close() def main(): # Configuration data_dir = "my_dataset" # Your dataset directory batch_size = 16 # Small batch size to save memory use_tf_datasets = True # Try to use TF datasets if available # Create model model = MemoryEfficientGazeModel() model.build_efficient_model() model.model.summary() # Try to load TF datasets first if use_tf_datasets: datasets = model.load_tf_datasets(data_dir, batch_size) if datasets: train_data, val_data, test_data = datasets else: use_tf_datasets = False # Fall back to numpy arrays if not use_tf_datasets: (train_data, val_data, test_data) = model.load_numpy_arrays(data_dir) # Train model print("\nTraining model...") history = model.train_model(train_data, val_data, batch_size=batch_size, epochs=50) # Plot training history plt.figure(figsize=(12, 4)) plt.subplot(1, 2, 1) plt.plot(history.history['loss'], label='Training Loss') plt.plot(history.history['val_loss'], label='Validation Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.title('Model Loss') plt.legend() plt.subplot(1, 2, 2) plt.plot(history.history['mae'], label='Training MAE') plt.plot(history.history['val_mae'], label='Validation MAE') plt.xlabel('Epoch') plt.ylabel('MAE') plt.title('Model MAE') plt.legend() plt.tight_layout() plt.savefig('training_history_efficient.png') plt.close() # Evaluate model print("\nEvaluating model...") predictions, actuals = model.evaluate_model(test_data, batch_size=batch_size) # Visualize predictions if not use_tf_datasets: test_images, test_gaze = test_data print("\nGenerating prediction visualizations...") visualize_predictions(model, test_images, test_gaze, predictions) # Convert to TFLite print("\nConverting to TFLite...") converter = tf.lite.TFLiteConverter.from_keras_model(model.model) converter.optimizations = [tf.lite.Optimize.DEFAULT] # Representative dataset for quantization if not use_tf_datasets: train_images, _ = train_data def representative_dataset(): for i in range(min(100, len(train_images))): yield [np.expand_dims(train_images[i], axis=0).astype(np.float32)] converter.representative_dataset = representative_dataset converter.target_spec.supported_ops = [ tf.lite.OpsSet.TFLITE_BUILTINS_INT8, tf.lite.OpsSet.TFLITE_BUILTINS ] tflite_model = converter.convert() with open('gaze_model_efficient.tflite', 'wb') as f: f.write(tflite_model) print("TFLite model saved") # Save model config model_config = { 'input_shape': model.input_shape, 'screen_width': model.screen_width, 'screen_height': model.screen_height, 'model_version': '1.0', 'training_date': str(np.datetime64('today')) } with open('model_config.json', 'w') as f: json.dump(model_config, f, indent=2) print("\nTraining complete!") # Clear memory del model gc.collect() if __name__ == "__main__": main()