File size: 3,604 Bytes
e53f10b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | """
Common utilities: logging, jsonl I/O, atomic write, VRAM check.
"""
import json
import logging
import os
import sys
import gc
from pathlib import Path
from datetime import datetime
def setup_logger(name: str, log_file: Path = None, level=logging.INFO):
"""
Configure logger to stdout + optional file. Returns logger.
Removes existing handlers to avoid duplicate logs on re-import.
"""
logger = logging.getLogger(name)
logger.setLevel(level)
logger.handlers = []
logger.propagate = False
fmt = logging.Formatter(
"[%(asctime)s] %(levelname)s %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(fmt)
logger.addHandler(sh)
if log_file is not None:
log_file = Path(log_file)
log_file.parent.mkdir(parents=True, exist_ok=True)
fh = logging.FileHandler(log_file, mode="a", encoding="utf-8")
fh.setFormatter(fmt)
logger.addHandler(fh)
return logger
def atomic_write_bytes(path: Path, data: bytes):
"""Atomic file write: write to .tmp then rename."""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "wb") as f:
f.write(data)
os.replace(tmp, path)
def write_jsonl(records, path: Path):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
for r in records:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
os.replace(tmp, path)
def read_jsonl(path: Path):
path = Path(path)
records = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
records.append(json.loads(line))
return records
def append_jsonl(record, path: Path):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
def write_json(obj, path: Path, indent=2):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=indent)
os.replace(tmp, path)
def read_json(path: Path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def get_vram_mb():
try:
import torch
if torch.cuda.is_available():
return torch.cuda.memory_allocated() / 1024**2
except Exception:
pass
return 0.0
def cleanup_memory():
"""Call after each heavy step."""
gc.collect()
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception:
pass
def ts():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def compute_completed_ids(path: Path):
"""For resume: read existing jsonl and return set of completed `idx` values."""
path = Path(path)
if not path.exists():
return set()
done = set()
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
if "idx" in obj:
done.add(obj["idx"])
except Exception:
continue
return done
|