import importlib.metadata import io import json as json_mod import os import shutil import sys import tempfile import threading import time import warnings from collections import Counter from importlib.resources import files from pathlib import Path if sys.version_info >= (3, 11): import tomllib else: import tomli as tomllib import httpx import huggingface_hub from gradio_client import handle_file from httpx import ReadTimeout from huggingface_hub import Volume from huggingface_hub.errors import ( BucketNotFoundError, HfHubHTTPError, RepositoryNotFoundError, ) import trackio from trackio.bucket_storage import ( create_bucket_if_not_exists, export_from_bucket_for_static, upload_project_to_bucket, upload_project_to_bucket_for_static, ) from trackio.frontend_config import resolve_frontend_dir from trackio.remote_client import RemoteClient from trackio.sqlite_storage import SQLiteStorage from trackio.utils import ( MEDIA_DIR, get_or_create_project_hash, on_spaces, preprocess_space_and_dataset_ids, ) SPACE_HOST_URL = "https://{user_name}-{space_name}.hf.space/" SPACE_URL = "https://huggingface.co/spaces/{space_id}" _BOLD_ORANGE = "\033[1m\033[38;5;208m" _RESET = "\033[0m" def raise_if_space_is_frozen_for_logging(space_id: str) -> None: try: info = huggingface_hub.HfApi().space_info(space_id) except RepositoryNotFoundError: return if getattr(info, "sdk", None) == "static": raise RuntimeError( f"Cannot log to Hugging Face Space '{space_id}' because it has been frozen " f"(it uses the static SDK: a read-only dashboard with no live Trackio server).\n\n" f"Use a different space_id for training, or create a new Gradio Trackio Space. " f"Freezing converts a live Gradio Space to static after a run; a frozen Space " f'cannot accept new logs. See trackio.sync(..., sdk="static") in the Trackio docs.' ) def _readme_linked_hub_yaml(dataset_id: str | None) -> str: if dataset_id is not None: return f"datasets:\n - {dataset_id}\n" return "" _CUSTOM_SPACE_FRONTEND_DIR = "trackio_custom_frontend" def _space_app_py(frontend_dir: str | None = None) -> str: if frontend_dir is None: return "import trackio\ntrackio.show()\n" return f'import trackio\ntrackio.show(frontend_dir="{frontend_dir}")\n' def _upload_frontend_folder( hf_api: huggingface_hub.HfApi, *, repo_id: str, repo_type: str, folder_path: str | Path, path_in_repo: str | None = None, ) -> None: kwargs = { "repo_id": repo_id, "repo_type": repo_type, "folder_path": str(folder_path), } if path_in_repo is not None: kwargs["path_in_repo"] = path_in_repo hf_api.upload_folder(**kwargs) def _retry_hf_write(op_name: str, fn, retries: int = 4, initial_delay: float = 1.5): delay = initial_delay for attempt in range(1, retries + 1): try: return fn() except ReadTimeout: if attempt == retries: raise print( f"* {op_name} timed out (attempt {attempt}/{retries}). Retrying in {delay:.1f}s..." ) time.sleep(delay) delay = min(delay * 2, 12) except HfHubHTTPError as e: status = e.response.status_code if e.response is not None else None if status is None or status < 500 or attempt == retries: raise print( f"* {op_name} failed with HTTP {status} (attempt {attempt}/{retries}). Retrying in {delay:.1f}s..." ) time.sleep(delay) delay = min(delay * 2, 12) def _get_space_volumes( space_id: str, hf_api: huggingface_hub.HfApi | None = None ) -> list[Volume]: """ Return mounted volumes for a Space. `HfApi.get_space_runtime()` does not always populate `volumes`, even when the mount exists. Fall back to `space_info().runtime.volumes`, which currently carries the volume metadata for running Spaces. """ hf_api = hf_api or huggingface_hub.HfApi() runtime = hf_api.get_space_runtime(space_id) if runtime.volumes: return list(runtime.volumes) info = hf_api.space_info(space_id) if info.runtime and info.runtime.volumes: return list(info.runtime.volumes) return [] def _get_space_bucket_at_data_mount( space_id: str, hf_api: huggingface_hub.HfApi | None = None ) -> str | None: for volume in _get_space_volumes(space_id, hf_api=hf_api): if volume.type == "bucket" and volume.mount_path == "/data": return volume.source return None def _get_existing_space_bucket( space_id: str, hf_api: huggingface_hub.HfApi | None = None ) -> str | None: """Return the Trackio bucket for a Space, preferring the canonical /data mount.""" bucket_at_data = _get_space_bucket_at_data_mount(space_id, hf_api=hf_api) if bucket_at_data is not None: return bucket_at_data for volume in _get_space_volumes(space_id, hf_api=hf_api): if volume.type == "bucket": return volume.source return None def _get_existing_static_space_bucket( space_id: str, hf_api: huggingface_hub.HfApi | None = None ) -> str | None: hf_api = hf_api or huggingface_hub.HfApi() try: config_path = hf_api.hf_hub_download( repo_id=space_id, repo_type="space", filename="config.json", ) except (FileNotFoundError, HfHubHTTPError, OSError, ValueError): return None try: with open(config_path, encoding="utf-8") as config_file: config = json_mod.load(config_file) except (OSError, ValueError, TypeError): return None bucket_id = config.get("bucket_id") if isinstance(bucket_id, str) and bucket_id: return bucket_id return None def _ensure_bucket_mounted_at_data( space_id: str, bucket_id: str, hf_api: huggingface_hub.HfApi | None = None, ) -> None: hf_api = hf_api or huggingface_hub.HfApi() existing = _get_space_volumes(space_id, hf_api=hf_api) already_mounted = any( v.type == "bucket" and v.source == bucket_id and v.mount_path == "/data" for v in existing ) if not already_mounted: preserved = [ v for v in existing if not ( v.type == "bucket" and (v.source == bucket_id or v.mount_path == "/data") ) ] hf_api.set_space_volumes( space_id, preserved + [Volume(type="bucket", source=bucket_id, mount_path="/data")], ) print(f"* Attached bucket {bucket_id} at '/data'") existing_variables = hf_api.get_space_variables(space_id) current_trackio_dir = getattr(existing_variables.get("TRACKIO_DIR"), "value", None) if current_trackio_dir != "/data/trackio": huggingface_hub.add_space_variable(space_id, "TRACKIO_DIR", "/data/trackio") current_bucket_id = getattr( existing_variables.get("TRACKIO_BUCKET_ID"), "value", None ) if current_bucket_id != bucket_id: huggingface_hub.add_space_variable(space_id, "TRACKIO_BUCKET_ID", bucket_id) def _bucket_exists(bucket_id: str, hf_api: huggingface_hub.HfApi | None = None) -> bool: hf_api = hf_api or huggingface_hub.HfApi() try: hf_api.bucket_info(bucket_id) return True except BucketNotFoundError: return False def _find_available_bucket_id( preferred_bucket_id: str, hf_api: huggingface_hub.HfApi | None = None ) -> str: hf_api = hf_api or huggingface_hub.HfApi() if not _bucket_exists(preferred_bucket_id, hf_api): return preferred_bucket_id suffix = 2 while True: candidate = f"{preferred_bucket_id}-{suffix}" if not _bucket_exists(candidate, hf_api): return candidate suffix += 1 def resolve_auto_bucket_id( space_id: str, preferred_bucket_id: str, hf_api: huggingface_hub.HfApi | None = None, ) -> str: """ Resolve the bucket to use for an auto-generated bucket ID. Rules: - Existing Space with a bucket mounted at /data -> reuse that bucket. - Existing static Space with a bucket_id in config.json -> reuse that bucket. - Otherwise -> use the preferred auto bucket ID if free, or a suffixed variant. """ hf_api = hf_api or huggingface_hub.HfApi() try: info = hf_api.space_info(space_id) except RepositoryNotFoundError: pass else: existing_bucket_id = _get_existing_space_bucket(space_id, hf_api=hf_api) if existing_bucket_id is None and getattr(info, "sdk", None) == "static": existing_bucket_id = _get_existing_static_space_bucket( space_id, hf_api=hf_api ) if existing_bucket_id is not None: return existing_bucket_id bucket_id = _find_available_bucket_id(preferred_bucket_id, hf_api) if bucket_id != preferred_bucket_id: print( f"* Auto-generated bucket {preferred_bucket_id} already exists; " f"using {bucket_id} instead" ) return bucket_id def _get_source_install_dependencies() -> str: """Get trackio dependencies from pyproject.toml for source installs.""" trackio_path = files("trackio") pyproject_path = Path(trackio_path).parent / "pyproject.toml" with open(pyproject_path, "rb") as f: pyproject = tomllib.load(f) deps = pyproject["project"]["dependencies"] spaces_deps = ( pyproject["project"].get("optional-dependencies", {}).get("spaces", []) ) mcp_deps = pyproject["project"].get("optional-dependencies", {}).get("mcp", []) return "\n".join(deps + spaces_deps + mcp_deps) def _get_space_install_requirement() -> str: return f"trackio[spaces,mcp]=={trackio.__version__}" def _is_trackio_installed_from_source() -> bool: """Check if trackio is installed from source/editable install vs PyPI.""" try: trackio_file = trackio.__file__ if "site-packages" not in trackio_file and "dist-packages" not in trackio_file: return True dist = importlib.metadata.distribution("trackio") if dist.files: files = list(dist.files) has_pth = any(".pth" in str(f) for f in files) if has_pth: return True return False except ( AttributeError, importlib.metadata.PackageNotFoundError, importlib.metadata.MetadataError, ValueError, TypeError, ): return True def deploy_as_space( space_id: str, space_storage: huggingface_hub.SpaceStorage | None = None, dataset_id: str | None = None, bucket_id: str | None = None, private: bool | None = None, frontend_dir: str | Path | None = None, ): if on_spaces(): # in case a repo with this function is uploaded to spaces return if dataset_id is not None and bucket_id is not None: raise ValueError( "Cannot use bucket volume options together with dataset_id; use one persistence mode." ) trackio_path = files("trackio") hf_api = huggingface_hub.HfApi() try: huggingface_hub.create_repo( space_id, private=private, space_sdk="gradio", space_storage=space_storage, repo_type="space", exist_ok=True, ) except HfHubHTTPError as e: if e.response.status_code in [401, 403]: # unauthorized or forbidden print("Need 'write' access token to create a Spaces repo.") huggingface_hub.login(add_to_git_credential=False) huggingface_hub.create_repo( space_id, private=private, space_sdk="gradio", space_storage=space_storage, repo_type="space", exist_ok=True, ) else: raise ValueError(f"Failed to create Space: {e}") # We can assume huggingface-hub is available; requirements.txt pins trackio. # Make sure necessary dependencies are installed by creating a requirements.txt. is_source_install = _is_trackio_installed_from_source() resolved_frontend = resolve_frontend_dir(frontend_dir, announce=True) if bucket_id is not None: create_bucket_if_not_exists(bucket_id, private=private) with open(Path(trackio_path, "README.md"), "r", encoding="utf-8") as f: readme_content = f.read() readme_content = readme_content.replace("sdk_version: {GRADIO_VERSION}\n", "") readme_content = readme_content.replace("{APP_FILE}", "app.py") readme_content = readme_content.replace( "{LINKED_HUB_METADATA}", _readme_linked_hub_yaml(dataset_id) ) readme_buffer = io.BytesIO(readme_content.encode("utf-8")) hf_api.upload_file( path_or_fileobj=readme_buffer, path_in_repo="README.md", repo_id=space_id, repo_type="space", ) if is_source_install: requirements_content = _get_source_install_dependencies() else: requirements_content = _get_space_install_requirement() requirements_buffer = io.BytesIO(requirements_content.encode("utf-8")) hf_api.upload_file( path_or_fileobj=requirements_buffer, path_in_repo="requirements.txt", repo_id=space_id, repo_type="space", ) huggingface_hub.utils.disable_progress_bars() if is_source_install: dist_index = ( Path(trackio.__file__).resolve().parent / "frontend" / "dist" / "index.html" ) if not dist_index.is_file() and not resolved_frontend.is_custom: raise ValueError( "The Trackio frontend build is missing. From the repository root run " "`cd trackio/frontend && npm ci && npm run build`, then deploy again." ) hf_api.upload_folder( repo_id=space_id, repo_type="space", folder_path=trackio_path, path_in_repo="trackio", ignore_patterns=[ "README.md", "frontend/node_modules/**", "frontend/src/**", "frontend/.gitignore", "frontend/package.json", "frontend/package-lock.json", "frontend/vite.config.js", "frontend/svelte.config.js", "**/__pycache__/**", "*.pyc", ], ) if resolved_frontend.is_custom: _upload_frontend_folder( hf_api, repo_id=space_id, repo_type="space", folder_path=resolved_frontend.path, path_in_repo=_CUSTOM_SPACE_FRONTEND_DIR, ) app_file_content = _space_app_py( _CUSTOM_SPACE_FRONTEND_DIR if resolved_frontend.is_custom else None ) app_file_buffer = io.BytesIO(app_file_content.encode("utf-8")) hf_api.upload_file( path_or_fileobj=app_file_buffer, path_in_repo="app.py", repo_id=space_id, repo_type="space", ) if hf_token := huggingface_hub.utils.get_token(): huggingface_hub.add_space_secret(space_id, "HF_TOKEN", hf_token) if bucket_id is not None: _ensure_bucket_mounted_at_data(space_id, bucket_id, hf_api) elif dataset_id is not None: huggingface_hub.add_space_variable(space_id, "TRACKIO_DATASET_ID", dataset_id) if logo_light_url := os.environ.get("TRACKIO_LOGO_LIGHT_URL"): huggingface_hub.add_space_variable( space_id, "TRACKIO_LOGO_LIGHT_URL", logo_light_url ) if logo_dark_url := os.environ.get("TRACKIO_LOGO_DARK_URL"): huggingface_hub.add_space_variable( space_id, "TRACKIO_LOGO_DARK_URL", logo_dark_url ) if plot_order := os.environ.get("TRACKIO_PLOT_ORDER"): huggingface_hub.add_space_variable(space_id, "TRACKIO_PLOT_ORDER", plot_order) if theme := os.environ.get("TRACKIO_THEME"): huggingface_hub.add_space_variable(space_id, "TRACKIO_THEME", theme) huggingface_hub.add_space_variable(space_id, "GRADIO_MCP_SERVER", "True") def create_space_if_not_exists( space_id: str, space_storage: huggingface_hub.SpaceStorage | None = None, dataset_id: str | None = None, bucket_id: str | None = None, private: bool | None = None, frontend_dir: str | Path | None = None, ) -> None: """ Creates a new Hugging Face Space if it does not exist. Args: space_id (`str`): The ID of the Space to create. space_storage ([`~huggingface_hub.SpaceStorage`], *optional*): Choice of persistent storage tier for the Space. dataset_id (`str`, *optional*): Deprecated. Use `bucket_id` instead. bucket_id (`str`, *optional*): Full Hub bucket id (`namespace/name`) to attach via the Hub volumes API (platform mount). Sets `TRACKIO_DIR` to the mount path. private (`bool`, *optional*): Whether to make the Space private. If `None` (default), the repo will be public unless the organization's default is private. This value is ignored if the repo already exists. """ if "/" not in space_id: raise ValueError( f"Invalid space ID: {space_id}. Must be in the format: username/reponame or orgname/reponame." ) if dataset_id is not None and "/" not in dataset_id: raise ValueError( f"Invalid dataset ID: {dataset_id}. Must be in the format: username/datasetname or orgname/datasetname." ) if bucket_id is not None and "/" not in bucket_id: raise ValueError( f"Invalid bucket ID: {bucket_id}. Must be in the format: username/bucketname or orgname/bucketname." ) try: huggingface_hub.repo_info(space_id, repo_type="space") print( f"* Found existing space: {_BOLD_ORANGE}{SPACE_URL.format(space_id=space_id)}{_RESET}" ) if bucket_id is not None: create_bucket_if_not_exists(bucket_id, private=private) _ensure_bucket_mounted_at_data(space_id, bucket_id) elif dataset_id is not None: huggingface_hub.add_space_variable( space_id, "TRACKIO_DATASET_ID", dataset_id ) resolved_frontend = resolve_frontend_dir(frontend_dir, announce=False) if resolved_frontend.is_custom: deploy_as_space( space_id, space_storage, dataset_id, bucket_id, private, frontend_dir=frontend_dir, ) return except RepositoryNotFoundError: pass except HfHubHTTPError as e: if e.response.status_code in [401, 403]: # unauthorized or forbidden print("Need 'write' access token to create a Spaces repo.") huggingface_hub.login(add_to_git_credential=False) else: raise ValueError(f"Failed to create Space: {e}") print( f"* Creating new space: {_BOLD_ORANGE}{SPACE_URL.format(space_id=space_id)}{_RESET}" ) deploy_as_space( space_id, space_storage, dataset_id, bucket_id, private, frontend_dir=frontend_dir, ) print("* Waiting for Space to be ready...") _wait_until_space_running(space_id) def _wait_until_space_running(space_id: str, timeout: int = 300) -> None: hf_api = huggingface_hub.HfApi() start = time.time() delay = 2 request_timeout = 45.0 failure_stages = frozenset( ("NO_APP_FILE", "CONFIG_ERROR", "BUILD_ERROR", "RUNTIME_ERROR") ) while time.time() - start < timeout: try: info = hf_api.space_info(space_id, timeout=request_timeout) if info.runtime: stage = str(info.runtime.stage) if stage in failure_stages: raise RuntimeError( f"Space {space_id} entered terminal stage {stage}. " "Fix README.md or app files; see build logs on the Hub." ) if stage == "RUNNING": return except RuntimeError: raise except (huggingface_hub.utils.HfHubHTTPError, httpx.RequestError): pass time.sleep(delay) delay = min(delay * 1.5, 15) raise TimeoutError( f"Space {space_id} did not reach RUNNING within {timeout}s. " "Check status and build logs on the Hub." ) def wait_until_space_exists( space_id: str, ) -> None: """ Blocks the current thread until the Space exists. Args: space_id (`str`): The ID of the Space to wait for. Raises: `TimeoutError`: If waiting for the Space takes longer than expected. """ hf_api = huggingface_hub.HfApi() delay = 1 for _ in range(30): try: hf_api.space_info(space_id) return except (huggingface_hub.utils.HfHubHTTPError, httpx.RequestError): time.sleep(delay) delay = min(delay * 2, 60) raise TimeoutError("Waiting for space to exist took longer than expected") def upload_db_to_space(project: str, space_id: str, force: bool = False) -> None: """ Uploads the database of a local Trackio project to a Hugging Face Space. This uses the Trackio remote client so newer Trackio Spaces can speak the direct HTTP API while older Gradio-based Spaces still work through `gradio_client`. Args: project (`str`): The name of the project to upload. space_id (`str`): The ID of the Space to upload to. force (`bool`, *optional*, defaults to `False`): If `True`, overwrites the existing database without prompting. If `False`, prompts for confirmation. """ db_path = SQLiteStorage.get_project_db_path(project) client = RemoteClient( space_id, hf_token=huggingface_hub.utils.get_token(), httpx_kwargs={"timeout": 90}, ) if not force: try: existing_projects = client.predict(api_name="/get_all_projects") if project in existing_projects: response = input( f"Database for project '{project}' already exists on Space '{space_id}'. " f"Overwrite it? (y/N): " ) if response.lower() not in ["y", "yes"]: print("* Upload cancelled.") return except Exception as e: print(f"* Warning: Could not check if project exists on Space: {e}") print("* Proceeding with upload...") client.predict( api_name="/upload_db_to_space", project=project, uploaded_db=handle_file(db_path), hf_token=huggingface_hub.utils.get_token(), ) SYNC_BATCH_SIZE = 500 def sync_incremental( project: str, space_id: str, private: bool | None = None, pending_only: bool = False, frontend_dir: str | Path | None = None, ) -> None: """ Syncs a local Trackio project to a Space via the bulk_log API endpoints instead of uploading the entire DB file. Supports incremental sync. Args: project: The name of the project to sync. space_id: The HF Space ID to sync to. private: Whether to make the Space private if creating. pending_only: If True, only sync rows tagged with space_id (pending data). """ print( f"* Syncing project '{project}' to: {SPACE_URL.format(space_id=space_id)} (please wait...)" ) create_space_if_not_exists(space_id, private=private, frontend_dir=frontend_dir) wait_until_space_exists(space_id) hf_token = huggingface_hub.utils.get_token() expected_run_counts: Counter[str] = Counter() client = RemoteClient( space_id, hf_token=hf_token, httpx_kwargs={"timeout": 90}, ) if pending_only: pending_logs = SQLiteStorage.get_pending_logs(project) if pending_logs: logs = pending_logs["logs"] expected_run_counts.update(log["run"] for log in logs) for i in range(0, len(logs), SYNC_BATCH_SIZE): batch = logs[i : i + SYNC_BATCH_SIZE] print( f" Syncing metrics: {min(i + SYNC_BATCH_SIZE, len(logs))}/{len(logs)}..." ) client.predict(api_name="/bulk_log", logs=batch, hf_token=hf_token) SQLiteStorage.clear_pending_logs(project, pending_logs["ids"]) pending_sys = SQLiteStorage.get_pending_system_logs(project) if pending_sys: logs = pending_sys["logs"] for i in range(0, len(logs), SYNC_BATCH_SIZE): batch = logs[i : i + SYNC_BATCH_SIZE] print( f" Syncing system metrics: {min(i + SYNC_BATCH_SIZE, len(logs))}/{len(logs)}..." ) client.predict( api_name="/bulk_log_system", logs=batch, hf_token=hf_token ) SQLiteStorage.clear_pending_system_logs(project, pending_sys["ids"]) pending_uploads = SQLiteStorage.get_pending_uploads(project) if pending_uploads: upload_entries = [] for u in pending_uploads["uploads"]: fp = u["file_path"] if os.path.exists(fp): upload_entries.append( { "project": u["project"], "run": u["run"], "step": u["step"], "relative_path": u["relative_path"], "uploaded_file": handle_file(fp), } ) if upload_entries: print(f" Syncing {len(upload_entries)} media files...") client.predict( api_name="/bulk_upload_media", uploads=upload_entries, hf_token=hf_token, ) SQLiteStorage.clear_pending_uploads(project, pending_uploads["ids"]) else: all_logs = SQLiteStorage.get_all_logs_for_sync(project) if all_logs: expected_run_counts.update(log["run"] for log in all_logs) for i in range(0, len(all_logs), SYNC_BATCH_SIZE): batch = all_logs[i : i + SYNC_BATCH_SIZE] print( f" Syncing metrics: {min(i + SYNC_BATCH_SIZE, len(all_logs))}/{len(all_logs)}..." ) client.predict(api_name="/bulk_log", logs=batch, hf_token=hf_token) all_sys_logs = SQLiteStorage.get_all_system_logs_for_sync(project) if all_sys_logs: for i in range(0, len(all_sys_logs), SYNC_BATCH_SIZE): batch = all_sys_logs[i : i + SYNC_BATCH_SIZE] print( f" Syncing system metrics: {min(i + SYNC_BATCH_SIZE, len(all_sys_logs))}/{len(all_sys_logs)}..." ) client.predict( api_name="/bulk_log_system", logs=batch, hf_token=hf_token ) _wait_for_remote_sync(client, project, expected_run_counts) SQLiteStorage.set_project_metadata(project, "space_id", space_id) print( f"* Synced successfully to space: {_BOLD_ORANGE}{SPACE_URL.format(space_id=space_id)}{_RESET}" ) def _build_remote_client_with_retry( space_id: str, timeout: int = 360, verbose: bool = False, ) -> RemoteClient: deadline = time.time() + timeout delay = 2 last_error: Exception | None = None while time.time() < deadline: try: return RemoteClient(space_id, verbose=verbose, httpx_kwargs={"timeout": 90}) except (ValueError, ConnectionError) as e: last_error = e time.sleep(delay) delay = min(delay * 1.5, 15) raise ConnectionError( f"Could not connect to Space '{space_id}' within {timeout}s: {last_error}" ) def _wait_for_remote_sync( client: RemoteClient, project: str, expected_run_counts: Counter[str], timeout: int = 180, ) -> None: if not expected_run_counts: return deadline = time.time() + timeout delay = 2 last_error: Exception | None = None pending = dict(expected_run_counts) while time.time() < deadline and pending: completed = [] for run_name, expected_num_logs in pending.items(): try: summary = client.predict( project=project, run=run_name, api_name="/get_run_summary" ) if summary.get("num_logs") == expected_num_logs: completed.append(run_name) except Exception as e: last_error = e for run_name in completed: pending.pop(run_name, None) if pending: time.sleep(delay) delay = min(delay * 1.5, 15) if pending: raise TimeoutError( f"Remote sync for project '{project}' did not become visible for runs " f"{sorted(pending.items())} within {timeout}s. " f"Last error: {last_error!r}" ) def upload_dataset_for_static( project: str, dataset_id: str, private: bool | None = None, ) -> None: hf_api = huggingface_hub.HfApi() try: huggingface_hub.create_repo( dataset_id, private=private, repo_type="dataset", exist_ok=True, ) except HfHubHTTPError as e: if e.response.status_code in [401, 403]: print("Need 'write' access token to create a Dataset repo.") huggingface_hub.login(add_to_git_credential=False) huggingface_hub.create_repo( dataset_id, private=private, repo_type="dataset", exist_ok=True, ) else: raise ValueError(f"Failed to create Dataset: {e}") with tempfile.TemporaryDirectory() as tmp_dir: output_dir = Path(tmp_dir) SQLiteStorage.export_for_static_space(project, output_dir) media_dir = MEDIA_DIR / project if media_dir.exists(): dest = output_dir / "media" shutil.copytree(media_dir, dest) _retry_hf_write( "Dataset upload", lambda: hf_api.upload_folder( repo_id=dataset_id, repo_type="dataset", folder_path=str(output_dir), ), ) print(f"* Dataset uploaded: https://huggingface.co/datasets/{dataset_id}") def deploy_as_static_space( space_id: str, dataset_id: str | None, project: str, bucket_id: str | None = None, private: bool | None = None, hf_token: str | None = None, frontend_dir: str | Path | None = None, ) -> None: if on_spaces(): return if private is True: raise ValueError( "private=True is not supported for static Trackio Spaces. Static Spaces " "run entirely in the browser, so their snapshot data must be public. " "Use sdk='gradio' for a private dashboard." ) hf_api = huggingface_hub.HfApi() try: huggingface_hub.create_repo( space_id, private=False, space_sdk="static", repo_type="space", exist_ok=True, ) except HfHubHTTPError as e: if e.response.status_code in [401, 403]: print("Need 'write' access token to create a Spaces repo.") huggingface_hub.login(add_to_git_credential=False) huggingface_hub.create_repo( space_id, private=False, space_sdk="static", repo_type="space", exist_ok=True, ) else: raise ValueError(f"Failed to create Space: {e}") linked = _readme_linked_hub_yaml(dataset_id) readme_content = ( f"---\nemoji: 🎯\nsdk: static\npinned: false\ntags:\n - trackio\n{linked}---\n" ) _retry_hf_write( "Static Space README upload", lambda: hf_api.upload_file( path_or_fileobj=io.BytesIO(readme_content.encode("utf-8")), path_in_repo="README.md", repo_id=space_id, repo_type="space", ), ) resolved_frontend = resolve_frontend_dir(frontend_dir, announce=True) _retry_hf_write( "Static Space frontend upload", lambda: hf_api.upload_folder( repo_id=space_id, repo_type="space", folder_path=str(resolved_frontend.path), ), ) config = { "mode": "static", "project": project, "private": bool(private), } if bucket_id is not None: config["bucket_id"] = bucket_id if dataset_id is not None: config["dataset_id"] = dataset_id if hf_token is not None: warnings.warn( "`hf_token` is ignored by deploy_as_static_space() for static Space " "deployment and will be removed in a future release.", DeprecationWarning, stacklevel=2, ) _retry_hf_write( "Static Space config upload", lambda: hf_api.upload_file( path_or_fileobj=io.BytesIO(json_mod.dumps(config).encode("utf-8")), path_in_repo="config.json", repo_id=space_id, repo_type="space", ), ) assets_dir = Path(trackio.__file__).resolve().parent / "assets" if assets_dir.is_dir(): _retry_hf_write( "Static Space assets upload", lambda: hf_api.upload_folder( repo_id=space_id, repo_type="space", folder_path=str(assets_dir), path_in_repo="assets", ), ) print( f"* Static Space deployed: {_BOLD_ORANGE}{SPACE_URL.format(space_id=space_id)}{_RESET}" ) def sync( project: str, space_id: str | None = None, private: bool | None = None, force: bool = False, run_in_background: bool = False, sdk: str = "gradio", dataset_id: str | None = None, bucket_id: str | None = None, frontend_dir: str | Path | None = None, ) -> str: """ Syncs a local Trackio project's database to a Hugging Face Space. If the Space does not exist, it will be created. Local data is never deleted. **Freezing:** Passing ``sdk="static"`` deploys a static Space backed by an HF Bucket (read-only dashboard, no Gradio server). You can sync the same project again later to refresh that static Space. If you want a one-time snapshot of an existing Gradio Space, use ``freeze()`` instead. Args: project (`str`): The name of the project to upload. space_id (`str`, *optional*): The ID of the Space to upload to (e.g., `"username/space_id"`). If not provided, checks project metadata first, then generates a random space_id. private (`bool`, *optional*): Whether to make the Space private. If None (default), the repo will be public unless the organization's default is private. This value is ignored if the repo already exists. Not supported with ``sdk="static"`` because static Trackio dashboards read snapshot data directly from the browser. force (`bool`, *optional*, defaults to `False`): If `True`, overwrite the existing database without prompting for confirmation. If `False`, prompt the user before overwriting an existing database. run_in_background (`bool`, *optional*, defaults to `False`): If `True`, the Space creation and database upload will be run in a background thread. If `False`, all the steps will be run synchronously. sdk (`str`, *optional*, defaults to `"gradio"`): The type of Space to deploy. `"gradio"` deploys a Gradio Space with a live server. `"static"` freezes the Space: deploys a static Space that reads from an HF Bucket (no server needed). dataset_id (`str`, *optional*): Deprecated. Use `bucket_id` instead. bucket_id (`str`, *optional*): The ID of the HF Bucket to sync to. By default, a bucket is auto-generated from the space_id. Returns: `str`: The Space ID of the synced project. """ if sdk not in ("gradio", "static"): raise ValueError(f"sdk must be 'gradio' or 'static', got '{sdk}'") if sdk == "static" and private is True: raise ValueError( "private=True is not supported for static Trackio Spaces. Static Spaces " "run entirely in the browser, so their snapshot data must be public. " "Use sdk='gradio' for a private dashboard." ) bucket_id_was_explicit = bucket_id is not None if space_id is None: space_id = SQLiteStorage.get_space_id(project) if space_id is None: space_id = f"{project}-{get_or_create_project_hash(project)}" space_id, dataset_id, bucket_id = preprocess_space_and_dataset_ids( space_id, dataset_id, bucket_id ) if dataset_id is None and bucket_id is not None and not bucket_id_was_explicit: bucket_id = resolve_auto_bucket_id(space_id, bucket_id) def _do_sync(): try: info = huggingface_hub.HfApi().space_info(space_id) existing_sdk = info.sdk if existing_sdk and existing_sdk != sdk: raise ValueError( f"Space '{space_id}' is a '{existing_sdk}' Space but sdk='{sdk}' was requested. " f"The sdk must match the existing Space type." ) except RepositoryNotFoundError: pass if sdk == "static": if dataset_id is not None: upload_dataset_for_static(project, dataset_id, private=False) deploy_as_static_space( space_id, dataset_id, project, private=False, frontend_dir=frontend_dir, ) elif bucket_id is not None: create_bucket_if_not_exists(bucket_id, private=False) upload_project_to_bucket_for_static(project, bucket_id) print( f"* Project data uploaded to bucket: https://huggingface.co/buckets/{bucket_id}" ) deploy_as_static_space( space_id, None, project, bucket_id=bucket_id, private=False, frontend_dir=frontend_dir, ) else: if bucket_id is not None: create_bucket_if_not_exists(bucket_id, private=private) upload_project_to_bucket(project, bucket_id) print( f"* Project data uploaded to bucket: https://huggingface.co/buckets/{bucket_id}" ) create_space_if_not_exists( space_id, bucket_id=bucket_id, private=private, frontend_dir=frontend_dir, ) _wait_until_space_running(space_id) _wait_for_remote_sync( _build_remote_client_with_retry(space_id), project, Counter( log["run"] for log in SQLiteStorage.get_all_logs_for_sync(project) ), ) else: sync_incremental( project, space_id, private=private, pending_only=False, frontend_dir=frontend_dir, ) SQLiteStorage.set_project_metadata(project, "space_id", space_id) if run_in_background: threading.Thread(target=_do_sync).start() else: _do_sync() return space_id def _get_source_bucket(space_id: str) -> str: bucket_id = _get_existing_space_bucket(space_id) if bucket_id is not None: _ensure_bucket_mounted_at_data(space_id, bucket_id) return bucket_id raise ValueError( f"Space '{space_id}' has no bucket mounted at '/data'. " f"freeze() requires the source Space to use bucket storage." ) def freeze( space_id: str, project: str, new_space_id: str | None = None, private: bool | None = None, bucket_id: str | None = None, frontend_dir: str | Path | None = None, ) -> str: """ Creates a new static Hugging Face Space containing a read-only snapshot of the data for the specified project from the source Gradio Space. The data is read from the bucket attached to the source Space at freeze time. The original Space is not modified, and the new static Space does not automatically reflect metrics uploaded to the original Gradio Space after the freeze completes. Args: space_id (`str`): The ID of the source Gradio Space (e.g., `"username/my-space"` or a short repo name with the logged-in namespace inferred, like `init()`). Must be a Gradio Space with a bucket mounted at `/data`. project (`str`): The name of the project whose data to include in the frozen Space. new_space_id (`str`, *optional*): The ID for the new static Space. If not provided, defaults to `"{space_id}_static"`. private (`bool`, *optional*): Not supported. Frozen static dashboards read snapshot data directly from the browser, so the destination snapshot must be public. bucket_id (`str`, *optional*): The ID of the HF Bucket for the new static Space's data storage. If not provided, one is auto-generated from the new Space ID. Returns: `str`: The Space ID of the newly created static Space. """ if private is True: raise ValueError( "private=True is not supported for frozen static Trackio Spaces. Static " "Spaces run entirely in the browser, so their snapshot data must be " "public. Use a Gradio Space if the frozen dashboard must stay private." ) space_id, _, _ = preprocess_space_and_dataset_ids(space_id, None, None) try: info = huggingface_hub.HfApi().space_info(space_id) if info.sdk != "gradio": raise ValueError( f"Space '{space_id}' is not a Gradio Space (sdk='{info.sdk}'). " f"freeze() requires a Gradio Space as the source." ) except RepositoryNotFoundError: raise ValueError( f"Space '{space_id}' not found. Provide an existing Gradio Space ID." ) source_bucket_id = _get_source_bucket(space_id) print(f"* Reading project data from bucket: {source_bucket_id}") bucket_id_was_explicit = bucket_id is not None if new_space_id is None: new_space_id = f"{space_id}_static" new_space_id, _dataset_id, bucket_id = preprocess_space_and_dataset_ids( new_space_id, None, bucket_id ) if bucket_id is not None and not bucket_id_was_explicit: bucket_id = resolve_auto_bucket_id(new_space_id, bucket_id) hf_api = huggingface_hub.HfApi() try: dest_info = hf_api.space_info(new_space_id) tags = dest_info.tags or [] if dest_info.sdk != "static" or "trackio" not in tags: raise ValueError( f"Space '{new_space_id}' already exists and is not a Trackio static Space " f"(sdk='{dest_info.sdk}', tags={tags}). Choose a different new_space_id " f"or delete the existing Space first." ) except RepositoryNotFoundError: pass create_bucket_if_not_exists(bucket_id, private=False) export_from_bucket_for_static(source_bucket_id, bucket_id, project) print( f"* Project data uploaded to bucket: https://huggingface.co/buckets/{bucket_id}" ) deploy_as_static_space( new_space_id, None, project, bucket_id=bucket_id, private=False, frontend_dir=frontend_dir, ) return new_space_id