File size: 1,910 Bytes
11ac7be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from __future__ import annotations

import json
import os
from dataclasses import asdict, is_dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, Optional


class ArtifactStore:
    def __init__(self, run_dir: Path) -> None:
        self.run_dir = run_dir
        self.run_dir.mkdir(parents=True, exist_ok=True)

    @staticmethod
    def create(base_dir: Path) -> "ArtifactStore":
        ts = datetime.now().strftime("%Y%m%d-%H%M%S")
        run_dir = base_dir / f"{ts}-{os.getpid()}"
        return ArtifactStore(run_dir)

    def path(self, name: str) -> Path:
        return self.run_dir / name

    def write_text(self, name: str, text: str) -> Path:
        p = self.path(name)
        p.write_text(text, encoding="utf-8")
        return p

    def write_json(self, name: str, obj: Any) -> Path:
        payload: Any = obj
        if is_dataclass(obj):
            payload = asdict(obj)
        p = self.path(name)
        p.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
        return p

    def write_jsonl(self, name: str, rows: Iterable[Dict[str, Any]]) -> Path:
        p = self.path(name)
        with p.open("w", encoding="utf-8") as f:
            for row in rows:
                f.write(json.dumps(row, ensure_ascii=False) + "\n")
        return p

    def append_jsonl(self, name: str, row: Dict[str, Any]) -> Path:
        p = self.path(name)
        with p.open("a", encoding="utf-8") as f:
            f.write(json.dumps(row, ensure_ascii=False) + "\n")
        return p

    def read_json(self, name: str) -> Dict[str, Any]:
        p = self.path(name)
        return json.loads(p.read_text(encoding="utf-8"))

    def maybe_read_text(self, name: str) -> Optional[str]:
        p = self.path(name)
        if not p.exists():
            return None
        return p.read_text(encoding="utf-8")