Spaces:
Running
Running
| # Copyright 2025 The HuggingFace Team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """Contains commands to interact with jobs on the Hugging Face Hub. | |
| Usage: | |
| # run a job | |
| hf jobs run <image> <command> | |
| # List running or completed jobs | |
| hf jobs ps [-a] [-f key=value] [--format TEMPLATE] | |
| # Stream logs from a job | |
| hf jobs logs <job-id> | |
| # Inspect detailed information about a job | |
| hf jobs inspect <job-id> | |
| # Cancel a running job | |
| hf jobs cancel <job-id> | |
| """ | |
| import json | |
| import os | |
| import re | |
| from argparse import Namespace, _SubParsersAction | |
| from dataclasses import asdict | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Union | |
| import requests | |
| from huggingface_hub import HfApi, SpaceHardware, get_token | |
| from huggingface_hub.utils import logging | |
| from huggingface_hub.utils._dotenv import load_dotenv | |
| from . import BaseHuggingfaceCLICommand | |
| logger = logging.get_logger(__name__) | |
| SUGGESTED_FLAVORS = [item.value for item in SpaceHardware if item.value != "zero-a10g"] | |
| class JobsCommands(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction): | |
| jobs_parser = parser.add_parser("jobs", help="Run and manage Jobs on the Hub.") | |
| jobs_subparsers = jobs_parser.add_subparsers(help="huggingface.co jobs related commands") | |
| # Show help if no subcommand is provided | |
| jobs_parser.set_defaults(func=lambda args: jobs_parser.print_help()) | |
| # Register commands | |
| InspectCommand.register_subcommand(jobs_subparsers) | |
| LogsCommand.register_subcommand(jobs_subparsers) | |
| PsCommand.register_subcommand(jobs_subparsers) | |
| RunCommand.register_subcommand(jobs_subparsers) | |
| CancelCommand.register_subcommand(jobs_subparsers) | |
| UvCommand.register_subcommand(jobs_subparsers) | |
| ScheduledJobsCommands.register_subcommand(jobs_subparsers) | |
| class RunCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("run", help="Run a Job") | |
| run_parser.add_argument("image", type=str, help="The Docker image to use.") | |
| run_parser.add_argument("-e", "--env", action="append", help="Set environment variables. E.g. --env ENV=value") | |
| run_parser.add_argument( | |
| "-s", | |
| "--secrets", | |
| action="append", | |
| help=( | |
| "Set secret environment variables. E.g. --secrets SECRET=value " | |
| "or `--secrets HF_TOKEN` to pass your Hugging Face token." | |
| ), | |
| ) | |
| run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.") | |
| run_parser.add_argument("--secrets-file", type=str, help="Read in a file of secret environment variables.") | |
| run_parser.add_argument( | |
| "--flavor", | |
| type=str, | |
| help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.", | |
| ) | |
| run_parser.add_argument( | |
| "--timeout", | |
| type=str, | |
| help="Max duration: int/float with s (seconds, default), m (minutes), h (hours) or d (days).", | |
| ) | |
| run_parser.add_argument( | |
| "-d", | |
| "--detach", | |
| action="store_true", | |
| help="Run the Job in the background and print the Job ID.", | |
| ) | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the Job will be created. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", | |
| type=str, | |
| help="A User Access Token generated from https://huggingface.co/settings/tokens", | |
| ) | |
| run_parser.add_argument("command", nargs="...", help="The command to run.") | |
| run_parser.set_defaults(func=RunCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.image: str = args.image | |
| self.command: List[str] = args.command | |
| self.env: dict[str, Optional[str]] = {} | |
| if args.env_file: | |
| self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy())) | |
| for env_value in args.env or []: | |
| self.env.update(load_dotenv(env_value, environ=os.environ.copy())) | |
| self.secrets: dict[str, Optional[str]] = {} | |
| extended_environ = _get_extended_environ() | |
| if args.secrets_file: | |
| self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ)) | |
| for secret in args.secrets or []: | |
| self.secrets.update(load_dotenv(secret, environ=extended_environ)) | |
| self.flavor: Optional[SpaceHardware] = args.flavor | |
| self.timeout: Optional[str] = args.timeout | |
| self.detach: bool = args.detach | |
| self.namespace: Optional[str] = args.namespace | |
| self.token: Optional[str] = args.token | |
| def run(self) -> None: | |
| api = HfApi(token=self.token) | |
| job = api.run_job( | |
| image=self.image, | |
| command=self.command, | |
| env=self.env, | |
| secrets=self.secrets, | |
| flavor=self.flavor, | |
| timeout=self.timeout, | |
| namespace=self.namespace, | |
| ) | |
| # Always print the job ID to the user | |
| print(f"Job started with ID: {job.id}") | |
| print(f"View at: {job.url}") | |
| if self.detach: | |
| return | |
| # Now let's stream the logs | |
| for log in api.fetch_job_logs(job_id=job.id): | |
| print(log) | |
| class LogsCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("logs", help="Fetch the logs of a Job") | |
| run_parser.add_argument("job_id", type=str, help="Job ID") | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the job is running. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" | |
| ) | |
| run_parser.set_defaults(func=LogsCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.job_id: str = args.job_id | |
| self.namespace: Optional[str] = args.namespace | |
| self.token: Optional[str] = args.token | |
| def run(self) -> None: | |
| api = HfApi(token=self.token) | |
| for log in api.fetch_job_logs(job_id=self.job_id, namespace=self.namespace): | |
| print(log) | |
| def _tabulate(rows: List[List[Union[str, int]]], headers: List[str]) -> str: | |
| """ | |
| Inspired by: | |
| - stackoverflow.com/a/8356620/593036 | |
| - stackoverflow.com/questions/9535954/printing-lists-as-tabular-data | |
| """ | |
| col_widths = [max(len(str(x)) for x in col) for col in zip(*rows, headers)] | |
| terminal_width = max(os.get_terminal_size().columns, len(headers) * 12) | |
| while len(headers) + sum(col_widths) > terminal_width: | |
| col_to_minimize = col_widths.index(max(col_widths)) | |
| col_widths[col_to_minimize] //= 2 | |
| if len(headers) + sum(col_widths) <= terminal_width: | |
| col_widths[col_to_minimize] = terminal_width - sum(col_widths) - len(headers) + col_widths[col_to_minimize] | |
| row_format = ("{{:{}}} " * len(headers)).format(*col_widths) | |
| lines = [] | |
| lines.append(row_format.format(*headers)) | |
| lines.append(row_format.format(*["-" * w for w in col_widths])) | |
| for row in rows: | |
| row_format_args = [ | |
| str(x)[: col_width - 3] + "..." if len(str(x)) > col_width else str(x) | |
| for x, col_width in zip(row, col_widths) | |
| ] | |
| lines.append(row_format.format(*row_format_args)) | |
| return "\n".join(lines) | |
| class PsCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("ps", help="List Jobs") | |
| run_parser.add_argument( | |
| "-a", | |
| "--all", | |
| action="store_true", | |
| help="Show all Jobs (default shows just running)", | |
| ) | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace from where it lists the jobs. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", | |
| type=str, | |
| help="A User Access Token generated from https://huggingface.co/settings/tokens", | |
| ) | |
| # Add Docker-style filtering argument | |
| run_parser.add_argument( | |
| "-f", | |
| "--filter", | |
| action="append", | |
| default=[], | |
| help="Filter output based on conditions provided (format: key=value)", | |
| ) | |
| # Add option to format output | |
| run_parser.add_argument( | |
| "--format", | |
| type=str, | |
| help="Format output using a custom template", | |
| ) | |
| run_parser.set_defaults(func=PsCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.all: bool = args.all | |
| self.namespace: Optional[str] = args.namespace | |
| self.token: Optional[str] = args.token | |
| self.format: Optional[str] = args.format | |
| self.filters: Dict[str, str] = {} | |
| # Parse filter arguments (key=value pairs) | |
| for f in args.filter: | |
| if "=" in f: | |
| key, value = f.split("=", 1) | |
| self.filters[key.lower()] = value | |
| else: | |
| print(f"Warning: Ignoring invalid filter format '{f}'. Use key=value format.") | |
| def run(self) -> None: | |
| """ | |
| Fetch and display job information for the current user. | |
| Uses Docker-style filtering with -f/--filter flag and key=value pairs. | |
| """ | |
| try: | |
| api = HfApi(token=self.token) | |
| # Fetch jobs data | |
| jobs = api.list_jobs(namespace=self.namespace) | |
| # Define table headers | |
| table_headers = ["JOB ID", "IMAGE/SPACE", "COMMAND", "CREATED", "STATUS"] | |
| # Process jobs data | |
| rows = [] | |
| for job in jobs: | |
| # Extract job data for filtering | |
| status = job.status.stage if job.status else "UNKNOWN" | |
| # Skip job if not all jobs should be shown and status doesn't match criteria | |
| if not self.all and status not in ("RUNNING", "UPDATING"): | |
| continue | |
| # Extract job ID | |
| job_id = job.id | |
| # Extract image or space information | |
| image_or_space = job.docker_image or "N/A" | |
| # Extract and format command | |
| command = job.command or [] | |
| command_str = " ".join(command) if command else "N/A" | |
| # Extract creation time | |
| created_at = job.created_at.strftime("%Y-%m-%d %H:%M:%S") if job.created_at else "N/A" | |
| # Create a dict with all job properties for filtering | |
| job_properties = { | |
| "id": job_id, | |
| "image": image_or_space, | |
| "status": status.lower(), | |
| "command": command_str, | |
| } | |
| # Check if job matches all filters | |
| if not self._matches_filters(job_properties): | |
| continue | |
| # Create row | |
| rows.append([job_id, image_or_space, command_str, created_at, status]) | |
| # Handle empty results | |
| if not rows: | |
| filters_msg = "" | |
| if self.filters: | |
| filters_msg = f" matching filters: {', '.join([f'{k}={v}' for k, v in self.filters.items()])}" | |
| print(f"No jobs found{filters_msg}") | |
| return | |
| # Apply custom format if provided or use default tabular format | |
| self._print_output(rows, table_headers) | |
| except requests.RequestException as e: | |
| print(f"Error fetching jobs data: {e}") | |
| except (KeyError, ValueError, TypeError) as e: | |
| print(f"Error processing jobs data: {e}") | |
| except Exception as e: | |
| print(f"Unexpected error - {type(e).__name__}: {e}") | |
| def _matches_filters(self, job_properties: Dict[str, str]) -> bool: | |
| """Check if job matches all specified filters.""" | |
| for key, pattern in self.filters.items(): | |
| # Check if property exists | |
| if key not in job_properties: | |
| return False | |
| # Support pattern matching with wildcards | |
| if "*" in pattern or "?" in pattern: | |
| # Convert glob pattern to regex | |
| regex_pattern = pattern.replace("*", ".*").replace("?", ".") | |
| if not re.search(f"^{regex_pattern}$", job_properties[key], re.IGNORECASE): | |
| return False | |
| # Simple substring matching | |
| elif pattern.lower() not in job_properties[key].lower(): | |
| return False | |
| return True | |
| def _print_output(self, rows, headers): | |
| """Print output according to the chosen format.""" | |
| if self.format: | |
| # Custom template formatting (simplified) | |
| template = self.format | |
| for row in rows: | |
| line = template | |
| for i, field in enumerate(["id", "image", "command", "created", "status"]): | |
| placeholder = f"{{{{.{field}}}}}" | |
| if placeholder in line: | |
| line = line.replace(placeholder, str(row[i])) | |
| print(line) | |
| else: | |
| # Default tabular format | |
| print( | |
| _tabulate( | |
| rows, | |
| headers=headers, | |
| ) | |
| ) | |
| class InspectCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("inspect", help="Display detailed information on one or more Jobs") | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the job is running. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" | |
| ) | |
| run_parser.add_argument("job_ids", nargs="...", help="The jobs to inspect") | |
| run_parser.set_defaults(func=InspectCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.namespace: Optional[str] = args.namespace | |
| self.token: Optional[str] = args.token | |
| self.job_ids: List[str] = args.job_ids | |
| def run(self) -> None: | |
| api = HfApi(token=self.token) | |
| jobs = [api.inspect_job(job_id=job_id, namespace=self.namespace) for job_id in self.job_ids] | |
| print(json.dumps([asdict(job) for job in jobs], indent=4, default=str)) | |
| class CancelCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("cancel", help="Cancel a Job") | |
| run_parser.add_argument("job_id", type=str, help="Job ID") | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the job is running. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" | |
| ) | |
| run_parser.set_defaults(func=CancelCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.job_id: str = args.job_id | |
| self.namespace = args.namespace | |
| self.token: Optional[str] = args.token | |
| def run(self) -> None: | |
| api = HfApi(token=self.token) | |
| api.cancel_job(job_id=self.job_id, namespace=self.namespace) | |
| class UvCommand(BaseHuggingfaceCLICommand): | |
| """Run UV scripts on Hugging Face infrastructure.""" | |
| def register_subcommand(parser): | |
| """Register UV run subcommand.""" | |
| uv_parser = parser.add_parser( | |
| "uv", | |
| help="Run UV scripts (Python with inline dependencies) on HF infrastructure", | |
| ) | |
| subparsers = uv_parser.add_subparsers(dest="uv_command", help="UV commands", required=True) | |
| # Run command only | |
| run_parser = subparsers.add_parser( | |
| "run", | |
| help="Run a UV script (local file or URL) on HF infrastructure", | |
| ) | |
| run_parser.add_argument("script", help="UV script to run (local file or URL)") | |
| run_parser.add_argument("script_args", nargs="...", help="Arguments for the script", default=[]) | |
| run_parser.add_argument("--image", type=str, help="Use a custom Docker image with `uv` installed.") | |
| run_parser.add_argument( | |
| "--repo", | |
| help="Repository name for the script (creates ephemeral if not specified)", | |
| ) | |
| run_parser.add_argument( | |
| "--flavor", | |
| type=str, | |
| help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.", | |
| ) | |
| run_parser.add_argument("-e", "--env", action="append", help="Environment variables") | |
| run_parser.add_argument( | |
| "-s", | |
| "--secrets", | |
| action="append", | |
| help=( | |
| "Set secret environment variables. E.g. --secrets SECRET=value " | |
| "or `--secrets HF_TOKEN` to pass your Hugging Face token." | |
| ), | |
| ) | |
| run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.") | |
| run_parser.add_argument( | |
| "--secrets-file", | |
| type=str, | |
| help="Read in a file of secret environment variables.", | |
| ) | |
| run_parser.add_argument("--timeout", type=str, help="Max duration (e.g., 30s, 5m, 1h)") | |
| run_parser.add_argument("-d", "--detach", action="store_true", help="Run in background") | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the Job will be created. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument("--token", type=str, help="HF token") | |
| # UV options | |
| run_parser.add_argument("--with", action="append", help="Run with the given packages installed", dest="with_") | |
| run_parser.add_argument( | |
| "-p", "--python", type=str, help="The Python interpreter to use for the run environment" | |
| ) | |
| run_parser.set_defaults(func=UvCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| """Initialize the command with parsed arguments.""" | |
| self.script = args.script | |
| self.script_args = args.script_args | |
| self.dependencies = args.with_ | |
| self.python = args.python | |
| self.image = args.image | |
| self.env: dict[str, Optional[str]] = {} | |
| if args.env_file: | |
| self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy())) | |
| for env_value in args.env or []: | |
| self.env.update(load_dotenv(env_value, environ=os.environ.copy())) | |
| self.secrets: dict[str, Optional[str]] = {} | |
| extended_environ = _get_extended_environ() | |
| if args.secrets_file: | |
| self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ)) | |
| for secret in args.secrets or []: | |
| self.secrets.update(load_dotenv(secret, environ=extended_environ)) | |
| self.flavor: Optional[SpaceHardware] = args.flavor | |
| self.timeout: Optional[str] = args.timeout | |
| self.detach: bool = args.detach | |
| self.namespace: Optional[str] = args.namespace | |
| self.token: Optional[str] = args.token | |
| self._repo = args.repo | |
| def run(self) -> None: | |
| """Execute UV command.""" | |
| logging.set_verbosity(logging.INFO) | |
| api = HfApi(token=self.token) | |
| job = api.run_uv_job( | |
| script=self.script, | |
| script_args=self.script_args, | |
| dependencies=self.dependencies, | |
| python=self.python, | |
| image=self.image, | |
| env=self.env, | |
| secrets=self.secrets, | |
| flavor=self.flavor, | |
| timeout=self.timeout, | |
| namespace=self.namespace, | |
| _repo=self._repo, | |
| ) | |
| # Always print the job ID to the user | |
| print(f"Job started with ID: {job.id}") | |
| print(f"View at: {job.url}") | |
| if self.detach: | |
| return | |
| # Now let's stream the logs | |
| for log in api.fetch_job_logs(job_id=job.id): | |
| print(log) | |
| def _get_extended_environ() -> Dict[str, str]: | |
| extended_environ = os.environ.copy() | |
| if (token := get_token()) is not None: | |
| extended_environ["HF_TOKEN"] = token | |
| return extended_environ | |
| class ScheduledJobsCommands(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction): | |
| scheduled_jobs_parser = parser.add_parser("scheduled", help="Create and manage scheduled Jobs on the Hub.") | |
| scheduled_jobs_subparsers = scheduled_jobs_parser.add_subparsers( | |
| help="huggingface.co scheduled jobs related commands" | |
| ) | |
| # Show help if no subcommand is provided | |
| scheduled_jobs_parser.set_defaults(func=lambda args: scheduled_jobs_subparsers.print_help()) | |
| # Register commands | |
| ScheduledRunCommand.register_subcommand(scheduled_jobs_subparsers) | |
| ScheduledPsCommand.register_subcommand(scheduled_jobs_subparsers) | |
| ScheduledInspectCommand.register_subcommand(scheduled_jobs_subparsers) | |
| ScheduledDeleteCommand.register_subcommand(scheduled_jobs_subparsers) | |
| ScheduledSuspendCommand.register_subcommand(scheduled_jobs_subparsers) | |
| ScheduledResumeCommand.register_subcommand(scheduled_jobs_subparsers) | |
| ScheduledUvCommand.register_subcommand(scheduled_jobs_subparsers) | |
| class ScheduledRunCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("run", help="Schedule a Job") | |
| run_parser.add_argument( | |
| "schedule", | |
| type=str, | |
| help="One of annually, yearly, monthly, weekly, daily, hourly, or a CRON schedule expression.", | |
| ) | |
| run_parser.add_argument("image", type=str, help="The Docker image to use.") | |
| run_parser.add_argument( | |
| "--suspend", | |
| action="store_true", | |
| help="Suspend (pause) the scheduled Job", | |
| default=None, | |
| ) | |
| run_parser.add_argument( | |
| "--concurrency", | |
| action="store_true", | |
| help="Allow multiple instances of this Job to run concurrently", | |
| default=None, | |
| ) | |
| run_parser.add_argument("-e", "--env", action="append", help="Set environment variables. E.g. --env ENV=value") | |
| run_parser.add_argument( | |
| "-s", | |
| "--secrets", | |
| action="append", | |
| help=( | |
| "Set secret environment variables. E.g. --secrets SECRET=value " | |
| "or `--secrets HF_TOKEN` to pass your Hugging Face token." | |
| ), | |
| ) | |
| run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.") | |
| run_parser.add_argument("--secrets-file", type=str, help="Read in a file of secret environment variables.") | |
| run_parser.add_argument( | |
| "--flavor", | |
| type=str, | |
| help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.", | |
| ) | |
| run_parser.add_argument( | |
| "--timeout", | |
| type=str, | |
| help="Max duration: int/float with s (seconds, default), m (minutes), h (hours) or d (days).", | |
| ) | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the scheduled Job will be created. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", | |
| type=str, | |
| help="A User Access Token generated from https://huggingface.co/settings/tokens", | |
| ) | |
| run_parser.add_argument("command", nargs="...", help="The command to run.") | |
| run_parser.set_defaults(func=ScheduledRunCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.schedule: str = args.schedule | |
| self.image: str = args.image | |
| self.command: List[str] = args.command | |
| self.suspend: Optional[bool] = args.suspend | |
| self.concurrency: Optional[bool] = args.concurrency | |
| self.env: dict[str, Optional[str]] = {} | |
| if args.env_file: | |
| self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy())) | |
| for env_value in args.env or []: | |
| self.env.update(load_dotenv(env_value, environ=os.environ.copy())) | |
| self.secrets: dict[str, Optional[str]] = {} | |
| extended_environ = _get_extended_environ() | |
| if args.secrets_file: | |
| self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ)) | |
| for secret in args.secrets or []: | |
| self.secrets.update(load_dotenv(secret, environ=extended_environ)) | |
| self.flavor: Optional[SpaceHardware] = args.flavor | |
| self.timeout: Optional[str] = args.timeout | |
| self.namespace: Optional[str] = args.namespace | |
| self.token: Optional[str] = args.token | |
| def run(self) -> None: | |
| api = HfApi(token=self.token) | |
| scheduled_job = api.create_scheduled_job( | |
| image=self.image, | |
| command=self.command, | |
| schedule=self.schedule, | |
| suspend=self.suspend, | |
| concurrency=self.concurrency, | |
| env=self.env, | |
| secrets=self.secrets, | |
| flavor=self.flavor, | |
| timeout=self.timeout, | |
| namespace=self.namespace, | |
| ) | |
| # Always print the scheduled job ID to the user | |
| print(f"Scheduled Job created with ID: {scheduled_job.id}") | |
| class ScheduledPsCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("ps", help="List scheduled Jobs") | |
| run_parser.add_argument( | |
| "-a", | |
| "--all", | |
| action="store_true", | |
| help="Show all scheduled Jobs (default hides suspended)", | |
| ) | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace from where it lists the jobs. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", | |
| type=str, | |
| help="A User Access Token generated from https://huggingface.co/settings/tokens", | |
| ) | |
| # Add Docker-style filtering argument | |
| run_parser.add_argument( | |
| "-f", | |
| "--filter", | |
| action="append", | |
| default=[], | |
| help="Filter output based on conditions provided (format: key=value)", | |
| ) | |
| # Add option to format output | |
| run_parser.add_argument( | |
| "--format", | |
| type=str, | |
| help="Format output using a custom template", | |
| ) | |
| run_parser.set_defaults(func=ScheduledPsCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.all: bool = args.all | |
| self.namespace: Optional[str] = args.namespace | |
| self.token: Optional[str] = args.token | |
| self.format: Optional[str] = args.format | |
| self.filters: Dict[str, str] = {} | |
| # Parse filter arguments (key=value pairs) | |
| for f in args.filter: | |
| if "=" in f: | |
| key, value = f.split("=", 1) | |
| self.filters[key.lower()] = value | |
| else: | |
| print(f"Warning: Ignoring invalid filter format '{f}'. Use key=value format.") | |
| def run(self) -> None: | |
| """ | |
| Fetch and display scheduked job information for the current user. | |
| Uses Docker-style filtering with -f/--filter flag and key=value pairs. | |
| """ | |
| try: | |
| api = HfApi(token=self.token) | |
| # Fetch jobs data | |
| scheduled_jobs = api.list_scheduled_jobs(namespace=self.namespace) | |
| # Define table headers | |
| table_headers = [ | |
| "ID", | |
| "SCHEDULE", | |
| "IMAGE/SPACE", | |
| "COMMAND", | |
| "LAST RUN", | |
| "NEXT RUN", | |
| "SUSPEND", | |
| ] | |
| # Process jobs data | |
| rows = [] | |
| for scheduled_job in scheduled_jobs: | |
| # Extract job data for filtering | |
| suspend = scheduled_job.suspend | |
| # Skip job if not all jobs should be shown and status doesn't match criteria | |
| if not self.all and suspend: | |
| continue | |
| # Extract job ID | |
| scheduled_job_id = scheduled_job.id | |
| # Extract schedule | |
| schedule = scheduled_job.schedule | |
| # Extract image or space information | |
| image_or_space = scheduled_job.job_spec.docker_image or "N/A" | |
| # Extract and format command | |
| command = scheduled_job.job_spec.command or [] | |
| command_str = " ".join(command) if command else "N/A" | |
| # Extract status | |
| last_job_at = ( | |
| scheduled_job.status.last_job.at.strftime("%Y-%m-%d %H:%M:%S") | |
| if scheduled_job.status.last_job | |
| else "N/A" | |
| ) | |
| next_job_run_at = ( | |
| scheduled_job.status.next_job_run_at.strftime("%Y-%m-%d %H:%M:%S") | |
| if scheduled_job.status.next_job_run_at | |
| else "N/A" | |
| ) | |
| # Create a dict with all job properties for filtering | |
| job_properties = { | |
| "id": scheduled_job_id, | |
| "image": image_or_space, | |
| "suspend": str(suspend), | |
| "command": command_str, | |
| } | |
| # Check if job matches all filters | |
| if not self._matches_filters(job_properties): | |
| continue | |
| # Create row | |
| rows.append( | |
| [ | |
| scheduled_job_id, | |
| schedule, | |
| image_or_space, | |
| command_str, | |
| last_job_at, | |
| next_job_run_at, | |
| suspend, | |
| ] | |
| ) | |
| # Handle empty results | |
| if not rows: | |
| filters_msg = "" | |
| if self.filters: | |
| filters_msg = f" matching filters: {', '.join([f'{k}={v}' for k, v in self.filters.items()])}" | |
| print(f"No scheduled jobs found{filters_msg}") | |
| return | |
| # Apply custom format if provided or use default tabular format | |
| self._print_output(rows, table_headers) | |
| except requests.RequestException as e: | |
| print(f"Error fetching scheduled jobs data: {e}") | |
| except (KeyError, ValueError, TypeError) as e: | |
| print(f"Error processing scheduled jobs data: {e}") | |
| except Exception as e: | |
| print(f"Unexpected error - {type(e).__name__}: {e}") | |
| def _matches_filters(self, job_properties: Dict[str, str]) -> bool: | |
| """Check if scheduled job matches all specified filters.""" | |
| for key, pattern in self.filters.items(): | |
| # Check if property exists | |
| if key not in job_properties: | |
| return False | |
| # Support pattern matching with wildcards | |
| if "*" in pattern or "?" in pattern: | |
| # Convert glob pattern to regex | |
| regex_pattern = pattern.replace("*", ".*").replace("?", ".") | |
| if not re.search(f"^{regex_pattern}$", job_properties[key], re.IGNORECASE): | |
| return False | |
| # Simple substring matching | |
| elif pattern.lower() not in job_properties[key].lower(): | |
| return False | |
| return True | |
| def _print_output(self, rows, headers): | |
| """Print output according to the chosen format.""" | |
| if self.format: | |
| # Custom template formatting (simplified) | |
| template = self.format | |
| for row in rows: | |
| line = template | |
| for i, field in enumerate( | |
| ["id", "schedule", "image", "command", "last_job_at", "next_job_run_at", "suspend"] | |
| ): | |
| placeholder = f"{{{{.{field}}}}}" | |
| if placeholder in line: | |
| line = line.replace(placeholder, str(row[i])) | |
| print(line) | |
| else: | |
| # Default tabular format | |
| print( | |
| _tabulate( | |
| rows, | |
| headers=headers, | |
| ) | |
| ) | |
| class ScheduledInspectCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("inspect", help="Display detailed information on one or more scheduled Jobs") | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the scheduled job is. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" | |
| ) | |
| run_parser.add_argument("scheduled_job_ids", nargs="...", help="The scheduled jobs to inspect") | |
| run_parser.set_defaults(func=ScheduledInspectCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.namespace: Optional[str] = args.namespace | |
| self.token: Optional[str] = args.token | |
| self.scheduled_job_ids: List[str] = args.scheduled_job_ids | |
| def run(self) -> None: | |
| api = HfApi(token=self.token) | |
| scheduled_jobs = [ | |
| api.inspect_scheduled_job(scheduled_job_id=scheduled_job_id, namespace=self.namespace) | |
| for scheduled_job_id in self.scheduled_job_ids | |
| ] | |
| print(json.dumps([asdict(scheduled_job) for scheduled_job in scheduled_jobs], indent=4, default=str)) | |
| class ScheduledDeleteCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("delete", help="Delete a scheduled Job") | |
| run_parser.add_argument("scheduled_job_id", type=str, help="Scheduled Job ID") | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the scheduled job is. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" | |
| ) | |
| run_parser.set_defaults(func=ScheduledDeleteCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.scheduled_job_id: str = args.scheduled_job_id | |
| self.namespace = args.namespace | |
| self.token: Optional[str] = args.token | |
| def run(self) -> None: | |
| api = HfApi(token=self.token) | |
| api.delete_scheduled_job(scheduled_job_id=self.scheduled_job_id, namespace=self.namespace) | |
| class ScheduledSuspendCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("suspend", help="Suspend (pause) a scheduled Job") | |
| run_parser.add_argument("scheduled_job_id", type=str, help="Scheduled Job ID") | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the scheduled job is. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" | |
| ) | |
| run_parser.set_defaults(func=ScheduledSuspendCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.scheduled_job_id: str = args.scheduled_job_id | |
| self.namespace = args.namespace | |
| self.token: Optional[str] = args.token | |
| def run(self) -> None: | |
| api = HfApi(token=self.token) | |
| api.suspend_scheduled_job(scheduled_job_id=self.scheduled_job_id, namespace=self.namespace) | |
| class ScheduledResumeCommand(BaseHuggingfaceCLICommand): | |
| def register_subcommand(parser: _SubParsersAction) -> None: | |
| run_parser = parser.add_parser("resume", help="Resume (unpause) a scheduled Job") | |
| run_parser.add_argument("scheduled_job_id", type=str, help="Scheduled Job ID") | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the scheduled job is. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument( | |
| "--token", type=str, help="A User Access Token generated from https://huggingface.co/settings/tokens" | |
| ) | |
| run_parser.set_defaults(func=ScheduledResumeCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| self.scheduled_job_id: str = args.scheduled_job_id | |
| self.namespace = args.namespace | |
| self.token: Optional[str] = args.token | |
| def run(self) -> None: | |
| api = HfApi(token=self.token) | |
| api.resume_scheduled_job(scheduled_job_id=self.scheduled_job_id, namespace=self.namespace) | |
| class ScheduledUvCommand(BaseHuggingfaceCLICommand): | |
| """Schedule UV scripts on Hugging Face infrastructure.""" | |
| def register_subcommand(parser): | |
| """Register UV run subcommand.""" | |
| uv_parser = parser.add_parser( | |
| "uv", | |
| help="Schedule UV scripts (Python with inline dependencies) on HF infrastructure", | |
| ) | |
| subparsers = uv_parser.add_subparsers(dest="uv_command", help="UV commands", required=True) | |
| # Run command only | |
| run_parser = subparsers.add_parser( | |
| "run", | |
| help="Run a UV script (local file or URL) on HF infrastructure", | |
| ) | |
| run_parser.add_argument( | |
| "schedule", | |
| type=str, | |
| help="One of annually, yearly, monthly, weekly, daily, hourly, or a CRON schedule expression.", | |
| ) | |
| run_parser.add_argument("script", help="UV script to run (local file or URL)") | |
| run_parser.add_argument("script_args", nargs="...", help="Arguments for the script", default=[]) | |
| run_parser.add_argument( | |
| "--suspend", | |
| action="store_true", | |
| help="Suspend (pause) the scheduled Job", | |
| default=None, | |
| ) | |
| run_parser.add_argument( | |
| "--concurrency", | |
| action="store_true", | |
| help="Allow multiple instances of this Job to run concurrently", | |
| default=None, | |
| ) | |
| run_parser.add_argument("--image", type=str, help="Use a custom Docker image with `uv` installed.") | |
| run_parser.add_argument( | |
| "--repo", | |
| help="Repository name for the script (creates ephemeral if not specified)", | |
| ) | |
| run_parser.add_argument( | |
| "--flavor", | |
| type=str, | |
| help=f"Flavor for the hardware, as in HF Spaces. Defaults to `cpu-basic`. Possible values: {', '.join(SUGGESTED_FLAVORS)}.", | |
| ) | |
| run_parser.add_argument("-e", "--env", action="append", help="Environment variables") | |
| run_parser.add_argument( | |
| "-s", | |
| "--secrets", | |
| action="append", | |
| help=( | |
| "Set secret environment variables. E.g. --secrets SECRET=value " | |
| "or `--secrets HF_TOKEN` to pass your Hugging Face token." | |
| ), | |
| ) | |
| run_parser.add_argument("--env-file", type=str, help="Read in a file of environment variables.") | |
| run_parser.add_argument( | |
| "--secrets-file", | |
| type=str, | |
| help="Read in a file of secret environment variables.", | |
| ) | |
| run_parser.add_argument("--timeout", type=str, help="Max duration (e.g., 30s, 5m, 1h)") | |
| run_parser.add_argument("-d", "--detach", action="store_true", help="Run in background") | |
| run_parser.add_argument( | |
| "--namespace", | |
| type=str, | |
| help="The namespace where the Job will be created. Defaults to the current user's namespace.", | |
| ) | |
| run_parser.add_argument("--token", type=str, help="HF token") | |
| # UV options | |
| run_parser.add_argument("--with", action="append", help="Run with the given packages installed", dest="with_") | |
| run_parser.add_argument( | |
| "-p", "--python", type=str, help="The Python interpreter to use for the run environment" | |
| ) | |
| run_parser.set_defaults(func=ScheduledUvCommand) | |
| def __init__(self, args: Namespace) -> None: | |
| """Initialize the command with parsed arguments.""" | |
| self.schedule: str = args.schedule | |
| self.script = args.script | |
| self.script_args = args.script_args | |
| self.suspend: Optional[bool] = args.suspend | |
| self.concurrency: Optional[bool] = args.concurrency | |
| self.dependencies = args.with_ | |
| self.python = args.python | |
| self.image = args.image | |
| self.env: dict[str, Optional[str]] = {} | |
| if args.env_file: | |
| self.env.update(load_dotenv(Path(args.env_file).read_text(), environ=os.environ.copy())) | |
| for env_value in args.env or []: | |
| self.env.update(load_dotenv(env_value, environ=os.environ.copy())) | |
| self.secrets: dict[str, Optional[str]] = {} | |
| extended_environ = _get_extended_environ() | |
| if args.secrets_file: | |
| self.secrets.update(load_dotenv(Path(args.secrets_file).read_text(), environ=extended_environ)) | |
| for secret in args.secrets or []: | |
| self.secrets.update(load_dotenv(secret, environ=extended_environ)) | |
| self.flavor: Optional[SpaceHardware] = args.flavor | |
| self.timeout: Optional[str] = args.timeout | |
| self.detach: bool = args.detach | |
| self.namespace: Optional[str] = args.namespace | |
| self.token: Optional[str] = args.token | |
| self._repo = args.repo | |
| def run(self) -> None: | |
| """Schedule UV command.""" | |
| logging.set_verbosity(logging.INFO) | |
| api = HfApi(token=self.token) | |
| job = api.create_scheduled_uv_job( | |
| script=self.script, | |
| script_args=self.script_args, | |
| schedule=self.schedule, | |
| suspend=self.suspend, | |
| concurrency=self.concurrency, | |
| dependencies=self.dependencies, | |
| python=self.python, | |
| image=self.image, | |
| env=self.env, | |
| secrets=self.secrets, | |
| flavor=self.flavor, | |
| timeout=self.timeout, | |
| namespace=self.namespace, | |
| _repo=self._repo, | |
| ) | |
| # Always print the job ID to the user | |
| print(f"Scheduled Job created with ID: {job.id}") | |