Spaces:
Sleeping
Sleeping
File size: 7,196 Bytes
190f35c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | import re
import os
import pandas as pd
class TelemetryParser:
"""
Parses the training telemetry .txt files that MLX LoRA outputs
and converts them into clean CSV / DataFrame for analysis.
Usage:
runner = TelemetryParser("./data/training_telemetry.txt")
df = runner.parse()
runner.export_csv("./data/training_telemetry.csv")
runner.summary()
"""
# Regex pattern matching MLX training output lines like:
# Iter 10: Train loss 4.002, Learning Rate 1.000e-05, It/sec 0.383, Tokens/sec 64.202, Trained Tokens 1677, Peak mem 8.324 GB
TRAIN_PATTERN = re.compile(
r"Iter (\d+): Train loss ([\d.]+), "
r"Learning Rate ([\d.e+-]+), "
r"It/sec ([\d.]+), "
r"Tokens/sec ([\d.]+), "
r"Trained Tokens (\d+), "
r"Peak mem ([\d.]+) GB"
)
# Regex pattern matching MLX validation output lines like:
# Iter 200: Val loss 3.817, Val took 3.723s
VAL_PATTERN = re.compile(
r"Iter (\d+): Val loss ([\d.]+), Val took ([\d.]+)s"
)
def __init__(self, telemetry_filepath):
"""
Initializes the runner with a path to the telemetry log file.
telemetry_filepath: Path to the .txt file from mlx_lm.lora output.
"""
self.telemetry_filepath = telemetry_filepath
self.train_df = None
self.val_df = None
def parse(self):
"""
Reads the telemetry file and extracts training + validation metrics
into two separate DataFrames.
Returns the training DataFrame.
"""
if not os.path.exists(self.telemetry_filepath):
print(f"β Telemetry file not found: {self.telemetry_filepath}")
return None
train_rows = []
val_rows = []
with open(self.telemetry_filepath, "r", encoding="utf-8") as f:
for line in f:
# Try to match a training line
train_match = self.TRAIN_PATTERN.search(line)
if train_match:
train_rows.append({
"Iteration": int(train_match.group(1)),
"Train Loss": float(train_match.group(2)),
"Learning Rate": float(train_match.group(3)),
"It/sec": float(train_match.group(4)),
"Tokens/sec": float(train_match.group(5)),
"Trained Tokens": int(train_match.group(6)),
"Peak Memory (GB)": float(train_match.group(7)),
})
# Try to match a validation line
val_match = self.VAL_PATTERN.search(line)
if val_match:
val_rows.append({
"Iteration": int(val_match.group(1)),
"Val Loss": float(val_match.group(2)),
"Val Time (s)": float(val_match.group(3)),
})
self.train_df = pd.DataFrame(train_rows)
self.val_df = pd.DataFrame(val_rows)
print(f"π Parsed {len(self.train_df)} training entries and {len(self.val_df)} validation entries.")
return self.train_df
def export_csv(self, output_path=None):
"""
Exports the parsed training data to a CSV file.
Merges training and validation data on the Iteration column.
"""
if self.train_df is None:
self.parse()
if self.train_df is None or self.train_df.empty:
print("β No data to export!")
return
# Merge train and val on Iteration (val rows only exist every N steps)
merged = self.train_df.merge(self.val_df, on="Iteration", how="left")
if output_path is None:
# Default: same name as input but with .csv extension
base = os.path.splitext(self.telemetry_filepath)[0]
output_path = base + ".csv"
merged.to_csv(output_path, index=False)
print(f"πΎ Exported telemetry CSV to: {output_path}")
return output_path
def summary(self):
"""Prints a human-readable summary of the training run."""
if self.train_df is None:
self.parse()
if self.train_df is None or self.train_df.empty:
print("β No data to summarize!")
return
print("\nβββ Training Run Summary βββ")
print(f" Total Iterations: {self.train_df['Iteration'].max()}")
print(f" Total Tokens Trained: {self.train_df['Trained Tokens'].max():,}")
print(f" Peak Memory: {self.train_df['Peak Memory (GB)'].max():.2f} GB")
print(f" Avg Speed: {self.train_df['Tokens/sec'].mean():.1f} tokens/sec")
# Train loss: first vs last
first_loss = self.train_df['Train Loss'].iloc[0]
last_loss = self.train_df['Train Loss'].iloc[-1]
train_improvement = ((first_loss - last_loss) / first_loss) * 100
print(f" Train Loss: {first_loss:.3f} β {last_loss:.3f} ({train_improvement:+.1f}%)")
# Validation loss: first vs best
if self.val_df is not None and not self.val_df.empty:
first_val = self.val_df['Val Loss'].iloc[0]
best_val = self.val_df['Val Loss'].min()
best_iter = self.val_df.loc[self.val_df['Val Loss'].idxmin(), 'Iteration']
val_improvement = ((first_val - best_val) / first_val) * 100
print(f" Val Loss: {first_val:.3f} β {best_val:.3f} ({val_improvement:+.1f}%)")
print(f" π Best Checkpoint: Iter {best_iter} (Val Loss {best_val:.3f})")
print("βββββββββββββββββββββββββββ\n")
def get_best_checkpoint(self):
"""Returns the iteration number with the lowest validation loss."""
if self.val_df is None:
self.parse()
if self.val_df is None or self.val_df.empty:
return None
best_idx = self.val_df['Val Loss'].idxmin()
return int(self.val_df.loc[best_idx, 'Iteration'])
# ββββββββββββββββββββββββββββββββββββββββββββββββββ
# EXECUTION BLOCK β Run standalone for quick analysis
# ββββββββββββββββββββββββββββββββββββββββββββββββββ
if __name__ == "__main__":
import sys
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_dir = os.path.join(project_root, "data")
# Find all telemetry files in data/
telemetry_files = [
f for f in os.listdir(data_dir)
if f.startswith("training_telemetry") and f.endswith(".txt")
]
if not telemetry_files:
print("β No telemetry files found in data/. Run a training first!")
sys.exit(1)
for filename in sorted(telemetry_files):
filepath = os.path.join(data_dir, filename)
print(f"\nπ Analyzing: {filename}")
print("=" * 50)
runner = TelemetryParser(filepath)
runner.parse()
runner.summary()
runner.export_csv()
|