import argparse import os from trackio import freeze, show, sync from trackio.cli_helpers import ( error_exit, format_alerts, format_best, format_compare, format_json, format_list, format_metric_values, format_project_summary, format_query_result, format_run_summary, format_snapshot, format_summary, format_system_metric_names, format_system_metrics, ) from trackio.frontend_config import ( TRACKIO_CONFIG_PATH, get_persisted_frontend_dir, set_persisted_frontend_dir, unset_persisted_frontend_dir, ) from trackio.markdown import Markdown from trackio.server import get_project_summary, get_run_summary from trackio.sqlite_storage import SQLiteStorage def _get_space(args): return getattr(args, "space", None) def _get_remote(args): from trackio.remote_client import RemoteClient space = _get_space(args) if not space: return None hf_token = getattr(args, "hf_token", None) return RemoteClient(space, hf_token=hf_token) def _handle_status(): print("Reading local Trackio projects...\n") projects = SQLiteStorage.get_projects() if not projects: print("No Trackio projects found.") return local_projects = [] synced_projects = [] unsynced_projects = [] for project in projects: space_id = SQLiteStorage.get_space_id(project) if space_id is None: local_projects.append(project) elif SQLiteStorage.has_pending_data(project): unsynced_projects.append(project) else: synced_projects.append(project) print("Finished reading Trackio projects") if local_projects: print(f" * {len(local_projects)} local trackio project(s) [OK]") if synced_projects: print(f" * {len(synced_projects)} trackio project(s) synced to Spaces [OK]") if unsynced_projects: print( f" * {len(unsynced_projects)} trackio project(s) with unsynced changes [WARNING]:" ) for p in unsynced_projects: print(f" - {p}") if unsynced_projects: print( f"\nRun `trackio sync --project {unsynced_projects[0]}` to sync. " "Or run `trackio sync --all` to sync all unsynced changes." ) def _handle_sync(args): from trackio.deploy import sync_incremental if args.sync_all and args.project: error_exit("Cannot use --all and --project together.") if not args.sync_all and not args.project: error_exit("Must provide either --project or --all.") if args.sync_all: projects = SQLiteStorage.get_projects() synced_any = False for project in projects: space_id = SQLiteStorage.get_space_id(project) if space_id and SQLiteStorage.has_pending_data(project): sync_incremental( project, space_id, private=args.private, pending_only=True, frontend_dir=args.frontend, ) synced_any = True if not synced_any: print("No projects with unsynced data found.") else: space_id = args.space_id if space_id is None: space_id = SQLiteStorage.get_space_id(args.project) sync( project=args.project, space_id=space_id, private=args.private, force=args.force, sdk=args.sdk, frontend_dir=args.frontend, ) def _handle_config(args): if args.config_command == "get": frontend_dir = get_persisted_frontend_dir() if frontend_dir is None: print("No Trackio frontend config is set.") print(f"Config file: {TRACKIO_CONFIG_PATH}") return print(f"frontend: {frontend_dir}") print(f"config: {TRACKIO_CONFIG_PATH}") return if args.config_command == "set": try: frontend_dir = set_persisted_frontend_dir(args.frontend) except ValueError as e: error_exit(str(e)) print(f"Saved Trackio default frontend: {frontend_dir}") print("Reset with `trackio config unset frontend`.") return if args.config_command == "unset": removed = unset_persisted_frontend_dir() if removed: print("Removed Trackio default frontend.") else: print("No Trackio default frontend was set.") return def _extract_reports( run: str, logs: list[dict], report_name: str | None = None, run_id: str | None = None, ) -> list[dict]: reports = [] for log in logs: timestamp = log.get("timestamp") step = log.get("step") for key, value in log.items(): if report_name is not None and key != report_name: continue if isinstance(value, dict) and value.get("_type") == Markdown.TYPE: content = value.get("_value") if isinstance(content, str): reports.append( { "run": run, "run_id": run_id, "report": key, "step": step, "timestamp": timestamp, "content": content, } ) return reports def _require_project(project: str): db_path = SQLiteStorage.get_project_db_path(project) if not db_path.exists(): error_exit(f"Project '{project}' not found.") def _public_config(cfg: dict) -> dict: return {k: v for k, v in cfg.items() if not k.startswith("_")} def _handle_query(args): remote = _get_remote(args) try: if remote: result = remote.predict(args.project, args.sql, api_name="/query_project") else: result = SQLiteStorage.query_project(args.project, args.sql) except FileNotFoundError as e: error_exit(str(e)) except ValueError as e: error_exit(str(e)) if args.json: print(format_json(result)) else: print(format_query_result(result)) def main(): parser = argparse.ArgumentParser(description="Trackio CLI") parser.add_argument( "--space", required=False, help="HF Space ID (e.g. 'user/space') or Space URL to query remotely.", ) parser.add_argument( "--hf-token", required=False, help="HF token for accessing private Spaces.", ) subparsers = parser.add_subparsers(dest="command") ui_parser = subparsers.add_parser( "show", help="Show the Trackio dashboard UI for a project" ) ui_parser.add_argument( "--project", required=False, help="Project name to show in the dashboard" ) ui_parser.add_argument( "--theme", required=False, default="default", help="A Gradio Theme to use for the dashboard instead of the default, can be a built-in theme (e.g. 'soft', 'citrus'), or a theme from the Hub (e.g. 'gstaff/xkcd').", ) ui_parser.add_argument( "--mcp-server", action="store_true", help="Enable MCP server functionality. The Trackio dashboard will be set up as an MCP server and certain functions will be exposed as MCP tools.", ) ui_parser.add_argument( "--footer", action="store_true", default=True, help="Show the Gradio footer. Use --no-footer to hide it.", ) ui_parser.add_argument( "--no-footer", dest="footer", action="store_false", help="Hide the Gradio footer.", ) ui_parser.add_argument( "--color-palette", required=False, help="Comma-separated list of hex color codes for plot lines (e.g. '#FF0000,#00FF00,#0000FF'). If not provided, the TRACKIO_COLOR_PALETTE environment variable will be used, or the default palette if not set.", ) ui_parser.add_argument( "--host", required=False, help="Host to bind the server to (e.g. '0.0.0.0' for remote access). If not provided, defaults to '127.0.0.1' (localhost only).", ) ui_parser.add_argument( "--frontend", required=False, help="Custom frontend directory to serve. Must contain index.html.", ) subparsers.add_parser( "status", help="Show the status of all local Trackio projects, including sync status.", ) sync_parser = subparsers.add_parser( "sync", help="Sync a local project's database to a Hugging Face Space. If the Space does not exist, it will be created.", ) sync_parser.add_argument( "--project", required=False, help="The name of the local project.", ) sync_parser.add_argument( "--space-id", required=False, help="The Hugging Face Space ID where the project will be synced (e.g. username/space_id). If not provided, uses the previously-configured Space.", ) sync_parser.add_argument( "--all", action="store_true", dest="sync_all", help="Sync all projects that have unsynced data to their configured Spaces.", ) sync_parser.add_argument( "--private", action="store_true", help="Make the Hugging Face Space private if creating a new Space. By default, the repo will be public unless the organization's default is private. This value is ignored if the repo already exists.", ) sync_parser.add_argument( "--force", action="store_true", help="Overwrite the existing database without prompting for confirmation.", ) sync_parser.add_argument( "--sdk", choices=["gradio", "static"], default="gradio", help="The type of Space to deploy. 'gradio' (default) deploys a live Gradio server. 'static' deploys a static Space that reads from an HF Bucket.", ) sync_parser.add_argument( "--frontend", required=False, help="Custom frontend directory to deploy. Must contain index.html.", ) freeze_parser = subparsers.add_parser( "freeze", help="Create a one-time static Space snapshot from a project's data.", ) freeze_parser.add_argument( "--space-id", required=True, help="The source Gradio Space ID (e.g. username/space_id).", ) freeze_parser.add_argument( "--project", required=True, help="The name of the project to freeze into a static snapshot.", ) freeze_parser.add_argument( "--new-space-id", required=False, help="The Space ID for the new static Space. Defaults to {space_id}_static.", ) freeze_parser.add_argument( "--private", action="store_true", help="Make the new static Space private.", ) freeze_parser.add_argument( "--frontend", required=False, help="Custom frontend directory to deploy to the frozen static Space.", ) config_parser = subparsers.add_parser( "config", help="Manage persistent Trackio configuration.", ) config_subparsers = config_parser.add_subparsers( dest="config_command", required=True, ) config_subparsers.add_parser("get", help="Show current Trackio config.") config_set_parser = config_subparsers.add_parser( "set", help="Set a persistent Trackio config value.", ) config_set_parser.add_argument( "key", choices=["frontend"], help="Config key to set.", ) config_set_parser.add_argument( "frontend", help="Frontend directory to persist.", ) config_unset_parser = config_subparsers.add_parser( "unset", help="Unset a persistent Trackio config value.", ) config_unset_parser.add_argument( "key", choices=["frontend"], help="Config key to unset.", ) list_parser = subparsers.add_parser( "list", help="List projects, runs, or metrics", ) list_subparsers = list_parser.add_subparsers(dest="list_type", required=True) list_projects_parser = list_subparsers.add_parser( "projects", help="List all projects", ) list_projects_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) list_runs_parser = list_subparsers.add_parser( "runs", help="List runs for a project", ) list_runs_parser.add_argument( "--project", required=True, help="Project name", ) list_runs_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) list_metrics_parser = list_subparsers.add_parser( "metrics", help="List metrics for a run", ) list_metrics_parser.add_argument( "--project", required=True, help="Project name", ) list_metrics_parser.add_argument( "--run", required=True, help="Run name", ) list_metrics_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) list_system_metrics_parser = list_subparsers.add_parser( "system-metrics", help="List system metrics for a run", ) list_system_metrics_parser.add_argument( "--project", required=True, help="Project name", ) list_system_metrics_parser.add_argument( "--run", required=True, help="Run name", ) list_system_metrics_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) list_alerts_parser = list_subparsers.add_parser( "alerts", help="List alerts for a project or run", ) list_alerts_parser.add_argument( "--project", required=True, help="Project name", ) list_alerts_parser.add_argument( "--run", required=False, help="Run name (optional)", ) list_alerts_parser.add_argument( "--level", required=False, help="Filter by alert level (info, warn, error)", ) list_alerts_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) list_alerts_parser.add_argument( "--since", required=False, help="Only show alerts after this ISO 8601 timestamp", ) list_reports_parser = list_subparsers.add_parser( "reports", help="List markdown reports for a project or run", ) list_reports_parser.add_argument( "--project", required=True, help="Project name", ) list_reports_parser.add_argument( "--run", required=False, help="Run name (optional)", ) list_reports_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) get_parser = subparsers.add_parser( "get", help="Get project, run, or metric information", ) get_subparsers = get_parser.add_subparsers(dest="get_type", required=True) get_project_parser = get_subparsers.add_parser( "project", help="Get project summary", ) get_project_parser.add_argument( "--project", required=True, help="Project name", ) get_project_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) get_run_parser = get_subparsers.add_parser( "run", help="Get run summary", ) get_run_parser.add_argument( "--project", required=True, help="Project name", ) get_run_parser.add_argument( "--run", required=True, help="Run name", ) get_run_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) get_metric_parser = get_subparsers.add_parser( "metric", help="Get metric values for a run", ) get_metric_parser.add_argument( "--project", required=True, help="Project name", ) get_metric_parser.add_argument( "--run", required=True, help="Run name", ) get_metric_parser.add_argument( "--metric", required=True, help="Metric name", ) get_metric_parser.add_argument( "--step", type=int, required=False, help="Get metric at exactly this step", ) get_metric_parser.add_argument( "--around", type=int, required=False, help="Get metrics around this step (use with --window)", ) get_metric_parser.add_argument( "--at-time", required=False, help="Get metrics around this ISO 8601 timestamp (use with --window)", ) get_metric_parser.add_argument( "--window", type=int, required=False, default=10, help="Window size: ±steps for --around, ±seconds for --at-time (default: 10)", ) get_metric_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) get_snapshot_parser = get_subparsers.add_parser( "snapshot", help="Get all metrics at/around a step or timestamp", ) get_snapshot_parser.add_argument( "--project", required=True, help="Project name", ) get_snapshot_parser.add_argument( "--run", required=True, help="Run name", ) get_snapshot_parser.add_argument( "--step", type=int, required=False, help="Get all metrics at exactly this step", ) get_snapshot_parser.add_argument( "--around", type=int, required=False, help="Get all metrics around this step (use with --window)", ) get_snapshot_parser.add_argument( "--at-time", required=False, help="Get all metrics around this ISO 8601 timestamp (use with --window)", ) get_snapshot_parser.add_argument( "--window", type=int, required=False, default=10, help="Window size: ±steps for --around, ±seconds for --at-time (default: 10)", ) get_snapshot_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) get_system_metric_parser = get_subparsers.add_parser( "system-metric", help="Get system metric values for a run", ) get_system_metric_parser.add_argument( "--project", required=True, help="Project name", ) get_system_metric_parser.add_argument( "--run", required=True, help="Run name", ) get_system_metric_parser.add_argument( "--metric", required=False, help="System metric name (optional, if not provided returns all system metrics)", ) get_system_metric_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) get_alerts_parser = get_subparsers.add_parser( "alerts", help="Get alerts for a project or run", ) get_alerts_parser.add_argument( "--project", required=True, help="Project name", ) get_alerts_parser.add_argument( "--run", required=False, help="Run name (optional)", ) get_alerts_parser.add_argument( "--level", required=False, help="Filter by alert level (info, warn, error)", ) get_alerts_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) get_alerts_parser.add_argument( "--since", required=False, help="Only show alerts after this ISO 8601 timestamp", ) get_report_parser = get_subparsers.add_parser( "report", help="Get markdown report entries for a run", ) get_report_parser.add_argument( "--project", required=True, help="Project name", ) get_report_parser.add_argument( "--run", required=True, help="Run name", ) get_report_parser.add_argument( "--report", required=True, help="Report metric name", ) get_report_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) query_parser = subparsers.add_parser( "query", help="Run a read-only SQL query against a project database", ) query_subparsers = query_parser.add_subparsers(dest="query_type", required=True) query_project_parser = query_subparsers.add_parser( "project", help="Run a read-only SQL query against a project's SQLite database", ) query_project_parser.add_argument( "--project", required=True, help="Project name", ) query_project_parser.add_argument( "--sql", required=True, help="Read-only SQL query to execute", ) query_project_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) skills_parser = subparsers.add_parser( "skills", help="Manage Trackio skills for AI coding assistants", ) skills_subparsers = skills_parser.add_subparsers( dest="skills_action", required=True ) skills_add_parser = skills_subparsers.add_parser( "add", help="Download and install the Trackio skill for an AI assistant", ) skills_add_parser.add_argument( "--cursor", action="store_true", help="Install for Cursor", ) skills_add_parser.add_argument( "--claude", action="store_true", help="Install for Claude Code", ) skills_add_parser.add_argument( "--codex", action="store_true", help="Install for Codex", ) skills_add_parser.add_argument( "--opencode", action="store_true", help="Install for OpenCode", ) skills_add_parser.add_argument( "--global", dest="global_", action="store_true", help="Install globally (user-level) instead of in the current project directory", ) skills_add_parser.add_argument( "--dest", type=str, required=False, help="Install into a custom destination (path to skills directory)", ) skills_add_parser.add_argument( "--force", action="store_true", help="Overwrite existing skill if it already exists", ) best_parser = subparsers.add_parser( "best", help="Find the best run in a project for a given metric", ) best_parser.add_argument( "--project", required=True, help="Project name", ) best_parser.add_argument( "--metric", required=True, help="Metric name to rank runs by", ) best_parser.add_argument( "--direction", choices=["min", "max"], default="min", help="Whether lower ('min', default) or higher ('max') values are better", ) best_parser.add_argument( "--mode", choices=["last", "min", "max"], default="last", help="How to select the value from each run: 'last' (final step), 'min' (minimum), 'max' (maximum). Default: last", ) best_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) best_parser.add_argument( "--include-all", action="store_true", help="Include runs of all statuses (default: only finished runs)", ) compare_parser = subparsers.add_parser( "compare", help="Compare runs side-by-side", ) compare_parser.add_argument( "--project", required=True, help="Project name", ) compare_parser.add_argument( "--runs", required=False, help="Comma-separated run names (default: all runs in project)", ) compare_parser.add_argument( "--metrics", required=False, help="Comma-separated metric names to compare (default: all metrics)", ) compare_parser.add_argument( "--mode", choices=["last", "min", "max"], default="last", help="How to select values: 'last', 'min', 'max'. Default: last", ) compare_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) compare_parser.add_argument( "--include-all", action="store_true", help="Include runs of all statuses (default: only finished runs)", ) summary_parser = subparsers.add_parser( "summary", help="Get a full experiment summary for a project", ) summary_parser.add_argument( "--project", required=True, help="Project name", ) summary_parser.add_argument( "--metric", required=False, help="Primary metric to include in summary", ) summary_parser.add_argument( "--json", action="store_true", help="Output in JSON format", ) args, unknown_args = parser.parse_known_args() if unknown_args: trailing_global_parser = argparse.ArgumentParser(add_help=False) trailing_global_parser.add_argument("--space", required=False) trailing_global_parser.add_argument("--hf-token", required=False) trailing_globals, remaining_unknown = trailing_global_parser.parse_known_args( unknown_args ) if remaining_unknown: parser.error(f"unrecognized arguments: {' '.join(remaining_unknown)}") if trailing_globals.space is not None: args.space = trailing_globals.space if trailing_globals.hf_token is not None: args.hf_token = trailing_globals.hf_token if args.command in ( "show", "status", "sync", "freeze", "skills", "best", "compare", "summary", ) and _get_space(args): error_exit( f"The '{args.command}' command does not support --space (remote mode)." ) if args.command == "show": color_palette = None if args.color_palette: color_palette = [color.strip() for color in args.color_palette.split(",")] show( project=args.project, theme=args.theme, mcp_server=args.mcp_server, footer=args.footer, color_palette=color_palette, host=args.host, frontend_dir=args.frontend, ) elif args.command == "status": _handle_status() elif args.command == "sync": _handle_sync(args) elif args.command == "freeze": freeze( space_id=args.space_id, project=args.project, new_space_id=args.new_space_id, private=args.private, frontend_dir=args.frontend, ) elif args.command == "config": _handle_config(args) elif args.command == "list": remote = _get_remote(args) if args.list_type == "projects": if remote: projects = remote.predict(api_name="/get_all_projects") else: projects = SQLiteStorage.get_projects() if args.json: print(format_json({"projects": projects})) else: print(format_list(projects, "Projects")) elif args.list_type == "runs": if remote: remote_records = remote.predict( args.project, api_name="/get_runs_for_project" ) runs = [r["name"] if isinstance(r, dict) else r for r in remote_records] statuses = SQLiteStorage.get_run_statuses(args.project) if args.json: out = [{"name": r, "status": statuses.get(r)} for r in runs] print(format_json({"project": args.project, "runs": out})) else: annotated = [ f"{r} [{statuses[r]}]" if r in statuses and statuses[r] else r for r in runs ] print(format_list(annotated, f"Runs in '{args.project}'")) else: _require_project(args.project) records = SQLiteStorage.get_run_records(args.project) statuses = SQLiteStorage.get_run_statuses(args.project) names = [r["name"] for r in records] has_dupes = len(names) != len(set(names)) if args.json: out = [ { "id": r["id"], "name": r["name"], "status": statuses.get(r["name"]), "started_at": r["created_at"], "finished_at": r["finished_at"], } for r in records ] print(format_json({"project": args.project, "runs": out})) else: annotated = [] for r in records: label = r["name"] if has_dupes: label += f" ({r['id'][:8]})" if statuses.get(r["name"]): label += f" [{statuses[r['name']]}]" annotated.append(label) print(format_list(annotated, f"Runs in '{args.project}'")) elif args.list_type == "metrics": if remote: metrics = remote.predict( args.project, args.run, api_name="/get_metrics_for_run" ) else: _require_project(args.project) runs = SQLiteStorage.get_runs(args.project) if args.run not in runs: error_exit( f"Run '{args.run}' not found in project '{args.project}'." ) metrics = SQLiteStorage.get_all_metrics_for_run(args.project, args.run) if args.json: print( format_json( {"project": args.project, "run": args.run, "metrics": metrics} ) ) else: print( format_list( metrics, f"Metrics for '{args.run}' in '{args.project}'" ) ) elif args.list_type == "system-metrics": if remote: system_metrics = remote.predict( args.project, args.run, api_name="/get_system_metrics_for_run" ) else: _require_project(args.project) runs = SQLiteStorage.get_runs(args.project) if args.run not in runs: error_exit( f"Run '{args.run}' not found in project '{args.project}'." ) system_metrics = SQLiteStorage.get_all_system_metrics_for_run( args.project, args.run ) if args.json: print( format_json( { "project": args.project, "run": args.run, "system_metrics": system_metrics, } ) ) else: print(format_system_metric_names(system_metrics)) elif args.list_type == "alerts": if remote: alerts = remote.predict( args.project, args.run, args.level, args.since, api_name="/get_alerts", ) else: _require_project(args.project) alerts = SQLiteStorage.get_alerts( args.project, run_name=args.run, level=args.level, since=args.since, ) if args.json: print( format_json( { "project": args.project, "run": args.run, "level": args.level, "since": args.since, "alerts": alerts, } ) ) else: print(format_alerts(alerts)) elif args.list_type == "reports": if remote: run_records = remote.predict( args.project, api_name="/get_runs_for_project" ) records = [ r if isinstance(r, dict) else {"name": r, "id": r} for r in run_records ] else: _require_project(args.project) records = SQLiteStorage.get_run_records(args.project) run_names = [r["name"] for r in records] if args.run and args.run not in run_names: error_exit(f"Run '{args.run}' not found in project '{args.project}'.") target_records = ( [r for r in records if r["name"] == args.run] if args.run else records ) target_names = [r["name"] for r in target_records] has_dupes = len(target_names) != len(set(target_names)) all_reports = [] for rec in target_records: run_name = rec["name"] run_id = rec.get("id") if remote: logs = remote.predict(args.project, run_name, api_name="/get_logs") else: logs = SQLiteStorage.get_logs(args.project, run_name, run_id=run_id) all_reports.extend(_extract_reports(run_name, logs, run_id=run_id)) if args.json: print( format_json( { "project": args.project, "run": args.run, "reports": all_reports, } ) ) else: report_lines = [] for entry in all_reports: label = entry["run"] if has_dupes and entry.get("run_id"): label += f" ({entry['run_id'][:8]})" report_lines.append( f"{label} | {entry['report']} | step={entry['step']} | {entry['timestamp']}" ) if args.run: print( format_list( report_lines, f"Reports for '{args.run}' in '{args.project}'", ) ) else: print(format_list(report_lines, f"Reports in '{args.project}'")) elif args.command == "get": remote = _get_remote(args) if args.get_type == "project": if remote: summary = remote.predict(args.project, api_name="/get_project_summary") else: _require_project(args.project) summary = get_project_summary(args.project) if args.json: print(format_json(summary)) else: print(format_project_summary(summary)) elif args.get_type == "run": if remote: summary = remote.predict( args.project, args.run, api_name="/get_run_summary" ) else: _require_project(args.project) runs = SQLiteStorage.get_runs(args.project) if args.run not in runs: error_exit( f"Run '{args.run}' not found in project '{args.project}'." ) summary = get_run_summary(args.project, args.run) if args.json: print(format_json(summary)) else: print(format_run_summary(summary)) elif args.get_type == "metric": at_time = getattr(args, "at_time", None) if remote: values = remote.predict( args.project, args.run, args.metric, args.step, args.around, at_time, args.window, api_name="/get_metric_values", ) else: _require_project(args.project) runs = SQLiteStorage.get_runs(args.project) if args.run not in runs: error_exit( f"Run '{args.run}' not found in project '{args.project}'." ) metrics = SQLiteStorage.get_all_metrics_for_run(args.project, args.run) if args.metric not in metrics: error_exit( f"Metric '{args.metric}' not found in run '{args.run}' of project '{args.project}'." ) values = SQLiteStorage.get_metric_values( args.project, args.run, args.metric, step=args.step, around_step=args.around, at_time=at_time, window=args.window, ) if args.json: print( format_json( { "project": args.project, "run": args.run, "metric": args.metric, "values": values, } ) ) else: print(format_metric_values(values)) elif args.get_type == "snapshot": if not args.step and not args.around and not getattr(args, "at_time", None): error_exit( "Provide --step, --around (with --window), or --at-time (with --window)." ) at_time = getattr(args, "at_time", None) if remote: snapshot = remote.predict( args.project, args.run, args.step, args.around, at_time, args.window, api_name="/get_snapshot", ) else: _require_project(args.project) runs = SQLiteStorage.get_runs(args.project) if args.run not in runs: error_exit( f"Run '{args.run}' not found in project '{args.project}'." ) snapshot = SQLiteStorage.get_snapshot( args.project, args.run, step=args.step, around_step=args.around, at_time=at_time, window=args.window, ) if args.json: result = { "project": args.project, "run": args.run, "metrics": snapshot, } if args.step is not None: result["step"] = args.step if args.around is not None: result["around"] = args.around result["window"] = args.window if at_time is not None: result["at_time"] = at_time result["window"] = args.window print(format_json(result)) else: print(format_snapshot(snapshot)) elif args.get_type == "system-metric": if remote: system_metrics = remote.predict( args.project, args.run, api_name="/get_system_logs" ) if args.metric: all_system_metric_names = remote.predict( args.project, args.run, api_name="/get_system_metrics_for_run", ) if args.metric not in all_system_metric_names: error_exit( f"System metric '{args.metric}' not found in run '{args.run}' of project '{args.project}'." ) filtered_metrics = [ { k: v for k, v in entry.items() if k == "timestamp" or k == args.metric } for entry in system_metrics if args.metric in entry ] if args.json: print( format_json( { "project": args.project, "run": args.run, "metric": args.metric, "values": filtered_metrics, } ) ) else: print(format_system_metrics(filtered_metrics)) else: if args.json: print( format_json( { "project": args.project, "run": args.run, "system_metrics": system_metrics, } ) ) else: print(format_system_metrics(system_metrics)) else: _require_project(args.project) runs = SQLiteStorage.get_runs(args.project) if args.run not in runs: error_exit( f"Run '{args.run}' not found in project '{args.project}'." ) if args.metric: system_metrics = SQLiteStorage.get_system_logs( args.project, args.run ) all_system_metric_names = ( SQLiteStorage.get_all_system_metrics_for_run( args.project, args.run ) ) if args.metric not in all_system_metric_names: error_exit( f"System metric '{args.metric}' not found in run '{args.run}' of project '{args.project}'." ) filtered_metrics = [ { k: v for k, v in entry.items() if k == "timestamp" or k == args.metric } for entry in system_metrics if args.metric in entry ] if args.json: print( format_json( { "project": args.project, "run": args.run, "metric": args.metric, "values": filtered_metrics, } ) ) else: print(format_system_metrics(filtered_metrics)) else: system_metrics = SQLiteStorage.get_system_logs( args.project, args.run ) if args.json: print( format_json( { "project": args.project, "run": args.run, "system_metrics": system_metrics, } ) ) else: print(format_system_metrics(system_metrics)) elif args.get_type == "alerts": if remote: alerts = remote.predict( args.project, args.run, args.level, args.since, api_name="/get_alerts", ) else: _require_project(args.project) alerts = SQLiteStorage.get_alerts( args.project, run_name=args.run, level=args.level, since=args.since, ) if args.json: print( format_json( { "project": args.project, "run": args.run, "level": args.level, "since": args.since, "alerts": alerts, } ) ) else: print(format_alerts(alerts)) elif args.get_type == "report": if remote: logs = remote.predict(args.project, args.run, api_name="/get_logs") else: _require_project(args.project) runs = SQLiteStorage.get_runs(args.project) if args.run not in runs: error_exit( f"Run '{args.run}' not found in project '{args.project}'." ) logs = SQLiteStorage.get_logs(args.project, args.run) reports = _extract_reports(args.run, logs, report_name=args.report) if not reports: error_exit( f"Report '{args.report}' not found in run '{args.run}' of project '{args.project}'." ) if args.json: print( format_json( { "project": args.project, "run": args.run, "report": args.report, "values": reports, } ) ) else: output = [] for idx, entry in enumerate(reports, start=1): output.append( f"Entry {idx} | step={entry['step']} | timestamp={entry['timestamp']}" ) output.append(entry["content"]) if idx < len(reports): output.append("-" * 80) print("\n".join(output)) elif args.command == "best": _require_project(args.project) minimize = args.direction == "min" status_filter = None if args.include_all else "finished" results = SQLiteStorage.get_final_metric_for_runs( args.project, args.metric, mode=args.mode, status_filter=status_filter ) if not results: qualifier = "" if args.include_all else " finished" error_exit( f"No{qualifier} runs with metric '{args.metric}' found in project '{args.project}'." ) configs = SQLiteStorage.get_all_run_configs(args.project) for r in results: r["config"] = _public_config(configs.get(r["run"]) or {}) statuses_map = SQLiteStorage.get_run_statuses(args.project) all_records = SQLiteStorage.get_run_records(args.project) if status_filter is not None: all_records = [ rec for rec in all_records if statuses_map.get(rec["name"]) == status_filter ] records_by_name: dict[str, list[dict]] = {} for rec in all_records: records_by_name.setdefault(rec["name"], []).append(rec) for r in results: pool = records_by_name.get(r["run"], []) matched = pool.pop(0) if pool else {} r["id"] = matched.get("id") r["started_at"] = matched.get("created_at") r["finished_at"] = matched.get("finished_at") results.sort(key=lambda x: x["value"], reverse=not minimize) best = results[0] run_names = [r["run"] for r in results] has_dupes = len(run_names) != len(set(run_names)) if args.json: print( format_json( { "project": args.project, "metric": args.metric, "direction": args.direction, "mode": args.mode, "best_run": best["run"], "best_value": best["value"], "best_step": best["step"], "ranking": results, } ) ) else: display = [ { **r, "run": f"{r['run']} ({r['id'][:8]})" if has_dupes and r.get("id") else r["run"], } for r in results ] print(format_best(args.project, args.metric, minimize, args.mode, display)) elif args.command == "compare": _require_project(args.project) status_filter = None if args.include_all else "finished" statuses = SQLiteStorage.get_run_statuses(args.project) all_records = SQLiteStorage.get_run_records(args.project) if args.runs: names_filter = {r.strip() for r in args.runs.split(",")} all_records = [r for r in all_records if r["name"] in names_filter] elif status_filter is not None: all_records = [ r for r in all_records if statuses.get(r["name"]) == status_filter ] metric_names = None if args.metrics: metric_names = [m.strip() for m in args.metrics.split(",")] if not metric_names: metric_set = set() for rec in all_records: metric_set.update( SQLiteStorage.get_all_metrics_for_run(args.project, rec["name"]) ) metric_names = sorted(metric_set) configs = SQLiteStorage.get_all_run_configs(args.project) run_names = [r["name"] for r in all_records] has_dupes = len(run_names) != len(set(run_names)) target_ids = [r["id"] for r in all_records if r.get("id")] metric_values_by_id: dict[str, dict[str, float]] = {} if target_ids: for metric in metric_names: rows = SQLiteStorage.get_final_metric_for_runs( args.project, metric, mode=args.mode, run_ids=target_ids, status_filter=None, ) for row in rows: rid = row.get("run_id") if rid: metric_values_by_id.setdefault(rid, {})[metric] = row["value"] comparison = [] for rec in all_records: run = rec["name"] rid = rec.get("id") run_metrics = metric_values_by_id.get(rid, {}) if rid else {} comparison.append( { "id": rec["id"], "run": run, "status": statuses.get(run), "started_at": rec["created_at"], "finished_at": rec["finished_at"], "config": _public_config(configs.get(run, {})), "metrics": run_metrics, } ) if args.json: print( format_json( { "project": args.project, "mode": args.mode, "runs": comparison, } ) ) else: display = [ { **e, "run": f"{e['run']} ({e['id'][:8]})" if has_dupes and e.get("id") else e["run"], } for e in comparison ] print(format_compare(args.project, metric_names, display)) elif args.command == "summary": _require_project(args.project) runs = SQLiteStorage.get_runs(args.project) configs = SQLiteStorage.get_all_run_configs(args.project) statuses = SQLiteStorage.get_run_statuses(args.project) alert_count = SQLiteStorage.get_alert_count(args.project) run_summaries = [] for run in runs: last_step = SQLiteStorage.get_last_step(args.project, run) num_logs = SQLiteStorage.get_log_count(args.project, run) metric_value = None if args.metric: values = SQLiteStorage.get_final_metric_for_runs( args.project, args.metric, mode="last", run_names=[run], status_filter=None, ) if values: metric_value = values[0]["value"] run_summaries.append( { "run": run, "status": statuses.get(run), "last_step": last_step, "num_logs": num_logs, "config": _public_config(configs.get(run, {})), "metric_value": metric_value, } ) summary_data = { "project": args.project, "num_runs": len(runs), "total_alerts": alert_count, "metric": args.metric, "runs": run_summaries, } if args.json: print(format_json(summary_data)) else: print(format_summary(summary_data)) elif args.command == "query": if args.query_type == "project": _handle_query(args) elif args.command == "skills": if args.skills_action == "add": _handle_skills_add(args) else: parser.print_help() def _handle_skills_add(args): import shutil from pathlib import Path CENTRAL_LOCAL = Path(".agents/skills") CENTRAL_GLOBAL = Path("~/.agents/skills") CLAUDE_LOCAL = Path(".claude/skills") CLAUDE_GLOBAL = Path("~/.claude/skills") SKILL_ID = "trackio" GITHUB_RAW = "https://raw.githubusercontent.com/gradio-app/trackio/main" SKILL_PREFIX = ".agents/skills/trackio" SKILL_FILES = [ "SKILL.md", "alerts.md", "logging_metrics.md", "retrieving_metrics.md", "storage_schema.md", ] if not (args.cursor or args.claude or args.codex or args.opencode or args.dest): error_exit( "Pick a destination via --cursor, --claude, --codex, --opencode, or --dest." ) def download(url: str) -> str: from huggingface_hub.utils import get_session try: response = get_session().get(url) response.raise_for_status() except Exception as e: error_exit( f"Failed to download {url}\n{e}\n\n" "Make sure you have internet access. The skill files are fetched from " "the Trackio GitHub repository." ) return response.text def remove_existing(path: Path, force: bool): if not (path.exists() or path.is_symlink()): return if not force: error_exit( f"Skill already exists at {path}.\nRe-run with --force to overwrite." ) if path.is_dir() and not path.is_symlink(): shutil.rmtree(path) else: path.unlink() def install_to(skills_dir: Path, force: bool) -> Path: skills_dir = skills_dir.expanduser().resolve() skills_dir.mkdir(parents=True, exist_ok=True) dest = skills_dir / SKILL_ID remove_existing(dest, force) dest.mkdir() for fname in SKILL_FILES: content = download(f"{GITHUB_RAW}/{SKILL_PREFIX}/{fname}") (dest / fname).write_text(content, encoding="utf-8") return dest def create_symlink( agent_skills_dir: Path, central_skill_path: Path, force: bool ) -> Path: agent_skills_dir = agent_skills_dir.expanduser().resolve() agent_skills_dir.mkdir(parents=True, exist_ok=True) link_path = agent_skills_dir / SKILL_ID remove_existing(link_path, force) link_path.symlink_to(os.path.relpath(central_skill_path, agent_skills_dir)) return link_path global_targets = { "cursor": Path("~/.cursor/skills"), "claude": CLAUDE_GLOBAL, "codex": Path("~/.codex/skills"), "opencode": Path("~/.opencode/skills"), } local_targets = { "cursor": Path(".cursor/skills"), "claude": CLAUDE_LOCAL, "codex": Path(".codex/skills"), "opencode": Path(".opencode/skills"), } targets_dict = global_targets if args.global_ else local_targets if args.dest: if args.cursor or args.claude or args.codex or args.opencode or args.global_: error_exit("--dest cannot be combined with agent flags or --global.") skill_dest = install_to(Path(args.dest), args.force) print(f"Installed '{SKILL_ID}' to {skill_dest}") return agent_targets = [] if args.cursor: agent_targets.append(targets_dict["cursor"]) if args.claude: agent_targets.append(targets_dict["claude"]) if args.codex: agent_targets.append(targets_dict["codex"]) if args.opencode: agent_targets.append(targets_dict["opencode"]) central_path = CENTRAL_GLOBAL if args.global_ else CENTRAL_LOCAL central_skill_path = install_to(central_path, args.force) print(f"Installed '{SKILL_ID}' to central location: {central_skill_path}") for agent_target in agent_targets: link_path = create_symlink(agent_target, central_skill_path, args.force) print(f"Created symlink: {link_path}") if __name__ == "__main__": main()