"""Dry-run execution for planned GPU task manifests.""" from __future__ import annotations import json from pathlib import Path from typing import Any from zsgdp.gpu.batching import batch_gpu_tasks from zsgdp.gpu.runtime import collect_gpu_runtime_status from zsgdp.gpu.worker import GPUWorker from zsgdp.utils import write_json def load_gpu_tasks(path: str | Path) -> list[dict[str, Any]]: task_path = _task_manifest_path(path) if not task_path.exists(): raise FileNotFoundError(f"Missing GPU task manifest: {task_path}") return [ json.loads(line) for line in task_path.read_text(encoding="utf-8").splitlines() if line.strip() ] def run_gpu_task_manifest( input_path: str | Path, *, config: dict[str, Any], output_path: str | Path | None = None, dry_run: bool = True, ) -> dict[str, Any]: tasks = load_gpu_tasks(input_path) report = dry_run_gpu_tasks(tasks, config=config, dry_run=dry_run) if output_path is not None: write_json(output_path, report) return report def dry_run_gpu_tasks( tasks: list[dict[str, Any]], *, config: dict[str, Any], dry_run: bool = True, runtime_status: dict[str, Any] | None = None, ) -> dict[str, Any]: runtime = runtime_status or collect_gpu_runtime_status(config).to_dict() max_batch_size = int(config.get("gpu", {}).get("max_batch_size", runtime.get("max_batch_size", 4))) batches = batch_gpu_tasks(tasks, max_batch_size=max_batch_size) worker = GPUWorker(config) batch_results = [worker.run_batch(batch, dry_run=dry_run) for batch in batches] ready_count = sum(result["ready_count"] for result in batch_results) blocked_count = sum(result["blocked_count"] for result in batch_results) executed_count = sum(result.get("executed_count", 0) for result in batch_results) failed_count = sum(result.get("failed_count", 0) for result in batch_results) report = { "dry_run": dry_run, "task_count": len(tasks), "batch_count": len(batches), "ready_count": ready_count, "blocked_count": blocked_count, "executed_count": executed_count, "failed_count": failed_count, "runtime": runtime, "batches": batch_results, } return report def _task_manifest_path(path: str | Path) -> Path: path_obj = Path(path) if path_obj.is_dir(): return path_obj / "gpu_tasks.jsonl" return path_obj