GyroScope / train.py
LH-Tech-AI's picture
Create train.py
4f4e7db verified
# ================================================================
# πŸ”„ IMAGE ROTATION PREDICTION β€” From-Scratch ResNet-18
# Dataset: ImageNet-1k Β· Hardware: Kaggle T4 GPU
# ================================================================
!pip install -q transformers datasets
# ────────────────────── Imports ──────────────────────
import os, random, math, time
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from transformers import ResNetConfig, ResNetForImageClassification
from datasets import load_dataset
from tqdm.auto import tqdm
# ────────────────────── Config ───────────────────────
HF_TOKEN = "hf_XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
NUM_TRAIN = 50_000
NUM_VAL = 5_000
IMG_SIZE = 224
BATCH_SIZE = 128
EPOCHS = 12
LR = 1e-3
WARMUP_EPOCHS = 1
WEIGHT_DECAY = 0.05
LABEL_SMOOTHING = 0.1
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
TRAIN_DIR = "/kaggle/working/data/train"
VAL_DIR = "/kaggle/working/data/val"
MODEL_DIR = "/kaggle/working/rotation_model"
print(f"πŸ–₯️ Device: {DEVICE}")
if DEVICE.type == "cuda":
print(f" GPU: {torch.cuda.get_device_name()}")
print(f" VRAM: {torch.cuda.get_device_properties(0).total_memory/1e9:.1f} GB")
# ──────────── Download ImageNet-1k (Streaming) ──────────────
from huggingface_hub import login
login(token=HF_TOKEN)
def download_images(split, save_dir, num_images):
os.makedirs(save_dir, exist_ok=True)
existing = len([f for f in os.listdir(save_dir) if f.endswith(".jpg")])
if existing >= num_images:
print(f" βœ“ {save_dir}: {existing} images already exist β†’ skipping.")
return
ds = load_dataset("ILSVRC/imagenet-1k", split=split,
streaming=True, trust_remote_code=True, token=HF_TOKEN)
count = 0
for ex in tqdm(ds, total=num_images, desc=f" ↓ {split}"):
if count >= num_images:
break
try:
img = ex["image"].convert("RGB")
w, h = img.size
if min(w, h) > 480:
s = 480 / min(w, h)
img = img.resize((int(w*s), int(h*s)), Image.BILINEAR)
img.save(os.path.join(save_dir, f"{count}.jpg"), quality=90)
count += 1
except Exception:
continue
print(f" βœ“ {count} Images β†’ {save_dir}")
print("\nπŸ“₯ Loading images from ImageNet-1k …")
download_images("train", TRAIN_DIR, NUM_TRAIN)
download_images("validation", VAL_DIR, NUM_VAL)
# ──────────────────── Rotation-Dataset ───────────────────────
ANGLES = [0, 90, 180, 270]
ANGLE_NAMES = ["0Β° (original)", "90Β° CCW", "180Β°", "270Β° CCW (=90Β° CW)"]
class RotationDataset(Dataset):
def __init__(self, img_dir, num_imgs, transform, all_rotations=False):
self.img_dir = img_dir
self.num_imgs = num_imgs
self.transform = transform
self.all_rot = all_rotations
def __len__(self):
return self.num_imgs * 4 if self.all_rot else self.num_imgs
def __getitem__(self, idx):
if self.all_rot:
img_idx, label = idx // 4, idx % 4
else:
img_idx, label = idx, random.randint(0, 3)
img = Image.open(os.path.join(self.img_dir, f"{img_idx}.jpg")).convert("RGB")
angle = ANGLES[label]
if angle == 90: img = img.transpose(Image.ROTATE_90)
elif angle == 180: img = img.transpose(Image.ROTATE_180)
elif angle == 270: img = img.transpose(Image.ROTATE_270)
return self.transform(img), label
train_tf = transforms.Compose([
transforms.Resize(256),
transforms.RandomCrop(IMG_SIZE),
transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.2, hue=0.05),
transforms.RandomGrayscale(p=0.05),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
transforms.RandomErasing(p=0.1),
])
val_tf = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(IMG_SIZE),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])
train_ds = RotationDataset(TRAIN_DIR, NUM_TRAIN, train_tf, all_rotations=True)
val_ds = RotationDataset(VAL_DIR, NUM_VAL, val_tf, all_rotations=True)
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
num_workers=2, pin_memory=True, drop_last=True)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False,
num_workers=2, pin_memory=True)
print(f"\nπŸ“Š Dataset size:")
print(f" Train: {len(train_ds):>8,} ({NUM_TRAIN:,} images Γ— 4 rotations)")
print(f" Val: {len(val_ds):>8,} ({NUM_VAL:,} images Γ— 4 rotations)")
# ────────────────── Modell: ResNet-18 from scratch ───────────────
config = ResNetConfig(
num_channels=3,
embedding_size=64,
hidden_sizes=[64, 128, 256, 512], # 4 Stages
depths=[2, 2, 2, 2], # β†’ ResNet-18
layer_type="basic",
hidden_act="relu",
num_labels=4, # 0Β°, 90Β°, 180Β°, 270Β°
)
model = ResNetForImageClassification(config).to(DEVICE)
n_params = sum(p.numel() for p in model.parameters())
print(f"\nπŸ—οΈ Model: ResNet-18 from scratch β€” {n_params:,} parameters")
# ────────────────────── Training-Setup ───────────────────────
optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)
total_steps = len(train_loader) * EPOCHS
warmup_steps = len(train_loader) * WARMUP_EPOCHS
def lr_lambda(step):
if step < warmup_steps:
return step / max(warmup_steps, 1)
progress = (step - warmup_steps) / max(total_steps - warmup_steps, 1)
return 0.5 * (1.0 + math.cos(math.pi * progress))
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
scaler = torch.cuda.amp.GradScaler()
criterion = nn.CrossEntropyLoss(label_smoothing=LABEL_SMOOTHING)
# ────────────────────── Training-Loop ────────────────────────
best_val_acc = 0.0
print(f"\nπŸš€ Starting training: {EPOCHS} epochs, {total_steps:,} steps\n")
for epoch in range(EPOCHS):
t0 = time.time()
# ---- Train ----
model.train()
run_loss = correct = total = 0
pbar = tqdm(train_loader, desc=f"Ep {epoch+1:2d}/{EPOCHS} [Train]", leave=False)
for imgs, labels in pbar:
imgs = imgs.to(DEVICE, non_blocking=True)
labels = labels.to(DEVICE, non_blocking=True)
with torch.cuda.amp.autocast():
logits = model(pixel_values=imgs).logits
loss = criterion(logits, labels)
optimizer.zero_grad(set_to_none=True)
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
scaler.step(optimizer)
scaler.update()
scheduler.step()
bs = labels.size(0)
run_loss += loss.item() * bs
correct += (logits.argmax(1) == labels).sum().item()
total += bs
pbar.set_postfix(loss=f"{run_loss/total:.4f}", acc=f"{100*correct/total:.1f}%")
train_acc = 100 * correct / total
# ---- Validate ----
model.eval()
v_correct = v_total = 0
v_loss = 0.0
cls_correct = [0]*4
cls_total = [0]*4
with torch.no_grad():
for imgs, labels in tqdm(val_loader, desc=f"Ep {epoch+1:2d}/{EPOCHS} [Val] ", leave=False):
imgs = imgs.to(DEVICE, non_blocking=True)
labels = labels.to(DEVICE, non_blocking=True)
with torch.cuda.amp.autocast():
logits = model(pixel_values=imgs).logits
loss = criterion(logits, labels)
preds = logits.argmax(1)
bs = labels.size(0)
v_loss += loss.item() * bs
v_correct += (preds == labels).sum().item()
v_total += bs
for c in range(4):
mask = (labels == c)
cls_correct[c] += (preds[mask] == labels[mask]).sum().item()
cls_total[c] += mask.sum().item()
val_acc = 100 * v_correct / v_total
dt = time.time() - t0
print(f"Epoch {epoch+1:2d}/{EPOCHS} β”‚ "
f"Train {train_acc:.1f}% β”‚ Val {val_acc:.2f}% β”‚ "
f"LR {scheduler.get_last_lr()[0]:.6f} β”‚ {dt:.0f}s")
for c in range(4):
ca = 100*cls_correct[c]/max(cls_total[c],1)
print(f" {ANGLE_NAMES[c]:>25s}: {ca:.1f}%")
if val_acc > best_val_acc:
best_val_acc = val_acc
model.save_pretrained(MODEL_DIR)
print(f" βœ… New best model saved β†’ {MODEL_DIR}")
print()
# ── Fertig ──
print("=" * 60)
print(f"πŸ† Training finished! Best Val-Accuracy: {best_val_acc:.2f}%")
print(f" Model: {MODEL_DIR}")