| |
| """ |
| Router Finetune Agent - Monitoring Dashboard |
| |
| A real-time monitoring dashboard for tracking router finetuning progress. |
| Displays key metrics with live updates using Rich library. |
| """ |
|
|
| import json |
| import sys |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Optional |
|
|
| from rich import print |
| from rich.console import Console |
| from rich.layout import Layout |
| from rich.live import Live |
| from rich.panel import Panel |
| from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn |
| from rich.table import Table |
| from rich.text import Text |
|
|
|
|
| |
| BASE_DIR = Path(__file__).parent |
| EVAL_RESULTS_PATH = BASE_DIR / "logs" / "eval_results.json" |
| RETRY_HISTORY_PATH = BASE_DIR / "logs" / "retry_history.json" |
|
|
| |
| TARGETS = { |
| "routing_accuracy": 0.85, |
| "macro_f1": 0.80, |
| "avg_latency_ms": 20.0, |
| "retry_count": 10, |
| } |
|
|
| |
| STAGES = ["Data Gen", "Labeling", "Training", "Eval", "Export"] |
|
|
|
|
| class MetricsReader: |
| """Reads and parses metrics from JSON log files.""" |
|
|
| @staticmethod |
| def read_json(file_path: Path) -> Optional[dict]: |
| """Read and parse JSON file, returns None if file doesn't exist or is invalid.""" |
| if not file_path.exists(): |
| return None |
| try: |
| with open(file_path, "r") as f: |
| return json.load(f) |
| except (json.JSONDecodeError, IOError): |
| return None |
|
|
| @staticmethod |
| def get_routing_accuracy(data: Optional[dict]) -> Optional[float]: |
| """Extract routing accuracy from eval results.""" |
| if data is None: |
| return None |
| return data.get("routing_accuracy") or data.get("metrics", {}).get("routing_accuracy") |
|
|
| @staticmethod |
| def get_macro_f1(data: Optional[dict]) -> Optional[float]: |
| """Extract macro F1 from eval results.""" |
| if data is None: |
| return None |
| return data.get("macro_f1") or data.get("metrics", {}).get("macro_f1") |
|
|
| @staticmethod |
| def get_avg_latency(data: Optional[dict]) -> Optional[float]: |
| """Extract average latency from eval results.""" |
| if data is None: |
| return None |
| return data.get("avg_latency_ms") or data.get("metrics", {}).get("avg_latency_ms") |
|
|
| @staticmethod |
| def get_current_stage(data: Optional[dict]) -> str: |
| """Extract current stage from eval results.""" |
| if data is None: |
| return "Unknown" |
| return data.get("current_stage", "Unknown") |
|
|
| @staticmethod |
| def get_training_progress(data: Optional[dict]) -> Optional[dict]: |
| """Extract training progress information.""" |
| if data is None: |
| return None |
| training = data.get("training", {}) or data.get("training_progress", {}) |
| if training: |
| return { |
| "epoch": training.get("current_epoch", 0), |
| "total_epochs": training.get("total_epochs", training.get("epochs", 0)), |
| "loss": training.get("loss"), |
| } |
| return None |
|
|
| @staticmethod |
| def get_retry_count(data: Optional[dict]) -> int: |
| """Get retry count from retry history.""" |
| retry_data = MetricsReader.read_json(RETRY_HISTORY_PATH) |
| if retry_data is None: |
| return 0 |
| return retry_data.get("total_retries", len(retry_data.get("retries", []))) |
|
|
|
|
| def get_status_color(current: Optional[float], target: float, higher_is_better: bool = True) -> str: |
| """Determine status color based on current value vs target.""" |
| if current is None: |
| return "yellow" |
| if higher_is_better: |
| return "green" if current >= target else "red" |
| else: |
| return "green" if current <= target else "red" |
|
|
|
|
| def format_metric_value(value: Optional[float], suffix: str = "", precision: int = 3) -> str: |
| """Format metric value for display.""" |
| if value is None: |
| return "[yellow]Waiting...[/yellow]" |
| if isinstance(value, float): |
| return f"[cyan]{value:.{precision}f}[/cyan]{suffix}" |
| return f"[cyan]{value}[/cyan]{suffix}" |
|
|
|
|
| def create_header() -> Panel: |
| """Create dashboard header panel.""" |
| title = Text("Router Finetune Agent - Monitoring Dashboard", style="bold white on dark_blue") |
| timestamp = Text(f"Updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", style="italic") |
| return Panel( |
| title + "\n" + timestamp, |
| title="[bold]ROUTE FINETUNE MONITOR[/bold]", |
| border_style="blue", |
| padding=(1, 2), |
| ) |
|
|
|
|
| def create_metrics_table(eval_data: Optional[dict]) -> Table: |
| """Create main metrics table with current vs target values.""" |
| table = Table( |
| title="[bold]Key Metrics[/bold]", |
| show_header=True, |
| header_style="bold magenta", |
| border_style="bright_black", |
| row_styles=["", "dim"], |
| ) |
|
|
| table.add_column("Metric", style="bold", width=20) |
| table.add_column("Current", justify="center", width=15) |
| table.add_column("Target", justify="center", width=15) |
| table.add_column("Status", justify="center", width=10) |
|
|
| |
| routing_acc = MetricsReader.get_routing_accuracy(eval_data) |
| status_color = get_status_color(routing_acc, TARGETS["routing_accuracy"], higher_is_better=True) |
| status = "[green]PASS[/green]" if routing_acc is not None and routing_acc >= TARGETS["routing_accuracy"] else "[red]FAIL[/red]" if routing_acc is not None else "[yellow]WAIT[/yellow]" |
| table.add_row( |
| "Routing Accuracy", |
| format_metric_value(routing_acc), |
| f"[green]>={TARGETS['routing_accuracy']}[/green]", |
| status if routing_acc is not None else "[yellow]WAIT[/yellow]" |
| ) |
|
|
| |
| macro_f1 = MetricsReader.get_macro_f1(eval_data) |
| status = "[green]PASS[/green]" if macro_f1 is not None and macro_f1 >= TARGETS["macro_f1"] else "[red]FAIL[/red]" if macro_f1 is not None else "[yellow]WAIT[/yellow]" |
| table.add_row( |
| "Macro F1", |
| format_metric_value(macro_f1), |
| f"[green]>={TARGETS['macro_f1']}[/green]", |
| status |
| ) |
|
|
| |
| latency = MetricsReader.get_avg_latency(eval_data) |
| status = "[green]PASS[/green]" if latency is not None and latency <= TARGETS["avg_latency_ms"] else "[red]FAIL[/red]" if latency is not None else "[yellow]WAIT[/yellow]" |
| table.add_row( |
| "Avg Latency", |
| format_metric_value(latency, " ms"), |
| f"[green]<={TARGETS['avg_latency_ms']} ms[/green]", |
| status |
| ) |
|
|
| |
| retry_count = MetricsReader.get_retry_count(eval_data) |
| status = "[green]PASS[/green]" if retry_count <= TARGETS["retry_count"] else "[red]FAIL[/red]" |
| table.add_row( |
| "Retry Count", |
| f"[cyan]{retry_count}[/cyan]", |
| f"[green]<={TARGETS['retry_count']}[/green]", |
| status |
| ) |
|
|
| return table |
|
|
|
|
| def create_stage_panel(eval_data: Optional[dict]) -> Panel: |
| """Create current stage panel with visual progress.""" |
| current_stage = MetricsReader.get_current_stage(eval_data) |
|
|
| |
| stage_display = [] |
| for i, stage in enumerate(STAGES): |
| if stage == current_stage: |
| stage_display.append(f"[bold bright_yellow]>> {stage} <<[/bold bright_yellow]") |
| elif STAGES.index(current_stage) > i: |
| stage_display.append(f"[dim green] {stage} [/dim green]") |
| else: |
| stage_display.append(f"[dim] {stage} [/dim]") |
|
|
| |
| if current_stage in STAGES: |
| stage_index = STAGES.index(current_stage) |
| progress_pct = (stage_index / (len(STAGES) - 1)) * 100 |
| progress_bar = f"[bright_blue]{'β' * (stage_index * 4)}[/bright_blue][bright_yellow]{'β' * 4}[/bright_yellow][dim]{'β' * ((len(STAGES) - stage_index - 1) * 4)}[/dim]" |
| else: |
| progress_bar = "[dim]ββββββββββββββββββ[/dim]" |
| progress_pct = 0 |
|
|
| content = "\n".join(stage_display) + "\n\n" + progress_bar + f"\n[dim]Progress: {progress_pct:.0f}%[/dim]" |
|
|
| return Panel( |
| content, |
| title="[bold]Current Stage[/bold]", |
| border_style="cyan", |
| padding=(1, 2), |
| ) |
|
|
|
|
| def create_training_progress_panel(eval_data: Optional[dict]) -> Panel: |
| """Create training progress panel (only shown during training stage).""" |
| current_stage = MetricsReader.get_current_stage(eval_data) |
|
|
| if current_stage != "Training": |
| return Panel( |
| "[dim]Training progress available during Training stage[/dim]", |
| title="[bold]Training Progress[/bold]", |
| border_style="dim", |
| padding=(1, 2), |
| ) |
|
|
| progress_info = MetricsReader.get_training_progress(eval_data) |
|
|
| if progress_info is None or progress_info.get("total_epochs", 0) == 0: |
| return Panel( |
| "[yellow]Waiting for training data...[/yellow]", |
| title="[bold]Training Progress[/bold]", |
| border_style="yellow", |
| padding=(1, 2), |
| ) |
|
|
| epoch = progress_info.get("epoch", 0) |
| total_epochs = progress_info.get("total_epochs", 0) |
| loss = progress_info.get("loss") |
|
|
| |
| if total_epochs > 0: |
| bar_length = 30 |
| filled = int((epoch / total_epochs) * bar_length) |
| progress_bar = f"[bright_green]{'β' * filled}[/bright_green][dim]{'β' * (bar_length - filled)}[/dim]" |
| progress_text = f"Epoch {epoch}/{total_epochs}" |
| else: |
| progress_bar = "[dim]ββββββββββββββββββββββββ[/dim]" |
| progress_text = "Initializing..." |
|
|
| loss_text = f"Loss: [cyan]{loss:.4f}[/cyan]" if loss is not None else "Loss: [dim]N/A[/dim]" |
|
|
| content = f"{progress_bar}\n\n[bold]{progress_text}[/bold]\n{loss_text}" |
|
|
| return Panel( |
| content, |
| title="[bold]Training Progress[/bold]", |
| border_style="green", |
| padding=(1, 2), |
| ) |
|
|
|
|
| def create_status_summary(eval_data: Optional[dict]) -> Panel: |
| """Create overall status summary panel.""" |
| routing_acc = MetricsReader.get_routing_accuracy(eval_data) |
| macro_f1 = MetricsReader.get_macro_f1(eval_data) |
| latency = MetricsReader.get_avg_latency(eval_data) |
| retry_count = MetricsReader.get_retry_count(eval_data) |
|
|
| checks = [ |
| ("Routing Accuracy", routing_acc, TARGETS["routing_accuracy"], True), |
| ("Macro F1", macro_f1, TARGETS["macro_f1"], True), |
| ("Avg Latency", latency, TARGETS["avg_latency_ms"], False), |
| ("Retry Count", retry_count, TARGETS["retry_count"], False), |
| ] |
|
|
| passed = sum(1 for _, val, target, higher_better in checks |
| if val is not None and (val >= target if higher_better else val <= target)) |
| total = sum(1 for _, val, _, _ in checks if val is not None) |
|
|
| if total == 0: |
| status = "[yellow]AWAITING DATA[/yellow]" |
| summary = "Waiting for evaluation results..." |
| elif passed == len(checks): |
| status = "[bold bright_green]ALL CHECKS PASSED[/bold bright_green]" |
| summary = f"All {len(checks)} metrics meet targets" |
| elif passed == total: |
| status = "[bold bright_green]SUCCESS[/bold bright_green]" |
| summary = f"All {total} metrics meet targets" |
| else: |
| status = "[bold bright_red]METRICS NEED ATTENTION[/bold bright_red]" |
| summary = f"{passed}/{len(checks)} metrics passing" |
|
|
| content = f"{status}\n\n[dim]{summary}[/dim]" |
|
|
| return Panel( |
| content, |
| title="[bold]Overall Status[/bold]", |
| border_style="bright_black", |
| padding=(1, 2), |
| ) |
|
|
|
|
| def generate_layout(eval_data: Optional[dict]) -> Layout: |
| """Generate the complete dashboard layout.""" |
| layout = Layout() |
|
|
| |
| layout.split_column( |
| Layout(name="header", size=5), |
| Layout(name="main"), |
| Layout(name="footer", size=3), |
| ) |
|
|
| |
| layout["main"].split_row( |
| Layout(name="metrics", ratio=2), |
| Layout(name="stage_panel", ratio=1), |
| ) |
|
|
| |
| layout["header"].update(create_header()) |
|
|
| |
| layout["metrics"].update(create_metrics_table(eval_data)) |
|
|
| |
| layout["stage_panel"].split_column( |
| Layout(create_stage_panel(eval_data), name="stage"), |
| Layout(create_training_progress_panel(eval_data), name="training"), |
| ) |
|
|
| |
| layout["footer"].update(create_status_summary(eval_data)) |
|
|
| return layout |
|
|
|
|
| def read_current_data() -> tuple[Optional[dict], Optional[dict]]: |
| """Read current data from all sources.""" |
| eval_data = MetricsReader.read_json(EVAL_RESULTS_PATH) |
| retry_data = MetricsReader.read_json(RETRY_HISTORY_PATH) |
| return eval_data, retry_data |
|
|
|
|
| def main(): |
| """Main entry point for the monitoring dashboard.""" |
| console = Console() |
|
|
| |
| try: |
| from rich.layout import Layout |
| from rich.live import Live |
| except ImportError: |
| console.print("[red]Error: Rich library is required.[/red]") |
| console.print("Please install it with: pip install rich") |
| sys.exit(1) |
|
|
| console.print("\n[bold bright_blue]Starting Router Finetune Agent Monitor...[/bold bright_blue]") |
| console.print(f"Monitoring: {EVAL_RESULTS_PATH}") |
| console.print(f"Retry History: {RETRY_HISTORY_PATH}") |
| console.print("\nPress [bold]Ctrl+C[/bold] to exit.\n") |
|
|
| |
| eval_data, _ = read_current_data() |
|
|
| try: |
| with Live( |
| generate_layout(eval_data), |
| console=console, |
| refresh_per_second=2, |
| transient=False, |
| screen=True, |
| ) as live: |
| while True: |
| import time |
| eval_data, _ = read_current_data() |
| live.update(generate_layout(eval_data)) |
| time.sleep(0.5) |
|
|
| except KeyboardInterrupt: |
| console.print("\n[bold bright_yellow]Monitor stopped.[/bold bright_yellow]") |
| sys.exit(0) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|