| import torch |
| import json |
| import time |
| from datetime import datetime |
|
|
|
|
| import os |
| import torch.nn as nn |
|
|
| import numpy as np |
| import random |
| import transformers |
|
|
| import platform |
| from transformers import TrainerCallback, TrainingArguments, TrainerState, TrainerControl |
|
|
|
|
| class ExperimentMonitorCallback(TrainerCallback): |
| """ |
| Callback to monitor training performance and log system stats to a JSON file. |
| It captures: |
| 1. Experiment Metadata (GPU info, Batch size, Learning rate, etc.) |
| 2. Runtime Metrics (Avg time/step, Throughput) |
| 3. Memory Metrics (Allocated, Reserved, and Peak usage) |
| """ |
|
|
| def __init__(self, log_file_path: str, run_name: str = "experiment", log_interval: int = 100): |
| |
| self.log_file_path = log_file_path |
| self.run_name = run_name |
| self.log_interval = log_interval |
| |
| |
| self.start_time = None |
| self.last_log_time = None |
| |
| |
| self.log_data = { |
| "metadata": {}, |
| "metrics": [] |
| } |
|
|
| def _get_gpu_info(self): |
| |
| if torch.cuda.is_available(): |
| return { |
| "name": torch.cuda.get_device_name(0), |
| "count": torch.cuda.device_count(), |
| "capability": torch.cuda.get_device_capability(0) |
| } |
| return "CPU_ONLY" |
|
|
| def on_train_begin(self, args: TrainingArguments, state: TrainerState, control: TrainerControl, **kwargs): |
| |
| self.start_time = time.perf_counter() |
| self.last_log_time = self.start_time |
| |
| |
| if torch.cuda.is_available(): |
| torch.cuda.reset_peak_memory_stats() |
|
|
| |
| self.log_data["metadata"] = { |
| "run_name": self.run_name, |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| "python_version": platform.python_version(), |
| "pytorch_version": torch.__version__, |
| "gpu_info": self._get_gpu_info(), |
| "configuration": { |
| "batch_size_per_device": args.per_device_train_batch_size, |
| "learning_rate": args.learning_rate, |
| "max_steps": args.max_steps, |
| "num_train_epochs": args.num_train_epochs, |
| "fp16": args.fp16, |
| "bf16": args.bf16, |
| "optim": args.optim, |
| } |
| } |
| |
| |
| self._save_log() |
| |
|
|
| def on_step_end(self, args: TrainingArguments, state: TrainerState, control: TrainerControl, **kwargs): |
| current_step = state.global_step |
| |
| |
| if current_step > 0 and current_step % self.log_interval == 0: |
| current_time = time.perf_counter() |
| |
| |
| elapsed_since_last = current_time - self.last_log_time |
| avg_time_per_step = elapsed_since_last / self.log_interval |
| |
| |
| mem_stats = {} |
| if torch.cuda.is_available(): |
| |
| mem_stats["allocated_gb"] = torch.cuda.memory_allocated() / 1024**3 |
| mem_stats["reserved_gb"] = torch.cuda.memory_reserved() / 1024**3 |
| |
| mem_stats["peak_allocated_gb"] = torch.cuda.max_memory_allocated() / 1024**3 |
| |
| |
| metric_entry = { |
| "step": current_step, |
| "epoch": state.epoch, |
| "timestamp": datetime.now().isoformat(), |
| "performance": { |
| "avg_time_per_step_s": round(avg_time_per_step, 4), |
| "steps_per_second": round(1.0 / avg_time_per_step, 2) |
| }, |
| "memory": mem_stats |
| } |
|
|
| |
| self.log_data["metrics"].append(metric_entry) |
| self._save_log() |
| |
| |
| self.last_log_time = current_time |
| |
| |
| print(f" -> Step {current_step}: {avg_time_per_step*1000:.1f}s/step |"\ |
| f"Peak Mem: {mem_stats.get('peak_allocated_gb', 0):.2f} GB |"\ |
| f"Reserved: {mem_stats.get('reserved_gb', 0):.2f} GB") |
|
|
| def _save_log(self): |
| |
| |
| |
| try: |
| with open(self.log_file_path, 'w', encoding='utf-8') as f: |
| json.dump(self.log_data, f, indent=4) |
| except Exception as e: |
| print(f"Error saving experiment log: {e}") |
|
|
| def debug_masking_visualizer(processed_batch, tokenizer): |
| """ |
| Visualizes the alignment between input_ids and labels to verify masking. |
| """ |
| input_ids = processed_batch['input_ids'][0] |
| labels = processed_batch['labels'][0] |
| |
| print("\n" + "="*80) |
| print(f"{'TOKEN (Decoded)':<30} | {'INPUT ID':<10} | {'LABEL ID':<10} | {'STATUS'}") |
| print("="*80) |
|
|
| for idx, lbl in zip(input_ids, labels): |
| |
| |
| token_text = tokenizer.decode([idx]).replace("\n", "\\n") |
| |
| |
| if lbl == -100: |
| status = "❌ MASKED (Instruction)" |
| label_display = "IGNORE" |
| else: |
| status = "✅ TRAIN (Response)" |
| label_display = str(lbl) |
| |
| print(f"{token_text:<30} | {idx:<10} | {label_display:<10} | {status}") |
| |
| print("="*80 + "\n") |
|
|
|
|
| def trainable_parameters_to_file(model: nn.Module, save_dir: str): |
| """ |
| Calculates model parameters and saves a detailed report of |
| trainable matrices to a specific directory. |
| """ |
| trainable_params = 0 |
| all_param = 0 |
| trainable_layers = [] |
| |
| |
| max_name_len = 20 |
| |
| for name, param in model.named_parameters(): |
| num_params = param.numel() |
| all_param += num_params |
| if param.requires_grad: |
| trainable_params += num_params |
| trainable_layers.append({ |
| "name": name, |
| "shape": str(list(param.shape)), |
| "count": num_params |
| }) |
| |
| if len(name) > max_name_len: |
| max_name_len = len(name) |
| |
| trainable_pct = 100 * trainable_params / all_param if all_param > 0 else 0 |
| summary_text = ( |
| f"Total Parameters: {all_param:,}\n" |
| f"Trainable Parameters: {trainable_params:,}\n" |
| f"Trainable Percentage: {trainable_pct:.4f}%\n" |
| ) |
| |
| |
| |
| |
| |
| if not os.path.exists(save_dir): |
| os.makedirs(save_dir) |
| |
| file_path = os.path.join(save_dir, "model_parameters_report.txt") |
| |
| |
| name_col_width = max_name_len + 4 |
| |
| with open(file_path, "w") as f: |
| f.write("=== GLOBAL STATISTICS ===\n") |
| f.write(summary_text) |
| f.write("\n" + "=" * (name_col_width + 40) + "\n") |
| f.write("=== DETAILED TRAINABLE MATRICES LIST ===\n") |
| |
| |
| header = f"{'Layer Name':<{name_col_width}} | {'Shape':<25} | {'Count':<15}\n" |
| f.write(header) |
| f.write("-" * len(header) + "\n") |
| |
| for layer in trainable_layers: |
| f.write( |
| f"{layer['name'] :<{name_col_width}} | " |
| f"{layer['shape'] :<25} | " |
| f"{layer['count'] :,}\n" |
| ) |
|
|
| |
| |
| def set_seed_all(seed: int): |
| random.seed(seed) |
| np.random.seed(seed) |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed_all(seed) |
| transformers.set_seed(seed) |
| |
| |