test_521 / trackio /cli.py
abidlabs's picture
abidlabs HF Staff
Upload folder using huggingface_hub
4c7dda8 verified
import argparse
import os
from trackio import freeze, show, sync
from trackio.cli_helpers import (
error_exit,
format_alerts,
format_best,
format_compare,
format_json,
format_list,
format_metric_values,
format_project_summary,
format_query_result,
format_run_summary,
format_snapshot,
format_summary,
format_system_metric_names,
format_system_metrics,
)
from trackio.frontend_config import (
TRACKIO_CONFIG_PATH,
get_persisted_frontend_dir,
set_persisted_frontend_dir,
unset_persisted_frontend_dir,
)
from trackio.markdown import Markdown
from trackio.server import get_project_summary, get_run_summary
from trackio.sqlite_storage import SQLiteStorage
def _get_space(args):
return getattr(args, "space", None)
def _get_remote(args):
from trackio.remote_client import RemoteClient
space = _get_space(args)
if not space:
return None
hf_token = getattr(args, "hf_token", None)
return RemoteClient(space, hf_token=hf_token)
def _handle_status():
print("Reading local Trackio projects...\n")
projects = SQLiteStorage.get_projects()
if not projects:
print("No Trackio projects found.")
return
local_projects = []
synced_projects = []
unsynced_projects = []
for project in projects:
space_id = SQLiteStorage.get_space_id(project)
if space_id is None:
local_projects.append(project)
elif SQLiteStorage.has_pending_data(project):
unsynced_projects.append(project)
else:
synced_projects.append(project)
print("Finished reading Trackio projects")
if local_projects:
print(f" * {len(local_projects)} local trackio project(s) [OK]")
if synced_projects:
print(f" * {len(synced_projects)} trackio project(s) synced to Spaces [OK]")
if unsynced_projects:
print(
f" * {len(unsynced_projects)} trackio project(s) with unsynced changes [WARNING]:"
)
for p in unsynced_projects:
print(f" - {p}")
if unsynced_projects:
print(
f"\nRun `trackio sync --project {unsynced_projects[0]}` to sync. "
"Or run `trackio sync --all` to sync all unsynced changes."
)
def _handle_sync(args):
from trackio.deploy import sync_incremental
if args.sync_all and args.project:
error_exit("Cannot use --all and --project together.")
if not args.sync_all and not args.project:
error_exit("Must provide either --project or --all.")
if args.sync_all:
projects = SQLiteStorage.get_projects()
synced_any = False
for project in projects:
space_id = SQLiteStorage.get_space_id(project)
if space_id and SQLiteStorage.has_pending_data(project):
sync_incremental(
project,
space_id,
private=args.private,
pending_only=True,
frontend_dir=args.frontend,
)
synced_any = True
if not synced_any:
print("No projects with unsynced data found.")
else:
space_id = args.space_id
if space_id is None:
space_id = SQLiteStorage.get_space_id(args.project)
sync(
project=args.project,
space_id=space_id,
private=args.private,
force=args.force,
sdk=args.sdk,
frontend_dir=args.frontend,
)
def _handle_config(args):
if args.config_command == "get":
frontend_dir = get_persisted_frontend_dir()
if frontend_dir is None:
print("No Trackio frontend config is set.")
print(f"Config file: {TRACKIO_CONFIG_PATH}")
return
print(f"frontend: {frontend_dir}")
print(f"config: {TRACKIO_CONFIG_PATH}")
return
if args.config_command == "set":
try:
frontend_dir = set_persisted_frontend_dir(args.frontend)
except ValueError as e:
error_exit(str(e))
print(f"Saved Trackio default frontend: {frontend_dir}")
print("Reset with `trackio config unset frontend`.")
return
if args.config_command == "unset":
removed = unset_persisted_frontend_dir()
if removed:
print("Removed Trackio default frontend.")
else:
print("No Trackio default frontend was set.")
return
def _extract_reports(
run: str,
logs: list[dict],
report_name: str | None = None,
run_id: str | None = None,
) -> list[dict]:
reports = []
for log in logs:
timestamp = log.get("timestamp")
step = log.get("step")
for key, value in log.items():
if report_name is not None and key != report_name:
continue
if isinstance(value, dict) and value.get("_type") == Markdown.TYPE:
content = value.get("_value")
if isinstance(content, str):
reports.append(
{
"run": run,
"run_id": run_id,
"report": key,
"step": step,
"timestamp": timestamp,
"content": content,
}
)
return reports
def _require_project(project: str):
db_path = SQLiteStorage.get_project_db_path(project)
if not db_path.exists():
error_exit(f"Project '{project}' not found.")
def _public_config(cfg: dict) -> dict:
return {k: v for k, v in cfg.items() if not k.startswith("_")}
def _handle_query(args):
remote = _get_remote(args)
try:
if remote:
result = remote.predict(args.project, args.sql, api_name="/query_project")
else:
result = SQLiteStorage.query_project(args.project, args.sql)
except FileNotFoundError as e:
error_exit(str(e))
except ValueError as e:
error_exit(str(e))
if args.json:
print(format_json(result))
else:
print(format_query_result(result))
def main():
parser = argparse.ArgumentParser(description="Trackio CLI")
parser.add_argument(
"--space",
required=False,
help="HF Space ID (e.g. 'user/space') or Space URL to query remotely.",
)
parser.add_argument(
"--hf-token",
required=False,
help="HF token for accessing private Spaces.",
)
subparsers = parser.add_subparsers(dest="command")
ui_parser = subparsers.add_parser(
"show", help="Show the Trackio dashboard UI for a project"
)
ui_parser.add_argument(
"--project", required=False, help="Project name to show in the dashboard"
)
ui_parser.add_argument(
"--theme",
required=False,
default="default",
help="A Gradio Theme to use for the dashboard instead of the default, can be a built-in theme (e.g. 'soft', 'citrus'), or a theme from the Hub (e.g. 'gstaff/xkcd').",
)
ui_parser.add_argument(
"--mcp-server",
action="store_true",
help="Enable MCP server functionality. The Trackio dashboard will be set up as an MCP server and certain functions will be exposed as MCP tools.",
)
ui_parser.add_argument(
"--footer",
action="store_true",
default=True,
help="Show the Gradio footer. Use --no-footer to hide it.",
)
ui_parser.add_argument(
"--no-footer",
dest="footer",
action="store_false",
help="Hide the Gradio footer.",
)
ui_parser.add_argument(
"--color-palette",
required=False,
help="Comma-separated list of hex color codes for plot lines (e.g. '#FF0000,#00FF00,#0000FF'). If not provided, the TRACKIO_COLOR_PALETTE environment variable will be used, or the default palette if not set.",
)
ui_parser.add_argument(
"--host",
required=False,
help="Host to bind the server to (e.g. '0.0.0.0' for remote access). If not provided, defaults to '127.0.0.1' (localhost only).",
)
ui_parser.add_argument(
"--frontend",
required=False,
help="Custom frontend directory to serve. Must contain index.html.",
)
subparsers.add_parser(
"status",
help="Show the status of all local Trackio projects, including sync status.",
)
sync_parser = subparsers.add_parser(
"sync",
help="Sync a local project's database to a Hugging Face Space. If the Space does not exist, it will be created.",
)
sync_parser.add_argument(
"--project",
required=False,
help="The name of the local project.",
)
sync_parser.add_argument(
"--space-id",
required=False,
help="The Hugging Face Space ID where the project will be synced (e.g. username/space_id). If not provided, uses the previously-configured Space.",
)
sync_parser.add_argument(
"--all",
action="store_true",
dest="sync_all",
help="Sync all projects that have unsynced data to their configured Spaces.",
)
sync_parser.add_argument(
"--private",
action="store_true",
help="Make the Hugging Face Space private if creating a new Space. By default, the repo will be public unless the organization's default is private. This value is ignored if the repo already exists.",
)
sync_parser.add_argument(
"--force",
action="store_true",
help="Overwrite the existing database without prompting for confirmation.",
)
sync_parser.add_argument(
"--sdk",
choices=["gradio", "static"],
default="gradio",
help="The type of Space to deploy. 'gradio' (default) deploys a live Gradio server. 'static' deploys a static Space that reads from an HF Bucket.",
)
sync_parser.add_argument(
"--frontend",
required=False,
help="Custom frontend directory to deploy. Must contain index.html.",
)
freeze_parser = subparsers.add_parser(
"freeze",
help="Create a one-time static Space snapshot from a project's data.",
)
freeze_parser.add_argument(
"--space-id",
required=True,
help="The source Gradio Space ID (e.g. username/space_id).",
)
freeze_parser.add_argument(
"--project",
required=True,
help="The name of the project to freeze into a static snapshot.",
)
freeze_parser.add_argument(
"--new-space-id",
required=False,
help="The Space ID for the new static Space. Defaults to {space_id}_static.",
)
freeze_parser.add_argument(
"--private",
action="store_true",
help="Make the new static Space private.",
)
freeze_parser.add_argument(
"--frontend",
required=False,
help="Custom frontend directory to deploy to the frozen static Space.",
)
config_parser = subparsers.add_parser(
"config",
help="Manage persistent Trackio configuration.",
)
config_subparsers = config_parser.add_subparsers(
dest="config_command",
required=True,
)
config_subparsers.add_parser("get", help="Show current Trackio config.")
config_set_parser = config_subparsers.add_parser(
"set",
help="Set a persistent Trackio config value.",
)
config_set_parser.add_argument(
"key",
choices=["frontend"],
help="Config key to set.",
)
config_set_parser.add_argument(
"frontend",
help="Frontend directory to persist.",
)
config_unset_parser = config_subparsers.add_parser(
"unset",
help="Unset a persistent Trackio config value.",
)
config_unset_parser.add_argument(
"key",
choices=["frontend"],
help="Config key to unset.",
)
list_parser = subparsers.add_parser(
"list",
help="List projects, runs, or metrics",
)
list_subparsers = list_parser.add_subparsers(dest="list_type", required=True)
list_projects_parser = list_subparsers.add_parser(
"projects",
help="List all projects",
)
list_projects_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
list_runs_parser = list_subparsers.add_parser(
"runs",
help="List runs for a project",
)
list_runs_parser.add_argument(
"--project",
required=True,
help="Project name",
)
list_runs_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
list_metrics_parser = list_subparsers.add_parser(
"metrics",
help="List metrics for a run",
)
list_metrics_parser.add_argument(
"--project",
required=True,
help="Project name",
)
list_metrics_parser.add_argument(
"--run",
required=True,
help="Run name",
)
list_metrics_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
list_system_metrics_parser = list_subparsers.add_parser(
"system-metrics",
help="List system metrics for a run",
)
list_system_metrics_parser.add_argument(
"--project",
required=True,
help="Project name",
)
list_system_metrics_parser.add_argument(
"--run",
required=True,
help="Run name",
)
list_system_metrics_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
list_alerts_parser = list_subparsers.add_parser(
"alerts",
help="List alerts for a project or run",
)
list_alerts_parser.add_argument(
"--project",
required=True,
help="Project name",
)
list_alerts_parser.add_argument(
"--run",
required=False,
help="Run name (optional)",
)
list_alerts_parser.add_argument(
"--level",
required=False,
help="Filter by alert level (info, warn, error)",
)
list_alerts_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
list_alerts_parser.add_argument(
"--since",
required=False,
help="Only show alerts after this ISO 8601 timestamp",
)
list_reports_parser = list_subparsers.add_parser(
"reports",
help="List markdown reports for a project or run",
)
list_reports_parser.add_argument(
"--project",
required=True,
help="Project name",
)
list_reports_parser.add_argument(
"--run",
required=False,
help="Run name (optional)",
)
list_reports_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
get_parser = subparsers.add_parser(
"get",
help="Get project, run, or metric information",
)
get_subparsers = get_parser.add_subparsers(dest="get_type", required=True)
get_project_parser = get_subparsers.add_parser(
"project",
help="Get project summary",
)
get_project_parser.add_argument(
"--project",
required=True,
help="Project name",
)
get_project_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
get_run_parser = get_subparsers.add_parser(
"run",
help="Get run summary",
)
get_run_parser.add_argument(
"--project",
required=True,
help="Project name",
)
get_run_parser.add_argument(
"--run",
required=True,
help="Run name",
)
get_run_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
get_metric_parser = get_subparsers.add_parser(
"metric",
help="Get metric values for a run",
)
get_metric_parser.add_argument(
"--project",
required=True,
help="Project name",
)
get_metric_parser.add_argument(
"--run",
required=True,
help="Run name",
)
get_metric_parser.add_argument(
"--metric",
required=True,
help="Metric name",
)
get_metric_parser.add_argument(
"--step",
type=int,
required=False,
help="Get metric at exactly this step",
)
get_metric_parser.add_argument(
"--around",
type=int,
required=False,
help="Get metrics around this step (use with --window)",
)
get_metric_parser.add_argument(
"--at-time",
required=False,
help="Get metrics around this ISO 8601 timestamp (use with --window)",
)
get_metric_parser.add_argument(
"--window",
type=int,
required=False,
default=10,
help="Window size: ±steps for --around, ±seconds for --at-time (default: 10)",
)
get_metric_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
get_snapshot_parser = get_subparsers.add_parser(
"snapshot",
help="Get all metrics at/around a step or timestamp",
)
get_snapshot_parser.add_argument(
"--project",
required=True,
help="Project name",
)
get_snapshot_parser.add_argument(
"--run",
required=True,
help="Run name",
)
get_snapshot_parser.add_argument(
"--step",
type=int,
required=False,
help="Get all metrics at exactly this step",
)
get_snapshot_parser.add_argument(
"--around",
type=int,
required=False,
help="Get all metrics around this step (use with --window)",
)
get_snapshot_parser.add_argument(
"--at-time",
required=False,
help="Get all metrics around this ISO 8601 timestamp (use with --window)",
)
get_snapshot_parser.add_argument(
"--window",
type=int,
required=False,
default=10,
help="Window size: ±steps for --around, ±seconds for --at-time (default: 10)",
)
get_snapshot_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
get_system_metric_parser = get_subparsers.add_parser(
"system-metric",
help="Get system metric values for a run",
)
get_system_metric_parser.add_argument(
"--project",
required=True,
help="Project name",
)
get_system_metric_parser.add_argument(
"--run",
required=True,
help="Run name",
)
get_system_metric_parser.add_argument(
"--metric",
required=False,
help="System metric name (optional, if not provided returns all system metrics)",
)
get_system_metric_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
get_alerts_parser = get_subparsers.add_parser(
"alerts",
help="Get alerts for a project or run",
)
get_alerts_parser.add_argument(
"--project",
required=True,
help="Project name",
)
get_alerts_parser.add_argument(
"--run",
required=False,
help="Run name (optional)",
)
get_alerts_parser.add_argument(
"--level",
required=False,
help="Filter by alert level (info, warn, error)",
)
get_alerts_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
get_alerts_parser.add_argument(
"--since",
required=False,
help="Only show alerts after this ISO 8601 timestamp",
)
get_report_parser = get_subparsers.add_parser(
"report",
help="Get markdown report entries for a run",
)
get_report_parser.add_argument(
"--project",
required=True,
help="Project name",
)
get_report_parser.add_argument(
"--run",
required=True,
help="Run name",
)
get_report_parser.add_argument(
"--report",
required=True,
help="Report metric name",
)
get_report_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
query_parser = subparsers.add_parser(
"query",
help="Run a read-only SQL query against a project database",
)
query_subparsers = query_parser.add_subparsers(dest="query_type", required=True)
query_project_parser = query_subparsers.add_parser(
"project",
help="Run a read-only SQL query against a project's SQLite database",
)
query_project_parser.add_argument(
"--project",
required=True,
help="Project name",
)
query_project_parser.add_argument(
"--sql",
required=True,
help="Read-only SQL query to execute",
)
query_project_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
skills_parser = subparsers.add_parser(
"skills",
help="Manage Trackio skills for AI coding assistants",
)
skills_subparsers = skills_parser.add_subparsers(
dest="skills_action", required=True
)
skills_add_parser = skills_subparsers.add_parser(
"add",
help="Download and install the Trackio skill for an AI assistant",
)
skills_add_parser.add_argument(
"--cursor",
action="store_true",
help="Install for Cursor",
)
skills_add_parser.add_argument(
"--claude",
action="store_true",
help="Install for Claude Code",
)
skills_add_parser.add_argument(
"--codex",
action="store_true",
help="Install for Codex",
)
skills_add_parser.add_argument(
"--opencode",
action="store_true",
help="Install for OpenCode",
)
skills_add_parser.add_argument(
"--global",
dest="global_",
action="store_true",
help="Install globally (user-level) instead of in the current project directory",
)
skills_add_parser.add_argument(
"--dest",
type=str,
required=False,
help="Install into a custom destination (path to skills directory)",
)
skills_add_parser.add_argument(
"--force",
action="store_true",
help="Overwrite existing skill if it already exists",
)
best_parser = subparsers.add_parser(
"best",
help="Find the best run in a project for a given metric",
)
best_parser.add_argument(
"--project",
required=True,
help="Project name",
)
best_parser.add_argument(
"--metric",
required=True,
help="Metric name to rank runs by",
)
best_parser.add_argument(
"--direction",
choices=["min", "max"],
default="min",
help="Whether lower ('min', default) or higher ('max') values are better",
)
best_parser.add_argument(
"--mode",
choices=["last", "min", "max"],
default="last",
help="How to select the value from each run: 'last' (final step), 'min' (minimum), 'max' (maximum). Default: last",
)
best_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
best_parser.add_argument(
"--include-all",
action="store_true",
help="Include runs of all statuses (default: only finished runs)",
)
compare_parser = subparsers.add_parser(
"compare",
help="Compare runs side-by-side",
)
compare_parser.add_argument(
"--project",
required=True,
help="Project name",
)
compare_parser.add_argument(
"--runs",
required=False,
help="Comma-separated run names (default: all runs in project)",
)
compare_parser.add_argument(
"--metrics",
required=False,
help="Comma-separated metric names to compare (default: all metrics)",
)
compare_parser.add_argument(
"--mode",
choices=["last", "min", "max"],
default="last",
help="How to select values: 'last', 'min', 'max'. Default: last",
)
compare_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
compare_parser.add_argument(
"--include-all",
action="store_true",
help="Include runs of all statuses (default: only finished runs)",
)
summary_parser = subparsers.add_parser(
"summary",
help="Get a full experiment summary for a project",
)
summary_parser.add_argument(
"--project",
required=True,
help="Project name",
)
summary_parser.add_argument(
"--metric",
required=False,
help="Primary metric to include in summary",
)
summary_parser.add_argument(
"--json",
action="store_true",
help="Output in JSON format",
)
args, unknown_args = parser.parse_known_args()
if unknown_args:
trailing_global_parser = argparse.ArgumentParser(add_help=False)
trailing_global_parser.add_argument("--space", required=False)
trailing_global_parser.add_argument("--hf-token", required=False)
trailing_globals, remaining_unknown = trailing_global_parser.parse_known_args(
unknown_args
)
if remaining_unknown:
parser.error(f"unrecognized arguments: {' '.join(remaining_unknown)}")
if trailing_globals.space is not None:
args.space = trailing_globals.space
if trailing_globals.hf_token is not None:
args.hf_token = trailing_globals.hf_token
if args.command in (
"show",
"status",
"sync",
"freeze",
"skills",
"best",
"compare",
"summary",
) and _get_space(args):
error_exit(
f"The '{args.command}' command does not support --space (remote mode)."
)
if args.command == "show":
color_palette = None
if args.color_palette:
color_palette = [color.strip() for color in args.color_palette.split(",")]
show(
project=args.project,
theme=args.theme,
mcp_server=args.mcp_server,
footer=args.footer,
color_palette=color_palette,
host=args.host,
frontend_dir=args.frontend,
)
elif args.command == "status":
_handle_status()
elif args.command == "sync":
_handle_sync(args)
elif args.command == "freeze":
freeze(
space_id=args.space_id,
project=args.project,
new_space_id=args.new_space_id,
private=args.private,
frontend_dir=args.frontend,
)
elif args.command == "config":
_handle_config(args)
elif args.command == "list":
remote = _get_remote(args)
if args.list_type == "projects":
if remote:
projects = remote.predict(api_name="/get_all_projects")
else:
projects = SQLiteStorage.get_projects()
if args.json:
print(format_json({"projects": projects}))
else:
print(format_list(projects, "Projects"))
elif args.list_type == "runs":
if remote:
remote_records = remote.predict(
args.project, api_name="/get_runs_for_project"
)
runs = [r["name"] if isinstance(r, dict) else r for r in remote_records]
statuses = SQLiteStorage.get_run_statuses(args.project)
if args.json:
out = [{"name": r, "status": statuses.get(r)} for r in runs]
print(format_json({"project": args.project, "runs": out}))
else:
annotated = [
f"{r} [{statuses[r]}]" if r in statuses and statuses[r] else r
for r in runs
]
print(format_list(annotated, f"Runs in '{args.project}'"))
else:
_require_project(args.project)
records = SQLiteStorage.get_run_records(args.project)
statuses = SQLiteStorage.get_run_statuses(args.project)
names = [r["name"] for r in records]
has_dupes = len(names) != len(set(names))
if args.json:
out = [
{
"id": r["id"],
"name": r["name"],
"status": statuses.get(r["name"]),
"started_at": r["created_at"],
"finished_at": r["finished_at"],
}
for r in records
]
print(format_json({"project": args.project, "runs": out}))
else:
annotated = []
for r in records:
label = r["name"]
if has_dupes:
label += f" ({r['id'][:8]})"
if statuses.get(r["name"]):
label += f" [{statuses[r['name']]}]"
annotated.append(label)
print(format_list(annotated, f"Runs in '{args.project}'"))
elif args.list_type == "metrics":
if remote:
metrics = remote.predict(
args.project, args.run, api_name="/get_metrics_for_run"
)
else:
_require_project(args.project)
runs = SQLiteStorage.get_runs(args.project)
if args.run not in runs:
error_exit(
f"Run '{args.run}' not found in project '{args.project}'."
)
metrics = SQLiteStorage.get_all_metrics_for_run(args.project, args.run)
if args.json:
print(
format_json(
{"project": args.project, "run": args.run, "metrics": metrics}
)
)
else:
print(
format_list(
metrics, f"Metrics for '{args.run}' in '{args.project}'"
)
)
elif args.list_type == "system-metrics":
if remote:
system_metrics = remote.predict(
args.project, args.run, api_name="/get_system_metrics_for_run"
)
else:
_require_project(args.project)
runs = SQLiteStorage.get_runs(args.project)
if args.run not in runs:
error_exit(
f"Run '{args.run}' not found in project '{args.project}'."
)
system_metrics = SQLiteStorage.get_all_system_metrics_for_run(
args.project, args.run
)
if args.json:
print(
format_json(
{
"project": args.project,
"run": args.run,
"system_metrics": system_metrics,
}
)
)
else:
print(format_system_metric_names(system_metrics))
elif args.list_type == "alerts":
if remote:
alerts = remote.predict(
args.project,
args.run,
args.level,
args.since,
api_name="/get_alerts",
)
else:
_require_project(args.project)
alerts = SQLiteStorage.get_alerts(
args.project,
run_name=args.run,
level=args.level,
since=args.since,
)
if args.json:
print(
format_json(
{
"project": args.project,
"run": args.run,
"level": args.level,
"since": args.since,
"alerts": alerts,
}
)
)
else:
print(format_alerts(alerts))
elif args.list_type == "reports":
if remote:
run_records = remote.predict(
args.project, api_name="/get_runs_for_project"
)
records = [
r if isinstance(r, dict) else {"name": r, "id": r}
for r in run_records
]
else:
_require_project(args.project)
records = SQLiteStorage.get_run_records(args.project)
run_names = [r["name"] for r in records]
if args.run and args.run not in run_names:
error_exit(f"Run '{args.run}' not found in project '{args.project}'.")
target_records = (
[r for r in records if r["name"] == args.run] if args.run else records
)
target_names = [r["name"] for r in target_records]
has_dupes = len(target_names) != len(set(target_names))
all_reports = []
for rec in target_records:
run_name = rec["name"]
run_id = rec.get("id")
if remote:
logs = remote.predict(args.project, run_name, api_name="/get_logs")
else:
logs = SQLiteStorage.get_logs(args.project, run_name, run_id=run_id)
all_reports.extend(_extract_reports(run_name, logs, run_id=run_id))
if args.json:
print(
format_json(
{
"project": args.project,
"run": args.run,
"reports": all_reports,
}
)
)
else:
report_lines = []
for entry in all_reports:
label = entry["run"]
if has_dupes and entry.get("run_id"):
label += f" ({entry['run_id'][:8]})"
report_lines.append(
f"{label} | {entry['report']} | step={entry['step']} | {entry['timestamp']}"
)
if args.run:
print(
format_list(
report_lines,
f"Reports for '{args.run}' in '{args.project}'",
)
)
else:
print(format_list(report_lines, f"Reports in '{args.project}'"))
elif args.command == "get":
remote = _get_remote(args)
if args.get_type == "project":
if remote:
summary = remote.predict(args.project, api_name="/get_project_summary")
else:
_require_project(args.project)
summary = get_project_summary(args.project)
if args.json:
print(format_json(summary))
else:
print(format_project_summary(summary))
elif args.get_type == "run":
if remote:
summary = remote.predict(
args.project, args.run, api_name="/get_run_summary"
)
else:
_require_project(args.project)
runs = SQLiteStorage.get_runs(args.project)
if args.run not in runs:
error_exit(
f"Run '{args.run}' not found in project '{args.project}'."
)
summary = get_run_summary(args.project, args.run)
if args.json:
print(format_json(summary))
else:
print(format_run_summary(summary))
elif args.get_type == "metric":
at_time = getattr(args, "at_time", None)
if remote:
values = remote.predict(
args.project,
args.run,
args.metric,
args.step,
args.around,
at_time,
args.window,
api_name="/get_metric_values",
)
else:
_require_project(args.project)
runs = SQLiteStorage.get_runs(args.project)
if args.run not in runs:
error_exit(
f"Run '{args.run}' not found in project '{args.project}'."
)
metrics = SQLiteStorage.get_all_metrics_for_run(args.project, args.run)
if args.metric not in metrics:
error_exit(
f"Metric '{args.metric}' not found in run '{args.run}' of project '{args.project}'."
)
values = SQLiteStorage.get_metric_values(
args.project,
args.run,
args.metric,
step=args.step,
around_step=args.around,
at_time=at_time,
window=args.window,
)
if args.json:
print(
format_json(
{
"project": args.project,
"run": args.run,
"metric": args.metric,
"values": values,
}
)
)
else:
print(format_metric_values(values))
elif args.get_type == "snapshot":
if not args.step and not args.around and not getattr(args, "at_time", None):
error_exit(
"Provide --step, --around (with --window), or --at-time (with --window)."
)
at_time = getattr(args, "at_time", None)
if remote:
snapshot = remote.predict(
args.project,
args.run,
args.step,
args.around,
at_time,
args.window,
api_name="/get_snapshot",
)
else:
_require_project(args.project)
runs = SQLiteStorage.get_runs(args.project)
if args.run not in runs:
error_exit(
f"Run '{args.run}' not found in project '{args.project}'."
)
snapshot = SQLiteStorage.get_snapshot(
args.project,
args.run,
step=args.step,
around_step=args.around,
at_time=at_time,
window=args.window,
)
if args.json:
result = {
"project": args.project,
"run": args.run,
"metrics": snapshot,
}
if args.step is not None:
result["step"] = args.step
if args.around is not None:
result["around"] = args.around
result["window"] = args.window
if at_time is not None:
result["at_time"] = at_time
result["window"] = args.window
print(format_json(result))
else:
print(format_snapshot(snapshot))
elif args.get_type == "system-metric":
if remote:
system_metrics = remote.predict(
args.project, args.run, api_name="/get_system_logs"
)
if args.metric:
all_system_metric_names = remote.predict(
args.project,
args.run,
api_name="/get_system_metrics_for_run",
)
if args.metric not in all_system_metric_names:
error_exit(
f"System metric '{args.metric}' not found in run '{args.run}' of project '{args.project}'."
)
filtered_metrics = [
{
k: v
for k, v in entry.items()
if k == "timestamp" or k == args.metric
}
for entry in system_metrics
if args.metric in entry
]
if args.json:
print(
format_json(
{
"project": args.project,
"run": args.run,
"metric": args.metric,
"values": filtered_metrics,
}
)
)
else:
print(format_system_metrics(filtered_metrics))
else:
if args.json:
print(
format_json(
{
"project": args.project,
"run": args.run,
"system_metrics": system_metrics,
}
)
)
else:
print(format_system_metrics(system_metrics))
else:
_require_project(args.project)
runs = SQLiteStorage.get_runs(args.project)
if args.run not in runs:
error_exit(
f"Run '{args.run}' not found in project '{args.project}'."
)
if args.metric:
system_metrics = SQLiteStorage.get_system_logs(
args.project, args.run
)
all_system_metric_names = (
SQLiteStorage.get_all_system_metrics_for_run(
args.project, args.run
)
)
if args.metric not in all_system_metric_names:
error_exit(
f"System metric '{args.metric}' not found in run '{args.run}' of project '{args.project}'."
)
filtered_metrics = [
{
k: v
for k, v in entry.items()
if k == "timestamp" or k == args.metric
}
for entry in system_metrics
if args.metric in entry
]
if args.json:
print(
format_json(
{
"project": args.project,
"run": args.run,
"metric": args.metric,
"values": filtered_metrics,
}
)
)
else:
print(format_system_metrics(filtered_metrics))
else:
system_metrics = SQLiteStorage.get_system_logs(
args.project, args.run
)
if args.json:
print(
format_json(
{
"project": args.project,
"run": args.run,
"system_metrics": system_metrics,
}
)
)
else:
print(format_system_metrics(system_metrics))
elif args.get_type == "alerts":
if remote:
alerts = remote.predict(
args.project,
args.run,
args.level,
args.since,
api_name="/get_alerts",
)
else:
_require_project(args.project)
alerts = SQLiteStorage.get_alerts(
args.project,
run_name=args.run,
level=args.level,
since=args.since,
)
if args.json:
print(
format_json(
{
"project": args.project,
"run": args.run,
"level": args.level,
"since": args.since,
"alerts": alerts,
}
)
)
else:
print(format_alerts(alerts))
elif args.get_type == "report":
if remote:
logs = remote.predict(args.project, args.run, api_name="/get_logs")
else:
_require_project(args.project)
runs = SQLiteStorage.get_runs(args.project)
if args.run not in runs:
error_exit(
f"Run '{args.run}' not found in project '{args.project}'."
)
logs = SQLiteStorage.get_logs(args.project, args.run)
reports = _extract_reports(args.run, logs, report_name=args.report)
if not reports:
error_exit(
f"Report '{args.report}' not found in run '{args.run}' of project '{args.project}'."
)
if args.json:
print(
format_json(
{
"project": args.project,
"run": args.run,
"report": args.report,
"values": reports,
}
)
)
else:
output = []
for idx, entry in enumerate(reports, start=1):
output.append(
f"Entry {idx} | step={entry['step']} | timestamp={entry['timestamp']}"
)
output.append(entry["content"])
if idx < len(reports):
output.append("-" * 80)
print("\n".join(output))
elif args.command == "best":
_require_project(args.project)
minimize = args.direction == "min"
status_filter = None if args.include_all else "finished"
results = SQLiteStorage.get_final_metric_for_runs(
args.project, args.metric, mode=args.mode, status_filter=status_filter
)
if not results:
qualifier = "" if args.include_all else " finished"
error_exit(
f"No{qualifier} runs with metric '{args.metric}' found in project '{args.project}'."
)
configs = SQLiteStorage.get_all_run_configs(args.project)
for r in results:
r["config"] = _public_config(configs.get(r["run"]) or {})
statuses_map = SQLiteStorage.get_run_statuses(args.project)
all_records = SQLiteStorage.get_run_records(args.project)
if status_filter is not None:
all_records = [
rec
for rec in all_records
if statuses_map.get(rec["name"]) == status_filter
]
records_by_name: dict[str, list[dict]] = {}
for rec in all_records:
records_by_name.setdefault(rec["name"], []).append(rec)
for r in results:
pool = records_by_name.get(r["run"], [])
matched = pool.pop(0) if pool else {}
r["id"] = matched.get("id")
r["started_at"] = matched.get("created_at")
r["finished_at"] = matched.get("finished_at")
results.sort(key=lambda x: x["value"], reverse=not minimize)
best = results[0]
run_names = [r["run"] for r in results]
has_dupes = len(run_names) != len(set(run_names))
if args.json:
print(
format_json(
{
"project": args.project,
"metric": args.metric,
"direction": args.direction,
"mode": args.mode,
"best_run": best["run"],
"best_value": best["value"],
"best_step": best["step"],
"ranking": results,
}
)
)
else:
display = [
{
**r,
"run": f"{r['run']} ({r['id'][:8]})"
if has_dupes and r.get("id")
else r["run"],
}
for r in results
]
print(format_best(args.project, args.metric, minimize, args.mode, display))
elif args.command == "compare":
_require_project(args.project)
status_filter = None if args.include_all else "finished"
statuses = SQLiteStorage.get_run_statuses(args.project)
all_records = SQLiteStorage.get_run_records(args.project)
if args.runs:
names_filter = {r.strip() for r in args.runs.split(",")}
all_records = [r for r in all_records if r["name"] in names_filter]
elif status_filter is not None:
all_records = [
r for r in all_records if statuses.get(r["name"]) == status_filter
]
metric_names = None
if args.metrics:
metric_names = [m.strip() for m in args.metrics.split(",")]
if not metric_names:
metric_set = set()
for rec in all_records:
metric_set.update(
SQLiteStorage.get_all_metrics_for_run(args.project, rec["name"])
)
metric_names = sorted(metric_set)
configs = SQLiteStorage.get_all_run_configs(args.project)
run_names = [r["name"] for r in all_records]
has_dupes = len(run_names) != len(set(run_names))
comparison = []
for rec in all_records:
run = rec["name"]
run_metrics = {}
for metric in metric_names:
values = SQLiteStorage.get_final_metric_for_runs(
args.project,
metric,
mode=args.mode,
run_names=[run],
status_filter=None,
)
if values:
run_metrics[metric] = values[0]["value"]
comparison.append(
{
"id": rec["id"],
"run": run,
"status": statuses.get(run),
"started_at": rec["created_at"],
"finished_at": rec["finished_at"],
"config": _public_config(configs.get(run, {})),
"metrics": run_metrics,
}
)
if args.json:
print(
format_json(
{
"project": args.project,
"mode": args.mode,
"runs": comparison,
}
)
)
else:
display = [
{
**e,
"run": f"{e['run']} ({e['id'][:8]})"
if has_dupes and e.get("id")
else e["run"],
}
for e in comparison
]
print(format_compare(args.project, metric_names, display))
elif args.command == "summary":
_require_project(args.project)
runs = SQLiteStorage.get_runs(args.project)
configs = SQLiteStorage.get_all_run_configs(args.project)
statuses = SQLiteStorage.get_run_statuses(args.project)
alert_count = SQLiteStorage.get_alert_count(args.project)
run_summaries = []
for run in runs:
last_step = SQLiteStorage.get_last_step(args.project, run)
num_logs = SQLiteStorage.get_log_count(args.project, run)
metric_value = None
if args.metric:
values = SQLiteStorage.get_final_metric_for_runs(
args.project,
args.metric,
mode="last",
run_names=[run],
status_filter=None,
)
if values:
metric_value = values[0]["value"]
run_summaries.append(
{
"run": run,
"status": statuses.get(run),
"last_step": last_step,
"num_logs": num_logs,
"config": _public_config(configs.get(run, {})),
"metric_value": metric_value,
}
)
summary_data = {
"project": args.project,
"num_runs": len(runs),
"total_alerts": alert_count,
"metric": args.metric,
"runs": run_summaries,
}
if args.json:
print(format_json(summary_data))
else:
print(format_summary(summary_data))
elif args.command == "query":
if args.query_type == "project":
_handle_query(args)
elif args.command == "skills":
if args.skills_action == "add":
_handle_skills_add(args)
else:
parser.print_help()
def _handle_skills_add(args):
import shutil
from pathlib import Path
CENTRAL_LOCAL = Path(".agents/skills")
CENTRAL_GLOBAL = Path("~/.agents/skills")
CLAUDE_LOCAL = Path(".claude/skills")
CLAUDE_GLOBAL = Path("~/.claude/skills")
SKILL_ID = "trackio"
GITHUB_RAW = "https://raw.githubusercontent.com/gradio-app/trackio/main"
SKILL_PREFIX = ".agents/skills/trackio"
SKILL_FILES = [
"SKILL.md",
"alerts.md",
"logging_metrics.md",
"retrieving_metrics.md",
"storage_schema.md",
]
if not (args.cursor or args.claude or args.codex or args.opencode or args.dest):
error_exit(
"Pick a destination via --cursor, --claude, --codex, --opencode, or --dest."
)
def download(url: str) -> str:
from huggingface_hub.utils import get_session
try:
response = get_session().get(url)
response.raise_for_status()
except Exception as e:
error_exit(
f"Failed to download {url}\n{e}\n\n"
"Make sure you have internet access. The skill files are fetched from "
"the Trackio GitHub repository."
)
return response.text
def remove_existing(path: Path, force: bool):
if not (path.exists() or path.is_symlink()):
return
if not force:
error_exit(
f"Skill already exists at {path}.\nRe-run with --force to overwrite."
)
if path.is_dir() and not path.is_symlink():
shutil.rmtree(path)
else:
path.unlink()
def install_to(skills_dir: Path, force: bool) -> Path:
skills_dir = skills_dir.expanduser().resolve()
skills_dir.mkdir(parents=True, exist_ok=True)
dest = skills_dir / SKILL_ID
remove_existing(dest, force)
dest.mkdir()
for fname in SKILL_FILES:
content = download(f"{GITHUB_RAW}/{SKILL_PREFIX}/{fname}")
(dest / fname).write_text(content, encoding="utf-8")
return dest
def create_symlink(
agent_skills_dir: Path, central_skill_path: Path, force: bool
) -> Path:
agent_skills_dir = agent_skills_dir.expanduser().resolve()
agent_skills_dir.mkdir(parents=True, exist_ok=True)
link_path = agent_skills_dir / SKILL_ID
remove_existing(link_path, force)
link_path.symlink_to(os.path.relpath(central_skill_path, agent_skills_dir))
return link_path
global_targets = {
"cursor": Path("~/.cursor/skills"),
"claude": CLAUDE_GLOBAL,
"codex": Path("~/.codex/skills"),
"opencode": Path("~/.opencode/skills"),
}
local_targets = {
"cursor": Path(".cursor/skills"),
"claude": CLAUDE_LOCAL,
"codex": Path(".codex/skills"),
"opencode": Path(".opencode/skills"),
}
targets_dict = global_targets if args.global_ else local_targets
if args.dest:
if args.cursor or args.claude or args.codex or args.opencode or args.global_:
error_exit("--dest cannot be combined with agent flags or --global.")
skill_dest = install_to(Path(args.dest), args.force)
print(f"Installed '{SKILL_ID}' to {skill_dest}")
return
agent_targets = []
if args.cursor:
agent_targets.append(targets_dict["cursor"])
if args.claude:
agent_targets.append(targets_dict["claude"])
if args.codex:
agent_targets.append(targets_dict["codex"])
if args.opencode:
agent_targets.append(targets_dict["opencode"])
central_path = CENTRAL_GLOBAL if args.global_ else CENTRAL_LOCAL
central_skill_path = install_to(central_path, args.force)
print(f"Installed '{SKILL_ID}' to central location: {central_skill_path}")
for agent_target in agent_targets:
link_path = create_symlink(agent_target, central_skill_path, args.force)
print(f"Created symlink: {link_path}")
if __name__ == "__main__":
main()