Spaces:
Running
Running
| import json | |
| from pathlib import Path | |
| from typing import Any, Dict, Iterable, Optional | |
| import pandas as pd | |
| from .contracts import default_run_id | |
| def make_run_dir(run_id: Optional[str] = None, base_dir: str = "runs") -> Path: | |
| rid = run_id or default_run_id("run") | |
| out = Path(base_dir) / rid | |
| out.mkdir(parents=True, exist_ok=True) | |
| return out | |
| def write_json(path: Path, data: Any) -> Path: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(data, indent=2), encoding="utf-8") | |
| return path | |
| def write_markdown(path: Path, text: str) -> Path: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(text or "", encoding="utf-8") | |
| return path | |
| def append_jsonl(path: Path, row: Dict[str, Any]) -> Path: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with path.open("a", encoding="utf-8") as f: | |
| f.write(json.dumps(row, ensure_ascii=True) + "\n") | |
| return path | |
| def write_jsonl(path: Path, rows: Iterable[Dict[str, Any]]) -> Path: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with path.open("w", encoding="utf-8") as f: | |
| for row in rows: | |
| f.write(json.dumps(row, ensure_ascii=True) + "\n") | |
| return path | |
| def write_dataframe_csv(path: Path, df: pd.DataFrame) -> Path: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| df.to_csv(path, index=False) | |
| return path | |
| def load_json(path: Path) -> Any: | |
| return json.loads(path.read_text(encoding="utf-8")) | |