LorenzoNava's picture
feat: Use cleaned dataset (local or HuggingFace)
152f94b
#!/usr/bin/env python3
"""
DeBERTa CWE Classification - Minimal Training Interface
=======================================================
Minimal Gradio interface for training DeBERTa models on CVE-CWE classification.
Optimized for 4x NVIDIA L4 GPUs (96GB total VRAM).
Author: Berghem - Smart Information Security
License: MIT
"""
import os
import sys
import gradio as gr
import torch
from datasets import load_dataset, load_from_disk
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
TrainingArguments,
Trainer,
EarlyStoppingCallback,
TrainerCallback,
)
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
import time
# ============================================================================
# CONFIGURATION
# ============================================================================
MODELS = {
"DeBERTa-v3-Base (86M params) - Recommended": "microsoft/deberta-v3-base",
}
DATASET_PATH = "./dataset/cleaned"
# ============================================================================
# CUDA CACHE CLEARING CALLBACK
# ============================================================================
class CUDACacheClearCallback(TrainerCallback):
"""Clear CUDA cache after each epoch to prevent memory buildup"""
def on_epoch_end(self, args, state, control, **kwargs):
if torch.cuda.is_available():
torch.cuda.empty_cache()
print(f"\n🧹 CUDA cache cleared after epoch {state.epoch}")
# ============================================================================
# TRAINING FUNCTION
# ============================================================================
def train_model(
model_name="microsoft/deberta-v3-base",
epochs=10,
batch_size=32,
learning_rate=2e-5,
max_length=256,
early_stopping_patience=5,
):
"""Train DeBERTa model on CVE-CWE dataset"""
logs = []
def log(msg):
logs.append(msg)
print(msg) # Also print to console
return "\n".join(logs)
try:
log("=" * 80)
log("DEBERTA CWE CLASSIFICATION TRAINING")
log("=" * 80)
log(f"Model: {model_name}")
log(f"Epochs: {epochs}")
log(f"Total batch size: {batch_size}")
log(f"Learning rate: {learning_rate}")
log(f"Max length: {max_length}")
log("=" * 80)
# Check device
if torch.cuda.is_available():
device = "cuda"
log(f"\n🖥️ Device: {device}")
log(f" GPU: {torch.cuda.get_device_name(0)}")
else:
device = "cpu"
log(f"\n🖥️ Device: {device} (CPU only)")
# Load cleaned dataset (try local first, then HuggingFace)
log("\n📦 Loading cleaned dataset...")
if os.path.exists(DATASET_PATH):
log(f" Using local: {DATASET_PATH}")
dataset = load_from_disk(DATASET_PATH)
else:
log(" Local dataset not found, downloading from HuggingFace...")
dataset = load_dataset("LorenzoNava/cve-cwe-dataset-cleaned")
log(f" ✅ Loaded {len(dataset['train']):,} samples (cleaned, no NVD-CWE-Other)")
# Create validation split if needed
if "validation" not in dataset and "test" not in dataset:
log("\n📊 Creating 90/10 train/validation split...")
split_dataset = dataset["train"].train_test_split(test_size=0.1, seed=42)
dataset["train"] = split_dataset["train"]
dataset["validation"] = split_dataset["test"]
log(f" Train: {len(dataset['train']):,} samples")
log(f" Validation: {len(dataset['validation']):,} samples")
# Build label mapping
log("\n🏷️ Building CWE label mapping...")
cwe_set = set()
for example in dataset["train"]:
if example.get("CWE-ID"):
cwe_set.add(example["CWE-ID"])
cwe_list = sorted(list(cwe_set))
label2id = {cwe: idx for idx, cwe in enumerate(cwe_list)}
id2label = {idx: cwe for cwe, idx in label2id.items()}
num_labels = len(label2id)
log(f" ✅ Found {num_labels} unique CWE classes")
# Load tokenizer
log(f"\n📚 Loading tokenizer: {model_name}")
tokenizer = AutoTokenizer.from_pretrained(model_name)
# Tokenize dataset
log("\n🔤 Tokenizing dataset...")
def tokenize_function(examples):
return tokenizer(
examples["DESCRIPTION"],
padding="max_length",
truncation=True,
max_length=max_length,
)
tokenized_dataset = dataset.map(
tokenize_function, batched=True, remove_columns=dataset["train"].column_names
)
log(" ✅ Tokenization complete")
# Clear CUDA cache
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Add labels
def add_labels(examples, idx):
cwe_ids = [dataset["train"][i]["CWE-ID"] for i in idx]
return {"labels": [label2id.get(cwe, -100) for cwe in cwe_ids]}
tokenized_dataset["train"] = tokenized_dataset["train"].map(
add_labels, batched=True, with_indices=True
)
if "validation" in tokenized_dataset:
def add_val_labels(examples, idx):
cwe_ids = [dataset["validation"][i]["CWE-ID"] for i in idx]
return {"labels": [label2id.get(cwe, -100) for cwe in cwe_ids]}
tokenized_dataset["validation"] = tokenized_dataset["validation"].map(
add_val_labels, batched=True, with_indices=True
)
# Filter invalid labels
log("\n🔍 Filtering invalid samples...")
tokenized_dataset["train"] = tokenized_dataset["train"].filter(lambda x: x["labels"] != -100)
if "validation" in tokenized_dataset:
tokenized_dataset["validation"] = tokenized_dataset["validation"].filter(
lambda x: x["labels"] != -100
)
log(f" ✅ Train: {len(tokenized_dataset['train']):,} valid samples")
# Load model
log(f"\n🤖 Loading model: {model_name}")
# Determine precision
use_bf16_model = False
use_fp16_model = False
if torch.cuda.is_available():
gpu_name = torch.cuda.get_device_name(0).upper()
if any(x in gpu_name for x in ["A100", "H100", "L4", "L40"]):
use_bf16_model = True
else:
use_fp16_model = True
# Determine model dtype
model_dtype = None
if torch.cuda.is_available():
model_dtype = torch.bfloat16 if use_bf16_model else torch.float16
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
num_labels=num_labels,
label2id=label2id,
id2label=id2label,
torch_dtype=model_dtype,
)
model = model.to(device)
log(f" ✅ Model loaded on {device}")
log(f" Parameters: {sum(p.numel() for p in model.parameters()):,}")
# Clear CUDA cache
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Training configuration
log("\n⚙️ Configuring training...")
output_dir = "./models/deberta-cwe-final"
# Precision settings
use_bf16 = False
use_fp16 = False
if torch.cuda.is_available():
gpu_name = torch.cuda.get_device_name(0).upper()
if any(x in gpu_name for x in ["A100", "H100", "L4", "L40"]):
use_bf16 = True
log(f" Using bf16 precision (optimal for {gpu_name})")
else:
use_fp16 = True
log(f" Using fp16 precision ({gpu_name})")
# Multi-GPU detection
num_gpus = torch.cuda.device_count() if torch.cuda.is_available() else 1
log(f" GPUs detected: {num_gpus}")
# Memory monitoring
if torch.cuda.is_available():
for i in range(num_gpus):
mem_total = torch.cuda.get_device_properties(i).total_memory / 1e9
mem_allocated = torch.cuda.memory_allocated(i) / 1e9
log(f" GPU {i}: {mem_total:.1f}GB total, {mem_allocated:.1f}GB allocated")
# Optimized batch size distribution for 4x L4 GPUs (96GB total VRAM)
if num_gpus >= 4:
per_device_batch = max(4, batch_size // num_gpus)
gradient_accum = 1
elif num_gpus == 2:
per_device_batch = max(4, batch_size // num_gpus)
gradient_accum = max(1, batch_size // (per_device_batch * num_gpus))
else:
per_device_batch = min(8, batch_size)
gradient_accum = max(1, batch_size // per_device_batch)
log(f" Per-device batch: {per_device_batch}")
log(f" Gradient accumulation: {gradient_accum}")
log(f" Effective batch: {per_device_batch * gradient_accum * num_gpus}")
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=epochs,
per_device_train_batch_size=per_device_batch,
per_device_eval_batch_size=per_device_batch * 2,
gradient_accumulation_steps=gradient_accum,
learning_rate=learning_rate,
weight_decay=0.01,
warmup_ratio=0.1,
lr_scheduler_type="cosine",
eval_strategy="steps",
eval_steps=500,
save_strategy="steps",
save_steps=500,
save_total_limit=2,
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
logging_steps=100,
logging_dir=f"{output_dir}/logs",
fp16=use_fp16,
bf16=use_bf16,
dataloader_num_workers=0,
report_to="none",
push_to_hub=False,
ddp_find_unused_parameters=False if num_gpus > 1 else None,
# CRITICAL: Disable gradient checkpointing (fixes backward pass error)
gradient_checkpointing=False,
optim="paged_adamw_8bit",
max_grad_norm=1.0,
)
# Metrics
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
acc = accuracy_score(labels, predictions)
f1 = f1_score(labels, predictions, average="weighted")
return {"accuracy": acc, "f1": f1}
# Create trainer
log("\n🚀 Starting training...")
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset["train"],
eval_dataset=tokenized_dataset.get("validation"),
tokenizer=tokenizer,
compute_metrics=compute_metrics,
callbacks=[
EarlyStoppingCallback(early_stopping_patience=early_stopping_patience),
CUDACacheClearCallback(),
],
)
# Clear cache before training
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Train with OOM error handling
try:
train_result = trainer.train()
except torch.cuda.OutOfMemoryError:
log(f"\n❌ Out of Memory!")
log(f" Reduce batch size or max length")
raise
# Evaluate
log("\n📊 Final evaluation...")
eval_result = trainer.evaluate()
log(f"\n✅ Training complete!")
log(f" Final Loss: {train_result.training_loss:.4f}")
log(f" Accuracy: {eval_result.get('eval_accuracy', 0):.4f}")
log(f" F1 Score: {eval_result.get('eval_f1', 0):.4f}")
# Save model
log(f"\n💾 Saving model to: {output_dir}")
trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)
# Final CUDA cache clear
if torch.cuda.is_available():
torch.cuda.empty_cache()
log("\n🧹 Final CUDA cache clear complete")
log(f"\n🎉 Done! Model saved successfully.")
log("=" * 80)
return "\n".join(logs)
except Exception as e:
log(f"\n❌ Error: {str(e)}")
import traceback
log(f"\n{traceback.format_exc()}")
return "\n".join(logs)
# ============================================================================
# GRADIO INTERFACE - MINIMAL VERSION
# ============================================================================
with gr.Blocks(title="DeBERTa CWE Training", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🤖 DeBERTa CWE Classification Training
**Optimized for 4x NVIDIA L4 GPUs (96GB VRAM)**
Click the button below to start training with optimized settings:
- Model: DeBERTa-v3-Base (86M params)
- Batch size: 32 (8 per GPU)
- Epochs: 10
- Learning rate: 2e-5
- Gradient checkpointing: DISABLED (fixes errors, we have plenty of VRAM)
"""
)
train_btn = gr.Button("🚀 Start Training", variant="primary", size="lg")
logs_output = gr.Textbox(
label="Training Logs",
lines=30,
max_lines=50,
interactive=False,
show_copy_button=True,
)
gr.Markdown(
"""
---
### After Training
Model will be saved to: `./models/deberta-cwe-final/`
**Developed by:** Berghem - Smart Information Security | **License:** MIT
"""
)
# Wire up the training
train_btn.click(fn=train_model, inputs=[], outputs=logs_output)
# ============================================================================
# LAUNCH
# ============================================================================
if __name__ == "__main__":
import os
# Check if running in HuggingFace Spaces
is_hf_space = os.getenv("SPACE_ID") is not None
demo.queue()
if is_hf_space:
# HF Spaces: explicit server configuration
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
quiet=False,
prevent_thread_lock=False,
inbrowser=False,
show_api=False,
)
else:
# Local development
demo.launch(share=True)