import re import os import pandas as pd class TelemetryParser: """ Parses the training telemetry .txt files that MLX LoRA outputs and converts them into clean CSV / DataFrame for analysis. Usage: runner = TelemetryParser("./data/training_telemetry.txt") df = runner.parse() runner.export_csv("./data/training_telemetry.csv") runner.summary() """ # Regex pattern matching MLX training output lines like: # Iter 10: Train loss 4.002, Learning Rate 1.000e-05, It/sec 0.383, Tokens/sec 64.202, Trained Tokens 1677, Peak mem 8.324 GB TRAIN_PATTERN = re.compile( r"Iter (\d+): Train loss ([\d.]+), " r"Learning Rate ([\d.e+-]+), " r"It/sec ([\d.]+), " r"Tokens/sec ([\d.]+), " r"Trained Tokens (\d+), " r"Peak mem ([\d.]+) GB" ) # Regex pattern matching MLX validation output lines like: # Iter 200: Val loss 3.817, Val took 3.723s VAL_PATTERN = re.compile( r"Iter (\d+): Val loss ([\d.]+), Val took ([\d.]+)s" ) def __init__(self, telemetry_filepath): """ Initializes the runner with a path to the telemetry log file. telemetry_filepath: Path to the .txt file from mlx_lm.lora output. """ self.telemetry_filepath = telemetry_filepath self.train_df = None self.val_df = None def parse(self): """ Reads the telemetry file and extracts training + validation metrics into two separate DataFrames. Returns the training DataFrame. """ if not os.path.exists(self.telemetry_filepath): print(f"āŒ Telemetry file not found: {self.telemetry_filepath}") return None train_rows = [] val_rows = [] with open(self.telemetry_filepath, "r", encoding="utf-8") as f: for line in f: # Try to match a training line train_match = self.TRAIN_PATTERN.search(line) if train_match: train_rows.append({ "Iteration": int(train_match.group(1)), "Train Loss": float(train_match.group(2)), "Learning Rate": float(train_match.group(3)), "It/sec": float(train_match.group(4)), "Tokens/sec": float(train_match.group(5)), "Trained Tokens": int(train_match.group(6)), "Peak Memory (GB)": float(train_match.group(7)), }) # Try to match a validation line val_match = self.VAL_PATTERN.search(line) if val_match: val_rows.append({ "Iteration": int(val_match.group(1)), "Val Loss": float(val_match.group(2)), "Val Time (s)": float(val_match.group(3)), }) self.train_df = pd.DataFrame(train_rows) self.val_df = pd.DataFrame(val_rows) print(f"šŸ“Š Parsed {len(self.train_df)} training entries and {len(self.val_df)} validation entries.") return self.train_df def export_csv(self, output_path=None): """ Exports the parsed training data to a CSV file. Merges training and validation data on the Iteration column. """ if self.train_df is None: self.parse() if self.train_df is None or self.train_df.empty: print("āŒ No data to export!") return # Merge train and val on Iteration (val rows only exist every N steps) merged = self.train_df.merge(self.val_df, on="Iteration", how="left") if output_path is None: # Default: same name as input but with .csv extension base = os.path.splitext(self.telemetry_filepath)[0] output_path = base + ".csv" merged.to_csv(output_path, index=False) print(f"šŸ’¾ Exported telemetry CSV to: {output_path}") return output_path def summary(self): """Prints a human-readable summary of the training run.""" if self.train_df is None: self.parse() if self.train_df is None or self.train_df.empty: print("āŒ No data to summarize!") return print("\n━━━ Training Run Summary ━━━") print(f" Total Iterations: {self.train_df['Iteration'].max()}") print(f" Total Tokens Trained: {self.train_df['Trained Tokens'].max():,}") print(f" Peak Memory: {self.train_df['Peak Memory (GB)'].max():.2f} GB") print(f" Avg Speed: {self.train_df['Tokens/sec'].mean():.1f} tokens/sec") # Train loss: first vs last first_loss = self.train_df['Train Loss'].iloc[0] last_loss = self.train_df['Train Loss'].iloc[-1] train_improvement = ((first_loss - last_loss) / first_loss) * 100 print(f" Train Loss: {first_loss:.3f} → {last_loss:.3f} ({train_improvement:+.1f}%)") # Validation loss: first vs best if self.val_df is not None and not self.val_df.empty: first_val = self.val_df['Val Loss'].iloc[0] best_val = self.val_df['Val Loss'].min() best_iter = self.val_df.loc[self.val_df['Val Loss'].idxmin(), 'Iteration'] val_improvement = ((first_val - best_val) / first_val) * 100 print(f" Val Loss: {first_val:.3f} → {best_val:.3f} ({val_improvement:+.1f}%)") print(f" šŸ† Best Checkpoint: Iter {best_iter} (Val Loss {best_val:.3f})") print("━━━━━━━━━━━━━━━━━━━━━━━━━━━\n") def get_best_checkpoint(self): """Returns the iteration number with the lowest validation loss.""" if self.val_df is None: self.parse() if self.val_df is None or self.val_df.empty: return None best_idx = self.val_df['Val Loss'].idxmin() return int(self.val_df.loc[best_idx, 'Iteration']) # ══════════════════════════════════════════════════ # EXECUTION BLOCK — Run standalone for quick analysis # ══════════════════════════════════════════════════ if __name__ == "__main__": import sys project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) data_dir = os.path.join(project_root, "data") # Find all telemetry files in data/ telemetry_files = [ f for f in os.listdir(data_dir) if f.startswith("training_telemetry") and f.endswith(".txt") ] if not telemetry_files: print("āŒ No telemetry files found in data/. Run a training first!") sys.exit(1) for filename in sorted(telemetry_files): filepath = os.path.join(data_dir, filename) print(f"\nšŸ“„ Analyzing: {filename}") print("=" * 50) runner = TelemetryParser(filepath) runner.parse() runner.summary() runner.export_csv()