""" VERIDEX — GPU Trainer v4.0 (RTX 3050 Optimized) ================================================= RTX 3050 ki specially optimize chesina trainer. 70,000 images support chestundi. DATASET STRUCTURE (mandatory): trainer/ dataset/ train/ real/ <- Training real images (70% = ~24,500) fake/ <- Training fake images (70% = ~24,500) val/ real/ <- Validation real images (15% = ~5,250) fake/ <- Validation fake images (15% = ~5,250) test/ real/ <- Final test real images (15% = ~5,250) fake/ <- Final test fake images (15% = ~5,250) Quick split for 70k images (35k real + 35k fake): train/ = 49,000 images (24,500 real + 24,500 fake) val/ = 10,500 images ( 5,250 real + 5,250 fake) test/ = 10,500 images ( 5,250 real + 5,250 fake) USAGE (Windows CMD - VERIDEX/trainer folder lo): venv_gpu\\Scripts\\activate python train_gpu.py OUTPUT: weights/efficientnet_deepfake.pth <- model weights weights/efficientnet_b4_meta.json <- metadata + test results After training complete: Copy both files to VERIDEX/backend/weights/ """ import os, json, time import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset import torchvision.transforms as T from PIL import Image import timm from tqdm import tqdm # ── RTX 3050 Config (4GB VRAM optimized) ────────────────────── CFG = { "dataset_dir": "dataset", "weights_dir": "weights", "output_name": "efficientnet_deepfake", "img_size": 256, # 224 kante better clarity, 320 kante faster "batch_size": 24, # 256px tho 24 RTX 3050 ki safe "epochs": 25, "lr": 2e-4, "weight_decay": 1e-4, "num_workers": 0, # Windows lo MUST be 0 "fake_label": 0, # 0=fake, 1=real (marchipoyoddu!) "early_stop": 6, "use_amp": True, # Mixed Precision (RTX 3050 support) "grad_accum": 1, # effective batch = 24 } SUPPORTED = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".tiff"} os.makedirs(CFG["weights_dir"], exist_ok=True) os.makedirs("training_logs", exist_ok=True) # ── Dataset Class ───────────────────────────────────────────── class DeepfakeDataset(Dataset): def __init__(self, samples, transform): self.samples = samples self.transform = transform def __len__(self): return len(self.samples) def __getitem__(self, idx): path, label = self.samples[idx] try: img = Image.open(path).convert("RGB") except Exception: img = Image.new("RGB", (CFG["img_size"], CFG["img_size"])) return self.transform(img), label # ── Load split folder ───────────────────────────────────────── def load_split(split_name): """ split_name: 'train', 'val', or 'test' dataset//real/ and dataset//fake/ nundi load """ real_dir = os.path.join(CFG["dataset_dir"], split_name, "real") fake_dir = os.path.join(CFG["dataset_dir"], split_name, "fake") for d in [real_dir, fake_dir]: if not os.path.exists(d): print(f"\n ERROR: Folder not found: {d}") print(f" Create cheyyi: trainer/dataset/{split_name}/real/") print(f" trainer/dataset/{split_name}/fake/") raise SystemExit(1) samples = [] for label_int, folder in [(1, real_dir), (CFG["fake_label"], fake_dir)]: files = [ f for f in os.listdir(folder) if os.path.splitext(f)[1].lower() in SUPPORTED and not f.startswith(".") # .gitkeep etc skip ] for f in files: samples.append((os.path.join(folder, f), label_int)) real_c = sum(1 for _, l in samples if l == 1) fake_c = sum(1 for _, l in samples if l == 0) print(f" {split_name:5s} | real: {real_c:7,} fake: {fake_c:7,} total: {len(samples):7,}") if len(samples) == 0: print(f"\n ERROR: {split_name}/ lo images lev! Images add chesii malli run cheyyi.") raise SystemExit(1) if real_c > 0 and fake_c > 0: ratio = max(real_c, fake_c) / min(real_c, fake_c) if ratio > 1.5: print(f" WARNING: {split_name} imbalanced ({ratio:.1f}x) — real:fake 1:1 better!") return samples # ── Transforms ──────────────────────────────────────────────── def get_transforms(): mean = [0.485, 0.456, 0.406] std = [0.229, 0.224, 0.225] sz = CFG["img_size"] train_tf = T.Compose([ T.Resize((sz + 40, sz + 40)), T.RandomCrop((sz, sz)), T.RandomHorizontalFlip(0.5), T.RandomVerticalFlip(0.05), T.ColorJitter(brightness=0.25, contrast=0.25, saturation=0.15, hue=0.08), T.RandomGrayscale(p=0.05), T.RandomRotation(10), T.GaussianBlur(kernel_size=3, sigma=(0.1, 1.0)), T.ToTensor(), T.Normalize(mean, std), ]) eval_tf = T.Compose([ T.Resize((sz, sz)), T.ToTensor(), T.Normalize(mean, std), ]) return train_tf, eval_tf # ── Model ───────────────────────────────────────────────────── def build_model(device): print("\n Building EfficientNet-B4...") import torch.backends.cudnn as cudnn cudnn.benchmark = True cudnn.deterministic = False model = timm.create_model("efficientnet_b4", pretrained=True) model.classifier = nn.Sequential( nn.Dropout(0.4), nn.Linear(model.num_features, 512), nn.GELU(), nn.BatchNorm1d(512), nn.Dropout(0.3), nn.Linear(512, 2), ) model = model.to(device) params = sum(p.numel() for p in model.parameters() if p.requires_grad) print(f" Trainable params: {params:,}") return model # ── Test Set Evaluation ─────────────────────────────────────── def evaluate_test(model, test_dl, device, scaler): model.eval() correct = total = 0 tp = fp = tn = fn = 0 with torch.no_grad(): for imgs, labels in tqdm(test_dl, desc=" Test eval", ncols=65, leave=False): imgs = imgs.to(device, non_blocking=True) labels = labels.to(device, non_blocking=True) if scaler: with torch.cuda.amp.autocast(): out = model(imgs) else: out = model(imgs) preds = out.argmax(1) correct += (preds == labels).sum().item() total += len(labels) for p, l in zip(preds.cpu(), labels.cpu()): p, l = int(p), int(l) if p == 1 and l == 1: tp += 1 if p == 1 and l == 0: fp += 1 if p == 0 and l == 0: tn += 1 if p == 0 and l == 1: fn += 1 acc = correct / max(total, 1) precision = tp / max(tp + fp, 1) recall = tp / max(tp + fn, 1) f1 = 2 * precision * recall / max(precision + recall, 1e-9) return { "test_accuracy": round(acc, 4), "precision": round(precision, 4), "recall": round(recall, 4), "f1_score": round(f1, 4), "true_positive": tp, "false_positive": fp, "true_negative": tn, "false_negative": fn, } # ── Main Training ───────────────────────────────────────────── def train(): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print("=" * 65) print(" VERIDEX Trainer v4.0 | RTX 3050 GPU Optimized") print("=" * 65) if device.type == "cuda": gpu_name = torch.cuda.get_device_name(0) vram_gb = torch.cuda.get_device_properties(0).total_memory / 1e9 print(f"\n GPU : {gpu_name}") print(f" VRAM : {vram_gb:.1f} GB") print(f" AMP : {'ON (faster!)' if CFG['use_amp'] else 'OFF'}") if vram_gb < 3.5: print(f"\n Low VRAM! batch_size 12 -> 6 reduce chestunna...") CFG["batch_size"] = 6 else: print(f"\n WARNING: GPU not found! CPU mode (slow)") print(f" nvidia-smi run chesii CUDA check cheyyi") CFG["use_amp"] = False print(f"\n Config:") print(f" Image size : {CFG['img_size']}x{CFG['img_size']}") print(f" Batch size : {CFG['batch_size']} (effective: {CFG['batch_size'] * CFG['grad_accum']})") print(f" Epochs : {CFG['epochs']}") print(f" Device : {device}") # Load datasets print(f"\n Dataset loading...") print(f" {'split':5s} | {'real':>12} {'fake':>12} {'total':>12}") print(f" {'-'*55}") train_samples = load_split("train") val_samples = load_split("val") test_samples = load_split("test") grand_total = len(train_samples) + len(val_samples) + len(test_samples) print(f" {'-'*55}") print(f" TOTAL {grand_total:>12,}") # DataLoaders train_tf, eval_tf = get_transforms() train_ds = DeepfakeDataset(train_samples, train_tf) val_ds = DeepfakeDataset(val_samples, eval_tf) test_ds = DeepfakeDataset(test_samples, eval_tf) train_dl = DataLoader(train_ds, batch_size=CFG["batch_size"], shuffle=True, num_workers=CFG["num_workers"], pin_memory=(device.type == "cuda")) val_dl = DataLoader(val_ds, batch_size=CFG["batch_size"], shuffle=False, num_workers=CFG["num_workers"], pin_memory=(device.type == "cuda")) test_dl = DataLoader(test_ds, batch_size=CFG["batch_size"], shuffle=False, num_workers=CFG["num_workers"], pin_memory=(device.type == "cuda")) # Model + optimizer model = build_model(device) optimizer = optim.AdamW(model.parameters(), lr=CFG["lr"], weight_decay=CFG["weight_decay"]) scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CFG["epochs"], eta_min=1e-6) criterion = nn.CrossEntropyLoss(label_smoothing=0.05) scaler = torch.cuda.amp.GradScaler() if CFG["use_amp"] and device.type == "cuda" else None best_val_acc = 0.0 no_improve = 0 log_path = f"training_logs/train_{int(time.time())}.csv" log_lines = ["epoch,train_loss,train_acc,val_loss,val_acc,lr"] start_time = time.time() print(f"\n Training started!\n") print(f" {'Epoch':>5} {'TrnLoss':>8} {'TrnAcc':>7} {'ValLoss':>8} {'ValAcc':>7} {'Note'}") print(f" {'-'*65}") for epoch in range(1, CFG["epochs"] + 1): ep_start = time.time() # TRAIN model.train() t_loss = t_correct = t_total = 0 optimizer.zero_grad() for step, (imgs, labels) in enumerate( tqdm(train_dl, desc=f" Ep{epoch:02d} TRAIN", leave=False, ncols=60) ): imgs = imgs.to(device, non_blocking=True) labels = labels.to(device, non_blocking=True) if scaler: with torch.cuda.amp.autocast(): out = model(imgs) loss = criterion(out, labels) / CFG["grad_accum"] scaler.scale(loss).backward() if (step + 1) % CFG["grad_accum"] == 0: scaler.unscale_(optimizer) torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) scaler.step(optimizer) scaler.update() optimizer.zero_grad() else: out = model(imgs) loss = criterion(out, labels) / CFG["grad_accum"] loss.backward() if (step + 1) % CFG["grad_accum"] == 0: torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() optimizer.zero_grad() t_loss += loss.item() * CFG["grad_accum"] * len(imgs) t_correct += (out.argmax(1) == labels).sum().item() t_total += len(imgs) # VALIDATION model.eval() v_loss = v_correct = v_total = 0 with torch.no_grad(): for imgs, labels in tqdm( val_dl, desc=f" Ep{epoch:02d} VAL ", leave=False, ncols=60 ): imgs = imgs.to(device, non_blocking=True) labels = labels.to(device, non_blocking=True) if scaler: with torch.cuda.amp.autocast(): out = model(imgs) loss = criterion(out, labels) else: out = model(imgs) loss = criterion(out, labels) v_loss += loss.item() * len(imgs) v_correct += (out.argmax(1) == labels).sum().item() v_total += len(imgs) scheduler.step() current_lr = scheduler.get_last_lr()[0] tl = t_loss / max(t_total, 1) ta = t_correct / max(t_total, 1) vl = v_loss / max(v_total, 1) va = v_correct / max(v_total, 1) elapsed = time.time() - ep_start eta_min = elapsed * (CFG["epochs"] - epoch) / 60 log_lines.append(f"{epoch},{tl:.4f},{ta:.4f},{vl:.4f},{va:.4f},{current_lr:.6f}") if va > best_val_acc: best_val_acc = va no_improve = 0 wpath = os.path.join(CFG["weights_dir"], f"{CFG['output_name']}.pth") torch.save(model.state_dict(), wpath) note = f"BEST! saved (ETA {eta_min:.0f}m)" else: no_improve += 1 note = f"no improve {no_improve}/{CFG['early_stop']} (ETA {eta_min:.0f}m)" if no_improve >= CFG["early_stop"]: print(f" {epoch:5d} {tl:8.4f} {ta:7.1%} {vl:8.4f} {va:7.1%} Early stop!") break print(f" {epoch:5d} {tl:8.4f} {ta:7.1%} {vl:8.4f} {va:7.1%} {note}") # Save log with open(log_path, "w") as f: f.write("\n".join(log_lines)) # FINAL TEST EVALUATION print(f"\n {'='*65}") print(f" Final TEST set evaluation (unseen data)...") wpath = os.path.join(CFG["weights_dir"], f"{CFG['output_name']}.pth") model.load_state_dict(torch.load(wpath, map_location=device)) res = evaluate_test(model, test_dl, device, scaler) print(f"\n Test Accuracy : {res['test_accuracy']:.1%}") print(f" Precision : {res['precision']:.1%}") print(f" Recall : {res['recall']:.1%}") print(f" F1 Score : {res['f1_score']:.1%}") print(f"\n Confusion Matrix:") print(f" True Positive : {res['true_positive']:,}") print(f" False Positive : {res['false_positive']:,}") print(f" True Negative : {res['true_negative']:,}") print(f" False Negative : {res['false_negative']:,}") # Save metadata meta = { "model": "EfficientNet-B4", "trained_on": "RTX 3050 GPU", "fake_label": CFG["fake_label"], "img_size": CFG["img_size"], "best_val_acc": round(best_val_acc, 4), "epochs_trained": epoch, "dataset_size": { "train": len(train_samples), "val": len(val_samples), "test": len(test_samples), "total": grand_total, }, "test_results": res, } with open(os.path.join(CFG["weights_dir"], "efficientnet_b4_meta.json"), "w") as f: json.dump(meta, f, indent=2) total_time = (time.time() - start_time) / 60 print(f"\n {'='*65}") print(f" TRAINING COMPLETE!") print(f" Best val accuracy : {best_val_acc:.1%}") print(f" Final test accuracy : {res['test_accuracy']:.1%}") print(f" F1 Score : {res['f1_score']:.1%}") print(f" Total time : {total_time:.0f} minutes") print(f"\n NEXT STEP:") print(f" 1. trainer\\weights\\efficientnet_deepfake.pth --> copy") print(f" 2. trainer\\weights\\efficientnet_b4_meta.json --> copy") print(f" 3. VERIDEX\\backend\\weights\\ lo paste cheyyi") print(f" 4. Backend restart: uvicorn main:app --reload --port 8000") print(f" {'='*65}") if __name__ == "__main__": train()