test_521 / trackio /cli_helpers.py
abidlabs's picture
abidlabs HF Staff
Upload folder using huggingface_hub
33098e6 verified
import json
import sys
from typing import Any
def format_json(data: Any) -> str:
"""Format data as JSON."""
return json.dumps(data, indent=2)
def format_list(items: list[str], title: str | None = None) -> str:
"""Format a list of items in human-readable format."""
if not items:
return f"No {title.lower() if title else 'items'} found."
output = []
if title:
output.append(f"{title}:")
for item in items:
output.append(f" - {item}")
return "\n".join(output)
def format_project_summary(summary: dict) -> str:
"""Format project summary in human-readable format."""
output = [f"Project: {summary['project']}"]
output.append(f"Number of runs: {summary['num_runs']}")
if summary["runs"]:
output.append("\nRuns:")
for run in summary["runs"]:
output.append(f" - {run}")
else:
output.append("\nNo runs found.")
if summary.get("last_activity"):
output.append(f"\nLast activity (max step): {summary['last_activity']}")
return "\n".join(output)
def format_run_summary(summary: dict) -> str:
"""Format run summary in human-readable format."""
output = [f"Project: {summary['project']}"]
output.append(f"Run: {summary['run']}")
if summary.get("status"):
output.append(f"Status: {summary['status']}")
output.append(f"Number of logs: {summary['num_logs']}")
if summary.get("last_step") is not None:
output.append(f"Last step: {summary['last_step']}")
if summary.get("metrics"):
output.append("\nMetrics:")
for metric in summary["metrics"]:
output.append(f" - {metric}")
else:
output.append("\nNo metrics found.")
config = summary.get("config")
if config:
output.append("\nConfig:")
config_display = {k: v for k, v in config.items() if not k.startswith("_")}
if config_display:
for key, value in config_display.items():
output.append(f" {key}: {value}")
else:
output.append(" (no config)")
else:
output.append("\nConfig: (no config)")
return "\n".join(output)
def format_metric_values(values: list[dict]) -> str:
"""Format metric values in human-readable format."""
if not values:
return "No metric values found."
output = [f"Found {len(values)} value(s):\n"]
output.append("Step | Timestamp | Value")
output.append("-" * 50)
for value in values:
step = value.get("step", "N/A")
timestamp = value.get("timestamp", "N/A")
val = value.get("value", "N/A")
output.append(f"{step} | {timestamp} | {val}")
return "\n".join(output)
def format_system_metrics(metrics: list[dict]) -> str:
"""Format system metrics in human-readable format."""
if not metrics:
return "No system metrics found."
output = [f"Found {len(metrics)} system metric entry/entries:\n"]
for i, entry in enumerate(metrics):
timestamp = entry.get("timestamp", "N/A")
output.append(f"\nEntry {i + 1} (Timestamp: {timestamp}):")
for key, value in entry.items():
if key != "timestamp":
output.append(f" {key}: {value}")
return "\n".join(output)
def format_system_metric_names(names: list[str]) -> str:
"""Format system metric names in human-readable format."""
return format_list(names, "System Metrics")
def format_snapshot(snapshot: dict[str, list[dict]]) -> str:
"""Format a metrics snapshot in human-readable format."""
if not snapshot:
return "No metrics found in the specified range."
output = []
for metric_name, values in sorted(snapshot.items()):
output.append(f"\n{metric_name}:")
output.append(" Step | Timestamp | Value")
output.append(" " + "-" * 48)
for v in values:
step = v.get("step", "N/A")
ts = v.get("timestamp", "N/A")
val = v.get("value", "N/A")
output.append(f" {step} | {ts} | {val}")
return "\n".join(output)
def format_alerts(alerts: list[dict]) -> str:
"""Format alerts in human-readable format."""
if not alerts:
return "No alerts found."
output = [f"Found {len(alerts)} alert(s):\n"]
for a in alerts:
ts = a.get("timestamp", "N/A")
run = a.get("run", "N/A")
level = (a.get("level") or "N/A").upper()
title = a.get("title", "")
text = a.get("text", "") or ""
step = a.get("step", "N/A")
line = f"[{level}] {title} | run={run} step={step} ts={ts}"
if text:
line += f"\n {text}"
data = a.get("data")
if data:
data_str = ", ".join(f"{k}={v}" for k, v in data.items())
line += f"\n data: {data_str}"
output.append(line)
return "\n".join(output)
def format_best(
project: str,
metric: str,
minimize: bool,
mode: str,
ranking: list[dict],
) -> str:
direction = "minimize" if minimize else "maximize"
output = [f"Project: {project}"]
output.append(f"Metric: {metric} ({direction}, mode={mode})")
output.append(f"Best run: {ranking[0]['run']} = {ranking[0]['value']}")
output.append(f"\nRanking ({len(ranking)} runs):")
output.append(f" {'#':<4} {'Run':<30} {'Value':<15} {'Step':<10}")
output.append(" " + "-" * 60)
for i, r in enumerate(ranking, 1):
output.append(
f" {i:<4} {r['run']:<30} {r['value']:<15} {r.get('step', 'N/A'):<10}"
)
return "\n".join(output)
def format_compare(
project: str,
metric_names: list[str],
comparison: list[dict],
) -> str:
output = [f"Project: {project}"]
output.append(
f"Comparing {len(comparison)} runs across {len(metric_names)} metrics\n"
)
max_run_name_w = 40
for e in comparison:
if len(e["run"]) > max_run_name_w:
e["run"] = e["run"][: max_run_name_w - 1] + "…"
run_w = max((len(e["run"]) for e in comparison), default=3)
run_w = max(run_w, 3)
status_w = 10
col_ws = []
for m in metric_names:
vals = [e["metrics"].get(m) for e in comparison]
val_w = max(
(
len(f"{v:.4f}" if isinstance(v, float) else str(v))
for v in vals
if v is not None
),
default=3,
)
col_ws.append(max(len(m), val_w))
header = f" {'Run':<{run_w}} {'Status':<{status_w}}"
for m, w in zip(metric_names, col_ws):
header += f" {m:<{w}}"
output.append(header)
sep_w = run_w + status_w + sum(w + 2 for w in col_ws) + 2
output.append(" " + "-" * sep_w)
for entry in comparison:
line = f" {entry['run']:<{run_w}} {(entry.get('status') or '?'):<{status_w}}"
for m, w in zip(metric_names, col_ws):
val = entry["metrics"].get(m)
if val is not None:
formatted = f"{val:.4f}" if isinstance(val, float) else str(val)
else:
formatted = "N/A"
line += f" {formatted:<{w}}"
output.append(line)
return "\n".join(output)
def format_summary(summary: dict) -> str:
output = [f"Project: {summary['project']}"]
output.append(f"Total runs: {summary['num_runs']}")
output.append(f"Total alerts: {summary['total_alerts']}")
if summary.get("metric"):
output.append(f"Primary metric: {summary['metric']}")
output.append("\nRuns:")
for r in summary["runs"]:
status = r.get("status") or "?"
line = f" {r['run']} [{status}] - {r['num_logs']} logs, last_step={r.get('last_step', 'N/A')}"
if summary.get("metric") and r.get("metric_value") is not None:
line += f", {summary['metric']}={r['metric_value']}"
output.append(line)
if r.get("config"):
cfg_str = ", ".join(f"{k}={v}" for k, v in r["config"].items())
output.append(f" config: {cfg_str}")
return "\n".join(output)
def format_query_result(result: dict[str, Any]) -> str:
"""Format SQL query results in human-readable format."""
columns = result.get("columns", [])
rows = result.get("rows", [])
row_count = result.get("row_count", 0)
if not columns:
return f"Query returned {row_count} row(s)."
rendered_rows = []
for row in rows:
rendered_rows.append(
[
"" if row.get(column) is None else str(row.get(column))
for column in columns
]
)
widths = []
for idx, column in enumerate(columns):
cell_width = max(
(len(rendered_row[idx]) for rendered_row in rendered_rows), default=0
)
widths.append(max(len(column), cell_width))
header = " | ".join(
column.ljust(width) for column, width in zip(columns, widths, strict=False)
)
separator = "-+-".join("-" * width for width in widths)
output = [f"Query returned {row_count} row(s).", header, separator]
if not rendered_rows:
output.append("(no rows)")
return "\n".join(output)
for rendered_row in rendered_rows:
output.append(
" | ".join(
value.ljust(width)
for value, width in zip(rendered_row, widths, strict=False)
)
)
return "\n".join(output)
def error_exit(message: str, code: int = 1) -> None:
"""Print error message and exit."""
print(f"Error: {message}", file=sys.stderr)
sys.exit(code)