#!/usr/bin/env python3 """ DeBERTa CWE Classification - Minimal Training Interface ======================================================= Minimal Gradio interface for training DeBERTa models on CVE-CWE classification. Optimized for 4x NVIDIA L4 GPUs (96GB total VRAM). Author: Berghem - Smart Information Security License: MIT """ import os import sys import gradio as gr import torch from datasets import load_dataset, load_from_disk from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, EarlyStoppingCallback, TrainerCallback, ) from sklearn.metrics import accuracy_score, f1_score import numpy as np import time # ============================================================================ # CONFIGURATION # ============================================================================ MODELS = { "DeBERTa-v3-Base (86M params) - Recommended": "microsoft/deberta-v3-base", } DATASET_PATH = "./dataset/cleaned" # ============================================================================ # CUDA CACHE CLEARING CALLBACK # ============================================================================ class CUDACacheClearCallback(TrainerCallback): """Clear CUDA cache after each epoch to prevent memory buildup""" def on_epoch_end(self, args, state, control, **kwargs): if torch.cuda.is_available(): torch.cuda.empty_cache() print(f"\n๐Ÿงน CUDA cache cleared after epoch {state.epoch}") # ============================================================================ # TRAINING FUNCTION # ============================================================================ def train_model( model_name="microsoft/deberta-v3-base", epochs=10, batch_size=32, learning_rate=2e-5, max_length=256, early_stopping_patience=5, ): """Train DeBERTa model on CVE-CWE dataset""" logs = [] def log(msg): logs.append(msg) print(msg) # Also print to console return "\n".join(logs) try: log("=" * 80) log("DEBERTA CWE CLASSIFICATION TRAINING") log("=" * 80) log(f"Model: {model_name}") log(f"Epochs: {epochs}") log(f"Total batch size: {batch_size}") log(f"Learning rate: {learning_rate}") log(f"Max length: {max_length}") log("=" * 80) # Check device if torch.cuda.is_available(): device = "cuda" log(f"\n๐Ÿ–ฅ๏ธ Device: {device}") log(f" GPU: {torch.cuda.get_device_name(0)}") else: device = "cpu" log(f"\n๐Ÿ–ฅ๏ธ Device: {device} (CPU only)") # Load cleaned dataset (try local first, then HuggingFace) log("\n๐Ÿ“ฆ Loading cleaned dataset...") if os.path.exists(DATASET_PATH): log(f" Using local: {DATASET_PATH}") dataset = load_from_disk(DATASET_PATH) else: log(" Local dataset not found, downloading from HuggingFace...") dataset = load_dataset("LorenzoNava/cve-cwe-dataset-cleaned") log(f" โœ… Loaded {len(dataset['train']):,} samples (cleaned, no NVD-CWE-Other)") # Create validation split if needed if "validation" not in dataset and "test" not in dataset: log("\n๐Ÿ“Š Creating 90/10 train/validation split...") split_dataset = dataset["train"].train_test_split(test_size=0.1, seed=42) dataset["train"] = split_dataset["train"] dataset["validation"] = split_dataset["test"] log(f" Train: {len(dataset['train']):,} samples") log(f" Validation: {len(dataset['validation']):,} samples") # Build label mapping log("\n๐Ÿท๏ธ Building CWE label mapping...") cwe_set = set() for example in dataset["train"]: if example.get("CWE-ID"): cwe_set.add(example["CWE-ID"]) cwe_list = sorted(list(cwe_set)) label2id = {cwe: idx for idx, cwe in enumerate(cwe_list)} id2label = {idx: cwe for cwe, idx in label2id.items()} num_labels = len(label2id) log(f" โœ… Found {num_labels} unique CWE classes") # Load tokenizer log(f"\n๐Ÿ“š Loading tokenizer: {model_name}") tokenizer = AutoTokenizer.from_pretrained(model_name) # Tokenize dataset log("\n๐Ÿ”ค Tokenizing dataset...") def tokenize_function(examples): return tokenizer( examples["DESCRIPTION"], padding="max_length", truncation=True, max_length=max_length, ) tokenized_dataset = dataset.map( tokenize_function, batched=True, remove_columns=dataset["train"].column_names ) log(" โœ… Tokenization complete") # Clear CUDA cache if torch.cuda.is_available(): torch.cuda.empty_cache() # Add labels def add_labels(examples, idx): cwe_ids = [dataset["train"][i]["CWE-ID"] for i in idx] return {"labels": [label2id.get(cwe, -100) for cwe in cwe_ids]} tokenized_dataset["train"] = tokenized_dataset["train"].map( add_labels, batched=True, with_indices=True ) if "validation" in tokenized_dataset: def add_val_labels(examples, idx): cwe_ids = [dataset["validation"][i]["CWE-ID"] for i in idx] return {"labels": [label2id.get(cwe, -100) for cwe in cwe_ids]} tokenized_dataset["validation"] = tokenized_dataset["validation"].map( add_val_labels, batched=True, with_indices=True ) # Filter invalid labels log("\n๐Ÿ” Filtering invalid samples...") tokenized_dataset["train"] = tokenized_dataset["train"].filter(lambda x: x["labels"] != -100) if "validation" in tokenized_dataset: tokenized_dataset["validation"] = tokenized_dataset["validation"].filter( lambda x: x["labels"] != -100 ) log(f" โœ… Train: {len(tokenized_dataset['train']):,} valid samples") # Load model log(f"\n๐Ÿค– Loading model: {model_name}") # Determine precision use_bf16_model = False use_fp16_model = False if torch.cuda.is_available(): gpu_name = torch.cuda.get_device_name(0).upper() if any(x in gpu_name for x in ["A100", "H100", "L4", "L40"]): use_bf16_model = True else: use_fp16_model = True # Determine model dtype model_dtype = None if torch.cuda.is_available(): model_dtype = torch.bfloat16 if use_bf16_model else torch.float16 model = AutoModelForSequenceClassification.from_pretrained( model_name, num_labels=num_labels, label2id=label2id, id2label=id2label, torch_dtype=model_dtype, ) model = model.to(device) log(f" โœ… Model loaded on {device}") log(f" Parameters: {sum(p.numel() for p in model.parameters()):,}") # Clear CUDA cache if torch.cuda.is_available(): torch.cuda.empty_cache() # Training configuration log("\nโš™๏ธ Configuring training...") output_dir = "./models/deberta-cwe-final" # Precision settings use_bf16 = False use_fp16 = False if torch.cuda.is_available(): gpu_name = torch.cuda.get_device_name(0).upper() if any(x in gpu_name for x in ["A100", "H100", "L4", "L40"]): use_bf16 = True log(f" Using bf16 precision (optimal for {gpu_name})") else: use_fp16 = True log(f" Using fp16 precision ({gpu_name})") # Multi-GPU detection num_gpus = torch.cuda.device_count() if torch.cuda.is_available() else 1 log(f" GPUs detected: {num_gpus}") # Memory monitoring if torch.cuda.is_available(): for i in range(num_gpus): mem_total = torch.cuda.get_device_properties(i).total_memory / 1e9 mem_allocated = torch.cuda.memory_allocated(i) / 1e9 log(f" GPU {i}: {mem_total:.1f}GB total, {mem_allocated:.1f}GB allocated") # Optimized batch size distribution for 4x L4 GPUs (96GB total VRAM) if num_gpus >= 4: per_device_batch = max(4, batch_size // num_gpus) gradient_accum = 1 elif num_gpus == 2: per_device_batch = max(4, batch_size // num_gpus) gradient_accum = max(1, batch_size // (per_device_batch * num_gpus)) else: per_device_batch = min(8, batch_size) gradient_accum = max(1, batch_size // per_device_batch) log(f" Per-device batch: {per_device_batch}") log(f" Gradient accumulation: {gradient_accum}") log(f" Effective batch: {per_device_batch * gradient_accum * num_gpus}") training_args = TrainingArguments( output_dir=output_dir, num_train_epochs=epochs, per_device_train_batch_size=per_device_batch, per_device_eval_batch_size=per_device_batch * 2, gradient_accumulation_steps=gradient_accum, learning_rate=learning_rate, weight_decay=0.01, warmup_ratio=0.1, lr_scheduler_type="cosine", eval_strategy="steps", eval_steps=500, save_strategy="steps", save_steps=500, save_total_limit=2, load_best_model_at_end=True, metric_for_best_model="f1", greater_is_better=True, logging_steps=100, logging_dir=f"{output_dir}/logs", fp16=use_fp16, bf16=use_bf16, dataloader_num_workers=0, report_to="none", push_to_hub=False, ddp_find_unused_parameters=False if num_gpus > 1 else None, # CRITICAL: Disable gradient checkpointing (fixes backward pass error) gradient_checkpointing=False, optim="paged_adamw_8bit", max_grad_norm=1.0, ) # Metrics def compute_metrics(eval_pred): logits, labels = eval_pred predictions = np.argmax(logits, axis=-1) acc = accuracy_score(labels, predictions) f1 = f1_score(labels, predictions, average="weighted") return {"accuracy": acc, "f1": f1} # Create trainer log("\n๐Ÿš€ Starting training...") trainer = Trainer( model=model, args=training_args, train_dataset=tokenized_dataset["train"], eval_dataset=tokenized_dataset.get("validation"), tokenizer=tokenizer, compute_metrics=compute_metrics, callbacks=[ EarlyStoppingCallback(early_stopping_patience=early_stopping_patience), CUDACacheClearCallback(), ], ) # Clear cache before training if torch.cuda.is_available(): torch.cuda.empty_cache() # Train with OOM error handling try: train_result = trainer.train() except torch.cuda.OutOfMemoryError: log(f"\nโŒ Out of Memory!") log(f" Reduce batch size or max length") raise # Evaluate log("\n๐Ÿ“Š Final evaluation...") eval_result = trainer.evaluate() log(f"\nโœ… Training complete!") log(f" Final Loss: {train_result.training_loss:.4f}") log(f" Accuracy: {eval_result.get('eval_accuracy', 0):.4f}") log(f" F1 Score: {eval_result.get('eval_f1', 0):.4f}") # Save model log(f"\n๐Ÿ’พ Saving model to: {output_dir}") trainer.save_model(output_dir) tokenizer.save_pretrained(output_dir) # Final CUDA cache clear if torch.cuda.is_available(): torch.cuda.empty_cache() log("\n๐Ÿงน Final CUDA cache clear complete") log(f"\n๐ŸŽ‰ Done! Model saved successfully.") log("=" * 80) return "\n".join(logs) except Exception as e: log(f"\nโŒ Error: {str(e)}") import traceback log(f"\n{traceback.format_exc()}") return "\n".join(logs) # ============================================================================ # GRADIO INTERFACE - MINIMAL VERSION # ============================================================================ with gr.Blocks(title="DeBERTa CWE Training", theme=gr.themes.Soft()) as demo: gr.Markdown( """ # ๐Ÿค– DeBERTa CWE Classification Training **Optimized for 4x NVIDIA L4 GPUs (96GB VRAM)** Click the button below to start training with optimized settings: - Model: DeBERTa-v3-Base (86M params) - Batch size: 32 (8 per GPU) - Epochs: 10 - Learning rate: 2e-5 - Gradient checkpointing: DISABLED (fixes errors, we have plenty of VRAM) """ ) train_btn = gr.Button("๐Ÿš€ Start Training", variant="primary", size="lg") logs_output = gr.Textbox( label="Training Logs", lines=30, max_lines=50, interactive=False, show_copy_button=True, ) gr.Markdown( """ --- ### After Training Model will be saved to: `./models/deberta-cwe-final/` **Developed by:** Berghem - Smart Information Security | **License:** MIT """ ) # Wire up the training train_btn.click(fn=train_model, inputs=[], outputs=logs_output) # ============================================================================ # LAUNCH # ============================================================================ if __name__ == "__main__": import os # Check if running in HuggingFace Spaces is_hf_space = os.getenv("SPACE_ID") is not None demo.queue() if is_hf_space: # HF Spaces: explicit server configuration demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, quiet=False, prevent_thread_lock=False, inbrowser=False, show_api=False, ) else: # Local development demo.launch(share=True)