Spaces:
Sleeping
Sleeping
Antigravity AI Agent
Optimize performance, caching, payload transfer, and complete dark mode visual redesign with Dubai Customs custom features
e565aa7 | import json | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from threading import RLock | |
| class ProgressService: | |
| def __init__(self, path: Path): | |
| self.path = path | |
| self.lock = RLock() | |
| self._cache = None | |
| self._mtime = 0.0 | |
| def get(self) -> dict: | |
| try: | |
| mtime = self.path.stat().st_mtime | |
| except FileNotFoundError: | |
| mtime = 0.0 | |
| if self._cache is None or mtime > self._mtime: | |
| with self.lock: | |
| try: | |
| self._cache = json.loads(self.path.read_text(encoding="utf-8")) | |
| except FileNotFoundError: | |
| self._cache = {} | |
| self._mtime = mtime | |
| return self._cache | |
| def update(self, **changes) -> dict: | |
| with self.lock: | |
| data = dict(self.get()) | |
| data.update(changes) | |
| data["last_updated"] = datetime.now(timezone.utc).isoformat() | |
| self.path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | |
| self._cache = data | |
| try: | |
| self._mtime = self.path.stat().st_mtime | |
| except FileNotFoundError: | |
| self._mtime = 0.0 | |
| return data | |