import torch from transformers import ( T5ForConditionalGeneration, T5Tokenizer, TrainingArguments, Trainer, DataCollatorForSeq2Seq, EarlyStoppingCallback ) from datasets import Dataset import pandas as pd import numpy as np import os import logging import sys from difflib import SequenceMatcher # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Check environment and dependencies first def check_dependencies(): """Check all required dependencies before starting""" missing_deps = [] try: import torch logger.info(f"✓ PyTorch {torch.__version__}") except ImportError: missing_deps.append("torch") try: import transformers logger.info(f"✓ Transformers {transformers.__version__}") except ImportError: missing_deps.append("transformers") try: import peft logger.info(f"✓ PEFT {peft.__version__}") except ImportError: missing_deps.append("peft") try: import accelerate logger.info(f"✓ Accelerate {accelerate.__version__}") except ImportError: missing_deps.append("accelerate") try: import datasets logger.info(f"✓ Datasets {datasets.__version__}") except ImportError: missing_deps.append("datasets") if missing_deps: raise ImportError(f"Missing dependencies: {', '.join(missing_deps)}. Install with: pip install {' '.join(missing_deps)}") return True # Run dependency check check_dependencies() # Environment check logger.info("=== Environment Check ===") logger.info(f"Python version: {sys.version}") logger.info(f"PyTorch version: {torch.__version__}") logger.info(f"CUDA available: {torch.cuda.is_available()}") if torch.cuda.is_available(): logger.info(f"CUDA devices: {torch.cuda.device_count()}") for i in range(torch.cuda.device_count()): device_props = torch.cuda.get_device_properties(i) logger.info(f" Device {i}: {torch.cuda.get_device_name(i)} ({device_props.total_memory / 1e9:.1f}GB)") class ResumeNormalizationTrainer: def __init__(self, model_name="google/flan-t5-base", use_lora=True): self.model_name = model_name self.use_lora = use_lora self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"Initializing model: {model_name}") logger.info(f"Using LoRA: {use_lora}") logger.info(f"Device: {self.device}") # Determine optimal batch size based on GPU memory self.train_batch_size, self.eval_batch_size = self._determine_batch_sizes() # Load tokenizer and model try: self.tokenizer = T5Tokenizer.from_pretrained(model_name) self.model = T5ForConditionalGeneration.from_pretrained( model_name, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 ) self.model = self.model.to(self.device) logger.info("Model loaded successfully") except Exception as e: logger.error(f"Failed to load model: {e}") raise # Setup LoRA if requested if use_lora: self._setup_lora() def _determine_batch_sizes(self): """Determine optimal batch sizes based on available GPU memory""" if not torch.cuda.is_available(): logger.warning("No GPU available, using minimal batch sizes") return 2, 4 # Get GPU memory in GB gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1e9 logger.info(f"GPU memory: {gpu_memory:.1f}GB") # Conservative batch size selection based on GPU memory if gpu_memory < 8: # Less than 8GB (e.g., older GPUs) train_batch_size = 4 eval_batch_size = 8 elif gpu_memory < 16: # 8-16GB (e.g., RTX 2080, V100) train_batch_size = 8 eval_batch_size = 16 elif gpu_memory < 24: # 16-24GB (e.g., RTX 3090, A100-40GB) train_batch_size = 16 eval_batch_size = 32 else: # 24GB+ (e.g., A100-80GB, L4) train_batch_size = 16 # Still conservative for stability eval_batch_size = 32 logger.info(f"Selected batch sizes - Train: {train_batch_size}, Eval: {eval_batch_size}") return train_batch_size, eval_batch_size def _setup_lora(self): """Configure LoRA for efficient fine-tuning""" try: from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training # Prepare model for LoRA training self.model = prepare_model_for_kbit_training( self.model, use_gradient_checkpointing=False # Disable for LoRA compatibility ) lora_config = LoraConfig( r=16, # rank lora_alpha=32, target_modules=["q", "v"], # T5 attention layers lora_dropout=0.1, bias="none", task_type=TaskType.SEQ_2_SEQ_LM, ) self.model = get_peft_model(self.model, lora_config) self.model.print_trainable_parameters() # Enable training mode self.model.train() logger.info("LoRA configuration applied successfully") except Exception as e: logger.error(f"Failed to setup LoRA: {e}") raise def validate_dataset(self, df): """Validate that dataset has required columns""" required_cols = ['instruction', 'output', 'task_type'] optional_cols = ['quality_score'] # Check required columns missing = [col for col in required_cols if col not in df.columns] if missing: raise ValueError(f"Missing required columns: {missing}. Found columns: {df.columns.tolist()}") # Log optional columns for col in optional_cols: if col not in df.columns: logger.warning(f"Optional column '{col}' not found, continuing without it") # Validate data types and non-empty if df['instruction'].isna().any() or df['output'].isna().any(): raise ValueError("Found NaN values in instruction or output columns") logger.info(f"Dataset validation passed. Columns: {df.columns.tolist()}") return True def load_dataset(self, data_path): """Load and prepare dataset with validation and shuffling""" logger.info(f"Loading dataset from: {data_path}") if not os.path.exists(data_path): raise FileNotFoundError(f"Data file not found: {data_path}") try: df = pd.read_csv(data_path) logger.info(f"Loaded {len(df)} examples") except Exception as e: logger.error(f"Failed to load CSV: {e}") raise # Validate dataset self.validate_dataset(df) # Add task prefixes if not present def add_task_prefix(row): task = row['task_type'] instruction = row['instruction'] # Skip if already has prefix if instruction.startswith('['): return instruction prefix_map = { 'normalize_company': '[COMPANY]', 'normalize_job_title': '[JOB]', 'normalize_skill': '[SKILLS]', 'company_equivalence': '[COMPANY]', 'job_title_equivalence': '[JOB]', 'achievement_equivalence': '[ACHIEVEMENT]' } prefix = prefix_map.get(task, '') return f"{prefix} {instruction}" if prefix else instruction df['instruction'] = df.apply(add_task_prefix, axis=1) # Shuffle data before splitting df = df.sample(frac=1, random_state=42).reset_index(drop=True) logger.info("Data shuffled") # Split into train/validation train_size = int(0.9 * len(df)) train_df = df[:train_size] val_df = df[train_size:] logger.info(f"Train set: {len(train_df)} examples") logger.info(f"Validation set: {len(val_df)} examples") # Convert to HuggingFace Dataset train_dataset = Dataset.from_pandas(train_df) val_dataset = Dataset.from_pandas(val_df) return train_dataset, val_dataset def preprocess_function(self, examples): """Tokenize inputs and targets with dynamic padding""" inputs = examples['instruction'] targets = examples['output'] # Tokenize inputs with dynamic padding model_inputs = self.tokenizer( inputs, max_length=256, truncation=True, padding=True # Dynamic padding instead of max_length ) # Tokenize targets labels = self.tokenizer( text_target=targets, max_length=128, truncation=True, padding=True # Dynamic padding ) # Replace padding token id's of the labels by -100 labels["input_ids"] = [ [(l if l != self.tokenizer.pad_token_id else -100) for l in label] for label in labels["input_ids"] ] model_inputs["labels"] = labels["input_ids"] return model_inputs def compute_metrics(self, eval_pred): """Compute both exact match and fuzzy match metrics""" predictions, labels = eval_pred # Decode predictions decoded_preds = self.tokenizer.batch_decode( predictions, skip_special_tokens=True ) # Replace -100 in the labels as we can't decode them labels = np.where(labels != -100, labels, self.tokenizer.pad_token_id) decoded_labels = self.tokenizer.batch_decode( labels, skip_special_tokens=True ) # Calculate exact match accuracy exact_match = sum( pred.strip().lower() == label.strip().lower() for pred, label in zip(decoded_preds, decoded_labels) ) / len(decoded_preds) # Calculate fuzzy match (>90% similarity) fuzzy_match = sum( SequenceMatcher(None, pred.strip().lower(), label.strip().lower()).ratio() > 0.9 for pred, label in zip(decoded_preds, decoded_labels) ) / len(decoded_preds) # Calculate character-level accuracy char_accuracy = np.mean([ SequenceMatcher(None, pred.strip().lower(), label.strip().lower()).ratio() for pred, label in zip(decoded_preds, decoded_labels) ]) logger.info(f"Exact match: {exact_match:.4f}, Fuzzy match: {fuzzy_match:.4f}, Char accuracy: {char_accuracy:.4f}") return { "exact_match": exact_match, "fuzzy_match": fuzzy_match, "char_accuracy": char_accuracy } def train(self, train_dataset, val_dataset, output_dir, hf_token=None, hub_username=None, num_epochs=5): """Train the model with production-ready settings""" logger.info("Starting training preparation...") # Get columns to remove (handle optional columns) columns_to_remove = ['instruction', 'output', 'task_type'] if 'quality_score' in train_dataset.column_names: columns_to_remove.append('quality_score') # Tokenize datasets train_dataset = train_dataset.map( self.preprocess_function, batched=True, remove_columns=columns_to_remove ) val_dataset = val_dataset.map( self.preprocess_function, batched=True, remove_columns=columns_to_remove ) # Data collator with dynamic padding data_collator = DataCollatorForSeq2Seq( self.tokenizer, model=self.model, label_pad_token_id=-100, padding=True, # Dynamic padding pad_to_multiple_of=8 if torch.cuda.is_available() else None ) # Calculate gradient accumulation steps to maintain effective batch size effective_batch_size = 32 # Target effective batch size gradient_accumulation_steps = max(1, effective_batch_size // self.train_batch_size) # Training arguments training_args = TrainingArguments( output_dir=output_dir, num_train_epochs=num_epochs, per_device_train_batch_size=self.train_batch_size, per_device_eval_batch_size=self.eval_batch_size, gradient_accumulation_steps=gradient_accumulation_steps, gradient_checkpointing=False, # Disabled for LoRA fp16=torch.cuda.is_available(), bf16=False, # Use fp16 instead for better compatibility optim="adamw_torch", learning_rate=3e-4 if self.use_lora else 5e-5, warmup_steps=min(500, len(train_dataset) // self.train_batch_size // 10), logging_steps=25, eval_strategy="steps", eval_steps=250, save_strategy="steps", save_steps=250, load_best_model_at_end=True, metric_for_best_model="fuzzy_match", # More forgiving than exact match greater_is_better=True, push_to_hub=True if hf_token else False, hub_model_id=f"{hub_username}/resume-normalizer-flan-t5" if hub_username else None, hub_token=hf_token, report_to=["tensorboard"] if torch.cuda.is_available() else [], dataloader_num_workers=0, # Avoid multiprocessing issues remove_unused_columns=False, label_names=["labels"], ddp_find_unused_parameters=False if torch.cuda.device_count() > 1 else None, dataloader_pin_memory=True if torch.cuda.is_available() else False, ) # Create trainer trainer = Trainer( model=self.model, args=training_args, train_dataset=train_dataset, eval_dataset=val_dataset, tokenizer=self.tokenizer, data_collator=data_collator, compute_metrics=self.compute_metrics, callbacks=[ EarlyStoppingCallback( early_stopping_patience=5, # More patient early_stopping_threshold=0.001 # Minimum improvement ) ], ) logger.info("Training configuration:") logger.info(f" Total examples: {len(train_dataset)}") logger.info(f" Batch size: {self.train_batch_size}") logger.info(f" Gradient accumulation: {gradient_accumulation_steps}") logger.info(f" Effective batch size: {self.train_batch_size * gradient_accumulation_steps}") logger.info(f" Total optimization steps: {len(train_dataset) // (self.train_batch_size * gradient_accumulation_steps) * num_epochs}") # Train try: trainer.train() except KeyboardInterrupt: logger.info("Training interrupted by user") trainer.save_model(output_dir + "_interrupted") raise except Exception as e: logger.error(f"Training failed: {e}") raise # Save model logger.info("Saving model...") if self.use_lora: # Save LoRA adapter self.model.save_pretrained(output_dir) self.tokenizer.save_pretrained(output_dir) else: trainer.save_model(output_dir) # Push to hub if token provided if hf_token and hub_username: logger.info("Pushing model to HuggingFace Hub...") try: trainer.push_to_hub( commit_message="Final model trained on resume normalization data" ) except Exception as e: logger.error(f"Failed to push to hub: {e}") logger.info("Model saved locally but not pushed to hub") logger.info("Training completed successfully!") return trainer def main(): """Main training function""" import argparse parser = argparse.ArgumentParser() parser.add_argument("--data_path", type=str, required=True) parser.add_argument("--model_size", type=str, default="base") parser.add_argument("--hf_token", type=str, default=None) parser.add_argument("--hub_username", type=str, default=None) parser.add_argument("--num_epochs", type=int, default=5) parser.add_argument("--use_lora", action="store_true") args = parser.parse_args() # Set HF token if provided if args.hf_token: from huggingface_hub import HfFolder HfFolder.save_token(args.hf_token) # Select model based on size model_name = "google/flan-t5-base" if args.model_size == "base" else "google/flan-t5-large" try: # Initialize trainer trainer = ResumeNormalizationTrainer( model_name=model_name, use_lora=args.use_lora ) # Load dataset train_dataset, val_dataset = trainer.load_dataset(args.data_path) # Train output_dir = "./resume-normalizer-model" trainer.train( train_dataset=train_dataset, val_dataset=val_dataset, output_dir=output_dir, hf_token=args.hf_token, hub_username=args.hub_username, num_epochs=args.num_epochs ) print("Training completed successfully!") print(f"Model saved to: {output_dir}") if args.hf_token and args.hub_username: print(f"Model available at: https://huggingface.co/{args.hub_username}/resume-normalizer-flan-t5") except Exception as e: logger.error(f"Training script failed: {e}") sys.exit(1) if __name__ == "__main__": main()