model-router / monitor.py
dknguyen2304's picture
Upload folder using huggingface_hub
8bc62e2 verified
#!/usr/bin/env python3
"""
Router Finetune Agent - Monitoring Dashboard
A real-time monitoring dashboard for tracking router finetuning progress.
Displays key metrics with live updates using Rich library.
"""
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from rich import print
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from rich.table import Table
from rich.text import Text
# Configuration
BASE_DIR = Path(__file__).parent
EVAL_RESULTS_PATH = BASE_DIR / "logs" / "eval_results.json"
RETRY_HISTORY_PATH = BASE_DIR / "logs" / "retry_history.json"
# Target thresholds
TARGETS = {
"routing_accuracy": 0.85,
"macro_f1": 0.80,
"avg_latency_ms": 20.0,
"retry_count": 10,
}
# Stage definitions
STAGES = ["Data Gen", "Labeling", "Training", "Eval", "Export"]
class MetricsReader:
"""Reads and parses metrics from JSON log files."""
@staticmethod
def read_json(file_path: Path) -> Optional[dict]:
"""Read and parse JSON file, returns None if file doesn't exist or is invalid."""
if not file_path.exists():
return None
try:
with open(file_path, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return None
@staticmethod
def get_routing_accuracy(data: Optional[dict]) -> Optional[float]:
"""Extract routing accuracy from eval results."""
if data is None:
return None
return data.get("routing_accuracy") or data.get("metrics", {}).get("routing_accuracy")
@staticmethod
def get_macro_f1(data: Optional[dict]) -> Optional[float]:
"""Extract macro F1 from eval results."""
if data is None:
return None
return data.get("macro_f1") or data.get("metrics", {}).get("macro_f1")
@staticmethod
def get_avg_latency(data: Optional[dict]) -> Optional[float]:
"""Extract average latency from eval results."""
if data is None:
return None
return data.get("avg_latency_ms") or data.get("metrics", {}).get("avg_latency_ms")
@staticmethod
def get_current_stage(data: Optional[dict]) -> str:
"""Extract current stage from eval results."""
if data is None:
return "Unknown"
return data.get("current_stage", "Unknown")
@staticmethod
def get_training_progress(data: Optional[dict]) -> Optional[dict]:
"""Extract training progress information."""
if data is None:
return None
training = data.get("training", {}) or data.get("training_progress", {})
if training:
return {
"epoch": training.get("current_epoch", 0),
"total_epochs": training.get("total_epochs", training.get("epochs", 0)),
"loss": training.get("loss"),
}
return None
@staticmethod
def get_retry_count(data: Optional[dict]) -> int:
"""Get retry count from retry history."""
retry_data = MetricsReader.read_json(RETRY_HISTORY_PATH)
if retry_data is None:
return 0
return retry_data.get("total_retries", len(retry_data.get("retries", [])))
def get_status_color(current: Optional[float], target: float, higher_is_better: bool = True) -> str:
"""Determine status color based on current value vs target."""
if current is None:
return "yellow"
if higher_is_better:
return "green" if current >= target else "red"
else:
return "green" if current <= target else "red"
def format_metric_value(value: Optional[float], suffix: str = "", precision: int = 3) -> str:
"""Format metric value for display."""
if value is None:
return "[yellow]Waiting...[/yellow]"
if isinstance(value, float):
return f"[cyan]{value:.{precision}f}[/cyan]{suffix}"
return f"[cyan]{value}[/cyan]{suffix}"
def create_header() -> Panel:
"""Create dashboard header panel."""
title = Text("Router Finetune Agent - Monitoring Dashboard", style="bold white on dark_blue")
timestamp = Text(f"Updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", style="italic")
return Panel(
title + "\n" + timestamp,
title="[bold]ROUTE FINETUNE MONITOR[/bold]",
border_style="blue",
padding=(1, 2),
)
def create_metrics_table(eval_data: Optional[dict]) -> Table:
"""Create main metrics table with current vs target values."""
table = Table(
title="[bold]Key Metrics[/bold]",
show_header=True,
header_style="bold magenta",
border_style="bright_black",
row_styles=["", "dim"],
)
table.add_column("Metric", style="bold", width=20)
table.add_column("Current", justify="center", width=15)
table.add_column("Target", justify="center", width=15)
table.add_column("Status", justify="center", width=10)
# Routing Accuracy
routing_acc = MetricsReader.get_routing_accuracy(eval_data)
status_color = get_status_color(routing_acc, TARGETS["routing_accuracy"], higher_is_better=True)
status = "[green]PASS[/green]" if routing_acc is not None and routing_acc >= TARGETS["routing_accuracy"] else "[red]FAIL[/red]" if routing_acc is not None else "[yellow]WAIT[/yellow]"
table.add_row(
"Routing Accuracy",
format_metric_value(routing_acc),
f"[green]>={TARGETS['routing_accuracy']}[/green]",
status if routing_acc is not None else "[yellow]WAIT[/yellow]"
)
# Macro F1
macro_f1 = MetricsReader.get_macro_f1(eval_data)
status = "[green]PASS[/green]" if macro_f1 is not None and macro_f1 >= TARGETS["macro_f1"] else "[red]FAIL[/red]" if macro_f1 is not None else "[yellow]WAIT[/yellow]"
table.add_row(
"Macro F1",
format_metric_value(macro_f1),
f"[green]>={TARGETS['macro_f1']}[/green]",
status
)
# Avg Latency
latency = MetricsReader.get_avg_latency(eval_data)
status = "[green]PASS[/green]" if latency is not None and latency <= TARGETS["avg_latency_ms"] else "[red]FAIL[/red]" if latency is not None else "[yellow]WAIT[/yellow]"
table.add_row(
"Avg Latency",
format_metric_value(latency, " ms"),
f"[green]<={TARGETS['avg_latency_ms']} ms[/green]",
status
)
# Retry Count
retry_count = MetricsReader.get_retry_count(eval_data)
status = "[green]PASS[/green]" if retry_count <= TARGETS["retry_count"] else "[red]FAIL[/red]"
table.add_row(
"Retry Count",
f"[cyan]{retry_count}[/cyan]",
f"[green]<={TARGETS['retry_count']}[/green]",
status
)
return table
def create_stage_panel(eval_data: Optional[dict]) -> Panel:
"""Create current stage panel with visual progress."""
current_stage = MetricsReader.get_current_stage(eval_data)
# Create stage indicator
stage_display = []
for i, stage in enumerate(STAGES):
if stage == current_stage:
stage_display.append(f"[bold bright_yellow]>> {stage} <<[/bold bright_yellow]")
elif STAGES.index(current_stage) > i:
stage_display.append(f"[dim green] {stage} [/dim green]")
else:
stage_display.append(f"[dim] {stage} [/dim]")
# Create stage progress bar
if current_stage in STAGES:
stage_index = STAGES.index(current_stage)
progress_pct = (stage_index / (len(STAGES) - 1)) * 100
progress_bar = f"[bright_blue]{'β–ˆ' * (stage_index * 4)}[/bright_blue][bright_yellow]{'β–ˆ' * 4}[/bright_yellow][dim]{'β–‘' * ((len(STAGES) - stage_index - 1) * 4)}[/dim]"
else:
progress_bar = "[dim]β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘[/dim]"
progress_pct = 0
content = "\n".join(stage_display) + "\n\n" + progress_bar + f"\n[dim]Progress: {progress_pct:.0f}%[/dim]"
return Panel(
content,
title="[bold]Current Stage[/bold]",
border_style="cyan",
padding=(1, 2),
)
def create_training_progress_panel(eval_data: Optional[dict]) -> Panel:
"""Create training progress panel (only shown during training stage)."""
current_stage = MetricsReader.get_current_stage(eval_data)
if current_stage != "Training":
return Panel(
"[dim]Training progress available during Training stage[/dim]",
title="[bold]Training Progress[/bold]",
border_style="dim",
padding=(1, 2),
)
progress_info = MetricsReader.get_training_progress(eval_data)
if progress_info is None or progress_info.get("total_epochs", 0) == 0:
return Panel(
"[yellow]Waiting for training data...[/yellow]",
title="[bold]Training Progress[/bold]",
border_style="yellow",
padding=(1, 2),
)
epoch = progress_info.get("epoch", 0)
total_epochs = progress_info.get("total_epochs", 0)
loss = progress_info.get("loss")
# Create progress bar
if total_epochs > 0:
bar_length = 30
filled = int((epoch / total_epochs) * bar_length)
progress_bar = f"[bright_green]{'β–ˆ' * filled}[/bright_green][dim]{'β–‘' * (bar_length - filled)}[/dim]"
progress_text = f"Epoch {epoch}/{total_epochs}"
else:
progress_bar = "[dim]β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘[/dim]"
progress_text = "Initializing..."
loss_text = f"Loss: [cyan]{loss:.4f}[/cyan]" if loss is not None else "Loss: [dim]N/A[/dim]"
content = f"{progress_bar}\n\n[bold]{progress_text}[/bold]\n{loss_text}"
return Panel(
content,
title="[bold]Training Progress[/bold]",
border_style="green",
padding=(1, 2),
)
def create_status_summary(eval_data: Optional[dict]) -> Panel:
"""Create overall status summary panel."""
routing_acc = MetricsReader.get_routing_accuracy(eval_data)
macro_f1 = MetricsReader.get_macro_f1(eval_data)
latency = MetricsReader.get_avg_latency(eval_data)
retry_count = MetricsReader.get_retry_count(eval_data)
checks = [
("Routing Accuracy", routing_acc, TARGETS["routing_accuracy"], True),
("Macro F1", macro_f1, TARGETS["macro_f1"], True),
("Avg Latency", latency, TARGETS["avg_latency_ms"], False),
("Retry Count", retry_count, TARGETS["retry_count"], False),
]
passed = sum(1 for _, val, target, higher_better in checks
if val is not None and (val >= target if higher_better else val <= target))
total = sum(1 for _, val, _, _ in checks if val is not None)
if total == 0:
status = "[yellow]AWAITING DATA[/yellow]"
summary = "Waiting for evaluation results..."
elif passed == len(checks):
status = "[bold bright_green]ALL CHECKS PASSED[/bold bright_green]"
summary = f"All {len(checks)} metrics meet targets"
elif passed == total:
status = "[bold bright_green]SUCCESS[/bold bright_green]"
summary = f"All {total} metrics meet targets"
else:
status = "[bold bright_red]METRICS NEED ATTENTION[/bold bright_red]"
summary = f"{passed}/{len(checks)} metrics passing"
content = f"{status}\n\n[dim]{summary}[/dim]"
return Panel(
content,
title="[bold]Overall Status[/bold]",
border_style="bright_black",
padding=(1, 2),
)
def generate_layout(eval_data: Optional[dict]) -> Layout:
"""Generate the complete dashboard layout."""
layout = Layout()
# Split into header, main content, and footer areas
layout.split_column(
Layout(name="header", size=5),
Layout(name="main"),
Layout(name="footer", size=3),
)
# Main area splits into metrics table and side panels
layout["main"].split_row(
Layout(name="metrics", ratio=2),
Layout(name="stage_panel", ratio=1),
)
# Populate header
layout["header"].update(create_header())
# Populate metrics
layout["metrics"].update(create_metrics_table(eval_data))
# Populate stage panel with training progress
layout["stage_panel"].split_column(
Layout(create_stage_panel(eval_data), name="stage"),
Layout(create_training_progress_panel(eval_data), name="training"),
)
# Footer with status summary
layout["footer"].update(create_status_summary(eval_data))
return layout
def read_current_data() -> tuple[Optional[dict], Optional[dict]]:
"""Read current data from all sources."""
eval_data = MetricsReader.read_json(EVAL_RESULTS_PATH)
retry_data = MetricsReader.read_json(RETRY_HISTORY_PATH)
return eval_data, retry_data
def main():
"""Main entry point for the monitoring dashboard."""
console = Console()
# Check if Rich is properly installed
try:
from rich.layout import Layout
from rich.live import Live
except ImportError:
console.print("[red]Error: Rich library is required.[/red]")
console.print("Please install it with: pip install rich")
sys.exit(1)
console.print("\n[bold bright_blue]Starting Router Finetune Agent Monitor...[/bold bright_blue]")
console.print(f"Monitoring: {EVAL_RESULTS_PATH}")
console.print(f"Retry History: {RETRY_HISTORY_PATH}")
console.print("\nPress [bold]Ctrl+C[/bold] to exit.\n")
# Initial read to show starting state
eval_data, _ = read_current_data()
try:
with Live(
generate_layout(eval_data),
console=console,
refresh_per_second=2,
transient=False,
screen=True,
) as live:
while True:
import time
eval_data, _ = read_current_data()
live.update(generate_layout(eval_data))
time.sleep(0.5) # Update twice per second
except KeyboardInterrupt:
console.print("\n[bold bright_yellow]Monitor stopped.[/bold bright_yellow]")
sys.exit(0)
if __name__ == "__main__":
main()