Spaces:
Running
Running
| # SPDX-FileCopyrightText: 2025 UL Research Institutes | |
| # SPDX-License-Identifier: Apache-2.0 | |
| import functools | |
| import time | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| import click | |
| import httpx | |
| from dyff.client import Client, errors | |
| from dyff.schema.platform import * | |
| from dyff.schema.requests import * | |
| from app.api.models import PredictionResponse | |
| # ---------------------------------------------------------------------------- | |
| def _wait_for_status( | |
| get_entity_fn, target_status: str | list[str], *, timeout: timedelta | |
| ) -> str: | |
| if isinstance(target_status, str): | |
| target_status = [target_status] | |
| then = datetime.now(timezone.utc) | |
| while True: | |
| try: | |
| entity = get_entity_fn() | |
| if entity.status in target_status: | |
| return entity.status | |
| elif entity.status in ["Error", "Failed"]: | |
| raise RuntimeError( | |
| f"operation failed: {entity.status} ({entity.reason})" | |
| ) | |
| except errors.HTTPError as ex: | |
| if ex.status != 404: | |
| raise | |
| except httpx.HTTPStatusError as ex: | |
| if ex.response.status_code != 404: | |
| raise | |
| if (datetime.now(timezone.utc) - then) >= timeout: | |
| break | |
| time.sleep(5) | |
| raise AssertionError("timeout") | |
| def _directory_size_bytes(directory: Path | str) -> int: | |
| directory = Path(directory) | |
| size: int = 0 | |
| for f in directory.rglob("*"): | |
| if f.exists() and not f.is_symlink(): | |
| size += f.stat().st_size | |
| return size | |
| def _storage_for_size(size_bytes: int) -> str: | |
| """Convert bytes to k8s Quantity and add some overhead in case the size | |
| is different on different filesystems. | |
| """ | |
| Gi = int(1024**3) | |
| size_B_with_overhead = size_bytes + 2*Gi | |
| size_Gi_with_overhead = size_B_with_overhead // Gi | |
| return f"{size_Gi_with_overhead}Gi" | |
| def _common_options(f): | |
| def wrapper(*args, **kwargs): | |
| return f(*args, **kwargs) | |
| return wrapper | |
| def cli(): | |
| pass | |
| def upload_submission( | |
| account: str, | |
| name: str, | |
| image: str | None, | |
| endpoint: str, | |
| volume: Path | None, | |
| volume_mount: Path | None, | |
| artifact_id: str | None, | |
| model_id: str | None, | |
| gpu: bool, | |
| timeout_seconds: int, | |
| ) -> None: | |
| dyffapi = Client(timeout=httpx.Timeout(timeout_seconds)) | |
| # Upload the image | |
| if artifact_id is None: | |
| # Create an Artifact resource | |
| click.echo("creating Artifact ...") | |
| artifact = dyffapi.artifacts.create(ArtifactCreateRequest(account=account)) | |
| click.echo(f"artifact.id: \"{artifact.id}\"") | |
| _wait_for_status( | |
| lambda: dyffapi.artifacts.get(artifact.id), | |
| "WaitingForUpload", | |
| timeout=timedelta(seconds=timeout_seconds), | |
| ) | |
| # Push the image from the local Docker daemon | |
| click.echo("pushing Artifact ...") | |
| dyffapi.artifacts.push(artifact, source=f"docker-daemon:{image}") | |
| time.sleep(5) | |
| # Indicate that we're done pushing | |
| dyffapi.artifacts.finalize(artifact.id) | |
| _wait_for_status( | |
| lambda: dyffapi.artifacts.get(artifact.id), | |
| "Ready", | |
| timeout=timedelta(seconds=timeout_seconds), | |
| ) | |
| click.echo("... done") | |
| else: | |
| artifact = dyffapi.artifacts.get(artifact_id) | |
| assert artifact is not None | |
| model: Model | None = None | |
| model_storage_quantity: str | None = None | |
| if model_id is None: | |
| if volume is not None: | |
| if volume_mount is None: | |
| raise click.UsageError("--volume-mount is required when --volume is used") | |
| model_storage_quantity = _storage_for_size(_directory_size_bytes(volume)) | |
| click.echo(f"creating Model from local directory with storage={model_storage_quantity} ...") | |
| model = dyffapi.models.create_from_volume( | |
| volume, | |
| name="model_volume", | |
| account=account, | |
| resources=ModelResources(storage=model_storage_quantity), | |
| ) | |
| assert model is not None | |
| click.echo(f"model.id: \"{model.id}\"") | |
| _wait_for_status( | |
| lambda: dyffapi.models.get(model.id), | |
| "WaitingForUpload", | |
| timeout=timedelta(seconds=timeout_seconds), | |
| ) | |
| click.echo("uploading Model ...") | |
| dyffapi.models.upload_volume(model, volume) | |
| _wait_for_status( | |
| lambda: dyffapi.models.get(model.id), | |
| "Ready", | |
| timeout=timedelta(seconds=timeout_seconds), | |
| ) | |
| click.echo("... done") | |
| else: | |
| model = None | |
| else: | |
| model = dyffapi.models.get(model_id) | |
| if model is None: | |
| raise click.UsageError(f"--model={model_id}: model not found") | |
| model_storage_quantity = model.resources.storage | |
| if model_storage_quantity is None: | |
| raise click.UsageError( | |
| f"--model={model_id}: model.resources.storage not set;" | |
| " was this model created with the challenge-cli tool?" | |
| ) | |
| # Create a runnable InferenceService | |
| if volume_mount is not None: | |
| if model is None: | |
| raise click.UsageError("--volume-mount requires --volume or --model") | |
| if model_storage_quantity is None: | |
| raise click.UsageError( | |
| f"model {model.id}: model.resources.storage not set;" | |
| " was this model created with the challenge-cli tool?" | |
| ) | |
| if not volume_mount.is_absolute(): | |
| raise click.UsageError("--volume-mount must be an absolute path") | |
| volumeMounts=[ | |
| VolumeMount( | |
| kind=VolumeMountKind.data, | |
| name="model", | |
| mountPath=volume_mount, | |
| data=VolumeMountData( | |
| source=EntityIdentifier.of(model), | |
| ), | |
| ), | |
| ] | |
| else: | |
| if model is not None: | |
| raise click.UsageError("--model requires --volume-mount") | |
| else: | |
| volumeMounts = None | |
| accelerator: Accelerator | None = None | |
| if gpu: | |
| accelerator = Accelerator( | |
| kind="GPU", | |
| gpu=AcceleratorGPU( | |
| hardwareTypes=["nvidia.com/gpu-l4"], | |
| count=1, | |
| ), | |
| ) | |
| # Don't change this | |
| service_request = InferenceServiceCreateRequest( | |
| account=account, | |
| name=name, | |
| model=None, | |
| runner=InferenceServiceRunner( | |
| kind=InferenceServiceRunnerKind.CONTAINER, | |
| imageRef=EntityIdentifier.of(artifact), | |
| resources=ModelResources(storage=model_storage_quantity), | |
| volumeMounts=volumeMounts, | |
| accelerator=accelerator, | |
| ), | |
| interface=InferenceInterface( | |
| endpoint=endpoint, | |
| outputSchema=DataSchema.make_output_schema(PredictionResponse), | |
| ), | |
| ) | |
| click.echo("creating InferenceService ...") | |
| service = dyffapi.inferenceservices.create(service_request) | |
| click.echo(f"service.id: \"{service.id}\"") | |
| click.echo("... done") | |
| def submit(account: str, task_id: str, team_id: str, service_id: str, challenge_id: str, timeout_seconds: int) -> None: | |
| dyffapi = Client(timeout=httpx.Timeout(timeout_seconds)) | |
| challenge_tasks = { | |
| "dc509a8c771b492b90c43012fde9a04f": { | |
| "pilot-1": "567316f19d37490b97aa647e7017ef44", | |
| "main-1": "7c849a35f1834c1c8a60e39624221919", | |
| "main-1-v2": "132e0ce09a1145f8af12fd800ae2479e", | |
| } | |
| } | |
| task_id_by_name = challenge_tasks.get(challenge_id, {}).get(task_id) | |
| if task_id_by_name is not None: | |
| task_id = task_id_by_name | |
| challenge = dyffapi.challenges.get(challenge_id) | |
| challengetask = challenge.tasks[task_id] | |
| team = dyffapi.teams.get(team_id) | |
| service = dyffapi.inferenceservices.get(service_id) | |
| submission = dyffapi.challenges.submit( | |
| challenge.id, | |
| challengetask.id, | |
| SubmissionCreateRequest( | |
| account=account, | |
| team=team.id, | |
| submission=EntityIdentifier(kind="InferenceService", id=service.id), | |
| ), | |
| ) | |
| click.echo(submission.model_dump_json(indent=2)) | |
| click.echo(f"submission.id: \"{submission.id}\"") | |
| if __name__ == "__main__": | |
| cli(show_default=True) | |