| |
| |
| |
| |
|
|
| !pip install -q transformers datasets |
|
|
| |
| import os, random, math, time |
| import numpy as np |
| from PIL import Image |
| import torch |
| import torch.nn as nn |
| from torch.utils.data import Dataset, DataLoader |
| from torchvision import transforms |
| from transformers import ResNetConfig, ResNetForImageClassification |
| from datasets import load_dataset |
| from tqdm.auto import tqdm |
|
|
| |
| HF_TOKEN = "hf_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" |
| NUM_TRAIN = 50_000 |
| NUM_VAL = 5_000 |
| IMG_SIZE = 224 |
| BATCH_SIZE = 128 |
| EPOCHS = 12 |
| LR = 1e-3 |
| WARMUP_EPOCHS = 1 |
| WEIGHT_DECAY = 0.05 |
| LABEL_SMOOTHING = 0.1 |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| TRAIN_DIR = "/kaggle/working/data/train" |
| VAL_DIR = "/kaggle/working/data/val" |
| MODEL_DIR = "/kaggle/working/rotation_model" |
|
|
| print(f"π₯οΈ Device: {DEVICE}") |
| if DEVICE.type == "cuda": |
| print(f" GPU: {torch.cuda.get_device_name()}") |
| print(f" VRAM: {torch.cuda.get_device_properties(0).total_memory/1e9:.1f} GB") |
|
|
| |
| from huggingface_hub import login |
| login(token=HF_TOKEN) |
|
|
| def download_images(split, save_dir, num_images): |
| os.makedirs(save_dir, exist_ok=True) |
| existing = len([f for f in os.listdir(save_dir) if f.endswith(".jpg")]) |
| if existing >= num_images: |
| print(f" β {save_dir}: {existing} images already exist β skipping.") |
| return |
| ds = load_dataset("ILSVRC/imagenet-1k", split=split, |
| streaming=True, trust_remote_code=True, token=HF_TOKEN) |
| count = 0 |
| for ex in tqdm(ds, total=num_images, desc=f" β {split}"): |
| if count >= num_images: |
| break |
| try: |
| img = ex["image"].convert("RGB") |
| w, h = img.size |
| if min(w, h) > 480: |
| s = 480 / min(w, h) |
| img = img.resize((int(w*s), int(h*s)), Image.BILINEAR) |
| img.save(os.path.join(save_dir, f"{count}.jpg"), quality=90) |
| count += 1 |
| except Exception: |
| continue |
| print(f" β {count} Images β {save_dir}") |
|
|
| print("\nπ₯ Loading images from ImageNet-1k β¦") |
| download_images("train", TRAIN_DIR, NUM_TRAIN) |
| download_images("validation", VAL_DIR, NUM_VAL) |
|
|
| |
| ANGLES = [0, 90, 180, 270] |
| ANGLE_NAMES = ["0Β° (original)", "90Β° CCW", "180Β°", "270Β° CCW (=90Β° CW)"] |
|
|
| class RotationDataset(Dataset): |
| def __init__(self, img_dir, num_imgs, transform, all_rotations=False): |
| self.img_dir = img_dir |
| self.num_imgs = num_imgs |
| self.transform = transform |
| self.all_rot = all_rotations |
|
|
| def __len__(self): |
| return self.num_imgs * 4 if self.all_rot else self.num_imgs |
|
|
| def __getitem__(self, idx): |
| if self.all_rot: |
| img_idx, label = idx // 4, idx % 4 |
| else: |
| img_idx, label = idx, random.randint(0, 3) |
|
|
| img = Image.open(os.path.join(self.img_dir, f"{img_idx}.jpg")).convert("RGB") |
|
|
| angle = ANGLES[label] |
| if angle == 90: img = img.transpose(Image.ROTATE_90) |
| elif angle == 180: img = img.transpose(Image.ROTATE_180) |
| elif angle == 270: img = img.transpose(Image.ROTATE_270) |
|
|
| return self.transform(img), label |
|
|
| train_tf = transforms.Compose([ |
| transforms.Resize(256), |
| transforms.RandomCrop(IMG_SIZE), |
| transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.2, hue=0.05), |
| transforms.RandomGrayscale(p=0.05), |
| transforms.ToTensor(), |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), |
| transforms.RandomErasing(p=0.1), |
| ]) |
| val_tf = transforms.Compose([ |
| transforms.Resize(256), |
| transforms.CenterCrop(IMG_SIZE), |
| transforms.ToTensor(), |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), |
| ]) |
|
|
| train_ds = RotationDataset(TRAIN_DIR, NUM_TRAIN, train_tf, all_rotations=True) |
| val_ds = RotationDataset(VAL_DIR, NUM_VAL, val_tf, all_rotations=True) |
|
|
| train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, |
| num_workers=2, pin_memory=True, drop_last=True) |
| val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, |
| num_workers=2, pin_memory=True) |
|
|
| print(f"\nπ Dataset size:") |
| print(f" Train: {len(train_ds):>8,} ({NUM_TRAIN:,} images Γ 4 rotations)") |
| print(f" Val: {len(val_ds):>8,} ({NUM_VAL:,} images Γ 4 rotations)") |
|
|
| |
| config = ResNetConfig( |
| num_channels=3, |
| embedding_size=64, |
| hidden_sizes=[64, 128, 256, 512], |
| depths=[2, 2, 2, 2], |
| layer_type="basic", |
| hidden_act="relu", |
| num_labels=4, |
| ) |
| model = ResNetForImageClassification(config).to(DEVICE) |
| n_params = sum(p.numel() for p in model.parameters()) |
| print(f"\nποΈ Model: ResNet-18 from scratch β {n_params:,} parameters") |
|
|
| |
| optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY) |
|
|
| total_steps = len(train_loader) * EPOCHS |
| warmup_steps = len(train_loader) * WARMUP_EPOCHS |
|
|
| def lr_lambda(step): |
| if step < warmup_steps: |
| return step / max(warmup_steps, 1) |
| progress = (step - warmup_steps) / max(total_steps - warmup_steps, 1) |
| return 0.5 * (1.0 + math.cos(math.pi * progress)) |
|
|
| scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda) |
| scaler = torch.cuda.amp.GradScaler() |
| criterion = nn.CrossEntropyLoss(label_smoothing=LABEL_SMOOTHING) |
|
|
| |
| best_val_acc = 0.0 |
| print(f"\nπ Starting training: {EPOCHS} epochs, {total_steps:,} steps\n") |
|
|
| for epoch in range(EPOCHS): |
| t0 = time.time() |
|
|
| |
| model.train() |
| run_loss = correct = total = 0 |
|
|
| pbar = tqdm(train_loader, desc=f"Ep {epoch+1:2d}/{EPOCHS} [Train]", leave=False) |
| for imgs, labels in pbar: |
| imgs = imgs.to(DEVICE, non_blocking=True) |
| labels = labels.to(DEVICE, non_blocking=True) |
|
|
| with torch.cuda.amp.autocast(): |
| logits = model(pixel_values=imgs).logits |
| loss = criterion(logits, labels) |
|
|
| optimizer.zero_grad(set_to_none=True) |
| scaler.scale(loss).backward() |
| scaler.unscale_(optimizer) |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
| scaler.step(optimizer) |
| scaler.update() |
| scheduler.step() |
|
|
| bs = labels.size(0) |
| run_loss += loss.item() * bs |
| correct += (logits.argmax(1) == labels).sum().item() |
| total += bs |
| pbar.set_postfix(loss=f"{run_loss/total:.4f}", acc=f"{100*correct/total:.1f}%") |
|
|
| train_acc = 100 * correct / total |
|
|
| |
| model.eval() |
| v_correct = v_total = 0 |
| v_loss = 0.0 |
| cls_correct = [0]*4 |
| cls_total = [0]*4 |
|
|
| with torch.no_grad(): |
| for imgs, labels in tqdm(val_loader, desc=f"Ep {epoch+1:2d}/{EPOCHS} [Val] ", leave=False): |
| imgs = imgs.to(DEVICE, non_blocking=True) |
| labels = labels.to(DEVICE, non_blocking=True) |
| with torch.cuda.amp.autocast(): |
| logits = model(pixel_values=imgs).logits |
| loss = criterion(logits, labels) |
| preds = logits.argmax(1) |
| bs = labels.size(0) |
| v_loss += loss.item() * bs |
| v_correct += (preds == labels).sum().item() |
| v_total += bs |
| for c in range(4): |
| mask = (labels == c) |
| cls_correct[c] += (preds[mask] == labels[mask]).sum().item() |
| cls_total[c] += mask.sum().item() |
|
|
| val_acc = 100 * v_correct / v_total |
| dt = time.time() - t0 |
|
|
| print(f"Epoch {epoch+1:2d}/{EPOCHS} β " |
| f"Train {train_acc:.1f}% β Val {val_acc:.2f}% β " |
| f"LR {scheduler.get_last_lr()[0]:.6f} β {dt:.0f}s") |
| for c in range(4): |
| ca = 100*cls_correct[c]/max(cls_total[c],1) |
| print(f" {ANGLE_NAMES[c]:>25s}: {ca:.1f}%") |
|
|
| if val_acc > best_val_acc: |
| best_val_acc = val_acc |
| model.save_pretrained(MODEL_DIR) |
| print(f" β
New best model saved β {MODEL_DIR}") |
| print() |
|
|
| |
| print("=" * 60) |
| print(f"π Training finished! Best Val-Accuracy: {best_val_acc:.2f}%") |
| print(f" Model: {MODEL_DIR}") |