Spaces:
Sleeping
Sleeping
| """ | |
| VERIDEX β GPU Trainer v4.0 (RTX 3050 Optimized) | |
| ================================================= | |
| RTX 3050 ki specially optimize chesina trainer. | |
| 70,000 images support chestundi. | |
| DATASET STRUCTURE (mandatory): | |
| trainer/ | |
| dataset/ | |
| train/ | |
| real/ <- Training real images (70% = ~24,500) | |
| fake/ <- Training fake images (70% = ~24,500) | |
| val/ | |
| real/ <- Validation real images (15% = ~5,250) | |
| fake/ <- Validation fake images (15% = ~5,250) | |
| test/ | |
| real/ <- Final test real images (15% = ~5,250) | |
| fake/ <- Final test fake images (15% = ~5,250) | |
| Quick split for 70k images (35k real + 35k fake): | |
| train/ = 49,000 images (24,500 real + 24,500 fake) | |
| val/ = 10,500 images ( 5,250 real + 5,250 fake) | |
| test/ = 10,500 images ( 5,250 real + 5,250 fake) | |
| USAGE (Windows CMD - VERIDEX/trainer folder lo): | |
| venv_gpu\\Scripts\\activate | |
| python train_gpu.py | |
| OUTPUT: | |
| weights/efficientnet_deepfake.pth <- model weights | |
| weights/efficientnet_b4_meta.json <- metadata + test results | |
| After training complete: | |
| Copy both files to VERIDEX/backend/weights/ | |
| """ | |
| import os, json, time | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, Dataset | |
| import torchvision.transforms as T | |
| from PIL import Image | |
| import timm | |
| from tqdm import tqdm | |
| # ββ RTX 3050 Config (4GB VRAM optimized) ββββββββββββββββββββββ | |
| CFG = { | |
| "dataset_dir": "dataset", | |
| "weights_dir": "weights", | |
| "output_name": "efficientnet_deepfake", | |
| "img_size": 256, # 224 kante better clarity, 320 kante faster | |
| "batch_size": 24, # 256px tho 24 RTX 3050 ki safe | |
| "epochs": 25, | |
| "lr": 2e-4, | |
| "weight_decay": 1e-4, | |
| "num_workers": 0, # Windows lo MUST be 0 | |
| "fake_label": 0, # 0=fake, 1=real (marchipoyoddu!) | |
| "early_stop": 6, | |
| "use_amp": True, # Mixed Precision (RTX 3050 support) | |
| "grad_accum": 1, # effective batch = 24 | |
| } | |
| SUPPORTED = {".jpg", ".jpeg", ".png", ".webp", ".bmp", ".tiff"} | |
| os.makedirs(CFG["weights_dir"], exist_ok=True) | |
| os.makedirs("training_logs", exist_ok=True) | |
| # ββ Dataset Class βββββββββββββββββββββββββββββββββββββββββββββ | |
| class DeepfakeDataset(Dataset): | |
| def __init__(self, samples, transform): | |
| self.samples = samples | |
| self.transform = transform | |
| def __len__(self): | |
| return len(self.samples) | |
| def __getitem__(self, idx): | |
| path, label = self.samples[idx] | |
| try: | |
| img = Image.open(path).convert("RGB") | |
| except Exception: | |
| img = Image.new("RGB", (CFG["img_size"], CFG["img_size"])) | |
| return self.transform(img), label | |
| # ββ Load split folder βββββββββββββββββββββββββββββββββββββββββ | |
| def load_split(split_name): | |
| """ | |
| split_name: 'train', 'val', or 'test' | |
| dataset/<split_name>/real/ and dataset/<split_name>/fake/ nundi load | |
| """ | |
| real_dir = os.path.join(CFG["dataset_dir"], split_name, "real") | |
| fake_dir = os.path.join(CFG["dataset_dir"], split_name, "fake") | |
| for d in [real_dir, fake_dir]: | |
| if not os.path.exists(d): | |
| print(f"\n ERROR: Folder not found: {d}") | |
| print(f" Create cheyyi: trainer/dataset/{split_name}/real/") | |
| print(f" trainer/dataset/{split_name}/fake/") | |
| raise SystemExit(1) | |
| samples = [] | |
| for label_int, folder in [(1, real_dir), (CFG["fake_label"], fake_dir)]: | |
| files = [ | |
| f for f in os.listdir(folder) | |
| if os.path.splitext(f)[1].lower() in SUPPORTED | |
| and not f.startswith(".") # .gitkeep etc skip | |
| ] | |
| for f in files: | |
| samples.append((os.path.join(folder, f), label_int)) | |
| real_c = sum(1 for _, l in samples if l == 1) | |
| fake_c = sum(1 for _, l in samples if l == 0) | |
| print(f" {split_name:5s} | real: {real_c:7,} fake: {fake_c:7,} total: {len(samples):7,}") | |
| if len(samples) == 0: | |
| print(f"\n ERROR: {split_name}/ lo images lev! Images add chesii malli run cheyyi.") | |
| raise SystemExit(1) | |
| if real_c > 0 and fake_c > 0: | |
| ratio = max(real_c, fake_c) / min(real_c, fake_c) | |
| if ratio > 1.5: | |
| print(f" WARNING: {split_name} imbalanced ({ratio:.1f}x) β real:fake 1:1 better!") | |
| return samples | |
| # ββ Transforms ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_transforms(): | |
| mean = [0.485, 0.456, 0.406] | |
| std = [0.229, 0.224, 0.225] | |
| sz = CFG["img_size"] | |
| train_tf = T.Compose([ | |
| T.Resize((sz + 40, sz + 40)), | |
| T.RandomCrop((sz, sz)), | |
| T.RandomHorizontalFlip(0.5), | |
| T.RandomVerticalFlip(0.05), | |
| T.ColorJitter(brightness=0.25, contrast=0.25, saturation=0.15, hue=0.08), | |
| T.RandomGrayscale(p=0.05), | |
| T.RandomRotation(10), | |
| T.GaussianBlur(kernel_size=3, sigma=(0.1, 1.0)), | |
| T.ToTensor(), | |
| T.Normalize(mean, std), | |
| ]) | |
| eval_tf = T.Compose([ | |
| T.Resize((sz, sz)), | |
| T.ToTensor(), | |
| T.Normalize(mean, std), | |
| ]) | |
| return train_tf, eval_tf | |
| # ββ Model βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_model(device): | |
| print("\n Building EfficientNet-B4...") | |
| import torch.backends.cudnn as cudnn | |
| cudnn.benchmark = True | |
| cudnn.deterministic = False | |
| model = timm.create_model("efficientnet_b4", pretrained=True) | |
| model.classifier = nn.Sequential( | |
| nn.Dropout(0.4), | |
| nn.Linear(model.num_features, 512), | |
| nn.GELU(), | |
| nn.BatchNorm1d(512), | |
| nn.Dropout(0.3), | |
| nn.Linear(512, 2), | |
| ) | |
| model = model.to(device) | |
| params = sum(p.numel() for p in model.parameters() if p.requires_grad) | |
| print(f" Trainable params: {params:,}") | |
| return model | |
| # ββ Test Set Evaluation βββββββββββββββββββββββββββββββββββββββ | |
| def evaluate_test(model, test_dl, device, scaler): | |
| model.eval() | |
| correct = total = 0 | |
| tp = fp = tn = fn = 0 | |
| with torch.no_grad(): | |
| for imgs, labels in tqdm(test_dl, desc=" Test eval", ncols=65, leave=False): | |
| imgs = imgs.to(device, non_blocking=True) | |
| labels = labels.to(device, non_blocking=True) | |
| if scaler: | |
| with torch.cuda.amp.autocast(): | |
| out = model(imgs) | |
| else: | |
| out = model(imgs) | |
| preds = out.argmax(1) | |
| correct += (preds == labels).sum().item() | |
| total += len(labels) | |
| for p, l in zip(preds.cpu(), labels.cpu()): | |
| p, l = int(p), int(l) | |
| if p == 1 and l == 1: tp += 1 | |
| if p == 1 and l == 0: fp += 1 | |
| if p == 0 and l == 0: tn += 1 | |
| if p == 0 and l == 1: fn += 1 | |
| acc = correct / max(total, 1) | |
| precision = tp / max(tp + fp, 1) | |
| recall = tp / max(tp + fn, 1) | |
| f1 = 2 * precision * recall / max(precision + recall, 1e-9) | |
| return { | |
| "test_accuracy": round(acc, 4), | |
| "precision": round(precision, 4), | |
| "recall": round(recall, 4), | |
| "f1_score": round(f1, 4), | |
| "true_positive": tp, | |
| "false_positive": fp, | |
| "true_negative": tn, | |
| "false_negative": fn, | |
| } | |
| # ββ Main Training βββββββββββββββββββββββββββββββββββββββββββββ | |
| def train(): | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print("=" * 65) | |
| print(" VERIDEX Trainer v4.0 | RTX 3050 GPU Optimized") | |
| print("=" * 65) | |
| if device.type == "cuda": | |
| gpu_name = torch.cuda.get_device_name(0) | |
| vram_gb = torch.cuda.get_device_properties(0).total_memory / 1e9 | |
| print(f"\n GPU : {gpu_name}") | |
| print(f" VRAM : {vram_gb:.1f} GB") | |
| print(f" AMP : {'ON (faster!)' if CFG['use_amp'] else 'OFF'}") | |
| if vram_gb < 3.5: | |
| print(f"\n Low VRAM! batch_size 12 -> 6 reduce chestunna...") | |
| CFG["batch_size"] = 6 | |
| else: | |
| print(f"\n WARNING: GPU not found! CPU mode (slow)") | |
| print(f" nvidia-smi run chesii CUDA check cheyyi") | |
| CFG["use_amp"] = False | |
| print(f"\n Config:") | |
| print(f" Image size : {CFG['img_size']}x{CFG['img_size']}") | |
| print(f" Batch size : {CFG['batch_size']} (effective: {CFG['batch_size'] * CFG['grad_accum']})") | |
| print(f" Epochs : {CFG['epochs']}") | |
| print(f" Device : {device}") | |
| # Load datasets | |
| print(f"\n Dataset loading...") | |
| print(f" {'split':5s} | {'real':>12} {'fake':>12} {'total':>12}") | |
| print(f" {'-'*55}") | |
| train_samples = load_split("train") | |
| val_samples = load_split("val") | |
| test_samples = load_split("test") | |
| grand_total = len(train_samples) + len(val_samples) + len(test_samples) | |
| print(f" {'-'*55}") | |
| print(f" TOTAL {grand_total:>12,}") | |
| # DataLoaders | |
| train_tf, eval_tf = get_transforms() | |
| train_ds = DeepfakeDataset(train_samples, train_tf) | |
| val_ds = DeepfakeDataset(val_samples, eval_tf) | |
| test_ds = DeepfakeDataset(test_samples, eval_tf) | |
| train_dl = DataLoader(train_ds, batch_size=CFG["batch_size"], shuffle=True, | |
| num_workers=CFG["num_workers"], pin_memory=(device.type == "cuda")) | |
| val_dl = DataLoader(val_ds, batch_size=CFG["batch_size"], shuffle=False, | |
| num_workers=CFG["num_workers"], pin_memory=(device.type == "cuda")) | |
| test_dl = DataLoader(test_ds, batch_size=CFG["batch_size"], shuffle=False, | |
| num_workers=CFG["num_workers"], pin_memory=(device.type == "cuda")) | |
| # Model + optimizer | |
| model = build_model(device) | |
| optimizer = optim.AdamW(model.parameters(), lr=CFG["lr"], weight_decay=CFG["weight_decay"]) | |
| scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CFG["epochs"], eta_min=1e-6) | |
| criterion = nn.CrossEntropyLoss(label_smoothing=0.05) | |
| scaler = torch.cuda.amp.GradScaler() if CFG["use_amp"] and device.type == "cuda" else None | |
| best_val_acc = 0.0 | |
| no_improve = 0 | |
| log_path = f"training_logs/train_{int(time.time())}.csv" | |
| log_lines = ["epoch,train_loss,train_acc,val_loss,val_acc,lr"] | |
| start_time = time.time() | |
| print(f"\n Training started!\n") | |
| print(f" {'Epoch':>5} {'TrnLoss':>8} {'TrnAcc':>7} {'ValLoss':>8} {'ValAcc':>7} {'Note'}") | |
| print(f" {'-'*65}") | |
| for epoch in range(1, CFG["epochs"] + 1): | |
| ep_start = time.time() | |
| # TRAIN | |
| model.train() | |
| t_loss = t_correct = t_total = 0 | |
| optimizer.zero_grad() | |
| for step, (imgs, labels) in enumerate( | |
| tqdm(train_dl, desc=f" Ep{epoch:02d} TRAIN", leave=False, ncols=60) | |
| ): | |
| imgs = imgs.to(device, non_blocking=True) | |
| labels = labels.to(device, non_blocking=True) | |
| if scaler: | |
| with torch.cuda.amp.autocast(): | |
| out = model(imgs) | |
| loss = criterion(out, labels) / CFG["grad_accum"] | |
| scaler.scale(loss).backward() | |
| if (step + 1) % CFG["grad_accum"] == 0: | |
| scaler.unscale_(optimizer) | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) | |
| scaler.step(optimizer) | |
| scaler.update() | |
| optimizer.zero_grad() | |
| else: | |
| out = model(imgs) | |
| loss = criterion(out, labels) / CFG["grad_accum"] | |
| loss.backward() | |
| if (step + 1) % CFG["grad_accum"] == 0: | |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) | |
| optimizer.step() | |
| optimizer.zero_grad() | |
| t_loss += loss.item() * CFG["grad_accum"] * len(imgs) | |
| t_correct += (out.argmax(1) == labels).sum().item() | |
| t_total += len(imgs) | |
| # VALIDATION | |
| model.eval() | |
| v_loss = v_correct = v_total = 0 | |
| with torch.no_grad(): | |
| for imgs, labels in tqdm( | |
| val_dl, desc=f" Ep{epoch:02d} VAL ", leave=False, ncols=60 | |
| ): | |
| imgs = imgs.to(device, non_blocking=True) | |
| labels = labels.to(device, non_blocking=True) | |
| if scaler: | |
| with torch.cuda.amp.autocast(): | |
| out = model(imgs) | |
| loss = criterion(out, labels) | |
| else: | |
| out = model(imgs) | |
| loss = criterion(out, labels) | |
| v_loss += loss.item() * len(imgs) | |
| v_correct += (out.argmax(1) == labels).sum().item() | |
| v_total += len(imgs) | |
| scheduler.step() | |
| current_lr = scheduler.get_last_lr()[0] | |
| tl = t_loss / max(t_total, 1) | |
| ta = t_correct / max(t_total, 1) | |
| vl = v_loss / max(v_total, 1) | |
| va = v_correct / max(v_total, 1) | |
| elapsed = time.time() - ep_start | |
| eta_min = elapsed * (CFG["epochs"] - epoch) / 60 | |
| log_lines.append(f"{epoch},{tl:.4f},{ta:.4f},{vl:.4f},{va:.4f},{current_lr:.6f}") | |
| if va > best_val_acc: | |
| best_val_acc = va | |
| no_improve = 0 | |
| wpath = os.path.join(CFG["weights_dir"], f"{CFG['output_name']}.pth") | |
| torch.save(model.state_dict(), wpath) | |
| note = f"BEST! saved (ETA {eta_min:.0f}m)" | |
| else: | |
| no_improve += 1 | |
| note = f"no improve {no_improve}/{CFG['early_stop']} (ETA {eta_min:.0f}m)" | |
| if no_improve >= CFG["early_stop"]: | |
| print(f" {epoch:5d} {tl:8.4f} {ta:7.1%} {vl:8.4f} {va:7.1%} Early stop!") | |
| break | |
| print(f" {epoch:5d} {tl:8.4f} {ta:7.1%} {vl:8.4f} {va:7.1%} {note}") | |
| # Save log | |
| with open(log_path, "w") as f: | |
| f.write("\n".join(log_lines)) | |
| # FINAL TEST EVALUATION | |
| print(f"\n {'='*65}") | |
| print(f" Final TEST set evaluation (unseen data)...") | |
| wpath = os.path.join(CFG["weights_dir"], f"{CFG['output_name']}.pth") | |
| model.load_state_dict(torch.load(wpath, map_location=device)) | |
| res = evaluate_test(model, test_dl, device, scaler) | |
| print(f"\n Test Accuracy : {res['test_accuracy']:.1%}") | |
| print(f" Precision : {res['precision']:.1%}") | |
| print(f" Recall : {res['recall']:.1%}") | |
| print(f" F1 Score : {res['f1_score']:.1%}") | |
| print(f"\n Confusion Matrix:") | |
| print(f" True Positive : {res['true_positive']:,}") | |
| print(f" False Positive : {res['false_positive']:,}") | |
| print(f" True Negative : {res['true_negative']:,}") | |
| print(f" False Negative : {res['false_negative']:,}") | |
| # Save metadata | |
| meta = { | |
| "model": "EfficientNet-B4", | |
| "trained_on": "RTX 3050 GPU", | |
| "fake_label": CFG["fake_label"], | |
| "img_size": CFG["img_size"], | |
| "best_val_acc": round(best_val_acc, 4), | |
| "epochs_trained": epoch, | |
| "dataset_size": { | |
| "train": len(train_samples), | |
| "val": len(val_samples), | |
| "test": len(test_samples), | |
| "total": grand_total, | |
| }, | |
| "test_results": res, | |
| } | |
| with open(os.path.join(CFG["weights_dir"], "efficientnet_b4_meta.json"), "w") as f: | |
| json.dump(meta, f, indent=2) | |
| total_time = (time.time() - start_time) / 60 | |
| print(f"\n {'='*65}") | |
| print(f" TRAINING COMPLETE!") | |
| print(f" Best val accuracy : {best_val_acc:.1%}") | |
| print(f" Final test accuracy : {res['test_accuracy']:.1%}") | |
| print(f" F1 Score : {res['f1_score']:.1%}") | |
| print(f" Total time : {total_time:.0f} minutes") | |
| print(f"\n NEXT STEP:") | |
| print(f" 1. trainer\\weights\\efficientnet_deepfake.pth --> copy") | |
| print(f" 2. trainer\\weights\\efficientnet_b4_meta.json --> copy") | |
| print(f" 3. VERIDEX\\backend\\weights\\ lo paste cheyyi") | |
| print(f" 4. Backend restart: uvicorn main:app --reload --port 8000") | |
| print(f" {'='*65}") | |
| if __name__ == "__main__": | |
| train() | |