ai-sonnet-generator / src /telemetry_parser.py
AlexKa03's picture
Upload folder using huggingface_hub
190f35c verified
import re
import os
import pandas as pd
class TelemetryParser:
"""
Parses the training telemetry .txt files that MLX LoRA outputs
and converts them into clean CSV / DataFrame for analysis.
Usage:
runner = TelemetryParser("./data/training_telemetry.txt")
df = runner.parse()
runner.export_csv("./data/training_telemetry.csv")
runner.summary()
"""
# Regex pattern matching MLX training output lines like:
# Iter 10: Train loss 4.002, Learning Rate 1.000e-05, It/sec 0.383, Tokens/sec 64.202, Trained Tokens 1677, Peak mem 8.324 GB
TRAIN_PATTERN = re.compile(
r"Iter (\d+): Train loss ([\d.]+), "
r"Learning Rate ([\d.e+-]+), "
r"It/sec ([\d.]+), "
r"Tokens/sec ([\d.]+), "
r"Trained Tokens (\d+), "
r"Peak mem ([\d.]+) GB"
)
# Regex pattern matching MLX validation output lines like:
# Iter 200: Val loss 3.817, Val took 3.723s
VAL_PATTERN = re.compile(
r"Iter (\d+): Val loss ([\d.]+), Val took ([\d.]+)s"
)
def __init__(self, telemetry_filepath):
"""
Initializes the runner with a path to the telemetry log file.
telemetry_filepath: Path to the .txt file from mlx_lm.lora output.
"""
self.telemetry_filepath = telemetry_filepath
self.train_df = None
self.val_df = None
def parse(self):
"""
Reads the telemetry file and extracts training + validation metrics
into two separate DataFrames.
Returns the training DataFrame.
"""
if not os.path.exists(self.telemetry_filepath):
print(f"❌ Telemetry file not found: {self.telemetry_filepath}")
return None
train_rows = []
val_rows = []
with open(self.telemetry_filepath, "r", encoding="utf-8") as f:
for line in f:
# Try to match a training line
train_match = self.TRAIN_PATTERN.search(line)
if train_match:
train_rows.append({
"Iteration": int(train_match.group(1)),
"Train Loss": float(train_match.group(2)),
"Learning Rate": float(train_match.group(3)),
"It/sec": float(train_match.group(4)),
"Tokens/sec": float(train_match.group(5)),
"Trained Tokens": int(train_match.group(6)),
"Peak Memory (GB)": float(train_match.group(7)),
})
# Try to match a validation line
val_match = self.VAL_PATTERN.search(line)
if val_match:
val_rows.append({
"Iteration": int(val_match.group(1)),
"Val Loss": float(val_match.group(2)),
"Val Time (s)": float(val_match.group(3)),
})
self.train_df = pd.DataFrame(train_rows)
self.val_df = pd.DataFrame(val_rows)
print(f"πŸ“Š Parsed {len(self.train_df)} training entries and {len(self.val_df)} validation entries.")
return self.train_df
def export_csv(self, output_path=None):
"""
Exports the parsed training data to a CSV file.
Merges training and validation data on the Iteration column.
"""
if self.train_df is None:
self.parse()
if self.train_df is None or self.train_df.empty:
print("❌ No data to export!")
return
# Merge train and val on Iteration (val rows only exist every N steps)
merged = self.train_df.merge(self.val_df, on="Iteration", how="left")
if output_path is None:
# Default: same name as input but with .csv extension
base = os.path.splitext(self.telemetry_filepath)[0]
output_path = base + ".csv"
merged.to_csv(output_path, index=False)
print(f"πŸ’Ύ Exported telemetry CSV to: {output_path}")
return output_path
def summary(self):
"""Prints a human-readable summary of the training run."""
if self.train_df is None:
self.parse()
if self.train_df is None or self.train_df.empty:
print("❌ No data to summarize!")
return
print("\n━━━ Training Run Summary ━━━")
print(f" Total Iterations: {self.train_df['Iteration'].max()}")
print(f" Total Tokens Trained: {self.train_df['Trained Tokens'].max():,}")
print(f" Peak Memory: {self.train_df['Peak Memory (GB)'].max():.2f} GB")
print(f" Avg Speed: {self.train_df['Tokens/sec'].mean():.1f} tokens/sec")
# Train loss: first vs last
first_loss = self.train_df['Train Loss'].iloc[0]
last_loss = self.train_df['Train Loss'].iloc[-1]
train_improvement = ((first_loss - last_loss) / first_loss) * 100
print(f" Train Loss: {first_loss:.3f} β†’ {last_loss:.3f} ({train_improvement:+.1f}%)")
# Validation loss: first vs best
if self.val_df is not None and not self.val_df.empty:
first_val = self.val_df['Val Loss'].iloc[0]
best_val = self.val_df['Val Loss'].min()
best_iter = self.val_df.loc[self.val_df['Val Loss'].idxmin(), 'Iteration']
val_improvement = ((first_val - best_val) / first_val) * 100
print(f" Val Loss: {first_val:.3f} β†’ {best_val:.3f} ({val_improvement:+.1f}%)")
print(f" πŸ† Best Checkpoint: Iter {best_iter} (Val Loss {best_val:.3f})")
print("━━━━━━━━━━━━━━━━━━━━━━━━━━━\n")
def get_best_checkpoint(self):
"""Returns the iteration number with the lowest validation loss."""
if self.val_df is None:
self.parse()
if self.val_df is None or self.val_df.empty:
return None
best_idx = self.val_df['Val Loss'].idxmin()
return int(self.val_df.loc[best_idx, 'Iteration'])
# ══════════════════════════════════════════════════
# EXECUTION BLOCK β€” Run standalone for quick analysis
# ══════════════════════════════════════════════════
if __name__ == "__main__":
import sys
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_dir = os.path.join(project_root, "data")
# Find all telemetry files in data/
telemetry_files = [
f for f in os.listdir(data_dir)
if f.startswith("training_telemetry") and f.endswith(".txt")
]
if not telemetry_files:
print("❌ No telemetry files found in data/. Run a training first!")
sys.exit(1)
for filename in sorted(telemetry_files):
filepath = os.path.join(data_dir, filename)
print(f"\nπŸ“„ Analyzing: {filename}")
print("=" * 50)
runner = TelemetryParser(filepath)
runner.parse()
runner.summary()
runner.export_csv()