FalconScan / backend /progress_service.py
Antigravity AI Agent
Optimize performance, caching, payload transfer, and complete dark mode visual redesign with Dubai Customs custom features
e565aa7
Raw
History Blame Contribute Delete
1.26 kB
import json
from datetime import datetime, timezone
from pathlib import Path
from threading import RLock
class ProgressService:
def __init__(self, path: Path):
self.path = path
self.lock = RLock()
self._cache = None
self._mtime = 0.0
def get(self) -> dict:
try:
mtime = self.path.stat().st_mtime
except FileNotFoundError:
mtime = 0.0
if self._cache is None or mtime > self._mtime:
with self.lock:
try:
self._cache = json.loads(self.path.read_text(encoding="utf-8"))
except FileNotFoundError:
self._cache = {}
self._mtime = mtime
return self._cache
def update(self, **changes) -> dict:
with self.lock:
data = dict(self.get())
data.update(changes)
data["last_updated"] = datetime.now(timezone.utc).isoformat()
self.path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
self._cache = data
try:
self._mtime = self.path.stat().st_mtime
except FileNotFoundError:
self._mtime = 0.0
return data