submission / challenge-cli.py
hyeongjun User
SAFE challenge ManipDet submission
8792786
# SPDX-FileCopyrightText: 2025 UL Research Institutes
# SPDX-License-Identifier: Apache-2.0
import functools
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
import click
import httpx
from dyff.client import Client, errors
from dyff.schema.platform import *
from dyff.schema.requests import *
from app.api.models import PredictionResponse
# ----------------------------------------------------------------------------
def _wait_for_status(
get_entity_fn, target_status: str | list[str], *, timeout: timedelta
) -> str:
if isinstance(target_status, str):
target_status = [target_status]
then = datetime.now(timezone.utc)
while True:
try:
entity = get_entity_fn()
if entity.status in target_status:
return entity.status
elif entity.status in ["Error", "Failed"]:
raise RuntimeError(
f"operation failed: {entity.status} ({entity.reason})"
)
except errors.HTTPError as ex:
if ex.status != 404:
raise
except httpx.HTTPStatusError as ex:
if ex.response.status_code != 404:
raise
if (datetime.now(timezone.utc) - then) >= timeout:
break
time.sleep(5)
raise AssertionError("timeout")
def _directory_size_bytes(directory: Path | str) -> int:
directory = Path(directory)
size: int = 0
for f in directory.rglob("*"):
if f.exists() and not f.is_symlink():
size += f.stat().st_size
return size
def _storage_for_size(size_bytes: int) -> str:
"""Convert bytes to k8s Quantity and add some overhead in case the size
is different on different filesystems.
"""
Gi = int(1024**3)
size_B_with_overhead = size_bytes + 2*Gi
size_Gi_with_overhead = size_B_with_overhead // Gi
return f"{size_Gi_with_overhead}Gi"
def _common_options(f):
@click.option(
"--account",
type=str,
required=True,
help="Your account ID",
metavar="ID",
)
@click.option(
"--timeout-seconds",
type=int,
default=120,
help="Timeout for Dyff client operations",
metavar="SECONDS",
)
@functools.wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
@click.group()
def cli():
pass
@cli.command()
@_common_options
@click.option(
"--name",
type=str,
required=True,
help="The name of your detector model. For display and querying purposes only.",
)
@click.option(
"--image",
type=str,
default=None,
help="The Docker image to upload (e.g., 'some/image:latest')."
" Must exist in your local Docker deamon."
" Required if --artifact is not specified.",
)
@click.option(
"--endpoint",
type=str,
default="predict",
help="The endpoint to call on your service to make a prediction.",
)
@click.option(
"--volume",
type=click.Path(exists=True, file_okay=False, readable=True, resolve_path=True, path_type=Path),
default=None,
help="A local directory path containing files to upload and mount in the running Docker container."
" You should use this if your submission includes large files like neural network weights."
)
@click.option(
"--volume-mount",
type=click.Path(exists=False, path_type=Path),
default=None,
help="The path to mount your uploaded directory in the running Docker container."
" Must be an absolute path."
" Required if --volume is specified.")
@click.option(
"--artifact",
"artifact_id",
type=str,
default=None,
help="The ID of the Artifact (i.e., Docker image) to use in the submission, if it already exists."
" You can pass the artifact.id from a previous invocation.",
metavar="ID",
)
@click.option(
"--model",
"model_id",
type=str,
default=None,
help="The ID of the Model (i.e., neural network weights) to use in the submission, if it already exists."
" You can pass the model.id from a previous invocation.",
metavar="ID",
)
@click.option(
"--gpu",
is_flag=True,
default=False,
help="Request a GPU (NVIDIA L4) for the inference service.",
)
def upload_submission(
account: str,
name: str,
image: str | None,
endpoint: str,
volume: Path | None,
volume_mount: Path | None,
artifact_id: str | None,
model_id: str | None,
gpu: bool,
timeout_seconds: int,
) -> None:
dyffapi = Client(timeout=httpx.Timeout(timeout_seconds))
# Upload the image
if artifact_id is None:
# Create an Artifact resource
click.echo("creating Artifact ...")
artifact = dyffapi.artifacts.create(ArtifactCreateRequest(account=account))
click.echo(f"artifact.id: \"{artifact.id}\"")
_wait_for_status(
lambda: dyffapi.artifacts.get(artifact.id),
"WaitingForUpload",
timeout=timedelta(seconds=timeout_seconds),
)
# Push the image from the local Docker daemon
click.echo("pushing Artifact ...")
dyffapi.artifacts.push(artifact, source=f"docker-daemon:{image}")
time.sleep(5)
# Indicate that we're done pushing
dyffapi.artifacts.finalize(artifact.id)
_wait_for_status(
lambda: dyffapi.artifacts.get(artifact.id),
"Ready",
timeout=timedelta(seconds=timeout_seconds),
)
click.echo("... done")
else:
artifact = dyffapi.artifacts.get(artifact_id)
assert artifact is not None
model: Model | None = None
model_storage_quantity: str | None = None
if model_id is None:
if volume is not None:
if volume_mount is None:
raise click.UsageError("--volume-mount is required when --volume is used")
model_storage_quantity = _storage_for_size(_directory_size_bytes(volume))
click.echo(f"creating Model from local directory with storage={model_storage_quantity} ...")
model = dyffapi.models.create_from_volume(
volume,
name="model_volume",
account=account,
resources=ModelResources(storage=model_storage_quantity),
)
assert model is not None
click.echo(f"model.id: \"{model.id}\"")
_wait_for_status(
lambda: dyffapi.models.get(model.id),
"WaitingForUpload",
timeout=timedelta(seconds=timeout_seconds),
)
click.echo("uploading Model ...")
dyffapi.models.upload_volume(model, volume)
_wait_for_status(
lambda: dyffapi.models.get(model.id),
"Ready",
timeout=timedelta(seconds=timeout_seconds),
)
click.echo("... done")
else:
model = None
else:
model = dyffapi.models.get(model_id)
if model is None:
raise click.UsageError(f"--model={model_id}: model not found")
model_storage_quantity = model.resources.storage
if model_storage_quantity is None:
raise click.UsageError(
f"--model={model_id}: model.resources.storage not set;"
" was this model created with the challenge-cli tool?"
)
# Create a runnable InferenceService
if volume_mount is not None:
if model is None:
raise click.UsageError("--volume-mount requires --volume or --model")
if model_storage_quantity is None:
raise click.UsageError(
f"model {model.id}: model.resources.storage not set;"
" was this model created with the challenge-cli tool?"
)
if not volume_mount.is_absolute():
raise click.UsageError("--volume-mount must be an absolute path")
volumeMounts=[
VolumeMount(
kind=VolumeMountKind.data,
name="model",
mountPath=volume_mount,
data=VolumeMountData(
source=EntityIdentifier.of(model),
),
),
]
else:
if model is not None:
raise click.UsageError("--model requires --volume-mount")
else:
volumeMounts = None
accelerator: Accelerator | None = None
if gpu:
accelerator = Accelerator(
kind="GPU",
gpu=AcceleratorGPU(
hardwareTypes=["nvidia.com/gpu-l4"],
count=1,
),
)
# Don't change this
service_request = InferenceServiceCreateRequest(
account=account,
name=name,
model=None,
runner=InferenceServiceRunner(
kind=InferenceServiceRunnerKind.CONTAINER,
imageRef=EntityIdentifier.of(artifact),
resources=ModelResources(storage=model_storage_quantity),
volumeMounts=volumeMounts,
accelerator=accelerator,
),
interface=InferenceInterface(
endpoint=endpoint,
outputSchema=DataSchema.make_output_schema(PredictionResponse),
),
)
click.echo("creating InferenceService ...")
service = dyffapi.inferenceservices.create(service_request)
click.echo(f"service.id: \"{service.id}\"")
click.echo("... done")
@cli.command()
@_common_options
@click.option(
"--task",
"task_id",
type=str,
required=True,
help="The Task ID to submit to.",
metavar="ID",
)
@click.option(
"--team",
"team_id",
type=str,
required=True,
help="The Team ID making the submission.",
metavar="ID",
)
@click.option(
"--service",
"service_id",
type=str,
required=True,
help="The InferenceService ID to submit.",
metavar="ID",
)
@click.option(
"--challenge",
"challenge_id",
type=str,
default="dc509a8c771b492b90c43012fde9a04f",
help="The Challenge ID to submit to.",
metavar="ID",
)
def submit(account: str, task_id: str, team_id: str, service_id: str, challenge_id: str, timeout_seconds: int) -> None:
dyffapi = Client(timeout=httpx.Timeout(timeout_seconds))
challenge_tasks = {
"dc509a8c771b492b90c43012fde9a04f": {
"pilot-1": "567316f19d37490b97aa647e7017ef44",
"main-1": "7c849a35f1834c1c8a60e39624221919",
"main-1-v2": "132e0ce09a1145f8af12fd800ae2479e",
}
}
task_id_by_name = challenge_tasks.get(challenge_id, {}).get(task_id)
if task_id_by_name is not None:
task_id = task_id_by_name
challenge = dyffapi.challenges.get(challenge_id)
challengetask = challenge.tasks[task_id]
team = dyffapi.teams.get(team_id)
service = dyffapi.inferenceservices.get(service_id)
submission = dyffapi.challenges.submit(
challenge.id,
challengetask.id,
SubmissionCreateRequest(
account=account,
team=team.id,
submission=EntityIdentifier(kind="InferenceService", id=service.id),
),
)
click.echo(submission.model_dump_json(indent=2))
click.echo(f"submission.id: \"{submission.id}\"")
if __name__ == "__main__":
cli(show_default=True)