Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| data_preparation.py | |
| Prepares training data for MorphGuard's detector and demorpher models. | |
| This script performs: | |
| 1. Dataset validation and splitting | |
| 2. Image preprocessing (resizing, normalization, augmentation) | |
| 3. Creation of training, validation, and test sets | |
| 4. Generation of synthetic morphed images for training if needed | |
| Usage: | |
| python scripts/data_preparation.py --data-dir /path/to/data --output-dir /path/to/output [options] | |
| """ | |
| import os | |
| import sys | |
| import argparse | |
| import json | |
| import random | |
| import shutil | |
| import hashlib | |
| from typing import Dict, List, Tuple, Optional | |
| from datetime import datetime | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| from tqdm import tqdm | |
| from PIL import Image | |
| # Add project root to path for imports | |
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) | |
| def create_dataset_splits(data_dir: str, output_dir: str, split_ratio: List[float] = [0.7, 0.15, 0.15]): | |
| """ | |
| Create train/val/test splits from a dataset | |
| Args: | |
| data_dir: Path to data directory with 'real' and 'morph' subdirectories | |
| output_dir: Path to output directory for processed data | |
| split_ratio: Train/val/test split ratios (must sum to 1.0) | |
| Returns: | |
| Dict with dataset statistics | |
| """ | |
| if sum(split_ratio) != 1.0: | |
| raise ValueError(f"Split ratios must sum to 1.0, got {sum(split_ratio)}") | |
| # Create output directories | |
| os.makedirs(output_dir, exist_ok=True) | |
| for split in ['train', 'val', 'test']: | |
| os.makedirs(os.path.join(output_dir, split, 'real'), exist_ok=True) | |
| os.makedirs(os.path.join(output_dir, split, 'morph'), exist_ok=True) | |
| # Collect files | |
| real_files = [f for f in os.listdir(os.path.join(data_dir, 'real')) | |
| if f.lower().endswith(('.jpg', '.jpeg', '.png'))] | |
| morph_files = [f for f in os.listdir(os.path.join(data_dir, 'morph')) | |
| if f.lower().endswith(('.jpg', '.jpeg', '.png'))] | |
| # Shuffle files | |
| random.shuffle(real_files) | |
| random.shuffle(morph_files) | |
| # Calculate splits | |
| real_train_size = int(len(real_files) * split_ratio[0]) | |
| real_val_size = int(len(real_files) * split_ratio[1]) | |
| morph_train_size = int(len(morph_files) * split_ratio[0]) | |
| morph_val_size = int(len(morph_files) * split_ratio[1]) | |
| # Split files | |
| real_splits = { | |
| 'train': real_files[:real_train_size], | |
| 'val': real_files[real_train_size:real_train_size + real_val_size], | |
| 'test': real_files[real_train_size + real_val_size:] | |
| } | |
| morph_splits = { | |
| 'train': morph_files[:morph_train_size], | |
| 'val': morph_files[morph_train_size:morph_train_size + morph_val_size], | |
| 'test': morph_files[morph_train_size + morph_val_size:] | |
| } | |
| # Copy files to output directories | |
| stats = {'real': {}, 'morph': {}} | |
| for category, splits in [('real', real_splits), ('morph', morph_splits)]: | |
| for split, files in splits.items(): | |
| stats[category][split] = len(files) | |
| print(f"Copying {len(files)} {category} images to {split} set...") | |
| for file in tqdm(files): | |
| src = os.path.join(data_dir, category, file) | |
| dst = os.path.join(output_dir, split, category, file) | |
| shutil.copy2(src, dst) | |
| # Save dataset stats | |
| stats_file = os.path.join(output_dir, 'dataset_stats.json') | |
| with open(stats_file, 'w') as f: | |
| json.dump({ | |
| 'real': { | |
| 'train': len(real_splits['train']), | |
| 'val': len(real_splits['val']), | |
| 'test': len(real_splits['test']), | |
| 'total': len(real_files) | |
| }, | |
| 'morph': { | |
| 'train': len(morph_splits['train']), | |
| 'val': len(morph_splits['val']), | |
| 'test': len(morph_splits['test']), | |
| 'total': len(morph_files) | |
| }, | |
| 'created_at': datetime.now().isoformat() | |
| }, f, indent=2) | |
| print(f"Dataset splits created successfully and saved to {output_dir}") | |
| print(f"Stats file: {stats_file}") | |
| return stats | |
| def preprocess_images(data_dir: str, output_dir: str, image_size: int = 224, normalize: bool = True): | |
| """ | |
| Preprocess images (resize, normalize) and save to output directory | |
| Args: | |
| data_dir: Path to data directory with train/val/test splits | |
| output_dir: Path to output directory for processed data | |
| image_size: Target image size for models | |
| normalize: Whether to normalize pixel values to [-1, 1] | |
| """ | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Create output directories matching input structure | |
| for root, dirs, files in os.walk(data_dir): | |
| rel_path = os.path.relpath(root, data_dir) | |
| if rel_path != '.': | |
| os.makedirs(os.path.join(output_dir, rel_path), exist_ok=True) | |
| # Process all images | |
| total_images = 0 | |
| processed_images = 0 | |
| for root, dirs, files in os.walk(data_dir): | |
| image_files = [f for f in files if f.lower().endswith(('.jpg', '.jpeg', '.png'))] | |
| total_images += len(image_files) | |
| if not image_files: | |
| continue | |
| rel_path = os.path.relpath(root, data_dir) | |
| out_dir = os.path.join(output_dir, rel_path) | |
| print(f"Processing {len(image_files)} images in {rel_path}...") | |
| for file in tqdm(image_files): | |
| try: | |
| # Read image | |
| img_path = os.path.join(root, file) | |
| img = cv2.imread(img_path) | |
| if img is None: | |
| print(f"Warning: Could not read {img_path}") | |
| continue | |
| # Convert BGR to RGB | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| # Resize to target size | |
| img = cv2.resize(img, (image_size, image_size), interpolation=cv2.INTER_AREA) | |
| # Normalize if requested | |
| if normalize: | |
| img = img.astype(np.float32) / 127.5 - 1.0 | |
| # Save as numpy array | |
| out_path = os.path.join(out_dir, os.path.splitext(file)[0] + '.npy') | |
| np.save(out_path, img) | |
| processed_images += 1 | |
| except Exception as e: | |
| print(f"Error processing {img_path}: {e}") | |
| print(f"Preprocessed {processed_images}/{total_images} images") | |
| print(f"Saved to {output_dir}") | |
| def generate_synthetic_morphs(data_dir: str, output_dir: str, num_pairs: int = 1000, alpha: float = 0.5): | |
| """ | |
| Generate synthetic morphed face images for training | |
| Args: | |
| data_dir: Path to data directory with train/val/test splits and 'real' subdirectories | |
| output_dir: Path to output directory for synthetic morphs | |
| num_pairs: Number of morphed pairs to generate | |
| alpha: Blending factor (0.5 for equal blending) | |
| """ | |
| # Create output directories | |
| os.makedirs(output_dir, exist_ok=True) | |
| train_dir = os.path.join(output_dir, 'train', 'morph') | |
| val_dir = os.path.join(output_dir, 'val', 'morph') | |
| os.makedirs(train_dir, exist_ok=True) | |
| os.makedirs(val_dir, exist_ok=True) | |
| # Get list of real images from training set | |
| train_real_dir = os.path.join(data_dir, 'train', 'real') | |
| real_files = [f for f in os.listdir(train_real_dir) | |
| if f.lower().endswith(('.jpg', '.jpeg', '.png'))] | |
| if len(real_files) < 10: | |
| raise ValueError(f"Not enough real images (found {len(real_files)}, need at least 10)") | |
| # Generate pairs | |
| pairs = [] | |
| for _ in range(num_pairs): | |
| # Select two different random images | |
| img1, img2 = random.sample(real_files, 2) | |
| pairs.append((img1, img2)) | |
| # Split into train/val (90/10) | |
| train_size = int(len(pairs) * 0.9) | |
| train_pairs = pairs[:train_size] | |
| val_pairs = pairs[train_size:] | |
| # Generate morphs | |
| morph_metadata = [] | |
| print(f"Generating {len(train_pairs)} training morphs...") | |
| for i, (img1_file, img2_file) in enumerate(tqdm(train_pairs)): | |
| try: | |
| # Load images | |
| img1_path = os.path.join(train_real_dir, img1_file) | |
| img2_path = os.path.join(train_real_dir, img2_file) | |
| img1 = cv2.imread(img1_path) | |
| img2 = cv2.imread(img2_path) | |
| if img1 is None or img2 is None: | |
| continue | |
| # Ensure same size | |
| img1 = cv2.resize(img1, (224, 224)) | |
| img2 = cv2.resize(img2, (224, 224)) | |
| # Simple alpha blending for demonstration | |
| # In a real system, you'd use face landmarks and warping | |
| morphed = cv2.addWeighted(img1, alpha, img2, 1-alpha, 0) | |
| # Save morphed image | |
| morph_file = f"synth_morph_{i:04d}.jpg" | |
| morph_path = os.path.join(train_dir, morph_file) | |
| cv2.imwrite(morph_path, morphed) | |
| # Add metadata | |
| morph_metadata.append({ | |
| 'morph_file': morph_file, | |
| 'source_files': [img1_file, img2_file], | |
| 'alpha': alpha, | |
| 'split': 'train' | |
| }) | |
| except Exception as e: | |
| print(f"Error generating morph from {img1_file} and {img2_file}: {e}") | |
| print(f"Generating {len(val_pairs)} validation morphs...") | |
| for i, (img1_file, img2_file) in enumerate(tqdm(val_pairs)): | |
| try: | |
| # Load images | |
| img1_path = os.path.join(train_real_dir, img1_file) | |
| img2_path = os.path.join(train_real_dir, img2_file) | |
| img1 = cv2.imread(img1_path) | |
| img2 = cv2.imread(img2_path) | |
| if img1 is None or img2 is None: | |
| continue | |
| # Ensure same size | |
| img1 = cv2.resize(img1, (224, 224)) | |
| img2 = cv2.resize(img2, (224, 224)) | |
| # Simple alpha blending | |
| morphed = cv2.addWeighted(img1, alpha, img2, 1-alpha, 0) | |
| # Save morphed image | |
| morph_file = f"synth_morph_val_{i:04d}.jpg" | |
| morph_path = os.path.join(val_dir, morph_file) | |
| cv2.imwrite(morph_path, morphed) | |
| # Add metadata | |
| morph_metadata.append({ | |
| 'morph_file': morph_file, | |
| 'source_files': [img1_file, img2_file], | |
| 'alpha': alpha, | |
| 'split': 'val' | |
| }) | |
| except Exception as e: | |
| print(f"Error generating morph from {img1_file} and {img2_file}: {e}") | |
| # Save metadata | |
| meta_file = os.path.join(output_dir, 'morph_metadata.json') | |
| with open(meta_file, 'w') as f: | |
| json.dump({ | |
| 'morphs': morph_metadata, | |
| 'created_at': datetime.now().isoformat(), | |
| 'parameters': { | |
| 'alpha': alpha, | |
| 'num_pairs': num_pairs | |
| } | |
| }, f, indent=2) | |
| print(f"Generated {len(morph_metadata)} synthetic morphs") | |
| print(f"Metadata saved to {meta_file}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Prepare data for MorphGuard training') | |
| parser.add_argument('--data-dir', required=True, help='Path to input data directory') | |
| parser.add_argument('--output-dir', required=True, help='Path to output data directory') | |
| parser.add_argument('--image-size', type=int, default=224, help='Target image size (default: 224)') | |
| parser.add_argument('--normalize', action='store_true', help='Normalize pixel values to [-1, 1]') | |
| parser.add_argument('--gen-morphs', action='store_true', help='Generate synthetic morphs') | |
| parser.add_argument('--num-morphs', type=int, default=1000, help='Number of synthetic morphs to generate') | |
| parser.add_argument('--train-val-test', type=float, nargs=3, default=[0.7, 0.15, 0.15], | |
| help='Train/val/test split ratios (default: 0.7 0.15 0.15)') | |
| parser.add_argument('--skip-splits', action='store_true', help='Skip dataset splitting step') | |
| parser.add_argument('--skip-preprocess', action='store_true', help='Skip image preprocessing step') | |
| args = parser.parse_args() | |
| # Validate arguments | |
| if not os.path.exists(args.data_dir): | |
| print(f"Error: Data directory {args.data_dir} does not exist") | |
| sys.exit(1) | |
| # Check if the data directory has the expected structure | |
| required_dirs = ['real', 'morph'] | |
| if not args.skip_splits: | |
| for d in required_dirs: | |
| if not os.path.exists(os.path.join(args.data_dir, d)): | |
| print(f"Error: Expected directory {d} not found in {args.data_dir}") | |
| print("Data directory should contain 'real' and 'morph' subdirectories") | |
| sys.exit(1) | |
| # Create output directories | |
| os.makedirs(args.output_dir, exist_ok=True) | |
| # Save run configuration | |
| config_file = os.path.join(args.output_dir, 'data_prep_config.json') | |
| with open(config_file, 'w') as f: | |
| json.dump(vars(args), f, indent=2) | |
| # Process steps | |
| if not args.skip_splits: | |
| print("\n--- Creating dataset splits ---") | |
| stats = create_dataset_splits(args.data_dir, args.output_dir, args.train_val_test) | |
| if not args.skip_preprocess: | |
| print("\n--- Preprocessing images ---") | |
| preprocess_images( | |
| args.output_dir if not args.skip_splits else args.data_dir, | |
| os.path.join(args.output_dir, 'processed'), | |
| args.image_size, | |
| args.normalize | |
| ) | |
| if args.gen_morphs: | |
| print("\n--- Generating synthetic morphs ---") | |
| generate_synthetic_morphs( | |
| args.output_dir, | |
| os.path.join(args.output_dir, 'synthetic'), | |
| args.num_morphs | |
| ) | |
| print("\nData preparation completed successfully.") | |
| if __name__ == "__main__": | |
| main() |