#!/usr/bin/env python3 """ Router Finetune Agent - Monitoring Dashboard A real-time monitoring dashboard for tracking router finetuning progress. Displays key metrics with live updates using Rich library. """ import json import sys from datetime import datetime from pathlib import Path from typing import Any, Optional from rich import print from rich.console import Console from rich.layout import Layout from rich.live import Live from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn from rich.table import Table from rich.text import Text # Configuration BASE_DIR = Path(__file__).parent EVAL_RESULTS_PATH = BASE_DIR / "logs" / "eval_results.json" RETRY_HISTORY_PATH = BASE_DIR / "logs" / "retry_history.json" # Target thresholds TARGETS = { "routing_accuracy": 0.85, "macro_f1": 0.80, "avg_latency_ms": 20.0, "retry_count": 10, } # Stage definitions STAGES = ["Data Gen", "Labeling", "Training", "Eval", "Export"] class MetricsReader: """Reads and parses metrics from JSON log files.""" @staticmethod def read_json(file_path: Path) -> Optional[dict]: """Read and parse JSON file, returns None if file doesn't exist or is invalid.""" if not file_path.exists(): return None try: with open(file_path, "r") as f: return json.load(f) except (json.JSONDecodeError, IOError): return None @staticmethod def get_routing_accuracy(data: Optional[dict]) -> Optional[float]: """Extract routing accuracy from eval results.""" if data is None: return None return data.get("routing_accuracy") or data.get("metrics", {}).get("routing_accuracy") @staticmethod def get_macro_f1(data: Optional[dict]) -> Optional[float]: """Extract macro F1 from eval results.""" if data is None: return None return data.get("macro_f1") or data.get("metrics", {}).get("macro_f1") @staticmethod def get_avg_latency(data: Optional[dict]) -> Optional[float]: """Extract average latency from eval results.""" if data is None: return None return data.get("avg_latency_ms") or data.get("metrics", {}).get("avg_latency_ms") @staticmethod def get_current_stage(data: Optional[dict]) -> str: """Extract current stage from eval results.""" if data is None: return "Unknown" return data.get("current_stage", "Unknown") @staticmethod def get_training_progress(data: Optional[dict]) -> Optional[dict]: """Extract training progress information.""" if data is None: return None training = data.get("training", {}) or data.get("training_progress", {}) if training: return { "epoch": training.get("current_epoch", 0), "total_epochs": training.get("total_epochs", training.get("epochs", 0)), "loss": training.get("loss"), } return None @staticmethod def get_retry_count(data: Optional[dict]) -> int: """Get retry count from retry history.""" retry_data = MetricsReader.read_json(RETRY_HISTORY_PATH) if retry_data is None: return 0 return retry_data.get("total_retries", len(retry_data.get("retries", []))) def get_status_color(current: Optional[float], target: float, higher_is_better: bool = True) -> str: """Determine status color based on current value vs target.""" if current is None: return "yellow" if higher_is_better: return "green" if current >= target else "red" else: return "green" if current <= target else "red" def format_metric_value(value: Optional[float], suffix: str = "", precision: int = 3) -> str: """Format metric value for display.""" if value is None: return "[yellow]Waiting...[/yellow]" if isinstance(value, float): return f"[cyan]{value:.{precision}f}[/cyan]{suffix}" return f"[cyan]{value}[/cyan]{suffix}" def create_header() -> Panel: """Create dashboard header panel.""" title = Text("Router Finetune Agent - Monitoring Dashboard", style="bold white on dark_blue") timestamp = Text(f"Updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", style="italic") return Panel( title + "\n" + timestamp, title="[bold]ROUTE FINETUNE MONITOR[/bold]", border_style="blue", padding=(1, 2), ) def create_metrics_table(eval_data: Optional[dict]) -> Table: """Create main metrics table with current vs target values.""" table = Table( title="[bold]Key Metrics[/bold]", show_header=True, header_style="bold magenta", border_style="bright_black", row_styles=["", "dim"], ) table.add_column("Metric", style="bold", width=20) table.add_column("Current", justify="center", width=15) table.add_column("Target", justify="center", width=15) table.add_column("Status", justify="center", width=10) # Routing Accuracy routing_acc = MetricsReader.get_routing_accuracy(eval_data) status_color = get_status_color(routing_acc, TARGETS["routing_accuracy"], higher_is_better=True) status = "[green]PASS[/green]" if routing_acc is not None and routing_acc >= TARGETS["routing_accuracy"] else "[red]FAIL[/red]" if routing_acc is not None else "[yellow]WAIT[/yellow]" table.add_row( "Routing Accuracy", format_metric_value(routing_acc), f"[green]>={TARGETS['routing_accuracy']}[/green]", status if routing_acc is not None else "[yellow]WAIT[/yellow]" ) # Macro F1 macro_f1 = MetricsReader.get_macro_f1(eval_data) status = "[green]PASS[/green]" if macro_f1 is not None and macro_f1 >= TARGETS["macro_f1"] else "[red]FAIL[/red]" if macro_f1 is not None else "[yellow]WAIT[/yellow]" table.add_row( "Macro F1", format_metric_value(macro_f1), f"[green]>={TARGETS['macro_f1']}[/green]", status ) # Avg Latency latency = MetricsReader.get_avg_latency(eval_data) status = "[green]PASS[/green]" if latency is not None and latency <= TARGETS["avg_latency_ms"] else "[red]FAIL[/red]" if latency is not None else "[yellow]WAIT[/yellow]" table.add_row( "Avg Latency", format_metric_value(latency, " ms"), f"[green]<={TARGETS['avg_latency_ms']} ms[/green]", status ) # Retry Count retry_count = MetricsReader.get_retry_count(eval_data) status = "[green]PASS[/green]" if retry_count <= TARGETS["retry_count"] else "[red]FAIL[/red]" table.add_row( "Retry Count", f"[cyan]{retry_count}[/cyan]", f"[green]<={TARGETS['retry_count']}[/green]", status ) return table def create_stage_panel(eval_data: Optional[dict]) -> Panel: """Create current stage panel with visual progress.""" current_stage = MetricsReader.get_current_stage(eval_data) # Create stage indicator stage_display = [] for i, stage in enumerate(STAGES): if stage == current_stage: stage_display.append(f"[bold bright_yellow]>> {stage} <<[/bold bright_yellow]") elif STAGES.index(current_stage) > i: stage_display.append(f"[dim green] {stage} [/dim green]") else: stage_display.append(f"[dim] {stage} [/dim]") # Create stage progress bar if current_stage in STAGES: stage_index = STAGES.index(current_stage) progress_pct = (stage_index / (len(STAGES) - 1)) * 100 progress_bar = f"[bright_blue]{'█' * (stage_index * 4)}[/bright_blue][bright_yellow]{'█' * 4}[/bright_yellow][dim]{'░' * ((len(STAGES) - stage_index - 1) * 4)}[/dim]" else: progress_bar = "[dim]░░░░░░░░░░░░░░░░░░[/dim]" progress_pct = 0 content = "\n".join(stage_display) + "\n\n" + progress_bar + f"\n[dim]Progress: {progress_pct:.0f}%[/dim]" return Panel( content, title="[bold]Current Stage[/bold]", border_style="cyan", padding=(1, 2), ) def create_training_progress_panel(eval_data: Optional[dict]) -> Panel: """Create training progress panel (only shown during training stage).""" current_stage = MetricsReader.get_current_stage(eval_data) if current_stage != "Training": return Panel( "[dim]Training progress available during Training stage[/dim]", title="[bold]Training Progress[/bold]", border_style="dim", padding=(1, 2), ) progress_info = MetricsReader.get_training_progress(eval_data) if progress_info is None or progress_info.get("total_epochs", 0) == 0: return Panel( "[yellow]Waiting for training data...[/yellow]", title="[bold]Training Progress[/bold]", border_style="yellow", padding=(1, 2), ) epoch = progress_info.get("epoch", 0) total_epochs = progress_info.get("total_epochs", 0) loss = progress_info.get("loss") # Create progress bar if total_epochs > 0: bar_length = 30 filled = int((epoch / total_epochs) * bar_length) progress_bar = f"[bright_green]{'█' * filled}[/bright_green][dim]{'░' * (bar_length - filled)}[/dim]" progress_text = f"Epoch {epoch}/{total_epochs}" else: progress_bar = "[dim]░░░░░░░░░░░░░░░░░░░░░░░░[/dim]" progress_text = "Initializing..." loss_text = f"Loss: [cyan]{loss:.4f}[/cyan]" if loss is not None else "Loss: [dim]N/A[/dim]" content = f"{progress_bar}\n\n[bold]{progress_text}[/bold]\n{loss_text}" return Panel( content, title="[bold]Training Progress[/bold]", border_style="green", padding=(1, 2), ) def create_status_summary(eval_data: Optional[dict]) -> Panel: """Create overall status summary panel.""" routing_acc = MetricsReader.get_routing_accuracy(eval_data) macro_f1 = MetricsReader.get_macro_f1(eval_data) latency = MetricsReader.get_avg_latency(eval_data) retry_count = MetricsReader.get_retry_count(eval_data) checks = [ ("Routing Accuracy", routing_acc, TARGETS["routing_accuracy"], True), ("Macro F1", macro_f1, TARGETS["macro_f1"], True), ("Avg Latency", latency, TARGETS["avg_latency_ms"], False), ("Retry Count", retry_count, TARGETS["retry_count"], False), ] passed = sum(1 for _, val, target, higher_better in checks if val is not None and (val >= target if higher_better else val <= target)) total = sum(1 for _, val, _, _ in checks if val is not None) if total == 0: status = "[yellow]AWAITING DATA[/yellow]" summary = "Waiting for evaluation results..." elif passed == len(checks): status = "[bold bright_green]ALL CHECKS PASSED[/bold bright_green]" summary = f"All {len(checks)} metrics meet targets" elif passed == total: status = "[bold bright_green]SUCCESS[/bold bright_green]" summary = f"All {total} metrics meet targets" else: status = "[bold bright_red]METRICS NEED ATTENTION[/bold bright_red]" summary = f"{passed}/{len(checks)} metrics passing" content = f"{status}\n\n[dim]{summary}[/dim]" return Panel( content, title="[bold]Overall Status[/bold]", border_style="bright_black", padding=(1, 2), ) def generate_layout(eval_data: Optional[dict]) -> Layout: """Generate the complete dashboard layout.""" layout = Layout() # Split into header, main content, and footer areas layout.split_column( Layout(name="header", size=5), Layout(name="main"), Layout(name="footer", size=3), ) # Main area splits into metrics table and side panels layout["main"].split_row( Layout(name="metrics", ratio=2), Layout(name="stage_panel", ratio=1), ) # Populate header layout["header"].update(create_header()) # Populate metrics layout["metrics"].update(create_metrics_table(eval_data)) # Populate stage panel with training progress layout["stage_panel"].split_column( Layout(create_stage_panel(eval_data), name="stage"), Layout(create_training_progress_panel(eval_data), name="training"), ) # Footer with status summary layout["footer"].update(create_status_summary(eval_data)) return layout def read_current_data() -> tuple[Optional[dict], Optional[dict]]: """Read current data from all sources.""" eval_data = MetricsReader.read_json(EVAL_RESULTS_PATH) retry_data = MetricsReader.read_json(RETRY_HISTORY_PATH) return eval_data, retry_data def main(): """Main entry point for the monitoring dashboard.""" console = Console() # Check if Rich is properly installed try: from rich.layout import Layout from rich.live import Live except ImportError: console.print("[red]Error: Rich library is required.[/red]") console.print("Please install it with: pip install rich") sys.exit(1) console.print("\n[bold bright_blue]Starting Router Finetune Agent Monitor...[/bold bright_blue]") console.print(f"Monitoring: {EVAL_RESULTS_PATH}") console.print(f"Retry History: {RETRY_HISTORY_PATH}") console.print("\nPress [bold]Ctrl+C[/bold] to exit.\n") # Initial read to show starting state eval_data, _ = read_current_data() try: with Live( generate_layout(eval_data), console=console, refresh_per_second=2, transient=False, screen=True, ) as live: while True: import time eval_data, _ = read_current_data() live.update(generate_layout(eval_data)) time.sleep(0.5) # Update twice per second except KeyboardInterrupt: console.print("\n[bold bright_yellow]Monitor stopped.[/bold bright_yellow]") sys.exit(0) if __name__ == "__main__": main()