Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Dict, Any, List | |
| from .logger import setup_logger | |
| logger = setup_logger(__name__) | |
| def _load_json(path: Path) -> Dict[str, Any]: | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| def _write_json(path: Path, obj: Dict[str, Any]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text( | |
| json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8" | |
| ) | |
| def run_step_manifest(step_dir: Path, executor) -> Dict[str, Any]: | |
| manifest_path = step_dir / "manifest.json" | |
| if not manifest_path.exists(): | |
| raise FileNotFoundError(f"manifest.json not found in {step_dir}") | |
| manifest = _load_json(manifest_path) | |
| result = executor(step_dir, manifest) | |
| # Basic contract | |
| result_path = step_dir / "result.json" | |
| _write_json(result_path, result) | |
| return result | |
| def resolve_prev_output_uri(prev_step_dir: Path, output_name: str) -> str: | |
| result_path = prev_step_dir / "result.json" | |
| if not result_path.exists(): | |
| raise FileNotFoundError(f"Missing result.json in {prev_step_dir}") | |
| res = _load_json(result_path) | |
| outs: List[Dict[str, Any]] = res.get("outputs") or [] | |
| for o in outs: | |
| if o.get("name") == output_name: | |
| return str(o.get("uri")) | |
| raise RuntimeError(f"Output '{output_name}' not found in {result_path}") | |