Spaces:
Running on Zero
Running on Zero
| """Dry-run execution for planned GPU task manifests.""" | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| from zsgdp.gpu.batching import batch_gpu_tasks | |
| from zsgdp.gpu.runtime import collect_gpu_runtime_status | |
| from zsgdp.gpu.worker import GPUWorker | |
| from zsgdp.utils import write_json | |
| def load_gpu_tasks(path: str | Path) -> list[dict[str, Any]]: | |
| task_path = _task_manifest_path(path) | |
| if not task_path.exists(): | |
| raise FileNotFoundError(f"Missing GPU task manifest: {task_path}") | |
| return [ | |
| json.loads(line) | |
| for line in task_path.read_text(encoding="utf-8").splitlines() | |
| if line.strip() | |
| ] | |
| def run_gpu_task_manifest( | |
| input_path: str | Path, | |
| *, | |
| config: dict[str, Any], | |
| output_path: str | Path | None = None, | |
| dry_run: bool = True, | |
| ) -> dict[str, Any]: | |
| tasks = load_gpu_tasks(input_path) | |
| report = dry_run_gpu_tasks(tasks, config=config, dry_run=dry_run) | |
| if output_path is not None: | |
| write_json(output_path, report) | |
| return report | |
| def dry_run_gpu_tasks( | |
| tasks: list[dict[str, Any]], | |
| *, | |
| config: dict[str, Any], | |
| dry_run: bool = True, | |
| runtime_status: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| runtime = runtime_status or collect_gpu_runtime_status(config).to_dict() | |
| max_batch_size = int(config.get("gpu", {}).get("max_batch_size", runtime.get("max_batch_size", 4))) | |
| batches = batch_gpu_tasks(tasks, max_batch_size=max_batch_size) | |
| worker = GPUWorker(config) | |
| batch_results = [worker.run_batch(batch, dry_run=dry_run) for batch in batches] | |
| ready_count = sum(result["ready_count"] for result in batch_results) | |
| blocked_count = sum(result["blocked_count"] for result in batch_results) | |
| executed_count = sum(result.get("executed_count", 0) for result in batch_results) | |
| failed_count = sum(result.get("failed_count", 0) for result in batch_results) | |
| report = { | |
| "dry_run": dry_run, | |
| "task_count": len(tasks), | |
| "batch_count": len(batches), | |
| "ready_count": ready_count, | |
| "blocked_count": blocked_count, | |
| "executed_count": executed_count, | |
| "failed_count": failed_count, | |
| "runtime": runtime, | |
| "batches": batch_results, | |
| } | |
| return report | |
| def _task_manifest_path(path: str | Path) -> Path: | |
| path_obj = Path(path) | |
| if path_obj.is_dir(): | |
| return path_obj / "gpu_tasks.jsonl" | |
| return path_obj | |