zeroshotGPU / zsgdp /gpu /runner.py
Arjunvir Singh
Initial commit: zeroshotGPU MVP with full eval surface
db06ffa
"""Dry-run execution for planned GPU task manifests."""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
from zsgdp.gpu.batching import batch_gpu_tasks
from zsgdp.gpu.runtime import collect_gpu_runtime_status
from zsgdp.gpu.worker import GPUWorker
from zsgdp.utils import write_json
def load_gpu_tasks(path: str | Path) -> list[dict[str, Any]]:
task_path = _task_manifest_path(path)
if not task_path.exists():
raise FileNotFoundError(f"Missing GPU task manifest: {task_path}")
return [
json.loads(line)
for line in task_path.read_text(encoding="utf-8").splitlines()
if line.strip()
]
def run_gpu_task_manifest(
input_path: str | Path,
*,
config: dict[str, Any],
output_path: str | Path | None = None,
dry_run: bool = True,
) -> dict[str, Any]:
tasks = load_gpu_tasks(input_path)
report = dry_run_gpu_tasks(tasks, config=config, dry_run=dry_run)
if output_path is not None:
write_json(output_path, report)
return report
def dry_run_gpu_tasks(
tasks: list[dict[str, Any]],
*,
config: dict[str, Any],
dry_run: bool = True,
runtime_status: dict[str, Any] | None = None,
) -> dict[str, Any]:
runtime = runtime_status or collect_gpu_runtime_status(config).to_dict()
max_batch_size = int(config.get("gpu", {}).get("max_batch_size", runtime.get("max_batch_size", 4)))
batches = batch_gpu_tasks(tasks, max_batch_size=max_batch_size)
worker = GPUWorker(config)
batch_results = [worker.run_batch(batch, dry_run=dry_run) for batch in batches]
ready_count = sum(result["ready_count"] for result in batch_results)
blocked_count = sum(result["blocked_count"] for result in batch_results)
executed_count = sum(result.get("executed_count", 0) for result in batch_results)
failed_count = sum(result.get("failed_count", 0) for result in batch_results)
report = {
"dry_run": dry_run,
"task_count": len(tasks),
"batch_count": len(batches),
"ready_count": ready_count,
"blocked_count": blocked_count,
"executed_count": executed_count,
"failed_count": failed_count,
"runtime": runtime,
"batches": batch_results,
}
return report
def _task_manifest_path(path: str | Path) -> Path:
path_obj = Path(path)
if path_obj.is_dir():
return path_obj / "gpu_tasks.jsonl"
return path_obj