Spaces:
Sleeping
Sleeping
| import re | |
| import os | |
| import pandas as pd | |
| class TelemetryParser: | |
| """ | |
| Parses the training telemetry .txt files that MLX LoRA outputs | |
| and converts them into clean CSV / DataFrame for analysis. | |
| Usage: | |
| runner = TelemetryParser("./data/training_telemetry.txt") | |
| df = runner.parse() | |
| runner.export_csv("./data/training_telemetry.csv") | |
| runner.summary() | |
| """ | |
| # Regex pattern matching MLX training output lines like: | |
| # Iter 10: Train loss 4.002, Learning Rate 1.000e-05, It/sec 0.383, Tokens/sec 64.202, Trained Tokens 1677, Peak mem 8.324 GB | |
| TRAIN_PATTERN = re.compile( | |
| r"Iter (\d+): Train loss ([\d.]+), " | |
| r"Learning Rate ([\d.e+-]+), " | |
| r"It/sec ([\d.]+), " | |
| r"Tokens/sec ([\d.]+), " | |
| r"Trained Tokens (\d+), " | |
| r"Peak mem ([\d.]+) GB" | |
| ) | |
| # Regex pattern matching MLX validation output lines like: | |
| # Iter 200: Val loss 3.817, Val took 3.723s | |
| VAL_PATTERN = re.compile( | |
| r"Iter (\d+): Val loss ([\d.]+), Val took ([\d.]+)s" | |
| ) | |
| def __init__(self, telemetry_filepath): | |
| """ | |
| Initializes the runner with a path to the telemetry log file. | |
| telemetry_filepath: Path to the .txt file from mlx_lm.lora output. | |
| """ | |
| self.telemetry_filepath = telemetry_filepath | |
| self.train_df = None | |
| self.val_df = None | |
| def parse(self): | |
| """ | |
| Reads the telemetry file and extracts training + validation metrics | |
| into two separate DataFrames. | |
| Returns the training DataFrame. | |
| """ | |
| if not os.path.exists(self.telemetry_filepath): | |
| print(f"β Telemetry file not found: {self.telemetry_filepath}") | |
| return None | |
| train_rows = [] | |
| val_rows = [] | |
| with open(self.telemetry_filepath, "r", encoding="utf-8") as f: | |
| for line in f: | |
| # Try to match a training line | |
| train_match = self.TRAIN_PATTERN.search(line) | |
| if train_match: | |
| train_rows.append({ | |
| "Iteration": int(train_match.group(1)), | |
| "Train Loss": float(train_match.group(2)), | |
| "Learning Rate": float(train_match.group(3)), | |
| "It/sec": float(train_match.group(4)), | |
| "Tokens/sec": float(train_match.group(5)), | |
| "Trained Tokens": int(train_match.group(6)), | |
| "Peak Memory (GB)": float(train_match.group(7)), | |
| }) | |
| # Try to match a validation line | |
| val_match = self.VAL_PATTERN.search(line) | |
| if val_match: | |
| val_rows.append({ | |
| "Iteration": int(val_match.group(1)), | |
| "Val Loss": float(val_match.group(2)), | |
| "Val Time (s)": float(val_match.group(3)), | |
| }) | |
| self.train_df = pd.DataFrame(train_rows) | |
| self.val_df = pd.DataFrame(val_rows) | |
| print(f"π Parsed {len(self.train_df)} training entries and {len(self.val_df)} validation entries.") | |
| return self.train_df | |
| def export_csv(self, output_path=None): | |
| """ | |
| Exports the parsed training data to a CSV file. | |
| Merges training and validation data on the Iteration column. | |
| """ | |
| if self.train_df is None: | |
| self.parse() | |
| if self.train_df is None or self.train_df.empty: | |
| print("β No data to export!") | |
| return | |
| # Merge train and val on Iteration (val rows only exist every N steps) | |
| merged = self.train_df.merge(self.val_df, on="Iteration", how="left") | |
| if output_path is None: | |
| # Default: same name as input but with .csv extension | |
| base = os.path.splitext(self.telemetry_filepath)[0] | |
| output_path = base + ".csv" | |
| merged.to_csv(output_path, index=False) | |
| print(f"πΎ Exported telemetry CSV to: {output_path}") | |
| return output_path | |
| def summary(self): | |
| """Prints a human-readable summary of the training run.""" | |
| if self.train_df is None: | |
| self.parse() | |
| if self.train_df is None or self.train_df.empty: | |
| print("β No data to summarize!") | |
| return | |
| print("\nβββ Training Run Summary βββ") | |
| print(f" Total Iterations: {self.train_df['Iteration'].max()}") | |
| print(f" Total Tokens Trained: {self.train_df['Trained Tokens'].max():,}") | |
| print(f" Peak Memory: {self.train_df['Peak Memory (GB)'].max():.2f} GB") | |
| print(f" Avg Speed: {self.train_df['Tokens/sec'].mean():.1f} tokens/sec") | |
| # Train loss: first vs last | |
| first_loss = self.train_df['Train Loss'].iloc[0] | |
| last_loss = self.train_df['Train Loss'].iloc[-1] | |
| train_improvement = ((first_loss - last_loss) / first_loss) * 100 | |
| print(f" Train Loss: {first_loss:.3f} β {last_loss:.3f} ({train_improvement:+.1f}%)") | |
| # Validation loss: first vs best | |
| if self.val_df is not None and not self.val_df.empty: | |
| first_val = self.val_df['Val Loss'].iloc[0] | |
| best_val = self.val_df['Val Loss'].min() | |
| best_iter = self.val_df.loc[self.val_df['Val Loss'].idxmin(), 'Iteration'] | |
| val_improvement = ((first_val - best_val) / first_val) * 100 | |
| print(f" Val Loss: {first_val:.3f} β {best_val:.3f} ({val_improvement:+.1f}%)") | |
| print(f" π Best Checkpoint: Iter {best_iter} (Val Loss {best_val:.3f})") | |
| print("βββββββββββββββββββββββββββ\n") | |
| def get_best_checkpoint(self): | |
| """Returns the iteration number with the lowest validation loss.""" | |
| if self.val_df is None: | |
| self.parse() | |
| if self.val_df is None or self.val_df.empty: | |
| return None | |
| best_idx = self.val_df['Val Loss'].idxmin() | |
| return int(self.val_df.loc[best_idx, 'Iteration']) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # EXECUTION BLOCK β Run standalone for quick analysis | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import sys | |
| project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| data_dir = os.path.join(project_root, "data") | |
| # Find all telemetry files in data/ | |
| telemetry_files = [ | |
| f for f in os.listdir(data_dir) | |
| if f.startswith("training_telemetry") and f.endswith(".txt") | |
| ] | |
| if not telemetry_files: | |
| print("β No telemetry files found in data/. Run a training first!") | |
| sys.exit(1) | |
| for filename in sorted(telemetry_files): | |
| filepath = os.path.join(data_dir, filename) | |
| print(f"\nπ Analyzing: {filename}") | |
| print("=" * 50) | |
| runner = TelemetryParser(filepath) | |
| runner.parse() | |
| runner.summary() | |
| runner.export_csv() | |