Codette-Reasoning / observatory /performance_tracker.py
Raiff1982's picture
Upload 120 files
ed1b365 verified
"""
Performance Tracker - analyses training metrics history to identify
improvement trends, best adapters, and score progression.
"""
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
_THIS_DIR = Path(__file__).resolve().parent
_PROJECT_ROOT = _THIS_DIR.parent
if str(_PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(_PROJECT_ROOT))
from observatory.metrics_logger import MetricsLogger
class PerformanceTracker:
"""Analyse training metrics to track improvement over time."""
def __init__(self, logger: Optional[MetricsLogger] = None, log_file: Optional[str] = None):
self.logger = logger or MetricsLogger(log_file=log_file)
# -- trend analysis ----------------------------------------------------
def score_progression(self, adapter: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get score progression over time for an adapter (or all).
Returns list of dicts with timestamp, adapter, reasoning_score, loss, epoch.
"""
if adapter:
entries = self.logger.get_by_adapter(adapter)
else:
entries = self.logger.get_all()
entries = sorted(entries, key=lambda e: e.get("timestamp", ""))
return [
{
"timestamp": e.get("timestamp"),
"adapter": e.get("adapter"),
"reasoning_score": e.get("reasoning_score", 0),
"loss": e.get("loss", 0),
"epoch": e.get("epoch", 0),
"dataset_size": e.get("dataset_size", 0),
}
for e in entries
]
def calculate_improvement(self, adapter: str) -> Dict[str, Any]:
"""Calculate improvement between first and last run for an adapter.
Returns dict with first_score, last_score, delta, percent_change,
num_runs, first_timestamp, last_timestamp.
"""
entries = self.logger.get_by_adapter(adapter)
if len(entries) < 2:
return {
"adapter": adapter,
"num_runs": len(entries),
"first_score": entries[0]["reasoning_score"] if entries else 0,
"last_score": entries[-1]["reasoning_score"] if entries else 0,
"delta": 0.0,
"percent_change": 0.0,
"sufficient_data": False,
}
entries = sorted(entries, key=lambda e: e.get("timestamp", ""))
first = entries[0]
last = entries[-1]
first_score = first.get("reasoning_score", 0)
last_score = last.get("reasoning_score", 0)
delta = last_score - first_score
pct = (delta / first_score * 100) if first_score > 0 else 0.0
return {
"adapter": adapter,
"num_runs": len(entries),
"first_score": round(first_score, 6),
"last_score": round(last_score, 6),
"delta": round(delta, 6),
"percent_change": round(pct, 2),
"first_timestamp": first.get("timestamp"),
"last_timestamp": last.get("timestamp"),
"sufficient_data": True,
}
def improvement_trends(self) -> List[Dict[str, Any]]:
"""Calculate improvement trends for all adapters."""
adapters = self.logger.get_unique_adapters()
trends = []
for adapter in adapters:
trend = self.calculate_improvement(adapter)
trends.append(trend)
trends.sort(key=lambda t: t.get("delta", 0), reverse=True)
return trends
def best_adapters(self, top_n: int = 5) -> List[Dict[str, Any]]:
"""Find the best-performing adapter versions by reasoning score.
Returns list of entries sorted by highest reasoning_score.
"""
entries = self.logger.get_all()
if not entries:
return []
# Group by adapter, take best score for each
best: Dict[str, Dict[str, Any]] = {}
for e in entries:
adapter = e.get("adapter", "unknown")
score = e.get("reasoning_score", 0)
if adapter not in best or score > best[adapter].get("reasoning_score", 0):
best[adapter] = e
ranked = sorted(best.values(), key=lambda e: e.get("reasoning_score", 0), reverse=True)
return ranked[:top_n]
def run_to_run_deltas(self, adapter: str) -> List[Dict[str, float]]:
"""Calculate score delta between consecutive runs of an adapter."""
entries = self.logger.get_by_adapter(adapter)
entries = sorted(entries, key=lambda e: e.get("timestamp", ""))
deltas = []
for i in range(1, len(entries)):
prev_score = entries[i - 1].get("reasoning_score", 0)
curr_score = entries[i].get("reasoning_score", 0)
deltas.append({
"run": i,
"from_timestamp": entries[i - 1].get("timestamp"),
"to_timestamp": entries[i].get("timestamp"),
"score_delta": round(curr_score - prev_score, 6),
"loss_delta": round(
entries[i].get("loss", 0) - entries[i - 1].get("loss", 0), 6
),
})
return deltas
def loss_progression(self, adapter: Optional[str] = None) -> List[Tuple[str, float]]:
"""Get loss values over time."""
if adapter:
entries = self.logger.get_by_adapter(adapter)
else:
entries = self.logger.get_all()
entries = sorted(entries, key=lambda e: e.get("timestamp", ""))
return [(e.get("timestamp", ""), e.get("loss", 0)) for e in entries]
# -- report ------------------------------------------------------------
def format_report(self) -> str:
"""Generate a formatted text report of performance tracking."""
lines: List[str] = []
lines.append("=" * 74)
lines.append(" CODETTE PERFORMANCE TRACKING REPORT")
lines.append("=" * 74)
entries = self.logger.get_all()
lines.append(f" Total logged runs: {len(entries)}")
lines.append(f" Unique adapters: {len(self.logger.get_unique_adapters())}")
lines.append("")
# Best adapters table
best = self.best_adapters(top_n=10)
if best:
lines.append("-" * 74)
lines.append(" TOP ADAPTERS BY REASONING SCORE")
lines.append("-" * 74)
lines.append(f" {'Rank':<5} {'Adapter':<28} {'Score':>8} {'Loss':>8} {'Epoch':>6} {'Data':>6}")
lines.append(f" {'----':<5} {'-------':<28} {'-----':>8} {'----':>8} {'-----':>6} {'----':>6}")
for i, entry in enumerate(best, 1):
name = entry.get("adapter", "?")[:27]
score = entry.get("reasoning_score", 0)
loss = entry.get("loss", 0)
epoch = entry.get("epoch", 0)
ds = entry.get("dataset_size", 0)
lines.append(
f" {i:<5} {name:<28} {score:>8.4f} {loss:>8.4f} {epoch:>6} {ds:>6}"
)
lines.append("")
# Improvement trends
trends = self.improvement_trends()
if trends:
lines.append("-" * 74)
lines.append(" IMPROVEMENT TRENDS (first run -> last run)")
lines.append("-" * 74)
lines.append(
f" {'Adapter':<28} {'First':>8} {'Last':>8} {'Delta':>8} {'Change':>8} {'Runs':>5}"
)
lines.append(
f" {'-------':<28} {'-----':>8} {'----':>8} {'-----':>8} {'------':>8} {'----':>5}"
)
for t in trends:
name = t["adapter"][:27]
first = t["first_score"]
last = t["last_score"]
delta = t["delta"]
pct = t["percent_change"]
runs = t["num_runs"]
sign = "+" if delta >= 0 else ""
lines.append(
f" {name:<28} {first:>8.4f} {last:>8.4f} "
f"{sign}{delta:>7.4f} {sign}{pct:>6.1f}% {runs:>5}"
)
lines.append("")
# Score progression chart (ASCII sparkline per adapter)
adapters = self.logger.get_unique_adapters()
if adapters:
lines.append("-" * 74)
lines.append(" SCORE PROGRESSION (ASCII sparkline)")
lines.append("-" * 74)
for adapter in adapters[:8]:
progression = self.score_progression(adapter)
if not progression:
continue
scores = [p["reasoning_score"] for p in progression]
sparkline = self._sparkline(scores, width=40)
name = adapter[:24]
lines.append(f" {name:<25} {sparkline} [{scores[0]:.3f} -> {scores[-1]:.3f}]")
lines.append("")
lines.append("=" * 74)
return "\n".join(lines)
@staticmethod
def _sparkline(values: List[float], width: int = 40) -> str:
"""Create an ASCII sparkline from a list of values."""
if not values:
return ""
if len(values) == 1:
return "-"
min_v = min(values)
max_v = max(values)
range_v = max_v - min_v if max_v > min_v else 1.0
chars = " _.-~^"
n_chars = len(chars) - 1
# Resample to fit width
if len(values) > width:
step = len(values) / width
resampled = []
for i in range(width):
idx = int(i * step)
resampled.append(values[min(idx, len(values) - 1)])
values = resampled
elif len(values) < width:
# Pad with last value
values = values + [values[-1]] * (width - len(values))
result = ""
for v in values[:width]:
normalised = (v - min_v) / range_v
idx = int(normalised * n_chars)
idx = max(0, min(idx, n_chars))
result += chars[idx]
return result
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Codette Performance Tracker - analyse training run history"
)
parser.add_argument(
"--log-file", "-l",
default=None,
help="Path to observatory_metrics.json (default: auto-detect)",
)
parser.add_argument(
"--adapter", "-a",
default=None,
help="Filter to a specific adapter name",
)
parser.add_argument(
"--best", "-b",
type=int,
default=None,
help="Show top N best adapters",
)
parser.add_argument(
"--deltas", "-d",
default=None,
help="Show run-to-run deltas for a specific adapter",
)
args = parser.parse_args()
tracker = PerformanceTracker(log_file=args.log_file)
if args.best:
best = tracker.best_adapters(top_n=args.best)
for i, entry in enumerate(best, 1):
print(f" {i}. {entry.get('adapter', '?')} - "
f"score: {entry.get('reasoning_score', 0):.4f}, "
f"loss: {entry.get('loss', 0):.4f}")
return
if args.deltas:
deltas = tracker.run_to_run_deltas(args.deltas)
if not deltas:
print(f"No run-to-run data for adapter: {args.deltas}")
return
for d in deltas:
sign = "+" if d["score_delta"] >= 0 else ""
print(f" Run {d['run']}: score {sign}{d['score_delta']:.6f}, "
f"loss {sign}{d['loss_delta']:.6f}")
return
if args.adapter:
improvement = tracker.calculate_improvement(args.adapter)
print(f" Adapter: {improvement['adapter']}")
print(f" Runs: {improvement['num_runs']}")
print(f" First score: {improvement['first_score']:.6f}")
print(f" Last score: {improvement['last_score']:.6f}")
print(f" Delta: {improvement['delta']:+.6f}")
print(f" Change: {improvement['percent_change']:+.2f}%")
return
# Full report
print(tracker.format_report())
if __name__ == "__main__":
main()