MorphGuard / scripts /data_preparation.py
juanquy's picture
Initial clean commit of modular MorphGuard
2978bba
Raw
History Blame Contribute Delete
14.2 kB
#!/usr/bin/env python3
"""
data_preparation.py
Prepares training data for MorphGuard's detector and demorpher models.
This script performs:
1. Dataset validation and splitting
2. Image preprocessing (resizing, normalization, augmentation)
3. Creation of training, validation, and test sets
4. Generation of synthetic morphed images for training if needed
Usage:
python scripts/data_preparation.py --data-dir /path/to/data --output-dir /path/to/output [options]
"""
import os
import sys
import argparse
import json
import random
import shutil
import hashlib
from typing import Dict, List, Tuple, Optional
from datetime import datetime
from pathlib import Path
import cv2
import numpy as np
from tqdm import tqdm
from PIL import Image
# Add project root to path for imports
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
def create_dataset_splits(data_dir: str, output_dir: str, split_ratio: List[float] = [0.7, 0.15, 0.15]):
"""
Create train/val/test splits from a dataset
Args:
data_dir: Path to data directory with 'real' and 'morph' subdirectories
output_dir: Path to output directory for processed data
split_ratio: Train/val/test split ratios (must sum to 1.0)
Returns:
Dict with dataset statistics
"""
if sum(split_ratio) != 1.0:
raise ValueError(f"Split ratios must sum to 1.0, got {sum(split_ratio)}")
# Create output directories
os.makedirs(output_dir, exist_ok=True)
for split in ['train', 'val', 'test']:
os.makedirs(os.path.join(output_dir, split, 'real'), exist_ok=True)
os.makedirs(os.path.join(output_dir, split, 'morph'), exist_ok=True)
# Collect files
real_files = [f for f in os.listdir(os.path.join(data_dir, 'real'))
if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
morph_files = [f for f in os.listdir(os.path.join(data_dir, 'morph'))
if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
# Shuffle files
random.shuffle(real_files)
random.shuffle(morph_files)
# Calculate splits
real_train_size = int(len(real_files) * split_ratio[0])
real_val_size = int(len(real_files) * split_ratio[1])
morph_train_size = int(len(morph_files) * split_ratio[0])
morph_val_size = int(len(morph_files) * split_ratio[1])
# Split files
real_splits = {
'train': real_files[:real_train_size],
'val': real_files[real_train_size:real_train_size + real_val_size],
'test': real_files[real_train_size + real_val_size:]
}
morph_splits = {
'train': morph_files[:morph_train_size],
'val': morph_files[morph_train_size:morph_train_size + morph_val_size],
'test': morph_files[morph_train_size + morph_val_size:]
}
# Copy files to output directories
stats = {'real': {}, 'morph': {}}
for category, splits in [('real', real_splits), ('morph', morph_splits)]:
for split, files in splits.items():
stats[category][split] = len(files)
print(f"Copying {len(files)} {category} images to {split} set...")
for file in tqdm(files):
src = os.path.join(data_dir, category, file)
dst = os.path.join(output_dir, split, category, file)
shutil.copy2(src, dst)
# Save dataset stats
stats_file = os.path.join(output_dir, 'dataset_stats.json')
with open(stats_file, 'w') as f:
json.dump({
'real': {
'train': len(real_splits['train']),
'val': len(real_splits['val']),
'test': len(real_splits['test']),
'total': len(real_files)
},
'morph': {
'train': len(morph_splits['train']),
'val': len(morph_splits['val']),
'test': len(morph_splits['test']),
'total': len(morph_files)
},
'created_at': datetime.now().isoformat()
}, f, indent=2)
print(f"Dataset splits created successfully and saved to {output_dir}")
print(f"Stats file: {stats_file}")
return stats
def preprocess_images(data_dir: str, output_dir: str, image_size: int = 224, normalize: bool = True):
"""
Preprocess images (resize, normalize) and save to output directory
Args:
data_dir: Path to data directory with train/val/test splits
output_dir: Path to output directory for processed data
image_size: Target image size for models
normalize: Whether to normalize pixel values to [-1, 1]
"""
os.makedirs(output_dir, exist_ok=True)
# Create output directories matching input structure
for root, dirs, files in os.walk(data_dir):
rel_path = os.path.relpath(root, data_dir)
if rel_path != '.':
os.makedirs(os.path.join(output_dir, rel_path), exist_ok=True)
# Process all images
total_images = 0
processed_images = 0
for root, dirs, files in os.walk(data_dir):
image_files = [f for f in files if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
total_images += len(image_files)
if not image_files:
continue
rel_path = os.path.relpath(root, data_dir)
out_dir = os.path.join(output_dir, rel_path)
print(f"Processing {len(image_files)} images in {rel_path}...")
for file in tqdm(image_files):
try:
# Read image
img_path = os.path.join(root, file)
img = cv2.imread(img_path)
if img is None:
print(f"Warning: Could not read {img_path}")
continue
# Convert BGR to RGB
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Resize to target size
img = cv2.resize(img, (image_size, image_size), interpolation=cv2.INTER_AREA)
# Normalize if requested
if normalize:
img = img.astype(np.float32) / 127.5 - 1.0
# Save as numpy array
out_path = os.path.join(out_dir, os.path.splitext(file)[0] + '.npy')
np.save(out_path, img)
processed_images += 1
except Exception as e:
print(f"Error processing {img_path}: {e}")
print(f"Preprocessed {processed_images}/{total_images} images")
print(f"Saved to {output_dir}")
def generate_synthetic_morphs(data_dir: str, output_dir: str, num_pairs: int = 1000, alpha: float = 0.5):
"""
Generate synthetic morphed face images for training
Args:
data_dir: Path to data directory with train/val/test splits and 'real' subdirectories
output_dir: Path to output directory for synthetic morphs
num_pairs: Number of morphed pairs to generate
alpha: Blending factor (0.5 for equal blending)
"""
# Create output directories
os.makedirs(output_dir, exist_ok=True)
train_dir = os.path.join(output_dir, 'train', 'morph')
val_dir = os.path.join(output_dir, 'val', 'morph')
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)
# Get list of real images from training set
train_real_dir = os.path.join(data_dir, 'train', 'real')
real_files = [f for f in os.listdir(train_real_dir)
if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
if len(real_files) < 10:
raise ValueError(f"Not enough real images (found {len(real_files)}, need at least 10)")
# Generate pairs
pairs = []
for _ in range(num_pairs):
# Select two different random images
img1, img2 = random.sample(real_files, 2)
pairs.append((img1, img2))
# Split into train/val (90/10)
train_size = int(len(pairs) * 0.9)
train_pairs = pairs[:train_size]
val_pairs = pairs[train_size:]
# Generate morphs
morph_metadata = []
print(f"Generating {len(train_pairs)} training morphs...")
for i, (img1_file, img2_file) in enumerate(tqdm(train_pairs)):
try:
# Load images
img1_path = os.path.join(train_real_dir, img1_file)
img2_path = os.path.join(train_real_dir, img2_file)
img1 = cv2.imread(img1_path)
img2 = cv2.imread(img2_path)
if img1 is None or img2 is None:
continue
# Ensure same size
img1 = cv2.resize(img1, (224, 224))
img2 = cv2.resize(img2, (224, 224))
# Simple alpha blending for demonstration
# In a real system, you'd use face landmarks and warping
morphed = cv2.addWeighted(img1, alpha, img2, 1-alpha, 0)
# Save morphed image
morph_file = f"synth_morph_{i:04d}.jpg"
morph_path = os.path.join(train_dir, morph_file)
cv2.imwrite(morph_path, morphed)
# Add metadata
morph_metadata.append({
'morph_file': morph_file,
'source_files': [img1_file, img2_file],
'alpha': alpha,
'split': 'train'
})
except Exception as e:
print(f"Error generating morph from {img1_file} and {img2_file}: {e}")
print(f"Generating {len(val_pairs)} validation morphs...")
for i, (img1_file, img2_file) in enumerate(tqdm(val_pairs)):
try:
# Load images
img1_path = os.path.join(train_real_dir, img1_file)
img2_path = os.path.join(train_real_dir, img2_file)
img1 = cv2.imread(img1_path)
img2 = cv2.imread(img2_path)
if img1 is None or img2 is None:
continue
# Ensure same size
img1 = cv2.resize(img1, (224, 224))
img2 = cv2.resize(img2, (224, 224))
# Simple alpha blending
morphed = cv2.addWeighted(img1, alpha, img2, 1-alpha, 0)
# Save morphed image
morph_file = f"synth_morph_val_{i:04d}.jpg"
morph_path = os.path.join(val_dir, morph_file)
cv2.imwrite(morph_path, morphed)
# Add metadata
morph_metadata.append({
'morph_file': morph_file,
'source_files': [img1_file, img2_file],
'alpha': alpha,
'split': 'val'
})
except Exception as e:
print(f"Error generating morph from {img1_file} and {img2_file}: {e}")
# Save metadata
meta_file = os.path.join(output_dir, 'morph_metadata.json')
with open(meta_file, 'w') as f:
json.dump({
'morphs': morph_metadata,
'created_at': datetime.now().isoformat(),
'parameters': {
'alpha': alpha,
'num_pairs': num_pairs
}
}, f, indent=2)
print(f"Generated {len(morph_metadata)} synthetic morphs")
print(f"Metadata saved to {meta_file}")
def main():
parser = argparse.ArgumentParser(description='Prepare data for MorphGuard training')
parser.add_argument('--data-dir', required=True, help='Path to input data directory')
parser.add_argument('--output-dir', required=True, help='Path to output data directory')
parser.add_argument('--image-size', type=int, default=224, help='Target image size (default: 224)')
parser.add_argument('--normalize', action='store_true', help='Normalize pixel values to [-1, 1]')
parser.add_argument('--gen-morphs', action='store_true', help='Generate synthetic morphs')
parser.add_argument('--num-morphs', type=int, default=1000, help='Number of synthetic morphs to generate')
parser.add_argument('--train-val-test', type=float, nargs=3, default=[0.7, 0.15, 0.15],
help='Train/val/test split ratios (default: 0.7 0.15 0.15)')
parser.add_argument('--skip-splits', action='store_true', help='Skip dataset splitting step')
parser.add_argument('--skip-preprocess', action='store_true', help='Skip image preprocessing step')
args = parser.parse_args()
# Validate arguments
if not os.path.exists(args.data_dir):
print(f"Error: Data directory {args.data_dir} does not exist")
sys.exit(1)
# Check if the data directory has the expected structure
required_dirs = ['real', 'morph']
if not args.skip_splits:
for d in required_dirs:
if not os.path.exists(os.path.join(args.data_dir, d)):
print(f"Error: Expected directory {d} not found in {args.data_dir}")
print("Data directory should contain 'real' and 'morph' subdirectories")
sys.exit(1)
# Create output directories
os.makedirs(args.output_dir, exist_ok=True)
# Save run configuration
config_file = os.path.join(args.output_dir, 'data_prep_config.json')
with open(config_file, 'w') as f:
json.dump(vars(args), f, indent=2)
# Process steps
if not args.skip_splits:
print("\n--- Creating dataset splits ---")
stats = create_dataset_splits(args.data_dir, args.output_dir, args.train_val_test)
if not args.skip_preprocess:
print("\n--- Preprocessing images ---")
preprocess_images(
args.output_dir if not args.skip_splits else args.data_dir,
os.path.join(args.output_dir, 'processed'),
args.image_size,
args.normalize
)
if args.gen_morphs:
print("\n--- Generating synthetic morphs ---")
generate_synthetic_morphs(
args.output_dir,
os.path.join(args.output_dir, 'synthetic'),
args.num_morphs
)
print("\nData preparation completed successfully.")
if __name__ == "__main__":
main()