Spaces:
Sleeping
Sleeping
File size: 1,255 Bytes
ca4cce7 e565aa7 ca4cce7 e565aa7 ca4cce7 e565aa7 ca4cce7 e565aa7 ca4cce7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | import json
from datetime import datetime, timezone
from pathlib import Path
from threading import RLock
class ProgressService:
def __init__(self, path: Path):
self.path = path
self.lock = RLock()
self._cache = None
self._mtime = 0.0
def get(self) -> dict:
try:
mtime = self.path.stat().st_mtime
except FileNotFoundError:
mtime = 0.0
if self._cache is None or mtime > self._mtime:
with self.lock:
try:
self._cache = json.loads(self.path.read_text(encoding="utf-8"))
except FileNotFoundError:
self._cache = {}
self._mtime = mtime
return self._cache
def update(self, **changes) -> dict:
with self.lock:
data = dict(self.get())
data.update(changes)
data["last_updated"] = datetime.now(timezone.utc).isoformat()
self.path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
self._cache = data
try:
self._mtime = self.path.stat().st_mtime
except FileNotFoundError:
self._mtime = 0.0
return data
|