Win-Stack / train.py
smarthillc
Production-ready training script with all robustness improvements
157a91d
import torch
from transformers import (
T5ForConditionalGeneration,
T5Tokenizer,
TrainingArguments,
Trainer,
DataCollatorForSeq2Seq,
EarlyStoppingCallback
)
from datasets import Dataset
import pandas as pd
import numpy as np
import os
import logging
import sys
from difflib import SequenceMatcher
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Check environment and dependencies first
def check_dependencies():
"""Check all required dependencies before starting"""
missing_deps = []
try:
import torch
logger.info(f"βœ“ PyTorch {torch.__version__}")
except ImportError:
missing_deps.append("torch")
try:
import transformers
logger.info(f"βœ“ Transformers {transformers.__version__}")
except ImportError:
missing_deps.append("transformers")
try:
import peft
logger.info(f"βœ“ PEFT {peft.__version__}")
except ImportError:
missing_deps.append("peft")
try:
import accelerate
logger.info(f"βœ“ Accelerate {accelerate.__version__}")
except ImportError:
missing_deps.append("accelerate")
try:
import datasets
logger.info(f"βœ“ Datasets {datasets.__version__}")
except ImportError:
missing_deps.append("datasets")
if missing_deps:
raise ImportError(f"Missing dependencies: {', '.join(missing_deps)}. Install with: pip install {' '.join(missing_deps)}")
return True
# Run dependency check
check_dependencies()
# Environment check
logger.info("=== Environment Check ===")
logger.info(f"Python version: {sys.version}")
logger.info(f"PyTorch version: {torch.__version__}")
logger.info(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
logger.info(f"CUDA devices: {torch.cuda.device_count()}")
for i in range(torch.cuda.device_count()):
device_props = torch.cuda.get_device_properties(i)
logger.info(f" Device {i}: {torch.cuda.get_device_name(i)} ({device_props.total_memory / 1e9:.1f}GB)")
class ResumeNormalizationTrainer:
def __init__(self, model_name="google/flan-t5-base", use_lora=True):
self.model_name = model_name
self.use_lora = use_lora
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Initializing model: {model_name}")
logger.info(f"Using LoRA: {use_lora}")
logger.info(f"Device: {self.device}")
# Determine optimal batch size based on GPU memory
self.train_batch_size, self.eval_batch_size = self._determine_batch_sizes()
# Load tokenizer and model
try:
self.tokenizer = T5Tokenizer.from_pretrained(model_name)
self.model = T5ForConditionalGeneration.from_pretrained(
model_name,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
self.model = self.model.to(self.device)
logger.info("Model loaded successfully")
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise
# Setup LoRA if requested
if use_lora:
self._setup_lora()
def _determine_batch_sizes(self):
"""Determine optimal batch sizes based on available GPU memory"""
if not torch.cuda.is_available():
logger.warning("No GPU available, using minimal batch sizes")
return 2, 4
# Get GPU memory in GB
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9
logger.info(f"GPU memory: {gpu_memory:.1f}GB")
# Conservative batch size selection based on GPU memory
if gpu_memory < 8: # Less than 8GB (e.g., older GPUs)
train_batch_size = 4
eval_batch_size = 8
elif gpu_memory < 16: # 8-16GB (e.g., RTX 2080, V100)
train_batch_size = 8
eval_batch_size = 16
elif gpu_memory < 24: # 16-24GB (e.g., RTX 3090, A100-40GB)
train_batch_size = 16
eval_batch_size = 32
else: # 24GB+ (e.g., A100-80GB, L4)
train_batch_size = 16 # Still conservative for stability
eval_batch_size = 32
logger.info(f"Selected batch sizes - Train: {train_batch_size}, Eval: {eval_batch_size}")
return train_batch_size, eval_batch_size
def _setup_lora(self):
"""Configure LoRA for efficient fine-tuning"""
try:
from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training
# Prepare model for LoRA training
self.model = prepare_model_for_kbit_training(
self.model,
use_gradient_checkpointing=False # Disable for LoRA compatibility
)
lora_config = LoraConfig(
r=16, # rank
lora_alpha=32,
target_modules=["q", "v"], # T5 attention layers
lora_dropout=0.1,
bias="none",
task_type=TaskType.SEQ_2_SEQ_LM,
)
self.model = get_peft_model(self.model, lora_config)
self.model.print_trainable_parameters()
# Enable training mode
self.model.train()
logger.info("LoRA configuration applied successfully")
except Exception as e:
logger.error(f"Failed to setup LoRA: {e}")
raise
def validate_dataset(self, df):
"""Validate that dataset has required columns"""
required_cols = ['instruction', 'output', 'task_type']
optional_cols = ['quality_score']
# Check required columns
missing = [col for col in required_cols if col not in df.columns]
if missing:
raise ValueError(f"Missing required columns: {missing}. Found columns: {df.columns.tolist()}")
# Log optional columns
for col in optional_cols:
if col not in df.columns:
logger.warning(f"Optional column '{col}' not found, continuing without it")
# Validate data types and non-empty
if df['instruction'].isna().any() or df['output'].isna().any():
raise ValueError("Found NaN values in instruction or output columns")
logger.info(f"Dataset validation passed. Columns: {df.columns.tolist()}")
return True
def load_dataset(self, data_path):
"""Load and prepare dataset with validation and shuffling"""
logger.info(f"Loading dataset from: {data_path}")
if not os.path.exists(data_path):
raise FileNotFoundError(f"Data file not found: {data_path}")
try:
df = pd.read_csv(data_path)
logger.info(f"Loaded {len(df)} examples")
except Exception as e:
logger.error(f"Failed to load CSV: {e}")
raise
# Validate dataset
self.validate_dataset(df)
# Add task prefixes if not present
def add_task_prefix(row):
task = row['task_type']
instruction = row['instruction']
# Skip if already has prefix
if instruction.startswith('['):
return instruction
prefix_map = {
'normalize_company': '[COMPANY]',
'normalize_job_title': '[JOB]',
'normalize_skill': '[SKILLS]',
'company_equivalence': '[COMPANY]',
'job_title_equivalence': '[JOB]',
'achievement_equivalence': '[ACHIEVEMENT]'
}
prefix = prefix_map.get(task, '')
return f"{prefix} {instruction}" if prefix else instruction
df['instruction'] = df.apply(add_task_prefix, axis=1)
# Shuffle data before splitting
df = df.sample(frac=1, random_state=42).reset_index(drop=True)
logger.info("Data shuffled")
# Split into train/validation
train_size = int(0.9 * len(df))
train_df = df[:train_size]
val_df = df[train_size:]
logger.info(f"Train set: {len(train_df)} examples")
logger.info(f"Validation set: {len(val_df)} examples")
# Convert to HuggingFace Dataset
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
return train_dataset, val_dataset
def preprocess_function(self, examples):
"""Tokenize inputs and targets with dynamic padding"""
inputs = examples['instruction']
targets = examples['output']
# Tokenize inputs with dynamic padding
model_inputs = self.tokenizer(
inputs,
max_length=256,
truncation=True,
padding=True # Dynamic padding instead of max_length
)
# Tokenize targets
labels = self.tokenizer(
text_target=targets,
max_length=128,
truncation=True,
padding=True # Dynamic padding
)
# Replace padding token id's of the labels by -100
labels["input_ids"] = [
[(l if l != self.tokenizer.pad_token_id else -100) for l in label]
for label in labels["input_ids"]
]
model_inputs["labels"] = labels["input_ids"]
return model_inputs
def compute_metrics(self, eval_pred):
"""Compute both exact match and fuzzy match metrics"""
predictions, labels = eval_pred
# Decode predictions
decoded_preds = self.tokenizer.batch_decode(
predictions, skip_special_tokens=True
)
# Replace -100 in the labels as we can't decode them
labels = np.where(labels != -100, labels, self.tokenizer.pad_token_id)
decoded_labels = self.tokenizer.batch_decode(
labels, skip_special_tokens=True
)
# Calculate exact match accuracy
exact_match = sum(
pred.strip().lower() == label.strip().lower()
for pred, label in zip(decoded_preds, decoded_labels)
) / len(decoded_preds)
# Calculate fuzzy match (>90% similarity)
fuzzy_match = sum(
SequenceMatcher(None, pred.strip().lower(), label.strip().lower()).ratio() > 0.9
for pred, label in zip(decoded_preds, decoded_labels)
) / len(decoded_preds)
# Calculate character-level accuracy
char_accuracy = np.mean([
SequenceMatcher(None, pred.strip().lower(), label.strip().lower()).ratio()
for pred, label in zip(decoded_preds, decoded_labels)
])
logger.info(f"Exact match: {exact_match:.4f}, Fuzzy match: {fuzzy_match:.4f}, Char accuracy: {char_accuracy:.4f}")
return {
"exact_match": exact_match,
"fuzzy_match": fuzzy_match,
"char_accuracy": char_accuracy
}
def train(self, train_dataset, val_dataset, output_dir, hf_token=None, hub_username=None, num_epochs=5):
"""Train the model with production-ready settings"""
logger.info("Starting training preparation...")
# Get columns to remove (handle optional columns)
columns_to_remove = ['instruction', 'output', 'task_type']
if 'quality_score' in train_dataset.column_names:
columns_to_remove.append('quality_score')
# Tokenize datasets
train_dataset = train_dataset.map(
self.preprocess_function,
batched=True,
remove_columns=columns_to_remove
)
val_dataset = val_dataset.map(
self.preprocess_function,
batched=True,
remove_columns=columns_to_remove
)
# Data collator with dynamic padding
data_collator = DataCollatorForSeq2Seq(
self.tokenizer,
model=self.model,
label_pad_token_id=-100,
padding=True, # Dynamic padding
pad_to_multiple_of=8 if torch.cuda.is_available() else None
)
# Calculate gradient accumulation steps to maintain effective batch size
effective_batch_size = 32 # Target effective batch size
gradient_accumulation_steps = max(1, effective_batch_size // self.train_batch_size)
# Training arguments
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=num_epochs,
per_device_train_batch_size=self.train_batch_size,
per_device_eval_batch_size=self.eval_batch_size,
gradient_accumulation_steps=gradient_accumulation_steps,
gradient_checkpointing=False, # Disabled for LoRA
fp16=torch.cuda.is_available(),
bf16=False, # Use fp16 instead for better compatibility
optim="adamw_torch",
learning_rate=3e-4 if self.use_lora else 5e-5,
warmup_steps=min(500, len(train_dataset) // self.train_batch_size // 10),
logging_steps=25,
eval_strategy="steps",
eval_steps=250,
save_strategy="steps",
save_steps=250,
load_best_model_at_end=True,
metric_for_best_model="fuzzy_match", # More forgiving than exact match
greater_is_better=True,
push_to_hub=True if hf_token else False,
hub_model_id=f"{hub_username}/resume-normalizer-flan-t5" if hub_username else None,
hub_token=hf_token,
report_to=["tensorboard"] if torch.cuda.is_available() else [],
dataloader_num_workers=0, # Avoid multiprocessing issues
remove_unused_columns=False,
label_names=["labels"],
ddp_find_unused_parameters=False if torch.cuda.device_count() > 1 else None,
dataloader_pin_memory=True if torch.cuda.is_available() else False,
)
# Create trainer
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
tokenizer=self.tokenizer,
data_collator=data_collator,
compute_metrics=self.compute_metrics,
callbacks=[
EarlyStoppingCallback(
early_stopping_patience=5, # More patient
early_stopping_threshold=0.001 # Minimum improvement
)
],
)
logger.info("Training configuration:")
logger.info(f" Total examples: {len(train_dataset)}")
logger.info(f" Batch size: {self.train_batch_size}")
logger.info(f" Gradient accumulation: {gradient_accumulation_steps}")
logger.info(f" Effective batch size: {self.train_batch_size * gradient_accumulation_steps}")
logger.info(f" Total optimization steps: {len(train_dataset) // (self.train_batch_size * gradient_accumulation_steps) * num_epochs}")
# Train
try:
trainer.train()
except KeyboardInterrupt:
logger.info("Training interrupted by user")
trainer.save_model(output_dir + "_interrupted")
raise
except Exception as e:
logger.error(f"Training failed: {e}")
raise
# Save model
logger.info("Saving model...")
if self.use_lora:
# Save LoRA adapter
self.model.save_pretrained(output_dir)
self.tokenizer.save_pretrained(output_dir)
else:
trainer.save_model(output_dir)
# Push to hub if token provided
if hf_token and hub_username:
logger.info("Pushing model to HuggingFace Hub...")
try:
trainer.push_to_hub(
commit_message="Final model trained on resume normalization data"
)
except Exception as e:
logger.error(f"Failed to push to hub: {e}")
logger.info("Model saved locally but not pushed to hub")
logger.info("Training completed successfully!")
return trainer
def main():
"""Main training function"""
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--data_path", type=str, required=True)
parser.add_argument("--model_size", type=str, default="base")
parser.add_argument("--hf_token", type=str, default=None)
parser.add_argument("--hub_username", type=str, default=None)
parser.add_argument("--num_epochs", type=int, default=5)
parser.add_argument("--use_lora", action="store_true")
args = parser.parse_args()
# Set HF token if provided
if args.hf_token:
from huggingface_hub import HfFolder
HfFolder.save_token(args.hf_token)
# Select model based on size
model_name = "google/flan-t5-base" if args.model_size == "base" else "google/flan-t5-large"
try:
# Initialize trainer
trainer = ResumeNormalizationTrainer(
model_name=model_name,
use_lora=args.use_lora
)
# Load dataset
train_dataset, val_dataset = trainer.load_dataset(args.data_path)
# Train
output_dir = "./resume-normalizer-model"
trainer.train(
train_dataset=train_dataset,
val_dataset=val_dataset,
output_dir=output_dir,
hf_token=args.hf_token,
hub_username=args.hub_username,
num_epochs=args.num_epochs
)
print("Training completed successfully!")
print(f"Model saved to: {output_dir}")
if args.hf_token and args.hub_username:
print(f"Model available at: https://huggingface.co/{args.hub_username}/resume-normalizer-flan-t5")
except Exception as e:
logger.error(f"Training script failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()