study_prompt / fileio /jsonio.py
陳聖勳
initial commit
03189b2
from __future__ import annotations
import io
import json
import os
import gzip
import tempfile
from pathlib import Path
from typing import Any, Iterable, Iterator, Union, Optional, overload, List, Dict
PathLike = Union[str, Path]
class FileIOError(Exception):
"""基底例外:涵蓋讀寫與解析錯誤。"""
class JSONDecodeError(FileIOError):
"""JSON 解析失敗。"""
class AtomicWriter:
"""
以原子方式寫檔:先寫到同目錄暫存檔,再 os.replace()。
可搭配 gzip_mode=True 自動用 gzip 包裝。
"""
def __init__(self, target: Path, gzip_mode: bool = False, encoding: str = "utf-8", newline: str = "\n"):
self.target = target
self.dir = target.parent
self.encoding = encoding
self.newline = newline
self.gzip_mode = gzip_mode
self._tmp_path: Optional[Path] = None
self._fh: Optional[io.TextIOBase] = None
def __enter__(self) -> io.TextIOBase:
self.dir.mkdir(parents=True, exist_ok=True)
fd, tmp_name = tempfile.mkstemp(prefix=f".{self.target.name}.", dir=str(self.dir))
os.close(fd) # 我們會用高階 file object 重新開
self._tmp_path = Path(tmp_name)
if self.gzip_mode:
f = gzip.open(self._tmp_path, mode="wt", encoding=self.encoding, newline=self.newline)
else:
f = open(self._tmp_path, mode="w", encoding=self.encoding, newline=self.newline)
self._fh = f
return f
def __exit__(self, exc_type, exc, tb) -> None:
if self._fh:
self._fh.close()
if exc_type is None:
assert self._tmp_path is not None
os.replace(self._tmp_path, self.target)
else:
# 發生錯誤就清掉暫存檔
if self._tmp_path and self._tmp_path.exists():
try:
self._tmp_path.unlink()
except Exception:
pass # 忽略清理失敗
def _open_read(path: Path, encoding: str = "utf-8") -> io.TextIOBase:
if path.suffix == ".gz":
return gzip.open(path, mode="rt", encoding=encoding)
return open(path, mode="r", encoding=encoding)
def _needs_gzip(path: Path) -> bool:
return path.suffix == ".gz"
class JsonIO:
"""
JSON 單檔存取。
- 保留中文 ensure_ascii=False
- 預設縮排 indent=2(可改)
- 自動支援 .gz 讀/寫
"""
def __init__(self, path: PathLike, encoding: str = "utf-8"):
self.path = Path(path)
self.encoding = encoding
def load(self) -> Any:
try:
with _open_read(self.path, encoding=self.encoding) as f:
return json.load(f)
except json.JSONDecodeError as e:
raise JSONDecodeError(f"JSON decode failed at {self.path}: {e}") from e
except Exception as e:
raise FileIOError(f"Failed to read {self.path}: {e}") from e
def save(self, obj: Any, indent: int = 2, ensure_ascii: bool = False) -> None:
gzip_mode = _needs_gzip(self.path)
try:
with AtomicWriter(self.path, gzip_mode=gzip_mode, encoding=self.encoding) as f:
json.dump(obj, f, ensure_ascii=ensure_ascii, indent=indent)
f.write("\n")
except Exception as e:
raise FileIOError(f"Failed to write {self.path}: {e}") from e
class JsonlIO:
"""
JSONL(NDJSON)逐行存取。
- 迭代讀(省記憶體)
- 覆寫寫入(原子)
- append 寫入
- 自動支援 .gz
"""
def __init__(self, path: PathLike, encoding: str = "utf-8"):
self.path = Path(path)
self.encoding = encoding
def iter_load(self) -> Iterator[Dict[str, Any]]:
"""逐行解析,遇到空白行自動略過。"""
try:
with _open_read(self.path, encoding=self.encoding) as f:
for i, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
raise JSONDecodeError(f"JSONL decode failed at line {i} in {self.path}: {e}") from e
except Exception as e:
raise FileIOError(f"Failed to read {self.path}: {e}") from e
def load_all(self) -> List[Dict[str, Any]]:
"""一次載入全部(小檔方便用)。"""
return list(self.iter_load())
def save_all(self, rows: Iterable[Dict[str, Any]], ensure_ascii: bool = False) -> None:
"""覆寫寫入(原子)。"""
gzip_mode = _needs_gzip(self.path)
try:
with AtomicWriter(self.path, gzip_mode=gzip_mode, encoding=self.encoding) as f:
for row in rows:
f.write(json.dumps(row, ensure_ascii=ensure_ascii))
f.write("\n")
except Exception as e:
raise FileIOError(f"Failed to write {self.path}: {e}") from e
def append(self, rows: Iterable[Dict[str, Any]], ensure_ascii: bool = False) -> None:
"""附加寫入(非原子;適合持續追加的 log 類資料)。"""
try:
if _needs_gzip(self.path):
# gzip 不支援原地 append 的隨機存取,這裡採「串流追加」
with gzip.open(self.path, mode="at", encoding=self.encoding, newline="\n") as f:
for row in rows:
f.write(json.dumps(row, ensure_ascii=ensure_ascii))
f.write("\n")
else:
self.path.parent.mkdir(parents=True, exist_ok=True)
with open(self.path, mode="a", encoding=self.encoding, newline="\n") as f:
for row in rows:
f.write(json.dumps(row, ensure_ascii=ensure_ascii))
f.write("\n")
except Exception as e:
raise FileIOError(f"Failed to append {self.path}: {e}") from e