shashidharak99's picture
Upload 16 files
0b3dd07 verified
"""
============================================================
Rangoli Classification Training Pipeline
============================================================
Full training loop with:
- Mixed precision training (AMP)
- Cosine annealing with warm restarts
- Learning rate warmup
- Gradient clipping
- MixUp / CutMix augmentation
- Early stopping
- TensorBoard logging
- Checkpoint management
- Progressive unfreezing
Usage:
python scripts/train.py --config configs/config.yaml --model resnet50
python scripts/train.py --config configs/config.yaml --model efficientnet_b3 --gpu 0
python scripts/train.py --config configs/config.yaml --model all # Train all models
============================================================
"""
import os
import sys
import json
import yaml
import time
import argparse
import numpy as np
from datetime import datetime
from pathlib import Path
import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import GradScaler, autocast
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from dataset.rangoli_dataset import create_dataloaders, MixUpCutMix
from models.classifier import build_model, build_loss_function
class AverageMeter:
"""Track running averages."""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
class EarlyStopping:
"""Early stopping with patience."""
def __init__(self, patience=10, min_delta=0.001, mode="max"):
self.patience = patience
self.min_delta = min_delta
self.mode = mode
self.counter = 0
self.best_score = None
self.should_stop = False
def __call__(self, score):
if self.best_score is None:
self.best_score = score
return False
if self.mode == "max":
improved = score > self.best_score + self.min_delta
else:
improved = score < self.best_score - self.min_delta
if improved:
self.best_score = score
self.counter = 0
else:
self.counter += 1
if self.counter >= self.patience:
self.should_stop = True
return self.should_stop
def get_optimizer(model, config):
"""Create optimizer with layer-wise learning rates."""
training_cfg = config["training"]
base_lr = training_cfg["learning_rate"]
# Discriminative learning rates
layer_groups = model.get_layer_groups()
param_groups = [
{"params": g["params"], "lr": base_lr * g["lr_scale"]}
for g in layer_groups
]
if training_cfg["optimizer"] == "adamw":
optimizer = optim.AdamW(
param_groups,
lr=base_lr,
weight_decay=training_cfg["weight_decay"],
)
elif training_cfg["optimizer"] == "sgd":
optimizer = optim.SGD(
param_groups,
lr=base_lr,
momentum=0.9,
weight_decay=training_cfg["weight_decay"],
nesterov=True,
)
return optimizer
def get_scheduler(optimizer, config):
"""Create learning rate scheduler."""
training_cfg = config["training"]
if training_cfg["scheduler"] == "cosine_annealing_warm_restarts":
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
optimizer,
T_0=training_cfg["T_0"],
T_mult=training_cfg["T_mult"],
eta_min=training_cfg["eta_min"],
)
elif training_cfg["scheduler"] == "cosine_annealing":
scheduler = optim.lr_scheduler.CosineAnnealingLR(
optimizer,
T_max=training_cfg["num_epochs"],
eta_min=training_cfg["eta_min"],
)
elif training_cfg["scheduler"] == "one_cycle":
scheduler = optim.lr_scheduler.OneCycleLR(
optimizer,
max_lr=training_cfg["learning_rate"],
epochs=training_cfg["num_epochs"],
steps_per_epoch=100, # Will be updated
)
return scheduler
def warmup_lr(optimizer, epoch, warmup_epochs, warmup_lr_val, base_lr):
"""Linear warmup."""
if epoch < warmup_epochs:
lr = warmup_lr_val + (base_lr - warmup_lr_val) * epoch / warmup_epochs
for param_group in optimizer.param_groups:
param_group["lr"] = lr * param_group.get("lr_scale", 1.0) if "lr_scale" in str(param_group) else lr
def train_one_epoch(model, train_loader, criterion, optimizer, scheduler,
scaler, mixup_cutmix, device, epoch, config):
"""Train for one epoch."""
model.train()
loss_meter = AverageMeter()
acc_meter = AverageMeter()
training_cfg = config["training"]
use_amp = training_cfg.get("use_amp", True) and device.type == "cuda"
pbar = tqdm(train_loader, desc=f" Train Epoch {epoch+1}", leave=False)
for batch_idx, (images, targets) in enumerate(pbar):
images = images.to(device, non_blocking=True)
targets = targets.to(device, non_blocking=True)
# Apply MixUp/CutMix
use_mixup = mixup_cutmix is not None and np.random.random() < 0.5
if use_mixup:
images, targets_mixed = mixup_cutmix(images, targets)
# Forward pass with mixed precision
with autocast(enabled=use_amp):
outputs = model(images)
if use_mixup:
loss = criterion(outputs, targets_mixed)
else:
loss = criterion(outputs, targets)
# Backward pass
optimizer.zero_grad()
if use_amp:
scaler.scale(loss).backward()
if training_cfg.get("max_grad_norm"):
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(
model.parameters(), training_cfg["max_grad_norm"]
)
scaler.step(optimizer)
scaler.update()
else:
loss.backward()
if training_cfg.get("max_grad_norm"):
torch.nn.utils.clip_grad_norm_(
model.parameters(), training_cfg["max_grad_norm"]
)
optimizer.step()
# Accuracy (for non-mixup samples)
if not use_mixup:
_, predicted = outputs.max(1)
correct = predicted.eq(targets).sum().item()
acc_meter.update(correct / targets.size(0), targets.size(0))
loss_meter.update(loss.item(), images.size(0))
pbar.set_postfix({
"loss": f"{loss_meter.avg:.4f}",
"acc": f"{acc_meter.avg:.4f}" if acc_meter.count > 0 else "N/A",
"lr": f"{optimizer.param_groups[-1]['lr']:.6f}",
})
if scheduler is not None:
scheduler.step()
return loss_meter.avg, acc_meter.avg
@torch.no_grad()
def validate(model, val_loader, criterion, device, use_amp=True):
"""Validate the model."""
model.eval()
loss_meter = AverageMeter()
acc_meter = AverageMeter()
all_preds = []
all_targets = []
for images, targets in tqdm(val_loader, desc=" Validate", leave=False):
images = images.to(device, non_blocking=True)
targets = targets.to(device, non_blocking=True)
with autocast(enabled=use_amp and device.type == "cuda"):
outputs = model(images)
loss = criterion(outputs, targets)
_, predicted = outputs.max(1)
correct = predicted.eq(targets).sum().item()
loss_meter.update(loss.item(), images.size(0))
acc_meter.update(correct / targets.size(0), targets.size(0))
all_preds.extend(predicted.cpu().numpy())
all_targets.extend(targets.cpu().numpy())
return loss_meter.avg, acc_meter.avg, np.array(all_preds), np.array(all_targets)
def save_checkpoint(model, optimizer, scheduler, epoch, val_acc, val_loss,
config, model_name, save_dir, is_best=False):
"""Save model checkpoint."""
os.makedirs(save_dir, exist_ok=True)
checkpoint = {
"epoch": epoch,
"model_name": model_name,
"architecture": config["models"][model_name]["architecture"],
"num_classes": config["num_classes"],
"state_dict": model.state_dict(),
"optimizer": optimizer.state_dict(),
"scheduler": scheduler.state_dict() if scheduler else None,
"val_acc": val_acc,
"val_loss": val_loss,
"config": config,
}
# Save latest
torch.save(checkpoint, os.path.join(save_dir, f"{model_name}_latest.pth"))
# Save best
if is_best:
torch.save(checkpoint, os.path.join(save_dir, f"{model_name}_best.pth"))
print(f" >> Saved new best model: val_acc={val_acc:.4f}")
def train_model(model_name, config, device):
"""Full training pipeline for a single model."""
print(f"\n{'#'*60}")
print(f" TRAINING: {model_name.upper()}")
print(f"{'#'*60}")
training_cfg = config["training"]
# Create output directories
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
run_name = f"{model_name}_{timestamp}"
checkpoint_dir = os.path.join(config["paths"]["checkpoints"], run_name)
log_dir = os.path.join(config["paths"]["logs"], run_name)
os.makedirs(checkpoint_dir, exist_ok=True)
os.makedirs(log_dir, exist_ok=True)
# TensorBoard
writer = SummaryWriter(log_dir)
# Data
manifest_path = os.path.join(config["paths"]["processed_data"], "dataset_manifest.json")
train_loader, val_loader, test_loader, class_to_idx = create_dataloaders(config, manifest_path)
# Load class weights
class_weights = None
if os.path.exists(manifest_path):
with open(manifest_path) as f:
manifest = json.load(f)
class_weights = manifest.get("class_weights")
# Model
model = build_model(model_name, config).to(device)
# Loss
criterion = build_loss_function(config, class_weights, device)
# Optimizer & Scheduler
optimizer = get_optimizer(model, config)
scheduler = get_scheduler(optimizer, config)
# Mixed Precision
scaler = GradScaler(enabled=training_cfg.get("use_amp", True) and device.type == "cuda")
# MixUp/CutMix
mixup_cutmix = MixUpCutMix(
mixup_alpha=config["augmentation"].get("mixup_alpha", 0.2),
cutmix_alpha=config["augmentation"].get("cutmix_alpha", 1.0),
num_classes=config["num_classes"],
)
# Early Stopping
early_stopping = EarlyStopping(
patience=training_cfg["early_stopping_patience"], mode="max"
)
# ========== Phase 1: Frozen Backbone ==========
print("\n --- Phase 1: Training classifier head (backbone frozen) ---")
model.freeze_backbone()
frozen_epochs = min(5, training_cfg["num_epochs"] // 5)
best_val_acc = 0.0
history = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}
for epoch in range(frozen_epochs):
train_loss, train_acc = train_one_epoch(
model, train_loader, criterion, optimizer, scheduler,
scaler, None, device, epoch, config # No mixup for frozen phase
)
val_loss, val_acc, _, _ = validate(model, val_loader, criterion, device)
print(f" Epoch {epoch+1}/{frozen_epochs} | "
f"Train Loss: {train_loss:.4f} Acc: {train_acc:.4f} | "
f"Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}")
writer.add_scalars("Phase1/Loss", {"train": train_loss, "val": val_loss}, epoch)
writer.add_scalars("Phase1/Accuracy", {"train": train_acc, "val": val_acc}, epoch)
# ========== Phase 2: Gradual Unfreezing ==========
print("\n --- Phase 2: Fine-tuning (progressive unfreezing) ---")
model.unfreeze_backbone(unfreeze_from=0.5)
# Reset optimizer with discriminative LR
optimizer = get_optimizer(model, config)
scheduler = get_scheduler(optimizer, config)
total_epochs = training_cfg["num_epochs"]
for epoch in range(total_epochs):
# Warmup
warmup_lr(optimizer, epoch,
training_cfg.get("warmup_epochs", 5),
training_cfg.get("warmup_lr", 1e-5),
training_cfg["learning_rate"])
# Progressive unfreezing at epoch milestones
if epoch == total_epochs // 4:
model.unfreeze_backbone(unfreeze_from=0.25)
elif epoch == total_epochs // 2:
model.unfreeze_backbone(unfreeze_from=0.0) # Fully unfreeze
# Train
train_loss, train_acc = train_one_epoch(
model, train_loader, criterion, optimizer, scheduler,
scaler, mixup_cutmix, device, epoch, config
)
# Validate
val_loss, val_acc, val_preds, val_targets = validate(
model, val_loader, criterion, device
)
# History
history["train_loss"].append(train_loss)
history["train_acc"].append(train_acc)
history["val_loss"].append(val_loss)
history["val_acc"].append(val_acc)
# TensorBoard
writer.add_scalars("Phase2/Loss", {"train": train_loss, "val": val_loss}, epoch)
writer.add_scalars("Phase2/Accuracy", {"train": train_acc, "val": val_acc}, epoch)
writer.add_scalar("LR", optimizer.param_groups[-1]["lr"], epoch)
# Save checkpoint
is_best = val_acc > best_val_acc
if is_best:
best_val_acc = val_acc
save_checkpoint(
model, optimizer, scheduler, epoch, val_acc, val_loss,
config, model_name, checkpoint_dir, is_best
)
print(f" Epoch {epoch+1}/{total_epochs} | "
f"Train Loss: {train_loss:.4f} Acc: {train_acc:.4f} | "
f"Val Loss: {val_loss:.4f} Acc: {val_acc:.4f} | "
f"Best: {best_val_acc:.4f} {'*' if is_best else ''}")
# Early Stopping
if early_stopping(val_acc):
print(f"\n >> Early stopping at epoch {epoch+1}")
break
# Save training history
history_path = os.path.join(checkpoint_dir, "training_history.json")
with open(history_path, "w") as f:
json.dump(history, f, indent=2)
writer.close()
print(f"\n {'='*50}")
print(f" TRAINING COMPLETE: {model_name}")
print(f" Best Validation Accuracy: {best_val_acc:.4f}")
print(f" Checkpoints: {checkpoint_dir}")
print(f" TensorBoard: {log_dir}")
print(f" {'='*50}")
return best_val_acc, history
def main():
parser = argparse.ArgumentParser(description="Train Rangoli Classifier")
parser.add_argument("--config", type=str, default="configs/config.yaml")
parser.add_argument("--model", type=str, default="resnet50",
choices=["resnet50", "efficientnet_b3", "vit_base",
"convnext_small", "mobilenet_v3", "swin_transformer", "all"])
parser.add_argument("--gpu", type=int, default=0)
parser.add_argument("--resume", type=str, default=None, help="Path to checkpoint")
args = parser.parse_args()
# Load config
with open(args.config, "r") as f:
config = yaml.safe_load(f)
# Device
if torch.cuda.is_available():
device = torch.device(f"cuda:{args.gpu}")
print(f" Using GPU: {torch.cuda.get_device_name(args.gpu)}")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
device = torch.device("mps")
print(" Using Apple MPS")
else:
device = torch.device("cpu")
print(" Using CPU (training will be slow)")
# Train
if args.model == "all":
results = {}
model_names = list(config["models"].keys())
for model_name in model_names:
best_acc, history = train_model(model_name, config, device)
results[model_name] = {"best_val_acc": best_acc, "epochs": len(history["val_acc"])}
# Summary
print("\n" + "="*60)
print(" COMPARATIVE RESULTS")
print("="*60)
for name, res in sorted(results.items(), key=lambda x: x[1]["best_val_acc"], reverse=True):
print(f" {name:25s} : {res['best_val_acc']:.4f} ({res['epochs']} epochs)")
# Save results
results_path = os.path.join(config["paths"]["reports"], "comparative_results.json")
os.makedirs(os.path.dirname(results_path), exist_ok=True)
with open(results_path, "w") as f:
json.dump(results, f, indent=2)
else:
train_model(args.model, config, device)
if __name__ == "__main__":
main()