| | import tensorflow as tf |
| | from tensorflow import keras |
| | from tensorflow.keras import layers |
| | import numpy as np |
| | import json |
| | import cv2 |
| | import os |
| | import matplotlib.pyplot as plt |
| | import gc |
| | from pathlib import Path |
| |
|
| | |
| | gpus = tf.config.experimental.list_physical_devices('GPU') |
| | if gpus: |
| | try: |
| | for gpu in gpus: |
| | tf.config.experimental.set_memory_growth(gpu, True) |
| | except RuntimeError as e: |
| | print(e) |
| |
|
| | class MemoryEfficientGazeModel: |
| | def __init__(self, input_shape=(60, 80, 3), screen_width=1920, screen_height=1080): |
| | self.input_shape = input_shape |
| | self.screen_width = screen_width |
| | self.screen_height = screen_height |
| | self.model = None |
| | |
| | def build_efficient_model(self): |
| | """Build a memory-efficient CNN model.""" |
| | inputs = keras.Input(shape=self.input_shape) |
| | |
| | |
| | x = layers.Conv2D(16, (3, 3), activation='relu', padding='same')(inputs) |
| | x = layers.MaxPooling2D((2, 2))(x) |
| | x = layers.BatchNormalization()(x) |
| | |
| | x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x) |
| | x = layers.MaxPooling2D((2, 2))(x) |
| | x = layers.BatchNormalization()(x) |
| | |
| | x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x) |
| | x = layers.MaxPooling2D((2, 2))(x) |
| | x = layers.BatchNormalization()(x) |
| | |
| | x = layers.GlobalAveragePooling2D()(x) |
| | x = layers.Dense(64, activation='relu')(x) |
| | x = layers.Dropout(0.3)(x) |
| | |
| | outputs = layers.Dense(2, activation='sigmoid')(x) |
| | |
| | self.model = keras.Model(inputs, outputs) |
| | return self.model |
| | |
| | def load_numpy_arrays(self, data_dir): |
| | """Load data from numpy arrays.""" |
| | data_dir = Path(data_dir) |
| | arrays_dir = data_dir / 'arrays' |
| | |
| | |
| | with open(arrays_dir / 'metadata.json', 'r') as f: |
| | metadata = json.load(f) |
| | |
| | |
| | self.screen_width = metadata.get('screen_width', 1920) |
| | self.screen_height = metadata.get('screen_height', 1080) |
| | |
| | print(f"Screen dimensions: {self.screen_width}x{self.screen_height}") |
| | |
| | |
| | print("Loading numpy arrays...") |
| | train_images = np.load(arrays_dir / 'train_images.npy') |
| | train_gaze = np.load(arrays_dir / 'train_gaze.npy') |
| | val_images = np.load(arrays_dir / 'val_images.npy') |
| | val_gaze = np.load(arrays_dir / 'val_gaze.npy') |
| | test_images = np.load(arrays_dir / 'test_images.npy') |
| | test_gaze = np.load(arrays_dir / 'test_gaze.npy') |
| | |
| | print(f"\nData shapes:") |
| | print(f"Training: {train_images.shape}, {train_gaze.shape}") |
| | print(f"Validation: {val_images.shape}, {val_gaze.shape}") |
| | print(f"Test: {test_images.shape}, {test_gaze.shape}") |
| | |
| | |
| | if train_images.shape[1:3] != self.input_shape[:2]: |
| | print(f"\nResizing images from {train_images.shape[1:3]} to {self.input_shape[:2]}") |
| | train_images = self.resize_batch(train_images, self.input_shape[:2]) |
| | val_images = self.resize_batch(val_images, self.input_shape[:2]) |
| | test_images = self.resize_batch(test_images, self.input_shape[:2]) |
| | |
| | |
| | if train_images.max() > 1: |
| | print("Normalizing images to [0, 1]") |
| | train_images = train_images.astype('float32') / 255.0 |
| | val_images = val_images.astype('float32') / 255.0 |
| | test_images = test_images.astype('float32') / 255.0 |
| | |
| | |
| | if train_gaze.max() > 1: |
| | print("Normalizing gaze coordinates") |
| | train_gaze[:, 0] = train_gaze[:, 0] / self.screen_width |
| | train_gaze[:, 1] = train_gaze[:, 1] / self.screen_height |
| | val_gaze[:, 0] = val_gaze[:, 0] / self.screen_width |
| | val_gaze[:, 1] = val_gaze[:, 1] / self.screen_height |
| | test_gaze[:, 0] = test_gaze[:, 0] / self.screen_width |
| | test_gaze[:, 1] = test_gaze[:, 1] / self.screen_height |
| | |
| | return (train_images, train_gaze), (val_images, val_gaze), (test_images, test_gaze) |
| | |
| | def resize_batch(self, images, target_size): |
| | """Resize a batch of images.""" |
| | resized = np.zeros((images.shape[0], target_size[0], target_size[1], images.shape[3])) |
| | for i in range(images.shape[0]): |
| | resized[i] = cv2.resize(images[i], (target_size[1], target_size[0])) |
| | return resized |
| | |
| | def load_tf_datasets(self, data_dir, batch_size=16): |
| | """Load TensorFlow datasets if available.""" |
| | data_dir = Path(data_dir) |
| | tf_dir = data_dir / 'tf_datasets' |
| | |
| | if not tf_dir.exists(): |
| | print("TensorFlow datasets not found, falling back to numpy arrays") |
| | return None |
| | |
| | print("Loading TensorFlow datasets...") |
| | |
| | |
| | train_ds = tf.data.Dataset.load(str(tf_dir / 'train')) |
| | val_ds = tf.data.Dataset.load(str(tf_dir / 'val')) |
| | test_ds = tf.data.Dataset.load(str(tf_dir / 'test')) |
| | |
| | |
| | is_batched = False |
| | for sample in train_ds.take(1): |
| | if isinstance(sample, tuple) and len(sample) == 2: |
| | image_sample, gaze_sample = sample |
| | print(f"Sample shapes - Image: {image_sample.shape}, Gaze: {gaze_sample.shape}") |
| | |
| | if len(image_sample.shape) == 4: |
| | is_batched = True |
| | print("Dataset is already batched") |
| | else: |
| | print(f"Unexpected sample format: {type(sample)}") |
| | |
| | |
| | train_size = sum(1 for _ in train_ds) |
| | val_size = sum(1 for _ in val_ds) |
| | test_size = sum(1 for _ in test_ds) |
| | |
| | print(f"Dataset sizes - Train: {train_size}, Val: {val_size}, Test: {test_size}") |
| | |
| | if is_batched: |
| | |
| | def preprocess_batch(image_batch, gaze_batch): |
| | |
| | |
| | if image_batch.shape[1] != self.input_shape[0] or image_batch.shape[2] != self.input_shape[1]: |
| | image_batch = tf.image.resize(image_batch, (self.input_shape[0], self.input_shape[1])) |
| | |
| | |
| | image_batch = tf.cast(image_batch, tf.float32) |
| | if tf.reduce_max(image_batch) > 1: |
| | image_batch = image_batch / 255.0 |
| | |
| | |
| | gaze_batch = tf.cast(gaze_batch, tf.float32) |
| | |
| | |
| | if tf.reduce_max(gaze_batch) > 1: |
| | gaze_batch = tf.stack([ |
| | gaze_batch[:, 0] / self.screen_width, |
| | gaze_batch[:, 1] / self.screen_height |
| | ], axis=1) |
| | |
| | return image_batch, gaze_batch |
| | |
| | |
| | train_ds = train_ds.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE) |
| | val_ds = val_ds.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE) |
| | test_ds = test_ds.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE) |
| | else: |
| | |
| | def preprocess(image, gaze): |
| | |
| | image = tf.image.resize(image, (self.input_shape[0], self.input_shape[1])) |
| | |
| | |
| | image = tf.cast(image, tf.float32) |
| | if tf.reduce_max(image) > 1: |
| | image = image / 255.0 |
| | |
| | |
| | gaze = tf.cast(gaze, tf.float32) |
| | |
| | |
| | if tf.reduce_max(gaze) > 1: |
| | gaze = tf.stack([ |
| | gaze[0] / self.screen_width, |
| | gaze[1] / self.screen_height |
| | ]) |
| | |
| | return image, gaze |
| | |
| | |
| | train_ds = train_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE) |
| | val_ds = val_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE) |
| | test_ds = test_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE).batch(batch_size).prefetch(tf.data.AUTOTUNE) |
| | |
| | return train_ds, val_ds, test_ds |
| | |
| | def create_data_generator(self, images, gaze_coords, batch_size=16, augment=False): |
| | """Create a data generator from numpy arrays.""" |
| | def generator(): |
| | indices = np.arange(len(images)) |
| | while True: |
| | np.random.shuffle(indices) |
| | for i in range(0, len(indices), batch_size): |
| | batch_indices = indices[i:i + batch_size] |
| | batch_images = images[batch_indices].copy() |
| | batch_gaze = gaze_coords[batch_indices].copy() |
| | |
| | if augment: |
| | for j in range(len(batch_images)): |
| | |
| | if np.random.random() > 0.5: |
| | brightness = np.random.uniform(0.8, 1.2) |
| | batch_images[j] = np.clip(batch_images[j] * brightness, 0, 1) |
| | |
| | |
| | if np.random.random() > 0.5: |
| | contrast = np.random.uniform(0.8, 1.2) |
| | batch_images[j] = np.clip((batch_images[j] - 0.5) * contrast + 0.5, 0, 1) |
| | |
| | |
| | if np.random.random() > 0.5: |
| | batch_images[j] = np.fliplr(batch_images[j]) |
| | batch_gaze[j, 0] = 1.0 - batch_gaze[j, 0] |
| | |
| | yield batch_images, batch_gaze |
| | |
| | return generator() |
| | |
| | def train_model(self, train_data, val_data, batch_size=16, epochs=50): |
| | """Train the model with either numpy arrays or tf.data.""" |
| | |
| | self.model.compile( |
| | optimizer=keras.optimizers.Adam(learning_rate=0.001), |
| | loss='mse', |
| | metrics=['mae'] |
| | ) |
| | |
| | |
| | callbacks = [ |
| | keras.callbacks.EarlyStopping( |
| | monitor='val_loss', |
| | patience=10, |
| | restore_best_weights=True, |
| | verbose=1 |
| | ), |
| | keras.callbacks.ReduceLROnPlateau( |
| | monitor='val_loss', |
| | factor=0.5, |
| | patience=5, |
| | min_lr=1e-6, |
| | verbose=1 |
| | ), |
| | keras.callbacks.ModelCheckpoint( |
| | 'best_gaze_model.keras', |
| | monitor='val_loss', |
| | save_best_only=True, |
| | verbose=1 |
| | ) |
| | ] |
| | |
| | |
| | if isinstance(train_data, tf.data.Dataset): |
| | |
| | history = self.model.fit( |
| | train_data, |
| | validation_data=val_data, |
| | epochs=epochs, |
| | callbacks=callbacks, |
| | verbose=1 |
| | ) |
| | else: |
| | |
| | train_images, train_gaze = train_data |
| | val_images, val_gaze = val_data |
| | |
| | train_gen = self.create_data_generator(train_images, train_gaze, batch_size, augment=True) |
| | val_gen = self.create_data_generator(val_images, val_gaze, batch_size, augment=False) |
| | |
| | train_steps = len(train_images) // batch_size |
| | val_steps = len(val_images) // batch_size |
| | |
| | history = self.model.fit( |
| | train_gen, |
| | steps_per_epoch=train_steps, |
| | validation_data=val_gen, |
| | validation_steps=val_steps, |
| | epochs=epochs, |
| | callbacks=callbacks, |
| | verbose=1 |
| | ) |
| | |
| | return history |
| | |
| | def evaluate_model(self, test_data, batch_size=16): |
| | """Evaluate the model.""" |
| | |
| | self.model = keras.models.load_model('best_gaze_model.keras') |
| | |
| | if isinstance(test_data, tf.data.Dataset): |
| | |
| | results = self.model.evaluate(test_data) |
| | |
| | |
| | predictions = [] |
| | actuals = [] |
| | for batch in test_data: |
| | if isinstance(batch, tuple): |
| | batch_images, batch_gaze = batch |
| | else: |
| | batch_images = batch['image'] |
| | batch_gaze = batch['gaze'] |
| | |
| | pred = self.model.predict(batch_images, verbose=0) |
| | predictions.extend(pred) |
| | actuals.extend(batch_gaze.numpy()) |
| | else: |
| | |
| | test_images, test_gaze = test_data |
| | results = self.model.evaluate(test_images, test_gaze, batch_size=batch_size) |
| | |
| | predictions = self.model.predict(test_images, batch_size=batch_size) |
| | actuals = test_gaze |
| | |
| | loss, mae = results |
| | |
| | print(f"\nTest Results:") |
| | print(f"Loss: {loss:.4f}") |
| | print(f"MAE (normalized): {mae:.4f}") |
| | print(f"Approximate pixel error: {mae * np.mean([self.screen_width, self.screen_height]):.2f} pixels") |
| | |
| | |
| | predictions = np.array(predictions) |
| | actuals = np.array(actuals) |
| | |
| | |
| | pred_x = predictions[:, 0] * self.screen_width |
| | pred_y = predictions[:, 1] * self.screen_height |
| | actual_x = actuals[:, 0] * self.screen_width |
| | actual_y = actuals[:, 1] * self.screen_height |
| | |
| | |
| | x_error = np.mean(np.abs(pred_x - actual_x)) |
| | y_error = np.mean(np.abs(pred_y - actual_y)) |
| | euclidean_error = np.mean(np.sqrt((pred_x - actual_x)**2 + (pred_y - actual_y)**2)) |
| | |
| | print(f"\nDetailed Error Analysis:") |
| | print(f"X-axis error: {x_error:.2f} pixels") |
| | print(f"Y-axis error: {y_error:.2f} pixels") |
| | print(f"Euclidean error: {euclidean_error:.2f} pixels") |
| | |
| | return predictions, actuals |
| |
|
| | def visualize_predictions(model, test_images, test_gaze, predictions, num_samples=5): |
| | """Visualize some predictions.""" |
| | plt.figure(figsize=(15, 3 * num_samples)) |
| | |
| | |
| | indices = np.random.choice(len(test_images), num_samples, replace=False) |
| | |
| | for i, idx in enumerate(indices): |
| | img = test_images[idx] |
| | actual = test_gaze[idx] |
| | pred = predictions[idx] |
| | |
| | |
| | actual_x = int(actual[0] * model.screen_width) |
| | actual_y = int(actual[1] * model.screen_height) |
| | pred_x = int(pred[0] * model.screen_width) |
| | pred_y = int(pred[1] * model.screen_height) |
| | |
| | |
| | plt.subplot(num_samples, 3, i*3 + 1) |
| | plt.imshow(img) |
| | plt.title('Webcam Image') |
| | plt.axis('off') |
| | |
| | |
| | screen = np.ones((model.screen_height // 10, model.screen_width // 10, 3)) |
| | |
| | |
| | cv2.circle(screen, (actual_x // 10, actual_y // 10), 5, (0, 1, 0), -1) |
| | |
| | |
| | cv2.circle(screen, (pred_x // 10, pred_y // 10), 5, (1, 0, 0), -1) |
| | |
| | plt.subplot(num_samples, 3, i*3 + 2) |
| | plt.imshow(screen) |
| | plt.title(f'Gaze Points (Green: Actual, Red: Predicted)') |
| | plt.axis('off') |
| | |
| | |
| | error = np.sqrt((pred_x - actual_x)**2 + (pred_y - actual_y)**2) |
| | plt.subplot(num_samples, 3, i*3 + 3) |
| | plt.text(0.5, 0.5, f'Error: {error:.1f} pixels', |
| | horizontalalignment='center', verticalalignment='center', |
| | transform=plt.gca().transAxes, fontsize=14) |
| | plt.axis('off') |
| | |
| | plt.tight_layout() |
| | plt.savefig('prediction_examples.png', dpi=150) |
| | plt.close() |
| |
|
| | def main(): |
| | |
| | data_dir = "my_dataset" |
| | batch_size = 16 |
| | use_tf_datasets = True |
| | |
| | |
| | model = MemoryEfficientGazeModel() |
| | model.build_efficient_model() |
| | model.model.summary() |
| | |
| | |
| | if use_tf_datasets: |
| | datasets = model.load_tf_datasets(data_dir, batch_size) |
| | if datasets: |
| | train_data, val_data, test_data = datasets |
| | else: |
| | use_tf_datasets = False |
| | |
| | |
| | if not use_tf_datasets: |
| | (train_data, val_data, test_data) = model.load_numpy_arrays(data_dir) |
| | |
| | |
| | print("\nTraining model...") |
| | history = model.train_model(train_data, val_data, batch_size=batch_size, epochs=50) |
| | |
| | |
| | plt.figure(figsize=(12, 4)) |
| | |
| | plt.subplot(1, 2, 1) |
| | plt.plot(history.history['loss'], label='Training Loss') |
| | plt.plot(history.history['val_loss'], label='Validation Loss') |
| | plt.xlabel('Epoch') |
| | plt.ylabel('Loss') |
| | plt.title('Model Loss') |
| | plt.legend() |
| | |
| | plt.subplot(1, 2, 2) |
| | plt.plot(history.history['mae'], label='Training MAE') |
| | plt.plot(history.history['val_mae'], label='Validation MAE') |
| | plt.xlabel('Epoch') |
| | plt.ylabel('MAE') |
| | plt.title('Model MAE') |
| | plt.legend() |
| | |
| | plt.tight_layout() |
| | plt.savefig('training_history_efficient.png') |
| | plt.close() |
| | |
| | |
| | print("\nEvaluating model...") |
| | predictions, actuals = model.evaluate_model(test_data, batch_size=batch_size) |
| | |
| | |
| | if not use_tf_datasets: |
| | test_images, test_gaze = test_data |
| | print("\nGenerating prediction visualizations...") |
| | visualize_predictions(model, test_images, test_gaze, predictions) |
| | |
| | |
| | print("\nConverting to TFLite...") |
| | converter = tf.lite.TFLiteConverter.from_keras_model(model.model) |
| | converter.optimizations = [tf.lite.Optimize.DEFAULT] |
| | |
| | |
| | if not use_tf_datasets: |
| | train_images, _ = train_data |
| | def representative_dataset(): |
| | for i in range(min(100, len(train_images))): |
| | yield [np.expand_dims(train_images[i], axis=0).astype(np.float32)] |
| | |
| | converter.representative_dataset = representative_dataset |
| | converter.target_spec.supported_ops = [ |
| | tf.lite.OpsSet.TFLITE_BUILTINS_INT8, |
| | tf.lite.OpsSet.TFLITE_BUILTINS |
| | ] |
| | |
| | tflite_model = converter.convert() |
| | |
| | with open('gaze_model_efficient.tflite', 'wb') as f: |
| | f.write(tflite_model) |
| | print("TFLite model saved") |
| | |
| | |
| | model_config = { |
| | 'input_shape': model.input_shape, |
| | 'screen_width': model.screen_width, |
| | 'screen_height': model.screen_height, |
| | 'model_version': '1.0', |
| | 'training_date': str(np.datetime64('today')) |
| | } |
| | |
| | with open('model_config.json', 'w') as f: |
| | json.dump(model_config, f, indent=2) |
| | |
| | print("\nTraining complete!") |
| | |
| | |
| | del model |
| | gc.collect() |
| |
|
| | if __name__ == "__main__": |
| | main() |