Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import io | |
| import json | |
| import os | |
| import gzip | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any, Iterable, Iterator, Union, Optional, overload, List, Dict | |
| PathLike = Union[str, Path] | |
| class FileIOError(Exception): | |
| """基底例外:涵蓋讀寫與解析錯誤。""" | |
| class JSONDecodeError(FileIOError): | |
| """JSON 解析失敗。""" | |
| class AtomicWriter: | |
| """ | |
| 以原子方式寫檔:先寫到同目錄暫存檔,再 os.replace()。 | |
| 可搭配 gzip_mode=True 自動用 gzip 包裝。 | |
| """ | |
| def __init__(self, target: Path, gzip_mode: bool = False, encoding: str = "utf-8", newline: str = "\n"): | |
| self.target = target | |
| self.dir = target.parent | |
| self.encoding = encoding | |
| self.newline = newline | |
| self.gzip_mode = gzip_mode | |
| self._tmp_path: Optional[Path] = None | |
| self._fh: Optional[io.TextIOBase] = None | |
| def __enter__(self) -> io.TextIOBase: | |
| self.dir.mkdir(parents=True, exist_ok=True) | |
| fd, tmp_name = tempfile.mkstemp(prefix=f".{self.target.name}.", dir=str(self.dir)) | |
| os.close(fd) # 我們會用高階 file object 重新開 | |
| self._tmp_path = Path(tmp_name) | |
| if self.gzip_mode: | |
| f = gzip.open(self._tmp_path, mode="wt", encoding=self.encoding, newline=self.newline) | |
| else: | |
| f = open(self._tmp_path, mode="w", encoding=self.encoding, newline=self.newline) | |
| self._fh = f | |
| return f | |
| def __exit__(self, exc_type, exc, tb) -> None: | |
| if self._fh: | |
| self._fh.close() | |
| if exc_type is None: | |
| assert self._tmp_path is not None | |
| os.replace(self._tmp_path, self.target) | |
| else: | |
| # 發生錯誤就清掉暫存檔 | |
| if self._tmp_path and self._tmp_path.exists(): | |
| try: | |
| self._tmp_path.unlink() | |
| except Exception: | |
| pass # 忽略清理失敗 | |
| def _open_read(path: Path, encoding: str = "utf-8") -> io.TextIOBase: | |
| if path.suffix == ".gz": | |
| return gzip.open(path, mode="rt", encoding=encoding) | |
| return open(path, mode="r", encoding=encoding) | |
| def _needs_gzip(path: Path) -> bool: | |
| return path.suffix == ".gz" | |
| class JsonIO: | |
| """ | |
| JSON 單檔存取。 | |
| - 保留中文 ensure_ascii=False | |
| - 預設縮排 indent=2(可改) | |
| - 自動支援 .gz 讀/寫 | |
| """ | |
| def __init__(self, path: PathLike, encoding: str = "utf-8"): | |
| self.path = Path(path) | |
| self.encoding = encoding | |
| def load(self) -> Any: | |
| try: | |
| with _open_read(self.path, encoding=self.encoding) as f: | |
| return json.load(f) | |
| except json.JSONDecodeError as e: | |
| raise JSONDecodeError(f"JSON decode failed at {self.path}: {e}") from e | |
| except Exception as e: | |
| raise FileIOError(f"Failed to read {self.path}: {e}") from e | |
| def save(self, obj: Any, indent: int = 2, ensure_ascii: bool = False) -> None: | |
| gzip_mode = _needs_gzip(self.path) | |
| try: | |
| with AtomicWriter(self.path, gzip_mode=gzip_mode, encoding=self.encoding) as f: | |
| json.dump(obj, f, ensure_ascii=ensure_ascii, indent=indent) | |
| f.write("\n") | |
| except Exception as e: | |
| raise FileIOError(f"Failed to write {self.path}: {e}") from e | |
| class JsonlIO: | |
| """ | |
| JSONL(NDJSON)逐行存取。 | |
| - 迭代讀(省記憶體) | |
| - 覆寫寫入(原子) | |
| - append 寫入 | |
| - 自動支援 .gz | |
| """ | |
| def __init__(self, path: PathLike, encoding: str = "utf-8"): | |
| self.path = Path(path) | |
| self.encoding = encoding | |
| def iter_load(self) -> Iterator[Dict[str, Any]]: | |
| """逐行解析,遇到空白行自動略過。""" | |
| try: | |
| with _open_read(self.path, encoding=self.encoding) as f: | |
| for i, line in enumerate(f, start=1): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| yield json.loads(line) | |
| except json.JSONDecodeError as e: | |
| raise JSONDecodeError(f"JSONL decode failed at line {i} in {self.path}: {e}") from e | |
| except Exception as e: | |
| raise FileIOError(f"Failed to read {self.path}: {e}") from e | |
| def load_all(self) -> List[Dict[str, Any]]: | |
| """一次載入全部(小檔方便用)。""" | |
| return list(self.iter_load()) | |
| def save_all(self, rows: Iterable[Dict[str, Any]], ensure_ascii: bool = False) -> None: | |
| """覆寫寫入(原子)。""" | |
| gzip_mode = _needs_gzip(self.path) | |
| try: | |
| with AtomicWriter(self.path, gzip_mode=gzip_mode, encoding=self.encoding) as f: | |
| for row in rows: | |
| f.write(json.dumps(row, ensure_ascii=ensure_ascii)) | |
| f.write("\n") | |
| except Exception as e: | |
| raise FileIOError(f"Failed to write {self.path}: {e}") from e | |
| def append(self, rows: Iterable[Dict[str, Any]], ensure_ascii: bool = False) -> None: | |
| """附加寫入(非原子;適合持續追加的 log 類資料)。""" | |
| try: | |
| if _needs_gzip(self.path): | |
| # gzip 不支援原地 append 的隨機存取,這裡採「串流追加」 | |
| with gzip.open(self.path, mode="at", encoding=self.encoding, newline="\n") as f: | |
| for row in rows: | |
| f.write(json.dumps(row, ensure_ascii=ensure_ascii)) | |
| f.write("\n") | |
| else: | |
| self.path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(self.path, mode="a", encoding=self.encoding, newline="\n") as f: | |
| for row in rows: | |
| f.write(json.dumps(row, ensure_ascii=ensure_ascii)) | |
| f.write("\n") | |
| except Exception as e: | |
| raise FileIOError(f"Failed to append {self.path}: {e}") from e | |