Spaces:
Sleeping
Sleeping
| """ | |
| Fine-tuning script for Iain Morris style article generation | |
| Uses QLoRA for efficient training | |
| """ | |
| import os | |
| import json | |
| import torch | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForCausalLM, | |
| TrainingArguments, | |
| Trainer, | |
| DataCollatorForLanguageModeling, | |
| BitsAndBytesConfig | |
| ) | |
| from peft import ( | |
| LoraConfig, | |
| get_peft_model, | |
| TaskType, | |
| prepare_model_for_kbit_training | |
| ) | |
| from datasets import Dataset, load_from_disk | |
| import logging | |
| from typing import Dict, List | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class IainMorrisFineTuner: | |
| def __init__(self, model_name: str = "microsoft/DialoGPT-medium"): | |
| """ | |
| Initialize the fine-tuner | |
| Args: | |
| model_name: Base model to fine-tune | |
| """ | |
| # Use Zephyr-7B-Beta - excellent for instruction following, no auth required | |
| self.model_name = "HuggingFaceH4/zephyr-7b-beta" | |
| # Configure device for Apple Silicon M3 | |
| if torch.backends.mps.is_available(): | |
| self.device = torch.device("mps") | |
| self.use_mps = True | |
| self.use_cuda = False | |
| logger.info("Using Apple Silicon MPS acceleration") | |
| elif torch.cuda.is_available(): | |
| self.device = torch.device("cuda") | |
| self.use_mps = False | |
| self.use_cuda = True | |
| logger.info("Using CUDA acceleration") | |
| else: | |
| self.device = torch.device("cpu") | |
| self.use_mps = False | |
| self.use_cuda = False | |
| logger.info("Using CPU") | |
| logger.info(f"Using device: {self.device}") | |
| # Skip quantization for MPS - not supported yet | |
| if self.use_cuda: | |
| self.bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch.bfloat16 | |
| ) | |
| else: | |
| self.bnb_config = None | |
| if self.use_mps: | |
| logger.info("Quantization not supported on MPS. Using full precision with memory optimization.") | |
| else: | |
| logger.info("Quantization not available on CPU. Using full precision.") | |
| # LoRA configuration optimized for M3 | |
| lora_rank = 16 if (self.use_mps or self.use_cuda) else 8 # Full rank for M3/CUDA | |
| self.lora_config = LoraConfig( | |
| r=lora_rank, # Rank | |
| lora_alpha=32, # Alpha parameter for LoRA scaling | |
| target_modules=[ | |
| "q_proj", | |
| "k_proj", | |
| "v_proj", | |
| "o_proj", | |
| "gate_proj", | |
| "up_proj", | |
| "down_proj", | |
| "lm_head", | |
| ], | |
| bias="none", | |
| lora_dropout=0.05, | |
| task_type=TaskType.CAUSAL_LM, | |
| ) | |
| def load_model_and_tokenizer(self): | |
| """Load the base model and tokenizer""" | |
| logger.info(f"Loading model: {self.model_name}") | |
| # Load tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.model_name, | |
| trust_remote_code=True, | |
| padding_side="left" | |
| ) | |
| # Add pad token if it doesn't exist | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| # Load model with M3-optimized settings | |
| model_kwargs = { | |
| "trust_remote_code": True, | |
| "low_cpu_mem_usage": True, | |
| } | |
| if self.use_cuda: | |
| # CUDA settings with quantization | |
| model_kwargs.update({ | |
| "quantization_config": self.bnb_config, | |
| "device_map": "auto", | |
| "torch_dtype": torch.bfloat16 | |
| }) | |
| elif self.use_mps: | |
| # MPS (Apple Silicon) optimized settings | |
| model_kwargs.update({ | |
| "torch_dtype": torch.float16, # float16 works well on MPS | |
| "device_map": None, # Let us handle device placement manually | |
| }) | |
| else: | |
| # CPU settings | |
| model_kwargs.update({ | |
| "torch_dtype": torch.float32, | |
| "device_map": None, | |
| }) | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.model_name, | |
| **model_kwargs | |
| ) | |
| # Move model to device if not using device_map | |
| if not self.use_cuda: | |
| self.model = self.model.to(self.device) | |
| # Prepare model for training | |
| if self.use_cuda: | |
| self.model = prepare_model_for_kbit_training(self.model) | |
| else: | |
| # For MPS/CPU training, just ensure model is in training mode | |
| self.model.train() | |
| # Add LoRA adapters | |
| self.model = get_peft_model(self.model, self.lora_config) | |
| # Print trainable parameters | |
| self.model.print_trainable_parameters() | |
| logger.info("Model and tokenizer loaded successfully") | |
| def format_chat_template(self, example: Dict) -> str: | |
| """ | |
| Format example using chat template | |
| Args: | |
| example: Training example with messages | |
| Returns: | |
| Formatted text | |
| """ | |
| messages = example['messages'] | |
| # Use the tokenizer's chat template if available | |
| if hasattr(self.tokenizer, 'apply_chat_template'): | |
| try: | |
| return self.tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=False | |
| ) | |
| except: | |
| pass | |
| # Fallback formatting | |
| formatted = "" | |
| for message in messages: | |
| role = message['role'] | |
| content = message['content'] | |
| if role == 'system': | |
| formatted += f"<|system|>\n{content}\n" | |
| elif role == 'user': | |
| formatted += f"<|user|>\n{content}\n" | |
| elif role == 'assistant': | |
| formatted += f"<|assistant|>\n{content}\n" | |
| return formatted | |
| def tokenize_function(self, examples: Dict) -> Dict: | |
| """ | |
| Tokenize examples for training | |
| Args: | |
| examples: Batch of examples | |
| Returns: | |
| Tokenized examples | |
| """ | |
| # Format each example | |
| texts = [] | |
| for i in range(len(examples['messages'])): | |
| example = {'messages': examples['messages'][i]} | |
| formatted_text = self.format_chat_template(example) | |
| texts.append(formatted_text) | |
| # Tokenize | |
| tokenized = self.tokenizer( | |
| texts, | |
| truncation=True, | |
| padding=False, | |
| max_length=2048, | |
| return_overflowing_tokens=False, | |
| ) | |
| # Set labels for causal language modeling | |
| tokenized["labels"] = tokenized["input_ids"].copy() | |
| return tokenized | |
| def load_datasets(self, data_dir: str = "data"): | |
| """ | |
| Load training and validation datasets | |
| Args: | |
| data_dir: Directory containing the datasets | |
| """ | |
| logger.info("Loading datasets...") | |
| try: | |
| # Try to load HF datasets first | |
| self.train_dataset = load_from_disk(f"{data_dir}/train_hf_dataset") | |
| self.val_dataset = load_from_disk(f"{data_dir}/val_hf_dataset") | |
| except: | |
| # Fallback to JSON files - prioritize enhanced dataset | |
| try: | |
| # Try enhanced dataset first (includes non-telecom examples) | |
| with open(f"{data_dir}/enhanced_train_dataset.json", 'r') as f: | |
| train_data = json.load(f) | |
| logger.info("Using enhanced training dataset with non-telecom examples") | |
| except FileNotFoundError: | |
| try: | |
| # Fall back to improved dataset (updated system prompts) | |
| with open(f"{data_dir}/improved_train_dataset.json", 'r') as f: | |
| train_data = json.load(f) | |
| logger.info("Using improved training dataset with updated system prompts") | |
| except FileNotFoundError: | |
| # Final fallback to original dataset | |
| with open(f"{data_dir}/train_dataset.json", 'r') as f: | |
| train_data = json.load(f) | |
| logger.info("Using original training dataset") | |
| # Load validation dataset (use improved if available) | |
| try: | |
| with open(f"{data_dir}/improved_val_dataset.json", 'r') as f: | |
| val_data = json.load(f) | |
| logger.info("Using improved validation dataset") | |
| except FileNotFoundError: | |
| with open(f"{data_dir}/val_dataset.json", 'r') as f: | |
| val_data = json.load(f) | |
| logger.info("Using original validation dataset") | |
| self.train_dataset = Dataset.from_list(train_data) | |
| self.val_dataset = Dataset.from_list(val_data) | |
| logger.info(f"Loaded {len(self.train_dataset)} training examples") | |
| logger.info(f"Loaded {len(self.val_dataset)} validation examples") | |
| # Tokenize datasets | |
| logger.info("Tokenizing datasets...") | |
| self.train_dataset = self.train_dataset.map( | |
| self.tokenize_function, | |
| batched=True, | |
| remove_columns=self.train_dataset.column_names | |
| ) | |
| self.val_dataset = self.val_dataset.map( | |
| self.tokenize_function, | |
| batched=True, | |
| remove_columns=self.val_dataset.column_names | |
| ) | |
| logger.info("Datasets tokenized successfully") | |
| def setup_training_args(self, output_dir: str = "models/iain-morris-model-enhanced"): | |
| """ | |
| Setup training arguments optimized for M3 | |
| Args: | |
| output_dir: Directory to save the model | |
| """ | |
| # Base training arguments - improved based on training guide recommendations | |
| training_kwargs = { | |
| "output_dir": output_dir, | |
| "num_train_epochs": 4 if self.use_mps else 4, # Increased epochs for better style learning | |
| "per_device_train_batch_size": 1, | |
| "per_device_eval_batch_size": 1, | |
| "gradient_accumulation_steps": 8 if self.use_mps else 4, # More accumulation for MPS | |
| "save_steps": 50, | |
| "logging_steps": 10, | |
| "learning_rate": 5e-5 if self.use_mps else 5e-5, # Lower LR as recommended (5e-5) | |
| "weight_decay": 0.001, | |
| "max_grad_norm": 0.3, | |
| "max_steps": -1, | |
| "warmup_ratio": 0.03, | |
| "group_by_length": True, | |
| "lr_scheduler_type": "constant", | |
| "report_to": "none", # Disable reporting to avoid tensorboard dependency | |
| "eval_strategy": "steps", | |
| "eval_steps": 50, | |
| "save_total_limit": 3, # Keep more checkpoints for better model selection | |
| "load_best_model_at_end": True, | |
| "metric_for_best_model": "eval_loss", | |
| "greater_is_better": False, | |
| "dataloader_pin_memory": False, | |
| } | |
| # Device-specific optimizations | |
| if self.use_cuda: | |
| training_kwargs.update({ | |
| "optim": "paged_adamw_32bit", | |
| "fp16": False, | |
| "bf16": True, | |
| }) | |
| elif self.use_mps: | |
| training_kwargs.update({ | |
| "optim": "adamw_torch", # Standard optimizer for MPS | |
| "fp16": False, # fp16 not supported on MPS in this version | |
| "bf16": False, # bf16 not supported on MPS | |
| "dataloader_num_workers": 0, # Avoid multiprocessing issues on MPS | |
| }) | |
| else: | |
| training_kwargs.update({ | |
| "optim": "adamw_torch", | |
| "fp16": False, | |
| "bf16": False, | |
| "dataloader_num_workers": 0, | |
| }) | |
| self.training_args = TrainingArguments(**training_kwargs) | |
| logger.info(f"Training configured for {self.device} with {training_kwargs['num_train_epochs']} epochs") | |
| def train(self): | |
| """Train the model""" | |
| logger.info("Starting training...") | |
| # Data collator | |
| data_collator = DataCollatorForLanguageModeling( | |
| tokenizer=self.tokenizer, | |
| mlm=False, | |
| ) | |
| # Initialize trainer | |
| trainer = Trainer( | |
| model=self.model, | |
| args=self.training_args, | |
| train_dataset=self.train_dataset, | |
| eval_dataset=self.val_dataset, | |
| tokenizer=self.tokenizer, | |
| data_collator=data_collator, | |
| ) | |
| # Train | |
| trainer.train() | |
| # Save the final model | |
| trainer.save_model() | |
| self.tokenizer.save_pretrained(self.training_args.output_dir) | |
| logger.info(f"Training completed. Model saved to {self.training_args.output_dir}") | |
| def save_lora_adapters(self, output_dir: str = "models/lora_adapters"): | |
| """ | |
| Save only the LoRA adapters | |
| Args: | |
| output_dir: Directory to save adapters | |
| """ | |
| os.makedirs(output_dir, exist_ok=True) | |
| self.model.save_pretrained(output_dir) | |
| self.tokenizer.save_pretrained(output_dir) | |
| logger.info(f"LoRA adapters saved to {output_dir}") | |
| def run_full_pipeline(self, data_dir: str = "data"): | |
| """ | |
| Run the complete fine-tuning pipeline | |
| Args: | |
| data_dir: Directory containing training data | |
| """ | |
| try: | |
| # Load model and tokenizer | |
| self.load_model_and_tokenizer() | |
| # Load datasets | |
| self.load_datasets(data_dir) | |
| # Setup training arguments | |
| self.setup_training_args() | |
| # Train | |
| self.train() | |
| # Save LoRA adapters separately | |
| self.save_lora_adapters() | |
| logger.info("Fine-tuning pipeline completed successfully!") | |
| except Exception as e: | |
| logger.error(f"Error in fine-tuning pipeline: {e}") | |
| raise | |
| def main(): | |
| """ | |
| Main function to run fine-tuning | |
| """ | |
| # Check if CUDA is available | |
| if torch.cuda.is_available(): | |
| logger.info(f"CUDA available. GPU: {torch.cuda.get_device_name()}") | |
| logger.info(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB") | |
| else: | |
| logger.warning("CUDA not available. Training will be slow on CPU.") | |
| # Initialize fine-tuner | |
| fine_tuner = IainMorrisFineTuner() | |
| # Run the pipeline | |
| fine_tuner.run_full_pipeline() | |
| if __name__ == "__main__": | |
| main() | |