#!/usr/bin/env python3 """ data_preparation.py Prepares training data for MorphGuard's detector and demorpher models. This script performs: 1. Dataset validation and splitting 2. Image preprocessing (resizing, normalization, augmentation) 3. Creation of training, validation, and test sets 4. Generation of synthetic morphed images for training if needed Usage: python scripts/data_preparation.py --data-dir /path/to/data --output-dir /path/to/output [options] """ import os import sys import argparse import json import random import shutil import hashlib from typing import Dict, List, Tuple, Optional from datetime import datetime from pathlib import Path import cv2 import numpy as np from tqdm import tqdm from PIL import Image # Add project root to path for imports sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) def create_dataset_splits(data_dir: str, output_dir: str, split_ratio: List[float] = [0.7, 0.15, 0.15]): """ Create train/val/test splits from a dataset Args: data_dir: Path to data directory with 'real' and 'morph' subdirectories output_dir: Path to output directory for processed data split_ratio: Train/val/test split ratios (must sum to 1.0) Returns: Dict with dataset statistics """ if sum(split_ratio) != 1.0: raise ValueError(f"Split ratios must sum to 1.0, got {sum(split_ratio)}") # Create output directories os.makedirs(output_dir, exist_ok=True) for split in ['train', 'val', 'test']: os.makedirs(os.path.join(output_dir, split, 'real'), exist_ok=True) os.makedirs(os.path.join(output_dir, split, 'morph'), exist_ok=True) # Collect files real_files = [f for f in os.listdir(os.path.join(data_dir, 'real')) if f.lower().endswith(('.jpg', '.jpeg', '.png'))] morph_files = [f for f in os.listdir(os.path.join(data_dir, 'morph')) if f.lower().endswith(('.jpg', '.jpeg', '.png'))] # Shuffle files random.shuffle(real_files) random.shuffle(morph_files) # Calculate splits real_train_size = int(len(real_files) * split_ratio[0]) real_val_size = int(len(real_files) * split_ratio[1]) morph_train_size = int(len(morph_files) * split_ratio[0]) morph_val_size = int(len(morph_files) * split_ratio[1]) # Split files real_splits = { 'train': real_files[:real_train_size], 'val': real_files[real_train_size:real_train_size + real_val_size], 'test': real_files[real_train_size + real_val_size:] } morph_splits = { 'train': morph_files[:morph_train_size], 'val': morph_files[morph_train_size:morph_train_size + morph_val_size], 'test': morph_files[morph_train_size + morph_val_size:] } # Copy files to output directories stats = {'real': {}, 'morph': {}} for category, splits in [('real', real_splits), ('morph', morph_splits)]: for split, files in splits.items(): stats[category][split] = len(files) print(f"Copying {len(files)} {category} images to {split} set...") for file in tqdm(files): src = os.path.join(data_dir, category, file) dst = os.path.join(output_dir, split, category, file) shutil.copy2(src, dst) # Save dataset stats stats_file = os.path.join(output_dir, 'dataset_stats.json') with open(stats_file, 'w') as f: json.dump({ 'real': { 'train': len(real_splits['train']), 'val': len(real_splits['val']), 'test': len(real_splits['test']), 'total': len(real_files) }, 'morph': { 'train': len(morph_splits['train']), 'val': len(morph_splits['val']), 'test': len(morph_splits['test']), 'total': len(morph_files) }, 'created_at': datetime.now().isoformat() }, f, indent=2) print(f"Dataset splits created successfully and saved to {output_dir}") print(f"Stats file: {stats_file}") return stats def preprocess_images(data_dir: str, output_dir: str, image_size: int = 224, normalize: bool = True): """ Preprocess images (resize, normalize) and save to output directory Args: data_dir: Path to data directory with train/val/test splits output_dir: Path to output directory for processed data image_size: Target image size for models normalize: Whether to normalize pixel values to [-1, 1] """ os.makedirs(output_dir, exist_ok=True) # Create output directories matching input structure for root, dirs, files in os.walk(data_dir): rel_path = os.path.relpath(root, data_dir) if rel_path != '.': os.makedirs(os.path.join(output_dir, rel_path), exist_ok=True) # Process all images total_images = 0 processed_images = 0 for root, dirs, files in os.walk(data_dir): image_files = [f for f in files if f.lower().endswith(('.jpg', '.jpeg', '.png'))] total_images += len(image_files) if not image_files: continue rel_path = os.path.relpath(root, data_dir) out_dir = os.path.join(output_dir, rel_path) print(f"Processing {len(image_files)} images in {rel_path}...") for file in tqdm(image_files): try: # Read image img_path = os.path.join(root, file) img = cv2.imread(img_path) if img is None: print(f"Warning: Could not read {img_path}") continue # Convert BGR to RGB img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # Resize to target size img = cv2.resize(img, (image_size, image_size), interpolation=cv2.INTER_AREA) # Normalize if requested if normalize: img = img.astype(np.float32) / 127.5 - 1.0 # Save as numpy array out_path = os.path.join(out_dir, os.path.splitext(file)[0] + '.npy') np.save(out_path, img) processed_images += 1 except Exception as e: print(f"Error processing {img_path}: {e}") print(f"Preprocessed {processed_images}/{total_images} images") print(f"Saved to {output_dir}") def generate_synthetic_morphs(data_dir: str, output_dir: str, num_pairs: int = 1000, alpha: float = 0.5): """ Generate synthetic morphed face images for training Args: data_dir: Path to data directory with train/val/test splits and 'real' subdirectories output_dir: Path to output directory for synthetic morphs num_pairs: Number of morphed pairs to generate alpha: Blending factor (0.5 for equal blending) """ # Create output directories os.makedirs(output_dir, exist_ok=True) train_dir = os.path.join(output_dir, 'train', 'morph') val_dir = os.path.join(output_dir, 'val', 'morph') os.makedirs(train_dir, exist_ok=True) os.makedirs(val_dir, exist_ok=True) # Get list of real images from training set train_real_dir = os.path.join(data_dir, 'train', 'real') real_files = [f for f in os.listdir(train_real_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))] if len(real_files) < 10: raise ValueError(f"Not enough real images (found {len(real_files)}, need at least 10)") # Generate pairs pairs = [] for _ in range(num_pairs): # Select two different random images img1, img2 = random.sample(real_files, 2) pairs.append((img1, img2)) # Split into train/val (90/10) train_size = int(len(pairs) * 0.9) train_pairs = pairs[:train_size] val_pairs = pairs[train_size:] # Generate morphs morph_metadata = [] print(f"Generating {len(train_pairs)} training morphs...") for i, (img1_file, img2_file) in enumerate(tqdm(train_pairs)): try: # Load images img1_path = os.path.join(train_real_dir, img1_file) img2_path = os.path.join(train_real_dir, img2_file) img1 = cv2.imread(img1_path) img2 = cv2.imread(img2_path) if img1 is None or img2 is None: continue # Ensure same size img1 = cv2.resize(img1, (224, 224)) img2 = cv2.resize(img2, (224, 224)) # Simple alpha blending for demonstration # In a real system, you'd use face landmarks and warping morphed = cv2.addWeighted(img1, alpha, img2, 1-alpha, 0) # Save morphed image morph_file = f"synth_morph_{i:04d}.jpg" morph_path = os.path.join(train_dir, morph_file) cv2.imwrite(morph_path, morphed) # Add metadata morph_metadata.append({ 'morph_file': morph_file, 'source_files': [img1_file, img2_file], 'alpha': alpha, 'split': 'train' }) except Exception as e: print(f"Error generating morph from {img1_file} and {img2_file}: {e}") print(f"Generating {len(val_pairs)} validation morphs...") for i, (img1_file, img2_file) in enumerate(tqdm(val_pairs)): try: # Load images img1_path = os.path.join(train_real_dir, img1_file) img2_path = os.path.join(train_real_dir, img2_file) img1 = cv2.imread(img1_path) img2 = cv2.imread(img2_path) if img1 is None or img2 is None: continue # Ensure same size img1 = cv2.resize(img1, (224, 224)) img2 = cv2.resize(img2, (224, 224)) # Simple alpha blending morphed = cv2.addWeighted(img1, alpha, img2, 1-alpha, 0) # Save morphed image morph_file = f"synth_morph_val_{i:04d}.jpg" morph_path = os.path.join(val_dir, morph_file) cv2.imwrite(morph_path, morphed) # Add metadata morph_metadata.append({ 'morph_file': morph_file, 'source_files': [img1_file, img2_file], 'alpha': alpha, 'split': 'val' }) except Exception as e: print(f"Error generating morph from {img1_file} and {img2_file}: {e}") # Save metadata meta_file = os.path.join(output_dir, 'morph_metadata.json') with open(meta_file, 'w') as f: json.dump({ 'morphs': morph_metadata, 'created_at': datetime.now().isoformat(), 'parameters': { 'alpha': alpha, 'num_pairs': num_pairs } }, f, indent=2) print(f"Generated {len(morph_metadata)} synthetic morphs") print(f"Metadata saved to {meta_file}") def main(): parser = argparse.ArgumentParser(description='Prepare data for MorphGuard training') parser.add_argument('--data-dir', required=True, help='Path to input data directory') parser.add_argument('--output-dir', required=True, help='Path to output data directory') parser.add_argument('--image-size', type=int, default=224, help='Target image size (default: 224)') parser.add_argument('--normalize', action='store_true', help='Normalize pixel values to [-1, 1]') parser.add_argument('--gen-morphs', action='store_true', help='Generate synthetic morphs') parser.add_argument('--num-morphs', type=int, default=1000, help='Number of synthetic morphs to generate') parser.add_argument('--train-val-test', type=float, nargs=3, default=[0.7, 0.15, 0.15], help='Train/val/test split ratios (default: 0.7 0.15 0.15)') parser.add_argument('--skip-splits', action='store_true', help='Skip dataset splitting step') parser.add_argument('--skip-preprocess', action='store_true', help='Skip image preprocessing step') args = parser.parse_args() # Validate arguments if not os.path.exists(args.data_dir): print(f"Error: Data directory {args.data_dir} does not exist") sys.exit(1) # Check if the data directory has the expected structure required_dirs = ['real', 'morph'] if not args.skip_splits: for d in required_dirs: if not os.path.exists(os.path.join(args.data_dir, d)): print(f"Error: Expected directory {d} not found in {args.data_dir}") print("Data directory should contain 'real' and 'morph' subdirectories") sys.exit(1) # Create output directories os.makedirs(args.output_dir, exist_ok=True) # Save run configuration config_file = os.path.join(args.output_dir, 'data_prep_config.json') with open(config_file, 'w') as f: json.dump(vars(args), f, indent=2) # Process steps if not args.skip_splits: print("\n--- Creating dataset splits ---") stats = create_dataset_splits(args.data_dir, args.output_dir, args.train_val_test) if not args.skip_preprocess: print("\n--- Preprocessing images ---") preprocess_images( args.output_dir if not args.skip_splits else args.data_dir, os.path.join(args.output_dir, 'processed'), args.image_size, args.normalize ) if args.gen_morphs: print("\n--- Generating synthetic morphs ---") generate_synthetic_morphs( args.output_dir, os.path.join(args.output_dir, 'synthetic'), args.num_morphs ) print("\nData preparation completed successfully.") if __name__ == "__main__": main()