Ark-kun commited on
Commit
7c5d028
·
1 Parent(s): b9249db

feat: Support namespace in launcher

Browse files
huggingface_overlay/cloud_pipelines_backend/launchers/huggingface_launchers.py ADDED
@@ -0,0 +1,465 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import copy
2
+ import dataclasses
3
+ import datetime
4
+ import logging
5
+ import pathlib
6
+ import typing
7
+ from typing import Any, Optional
8
+
9
+ import huggingface_hub
10
+
11
+ from cloud_pipelines.orchestration.launchers import naming_utils
12
+ from ..storage_providers import huggingface_repo_storage
13
+ from .. import component_structures as structures
14
+ from . import container_component_utils
15
+ from . import interfaces
16
+
17
+
18
+ _logger = logging.getLogger(__name__)
19
+
20
+ _MAX_INPUT_VALUE_SIZE = 10000
21
+
22
+ _CONTAINER_FILE_NAME = "data"
23
+
24
+
25
+ class HuggingFaceJobsContainerLauncher(
26
+ interfaces.ContainerTaskLauncher["LaunchedHuggingFaceJobContainer"]
27
+ ):
28
+ """Launcher that uses HuggingFace Jobs installed locally"""
29
+
30
+ def __init__(
31
+ self,
32
+ *,
33
+ client: Optional[huggingface_hub.HfApi] = None,
34
+ namespace: Optional[str] = None,
35
+ hf_token: Optional[str] = None,
36
+ hf_job_token: Optional[str] = None,
37
+ job_timeout: Optional[int | float | str] = None,
38
+ ):
39
+ # The HF Jobs that we launch need token to write the output artifacts and logs
40
+ hf_token = hf_token or huggingface_hub.get_token()
41
+ hf_job_token = hf_job_token or hf_token
42
+ self._api_client = client or huggingface_hub.HfApi(token=hf_token)
43
+ self._namespace: str = namespace or self._api_client.whoami()["name"]
44
+ self._storage_provider = (
45
+ huggingface_repo_storage.HuggingFaceRepoStorageProvider()
46
+ )
47
+ self._job_timeout = job_timeout
48
+ self._hf_job_token = hf_job_token
49
+ # Test the access
50
+ # _ = self._api_client.list_models(filter="non-existing")
51
+ _ = self._api_client.list_jobs(namespace=self._namespace)
52
+
53
+ def launch_container_task(
54
+ self,
55
+ *,
56
+ component_spec: structures.ComponentSpec,
57
+ # Input arguments may be updated with new downloaded values and new URIs of uploaded values.
58
+ input_arguments: dict[str, interfaces.InputArgument],
59
+ output_uris: dict[str, str],
60
+ log_uri: str,
61
+ annotations: dict[str, Any] | None = None,
62
+ ) -> "LaunchedHuggingFaceJobContainer":
63
+ if not isinstance(
64
+ component_spec.implementation, structures.ContainerImplementation
65
+ ):
66
+ raise TypeError(
67
+ f"Container launchers only support container implementations. Got {component_spec=}"
68
+ )
69
+ container_spec = component_spec.implementation.container
70
+
71
+ # TODO: Validate the input/output URIs.
72
+ container_inputs_root = pathlib.PurePosixPath("/tmp/component/inputs")
73
+ container_outputs_root = pathlib.PurePosixPath("/tmp/component/outputs")
74
+
75
+ # download_input_uris: dict[huggingface_repo_storage.HuggingFaceRepoUri, str] = {}
76
+ # upload_output_uris: dict[str, huggingface_repo_storage.HuggingFaceRepoUri] = {}
77
+ download_input_uris: dict[str, str] = {}
78
+ upload_output_uris: dict[str, str] = {}
79
+ # TODO: Derive common prefix for the upload_output_uris (also log_uri) and upload everything at once
80
+
81
+ # Callbacks for the command-line resolving
82
+ # Their main purpose is to return input/output path or value.
83
+ # They add volumes and volume mounts when needed.
84
+ # They also upload/download artifact data when needed.
85
+ def get_input_value(input_name: str) -> str:
86
+ input_argument = input_arguments[input_name]
87
+ if input_argument.is_dir:
88
+ raise interfaces.LauncherError(
89
+ f"Cannot consume directory as value. {input_name=}, {input_argument=}"
90
+ )
91
+ if input_argument.total_size > _MAX_INPUT_VALUE_SIZE:
92
+ raise interfaces.LauncherError(
93
+ f"Artifact is too big to consume as value. Consume it as file instead. {input_name=}, {input_argument=}"
94
+ )
95
+ value = input_argument.value
96
+ if value is None:
97
+ # Download artifact data
98
+ if not input_argument.uri:
99
+ raise interfaces.LauncherError(
100
+ f"Artifact data has no value and no uri. This cannot happen. {input_name=}, {input_argument=}"
101
+ )
102
+ uri_reader = self._storage_provider.make_uri(
103
+ input_argument.uri
104
+ ).get_reader()
105
+ try:
106
+ data = uri_reader.download_as_bytes()
107
+ except Exception as ex:
108
+ raise interfaces.LauncherError(
109
+ f"Error downloading artifact data. {input_name=}, {input_argument.uri=}"
110
+ ) from ex
111
+ try:
112
+ value = data.decode("utf-8")
113
+ except Exception as ex:
114
+ raise interfaces.LauncherError(
115
+ f"Error converting artifact data to text. {input_name=}, {input_argument.uri=}"
116
+ ) from ex
117
+ # Updating the input_arguments with the downloaded value
118
+ input_argument.value = value
119
+ return value
120
+
121
+ def get_input_path(input_name: str) -> str:
122
+ input_argument = input_arguments[input_name]
123
+ uri = input_argument.uri
124
+ if not uri:
125
+ if input_argument.value is None:
126
+ raise interfaces.LauncherError(
127
+ f"Artifact data has no value and no uri. This cannot happen. {input_name=}, {input_argument=}"
128
+ )
129
+ uri_writer = self._storage_provider.make_uri(
130
+ input_argument.staging_uri
131
+ ).get_writer()
132
+ try:
133
+ uri_writer.upload_from_text(input_argument.value)
134
+ except Exception as ex:
135
+ raise interfaces.LauncherError(
136
+ f"Error uploading argument value. {input_name=}, {input_argument=}"
137
+ ) from ex
138
+ uri = input_argument.staging_uri
139
+ # Updating the input_arguments with the URI of the uploaded value
140
+ input_argument.uri = uri
141
+
142
+ container_path = (
143
+ container_inputs_root
144
+ / naming_utils.sanitize_file_name(input_name)
145
+ / _CONTAINER_FILE_NAME
146
+ ).as_posix()
147
+ # hf_uri = huggingface_repo_storage.HuggingFaceRepoUri.parse(uri)
148
+ download_input_uris[uri] = container_path
149
+ return container_path
150
+
151
+ def get_output_path(output_name: str) -> str:
152
+ uri = output_uris[output_name]
153
+ # container_path = (
154
+ # container_outputs_root
155
+ # / naming_utils.sanitize_file_name(output_name)
156
+ # / _CONTAINER_FILE_NAME
157
+ # ).as_posix()
158
+ hf_uri = huggingface_repo_storage.HuggingFaceRepoUri.parse(uri)
159
+ uri_path_in_repo = hf_uri.path
160
+ container_path = str(container_outputs_root / uri_path_in_repo)
161
+ upload_output_uris[container_path] = uri
162
+ return container_path
163
+
164
+ def get_log_path() -> str:
165
+ # TODO: Use common URI here
166
+ hf_uri = huggingface_repo_storage.HuggingFaceRepoUri.parse(log_uri)
167
+ uri_path_in_repo = hf_uri.path
168
+ container_path = str(container_outputs_root / uri_path_in_repo)
169
+ return container_path
170
+
171
+ def get_exit_code_path() -> str:
172
+ # TODO: Use common URI here
173
+ hf_uri = huggingface_repo_storage.HuggingFaceRepoUri.parse(log_uri)
174
+ uri_path_in_repo = hf_uri.path
175
+ container_path = str(
176
+ (container_outputs_root / uri_path_in_repo).with_name("exit_code.txt")
177
+ )
178
+ return container_path
179
+
180
+ container_log_path = get_log_path()
181
+ exit_code_path = get_exit_code_path()
182
+
183
+ # Resolving the command line.
184
+ # Also indirectly populates volumes and volume_mounts.
185
+ resolved_cmd = container_component_utils.resolve_container_command_line(
186
+ component_spec=component_spec,
187
+ provided_input_names=set(input_arguments.keys()),
188
+ get_input_value=get_input_value,
189
+ get_input_path=get_input_path,
190
+ get_output_path=get_output_path,
191
+ )
192
+
193
+ # Preparing the artifact uploader wrapper
194
+ # TODO: Use common URI here
195
+ # TODO: Add: --commit-message '{path_in_repo}' once path_in_repo becomes non-empty
196
+ path_in_repo = ""
197
+ # commit_message = path_in_repo
198
+
199
+ hf_repo_uri = huggingface_repo_storage.HuggingFaceRepoUri.parse(log_uri)
200
+
201
+ # It's hard to download data from HuggingFace.
202
+ # First, there is no way to download a directory:
203
+ # 1. The CLI only downloads to cache. So we have to correctly find the data location in the cache sna copy the data out.
204
+ # 2. We cannot specify which directory to download, so we have to use the --include filter.
205
+ # But `hf download --include path` has an issue that it cannot download a directory unless path ends with slash (and for files, there should be no slash).
206
+ # Adding "*" works for both files and directories. It's imperfect, but fine.
207
+ # Another problem: The files in the snapshot are actually symlinks.
208
+ # cp options:
209
+ # -H follow command-line symbolic links in SOURCE
210
+ # -l, --link hard link files instead of copying
211
+ # -L, --dereference always follow symbolic links in SOURCE
212
+ input_download_lines = [
213
+ f'mkdir -p "$(dirname "{container_path}")"'
214
+ f' && snapshot_dir=`uv run --with huggingface_hub[cli] hf download --repo-type "{hf_uri.repo_type}" "{hf_uri.repo_id}" --include "{hf_uri.path}*"`'
215
+ f' && cp -r -L "$snapshot_dir/{hf_uri.path}" "{container_path}"'
216
+ for hf_uri, container_path in (
217
+ (huggingface_repo_storage.HuggingFaceRepoUri.parse(uri), container_path)
218
+ for uri, container_path in download_input_uris.items()
219
+ )
220
+ ]
221
+ input_download_code = "\n".join(input_download_lines)
222
+
223
+ artifact_uploader_script = f"""
224
+ set -e -x -o pipefail
225
+ # Installing uv
226
+ url="https://astral.sh/uv/install.sh"
227
+ if command -v curl &>/dev/null; then
228
+ # -s: silent, -L: follow redirects
229
+ curl -s -L "$url" | sh
230
+ elif command -v wget &>/dev/null; then
231
+ wget -q -O - "$url" | sh
232
+ else
233
+ echo "Error: Neither curl nor wget found." >&2
234
+ exit 1
235
+ fi
236
+
237
+ export PATH="$HOME/.local/bin:$PATH"
238
+
239
+ uv run --with huggingface_hub[cli] hf version
240
+
241
+ # Downloading the input data
242
+ {input_download_code}
243
+
244
+ # Running the program
245
+ log_path='{container_log_path}'
246
+ exit_code_path='{exit_code_path}'
247
+ mkdir -p "$(dirname "$log_path")"
248
+ mkdir -p "$(dirname "$exit_code_path")"
249
+ # We need to capture the exit code while piping the stderr and stdout to a log file. Not all shells support `${{PIPEFAIL[0]}}`
250
+ set +e +x
251
+ {{ "$0" "$@"; echo $? >"$exit_code_path";}} 2>&1 | tee "$log_path"
252
+ set -e +x
253
+
254
+ exit_code=`cat "$exit_code_path"`
255
+
256
+ uv run --with huggingface_hub[cli] hf upload --repo-type '{hf_repo_uri.repo_type}' '{hf_repo_uri.repo_id}' '{container_outputs_root}' '{path_in_repo}'
257
+ exit "$exit_code"
258
+ """
259
+
260
+ container_env = container_spec.env or {}
261
+
262
+ # Passing HF token to the Job
263
+ secrets: dict[str, str] = {}
264
+ if self._hf_job_token:
265
+ secrets["HF_TOKEN"] = self._hf_job_token
266
+
267
+ command_line = list(resolved_cmd.command or []) + list(resolved_cmd.args or [])
268
+ command_line = ["sh", "-c", artifact_uploader_script] + command_line
269
+ job = self._api_client.run_job(
270
+ image=container_spec.image,
271
+ command=command_line,
272
+ env=dict(container_env),
273
+ timeout=self._job_timeout,
274
+ namespace=self._namespace,
275
+ secrets=secrets,
276
+ # flavor=...,
277
+ )
278
+
279
+ _logger.debug(f"Launched HF Job {job.id=}")
280
+ launched_container = LaunchedHuggingFaceJobContainer(
281
+ id=job.id,
282
+ namespace=namespace,
283
+ job=job,
284
+ output_uris=output_uris,
285
+ log_uri=log_uri,
286
+ )
287
+ return launched_container
288
+
289
+ def deserialize_launched_container_from_dict(
290
+ self, launched_container_dict: dict[str, Any]
291
+ ) -> "LaunchedHuggingFaceJobContainer":
292
+ launched_container = LaunchedHuggingFaceJobContainer.from_dict(
293
+ launched_container_dict, api_client=self._api_client
294
+ )
295
+ return launched_container
296
+
297
+ def get_refreshed_launched_container_from_dict(
298
+ self, launched_container_dict: dict[str, Any]
299
+ ) -> "LaunchedHuggingFaceJobContainer":
300
+ launched_container = LaunchedHuggingFaceJobContainer.from_dict(
301
+ launched_container_dict, api_client=self._api_client
302
+ )
303
+ job = self._api_client.inspect_job(
304
+ job_id=launched_container.id,
305
+ namespace=launched_container._namespace,
306
+ )
307
+ new_launched_container = copy.copy(launched_container)
308
+ new_launched_container._job = job
309
+ return new_launched_container
310
+
311
+
312
+ class LaunchedHuggingFaceJobContainer(interfaces.LaunchedContainer):
313
+ def __init__(
314
+ self,
315
+ id: str,
316
+ namespace: str,
317
+ job: huggingface_hub.JobInfo,
318
+ output_uris: dict[str, str],
319
+ log_uri: str,
320
+ api_client: huggingface_hub.HfApi | None = None,
321
+ ):
322
+ self._id: str = id
323
+ self._namespace: str = namespace
324
+ self._job = job
325
+ self._output_uris: dict[str, str] = output_uris
326
+ self._log_uri: str = log_uri
327
+ self._api_client: huggingface_hub.HfApi | None = api_client
328
+
329
+ def _get_api_client(self):
330
+ if not self._api_client:
331
+ raise interfaces.LauncherError(
332
+ "This action requires an API client, but this instance was constructed without one."
333
+ )
334
+ return self._api_client
335
+
336
+ @property
337
+ def id(self) -> str:
338
+ return self._id
339
+
340
+ @property
341
+ def status(self) -> interfaces.ContainerStatus:
342
+ status_str = self._job.status.stage
343
+ # status_message = self._job.status.message
344
+ if status_str == huggingface_hub.JobStage.RUNNING:
345
+ return interfaces.ContainerStatus.RUNNING
346
+ elif status_str == huggingface_hub.JobStage.COMPLETED:
347
+ return interfaces.ContainerStatus.SUCCEEDED
348
+ elif status_str == huggingface_hub.JobStage.ERROR:
349
+ return interfaces.ContainerStatus.FAILED
350
+ elif status_str == huggingface_hub.JobStage.CANCELED:
351
+ return interfaces.ContainerStatus.FAILED
352
+ else: # "DELETED"
353
+ return interfaces.ContainerStatus.ERROR
354
+
355
+ @property
356
+ def exit_code(self) -> Optional[int]:
357
+ # HF Jobs do not provide exit code
358
+ if not self.has_ended:
359
+ return None
360
+ return None
361
+
362
+ @property
363
+ def has_ended(self) -> bool:
364
+ return self.status in (
365
+ interfaces.ContainerStatus.SUCCEEDED,
366
+ interfaces.ContainerStatus.FAILED,
367
+ interfaces.ContainerStatus.ERROR,
368
+ )
369
+
370
+ @property
371
+ def has_succeeded(self) -> bool:
372
+ return self.status == interfaces.ContainerStatus.SUCCEEDED
373
+
374
+ @property
375
+ def has_failed(self) -> bool:
376
+ return self.status == interfaces.ContainerStatus.FAILED
377
+
378
+ @property
379
+ def started_at(self) -> datetime.datetime | None:
380
+ # HF Jobs do not provide started_at, so using created_at
381
+ return self._job.created_at
382
+
383
+ @property
384
+ def ended_at(self) -> datetime.datetime | None:
385
+ # HF Jobs do not provide ended_at
386
+ # Fudging the value by returning the current time.
387
+ if self.has_ended:
388
+ return datetime.datetime.now(datetime.timezone.utc)
389
+ return None
390
+
391
+ @property
392
+ def launcher_error_message(self) -> str | None:
393
+ if self._job.status.message:
394
+ # TODO: Check what kind of messages this returns and when.
395
+ _logger.debug(f"{self._job.status.message=}")
396
+ return self._job.status.message
397
+ return None
398
+
399
+ def get_log(self) -> str:
400
+ if self.has_ended:
401
+ try:
402
+ return (
403
+ huggingface_repo_storage.HuggingFaceRepoStorageProvider()
404
+ .make_uri(self._log_uri)
405
+ .get_reader()
406
+ .download_as_text()
407
+ )
408
+ except Exception as ex:
409
+ _logger.warning(f"Error getting log from URI:", ex)
410
+ return "\n".join(
411
+ self._get_api_client().fetch_job_logs(
412
+ job_id=self._id,
413
+ )
414
+ )
415
+
416
+ def upload_log(self):
417
+ # Logs should be uploaded automatically by the modified command-line wrapper
418
+ pass
419
+
420
+ def stream_log_lines(self) -> typing.Iterator[str]:
421
+ return (
422
+ self._get_api_client()
423
+ .fetch_job_logs(
424
+ job_id=self._id,
425
+ namespace=self._namespace,
426
+ )
427
+ .__iter__()
428
+ )
429
+
430
+ def terminate(self):
431
+ self._get_api_client().cancel_job(job_id=self._id, namespace=self._namespace)
432
+
433
+ def to_dict(self) -> dict[str, Any]:
434
+ debug_job_info = dataclasses.asdict(self._job)
435
+ # Fix JSON serialization of datetime
436
+ del debug_job_info["created_at"]
437
+ return dict(
438
+ huggingface_job=dict(
439
+ id=self.id,
440
+ namespace=self._namespace,
441
+ output_uris=self._output_uris,
442
+ log_uri=self._log_uri,
443
+ # For debugging purposes, not needed otherwise
444
+ debug_job_info=debug_job_info,
445
+ )
446
+ )
447
+
448
+ @classmethod
449
+ def from_dict(
450
+ cls,
451
+ d: dict[str, Any],
452
+ api_client: huggingface_hub.HfApi | None = None,
453
+ ) -> "LaunchedHuggingFaceJobContainer":
454
+ container_dict = d["huggingface_job"]
455
+ job_info = huggingface_hub.JobInfo(
456
+ **container_dict["debug_job_info"],
457
+ )
458
+ return LaunchedHuggingFaceJobContainer(
459
+ id=container_dict["id"],
460
+ namespace=container_dict["namespace"],
461
+ job=job_info,
462
+ output_uris=container_dict["output_uris"],
463
+ log_uri=container_dict["log_uri"],
464
+ api_client=api_client,
465
+ )