VERIDEX.V1 / trainer /train_gpu.py
shadow55gh
fix: remove node_modules and cache from tracking
81f9dfe
"""
VERIDEX β€” GPU Trainer v4.0 (RTX 3050 Optimized)
=================================================
RTX 3050 ki specially optimize chesina trainer.
70,000 images support chestundi.
DATASET STRUCTURE (mandatory):
trainer/
dataset/
train/
real/ <- Training real images (70% = ~24,500)
fake/ <- Training fake images (70% = ~24,500)
val/
real/ <- Validation real images (15% = ~5,250)
fake/ <- Validation fake images (15% = ~5,250)
test/
real/ <- Final test real images (15% = ~5,250)
fake/ <- Final test fake images (15% = ~5,250)
Quick split for 70k images (35k real + 35k fake):
train/ = 49,000 images (24,500 real + 24,500 fake)
val/ = 10,500 images ( 5,250 real + 5,250 fake)
test/ = 10,500 images ( 5,250 real + 5,250 fake)
USAGE (Windows CMD - VERIDEX/trainer folder lo):
venv_gpu\\Scripts\\activate
python train_gpu.py
OUTPUT:
weights/efficientnet_deepfake.pth <- model weights
weights/efficientnet_b4_meta.json <- metadata + test results
After training complete:
Copy both files to VERIDEX/backend/weights/
"""
import os, json, time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as T
from PIL import Image
import timm
from tqdm import tqdm
# ── RTX 3050 Config (4GB VRAM optimized) ──────────────────────
CFG = {
"dataset_dir": "dataset",
"weights_dir": "weights",
"output_name": "efficientnet_deepfake",
"img_size": 256, # 224 kante better clarity, 320 kante faster
"batch_size": 24, # 256px tho 24 RTX 3050 ki safe
"epochs": 25,
"lr": 2e-4,
"weight_decay": 1e-4,
"num_workers": 0, # Windows lo MUST be 0
"fake_label": 0, # 0=fake, 1=real (marchipoyoddu!)
"early_stop": 6,
"use_amp": True, # Mixed Precision (RTX 3050 support)
"grad_accum": 1, # effective batch = 24
}
SUPPORTED = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".tiff"}
os.makedirs(CFG["weights_dir"], exist_ok=True)
os.makedirs("training_logs", exist_ok=True)
# ── Dataset Class ─────────────────────────────────────────────
class DeepfakeDataset(Dataset):
def __init__(self, samples, transform):
self.samples = samples
self.transform = transform
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
path, label = self.samples[idx]
try:
img = Image.open(path).convert("RGB")
except Exception:
img = Image.new("RGB", (CFG["img_size"], CFG["img_size"]))
return self.transform(img), label
# ── Load split folder ─────────────────────────────────────────
def load_split(split_name):
"""
split_name: 'train', 'val', or 'test'
dataset/<split_name>/real/ and dataset/<split_name>/fake/ nundi load
"""
real_dir = os.path.join(CFG["dataset_dir"], split_name, "real")
fake_dir = os.path.join(CFG["dataset_dir"], split_name, "fake")
for d in [real_dir, fake_dir]:
if not os.path.exists(d):
print(f"\n ERROR: Folder not found: {d}")
print(f" Create cheyyi: trainer/dataset/{split_name}/real/")
print(f" trainer/dataset/{split_name}/fake/")
raise SystemExit(1)
samples = []
for label_int, folder in [(1, real_dir), (CFG["fake_label"], fake_dir)]:
files = [
f for f in os.listdir(folder)
if os.path.splitext(f)[1].lower() in SUPPORTED
and not f.startswith(".") # .gitkeep etc skip
]
for f in files:
samples.append((os.path.join(folder, f), label_int))
real_c = sum(1 for _, l in samples if l == 1)
fake_c = sum(1 for _, l in samples if l == 0)
print(f" {split_name:5s} | real: {real_c:7,} fake: {fake_c:7,} total: {len(samples):7,}")
if len(samples) == 0:
print(f"\n ERROR: {split_name}/ lo images lev! Images add chesii malli run cheyyi.")
raise SystemExit(1)
if real_c > 0 and fake_c > 0:
ratio = max(real_c, fake_c) / min(real_c, fake_c)
if ratio > 1.5:
print(f" WARNING: {split_name} imbalanced ({ratio:.1f}x) β€” real:fake 1:1 better!")
return samples
# ── Transforms ────────────────────────────────────────────────
def get_transforms():
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
sz = CFG["img_size"]
train_tf = T.Compose([
T.Resize((sz + 40, sz + 40)),
T.RandomCrop((sz, sz)),
T.RandomHorizontalFlip(0.5),
T.RandomVerticalFlip(0.05),
T.ColorJitter(brightness=0.25, contrast=0.25, saturation=0.15, hue=0.08),
T.RandomGrayscale(p=0.05),
T.RandomRotation(10),
T.GaussianBlur(kernel_size=3, sigma=(0.1, 1.0)),
T.ToTensor(),
T.Normalize(mean, std),
])
eval_tf = T.Compose([
T.Resize((sz, sz)),
T.ToTensor(),
T.Normalize(mean, std),
])
return train_tf, eval_tf
# ── Model ─────────────────────────────────────────────────────
def build_model(device):
print("\n Building EfficientNet-B4...")
import torch.backends.cudnn as cudnn
cudnn.benchmark = True
cudnn.deterministic = False
model = timm.create_model("efficientnet_b4", pretrained=True)
model.classifier = nn.Sequential(
nn.Dropout(0.4),
nn.Linear(model.num_features, 512),
nn.GELU(),
nn.BatchNorm1d(512),
nn.Dropout(0.3),
nn.Linear(512, 2),
)
model = model.to(device)
params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f" Trainable params: {params:,}")
return model
# ── Test Set Evaluation ───────────────────────────────────────
def evaluate_test(model, test_dl, device, scaler):
model.eval()
correct = total = 0
tp = fp = tn = fn = 0
with torch.no_grad():
for imgs, labels in tqdm(test_dl, desc=" Test eval", ncols=65, leave=False):
imgs = imgs.to(device, non_blocking=True)
labels = labels.to(device, non_blocking=True)
if scaler:
with torch.cuda.amp.autocast():
out = model(imgs)
else:
out = model(imgs)
preds = out.argmax(1)
correct += (preds == labels).sum().item()
total += len(labels)
for p, l in zip(preds.cpu(), labels.cpu()):
p, l = int(p), int(l)
if p == 1 and l == 1: tp += 1
if p == 1 and l == 0: fp += 1
if p == 0 and l == 0: tn += 1
if p == 0 and l == 1: fn += 1
acc = correct / max(total, 1)
precision = tp / max(tp + fp, 1)
recall = tp / max(tp + fn, 1)
f1 = 2 * precision * recall / max(precision + recall, 1e-9)
return {
"test_accuracy": round(acc, 4),
"precision": round(precision, 4),
"recall": round(recall, 4),
"f1_score": round(f1, 4),
"true_positive": tp,
"false_positive": fp,
"true_negative": tn,
"false_negative": fn,
}
# ── Main Training ─────────────────────────────────────────────
def train():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("=" * 65)
print(" VERIDEX Trainer v4.0 | RTX 3050 GPU Optimized")
print("=" * 65)
if device.type == "cuda":
gpu_name = torch.cuda.get_device_name(0)
vram_gb = torch.cuda.get_device_properties(0).total_memory / 1e9
print(f"\n GPU : {gpu_name}")
print(f" VRAM : {vram_gb:.1f} GB")
print(f" AMP : {'ON (faster!)' if CFG['use_amp'] else 'OFF'}")
if vram_gb < 3.5:
print(f"\n Low VRAM! batch_size 12 -> 6 reduce chestunna...")
CFG["batch_size"] = 6
else:
print(f"\n WARNING: GPU not found! CPU mode (slow)")
print(f" nvidia-smi run chesii CUDA check cheyyi")
CFG["use_amp"] = False
print(f"\n Config:")
print(f" Image size : {CFG['img_size']}x{CFG['img_size']}")
print(f" Batch size : {CFG['batch_size']} (effective: {CFG['batch_size'] * CFG['grad_accum']})")
print(f" Epochs : {CFG['epochs']}")
print(f" Device : {device}")
# Load datasets
print(f"\n Dataset loading...")
print(f" {'split':5s} | {'real':>12} {'fake':>12} {'total':>12}")
print(f" {'-'*55}")
train_samples = load_split("train")
val_samples = load_split("val")
test_samples = load_split("test")
grand_total = len(train_samples) + len(val_samples) + len(test_samples)
print(f" {'-'*55}")
print(f" TOTAL {grand_total:>12,}")
# DataLoaders
train_tf, eval_tf = get_transforms()
train_ds = DeepfakeDataset(train_samples, train_tf)
val_ds = DeepfakeDataset(val_samples, eval_tf)
test_ds = DeepfakeDataset(test_samples, eval_tf)
train_dl = DataLoader(train_ds, batch_size=CFG["batch_size"], shuffle=True,
num_workers=CFG["num_workers"], pin_memory=(device.type == "cuda"))
val_dl = DataLoader(val_ds, batch_size=CFG["batch_size"], shuffle=False,
num_workers=CFG["num_workers"], pin_memory=(device.type == "cuda"))
test_dl = DataLoader(test_ds, batch_size=CFG["batch_size"], shuffle=False,
num_workers=CFG["num_workers"], pin_memory=(device.type == "cuda"))
# Model + optimizer
model = build_model(device)
optimizer = optim.AdamW(model.parameters(), lr=CFG["lr"], weight_decay=CFG["weight_decay"])
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CFG["epochs"], eta_min=1e-6)
criterion = nn.CrossEntropyLoss(label_smoothing=0.05)
scaler = torch.cuda.amp.GradScaler() if CFG["use_amp"] and device.type == "cuda" else None
best_val_acc = 0.0
no_improve = 0
log_path = f"training_logs/train_{int(time.time())}.csv"
log_lines = ["epoch,train_loss,train_acc,val_loss,val_acc,lr"]
start_time = time.time()
print(f"\n Training started!\n")
print(f" {'Epoch':>5} {'TrnLoss':>8} {'TrnAcc':>7} {'ValLoss':>8} {'ValAcc':>7} {'Note'}")
print(f" {'-'*65}")
for epoch in range(1, CFG["epochs"] + 1):
ep_start = time.time()
# TRAIN
model.train()
t_loss = t_correct = t_total = 0
optimizer.zero_grad()
for step, (imgs, labels) in enumerate(
tqdm(train_dl, desc=f" Ep{epoch:02d} TRAIN", leave=False, ncols=60)
):
imgs = imgs.to(device, non_blocking=True)
labels = labels.to(device, non_blocking=True)
if scaler:
with torch.cuda.amp.autocast():
out = model(imgs)
loss = criterion(out, labels) / CFG["grad_accum"]
scaler.scale(loss).backward()
if (step + 1) % CFG["grad_accum"] == 0:
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
else:
out = model(imgs)
loss = criterion(out, labels) / CFG["grad_accum"]
loss.backward()
if (step + 1) % CFG["grad_accum"] == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
optimizer.zero_grad()
t_loss += loss.item() * CFG["grad_accum"] * len(imgs)
t_correct += (out.argmax(1) == labels).sum().item()
t_total += len(imgs)
# VALIDATION
model.eval()
v_loss = v_correct = v_total = 0
with torch.no_grad():
for imgs, labels in tqdm(
val_dl, desc=f" Ep{epoch:02d} VAL ", leave=False, ncols=60
):
imgs = imgs.to(device, non_blocking=True)
labels = labels.to(device, non_blocking=True)
if scaler:
with torch.cuda.amp.autocast():
out = model(imgs)
loss = criterion(out, labels)
else:
out = model(imgs)
loss = criterion(out, labels)
v_loss += loss.item() * len(imgs)
v_correct += (out.argmax(1) == labels).sum().item()
v_total += len(imgs)
scheduler.step()
current_lr = scheduler.get_last_lr()[0]
tl = t_loss / max(t_total, 1)
ta = t_correct / max(t_total, 1)
vl = v_loss / max(v_total, 1)
va = v_correct / max(v_total, 1)
elapsed = time.time() - ep_start
eta_min = elapsed * (CFG["epochs"] - epoch) / 60
log_lines.append(f"{epoch},{tl:.4f},{ta:.4f},{vl:.4f},{va:.4f},{current_lr:.6f}")
if va > best_val_acc:
best_val_acc = va
no_improve = 0
wpath = os.path.join(CFG["weights_dir"], f"{CFG['output_name']}.pth")
torch.save(model.state_dict(), wpath)
note = f"BEST! saved (ETA {eta_min:.0f}m)"
else:
no_improve += 1
note = f"no improve {no_improve}/{CFG['early_stop']} (ETA {eta_min:.0f}m)"
if no_improve >= CFG["early_stop"]:
print(f" {epoch:5d} {tl:8.4f} {ta:7.1%} {vl:8.4f} {va:7.1%} Early stop!")
break
print(f" {epoch:5d} {tl:8.4f} {ta:7.1%} {vl:8.4f} {va:7.1%} {note}")
# Save log
with open(log_path, "w") as f:
f.write("\n".join(log_lines))
# FINAL TEST EVALUATION
print(f"\n {'='*65}")
print(f" Final TEST set evaluation (unseen data)...")
wpath = os.path.join(CFG["weights_dir"], f"{CFG['output_name']}.pth")
model.load_state_dict(torch.load(wpath, map_location=device))
res = evaluate_test(model, test_dl, device, scaler)
print(f"\n Test Accuracy : {res['test_accuracy']:.1%}")
print(f" Precision : {res['precision']:.1%}")
print(f" Recall : {res['recall']:.1%}")
print(f" F1 Score : {res['f1_score']:.1%}")
print(f"\n Confusion Matrix:")
print(f" True Positive : {res['true_positive']:,}")
print(f" False Positive : {res['false_positive']:,}")
print(f" True Negative : {res['true_negative']:,}")
print(f" False Negative : {res['false_negative']:,}")
# Save metadata
meta = {
"model": "EfficientNet-B4",
"trained_on": "RTX 3050 GPU",
"fake_label": CFG["fake_label"],
"img_size": CFG["img_size"],
"best_val_acc": round(best_val_acc, 4),
"epochs_trained": epoch,
"dataset_size": {
"train": len(train_samples),
"val": len(val_samples),
"test": len(test_samples),
"total": grand_total,
},
"test_results": res,
}
with open(os.path.join(CFG["weights_dir"], "efficientnet_b4_meta.json"), "w") as f:
json.dump(meta, f, indent=2)
total_time = (time.time() - start_time) / 60
print(f"\n {'='*65}")
print(f" TRAINING COMPLETE!")
print(f" Best val accuracy : {best_val_acc:.1%}")
print(f" Final test accuracy : {res['test_accuracy']:.1%}")
print(f" F1 Score : {res['f1_score']:.1%}")
print(f" Total time : {total_time:.0f} minutes")
print(f"\n NEXT STEP:")
print(f" 1. trainer\\weights\\efficientnet_deepfake.pth --> copy")
print(f" 2. trainer\\weights\\efficientnet_b4_meta.json --> copy")
print(f" 3. VERIDEX\\backend\\weights\\ lo paste cheyyi")
print(f" 4. Backend restart: uvicorn main:app --reload --port 8000")
print(f" {'='*65}")
if __name__ == "__main__":
train()