Spaces:
Running
Running
feat(sync): skip backup when no state changes detected
Browse filesAdd metadata marker (file count/size/mtime) and SHA-256 fingerprint
checks before uploading. State persisted to /tmp so subsequent sync
intervals within the same container run skip unnecessary HF uploads.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- deerflow-sync.py +73 -2
deerflow-sync.py
CHANGED
|
@@ -14,6 +14,7 @@ Usage:
|
|
| 14 |
deerflow-sync.py loop — sync-once on an interval (reads SYNC_INTERVAL env)
|
| 15 |
"""
|
| 16 |
|
|
|
|
| 17 |
import json
|
| 18 |
import os
|
| 19 |
import sys
|
|
@@ -34,8 +35,9 @@ DATA_DIR = Path(os.environ.get("DEER_FLOW_HOME", "/app/data"))
|
|
| 34 |
CONFIG_PATH = Path(os.environ.get("DEER_FLOW_CONFIG_PATH", DATA_DIR / "config.yaml"))
|
| 35 |
SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "600"))
|
| 36 |
|
| 37 |
-
ARCHIVE_NAME
|
| 38 |
-
SYNC_STATUS_FILE
|
|
|
|
| 39 |
|
| 40 |
# Files/dirs to include in the backup archive
|
| 41 |
BACKUP_TARGETS = [
|
|
@@ -53,6 +55,61 @@ _UPLOAD_MAX_ATTEMPTS = 4
|
|
| 53 |
_UPLOAD_BACKOFF_BASE = 3 # seconds — doubles each attempt: 3, 6, 12
|
| 54 |
|
| 55 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 56 |
def _write_status(status: str, message: str):
|
| 57 |
try:
|
| 58 |
payload = {
|
|
@@ -215,6 +272,19 @@ def sync_once():
|
|
| 215 |
repo_id = _resolve_repo_id(api)
|
| 216 |
_ensure_repo(api, repo_id)
|
| 217 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 218 |
with tempfile.TemporaryDirectory() as tmp:
|
| 219 |
archive = Path(tmp) / ARCHIVE_NAME
|
| 220 |
_make_archive(archive)
|
|
@@ -229,6 +299,7 @@ def sync_once():
|
|
| 229 |
_upload_with_retry(api, archive, repo_id)
|
| 230 |
log.info("State synced to %s (%d KB)", repo_id, size_kb)
|
| 231 |
_write_status("synced", f"Synced to {repo_id} ({size_kb} KB)")
|
|
|
|
| 232 |
except Exception as exc:
|
| 233 |
log.warning("Sync failed: %s", exc)
|
| 234 |
_write_status("error", f"Sync failed: {exc}")
|
|
|
|
| 14 |
deerflow-sync.py loop — sync-once on an interval (reads SYNC_INTERVAL env)
|
| 15 |
"""
|
| 16 |
|
| 17 |
+
import hashlib
|
| 18 |
import json
|
| 19 |
import os
|
| 20 |
import sys
|
|
|
|
| 35 |
CONFIG_PATH = Path(os.environ.get("DEER_FLOW_CONFIG_PATH", DATA_DIR / "config.yaml"))
|
| 36 |
SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "600"))
|
| 37 |
|
| 38 |
+
ARCHIVE_NAME = "deerflow-state.tar.gz"
|
| 39 |
+
SYNC_STATUS_FILE = "/tmp/huggingflow-sync-status.json"
|
| 40 |
+
SYNC_STATE_FILE = Path("/tmp/huggingflow-sync-state.json")
|
| 41 |
|
| 42 |
# Files/dirs to include in the backup archive
|
| 43 |
BACKUP_TARGETS = [
|
|
|
|
| 55 |
_UPLOAD_BACKOFF_BASE = 3 # seconds — doubles each attempt: 3, 6, 12
|
| 56 |
|
| 57 |
|
| 58 |
+
def _metadata_marker() -> tuple[int, int, int]:
|
| 59 |
+
fc = ts = nm = 0
|
| 60 |
+
for target in BACKUP_TARGETS:
|
| 61 |
+
if not target.exists():
|
| 62 |
+
continue
|
| 63 |
+
paths = [target] if target.is_file() else list(target.rglob("*"))
|
| 64 |
+
for path in paths:
|
| 65 |
+
if not path.is_file() or _should_exclude(path):
|
| 66 |
+
continue
|
| 67 |
+
try:
|
| 68 |
+
st = path.stat()
|
| 69 |
+
fc += 1
|
| 70 |
+
ts += int(st.st_size)
|
| 71 |
+
nm = max(nm, int(st.st_mtime_ns))
|
| 72 |
+
except OSError:
|
| 73 |
+
continue
|
| 74 |
+
return (fc, ts, nm)
|
| 75 |
+
|
| 76 |
+
|
| 77 |
+
def _fingerprint_targets() -> str:
|
| 78 |
+
hasher = hashlib.sha256()
|
| 79 |
+
for target in BACKUP_TARGETS:
|
| 80 |
+
if not target.exists():
|
| 81 |
+
continue
|
| 82 |
+
paths = [target] if target.is_file() else sorted(p for p in target.rglob("*") if p.is_file())
|
| 83 |
+
for path in paths:
|
| 84 |
+
if _should_exclude(path):
|
| 85 |
+
continue
|
| 86 |
+
hasher.update(str(path).encode("utf-8"))
|
| 87 |
+
with path.open("rb") as f:
|
| 88 |
+
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
| 89 |
+
hasher.update(chunk)
|
| 90 |
+
return hasher.hexdigest()
|
| 91 |
+
|
| 92 |
+
|
| 93 |
+
def _load_sync_state() -> tuple[str | None, tuple | None]:
|
| 94 |
+
try:
|
| 95 |
+
if SYNC_STATE_FILE.exists():
|
| 96 |
+
s = json.loads(SYNC_STATE_FILE.read_text())
|
| 97 |
+
fp = s.get("fingerprint")
|
| 98 |
+
m = s.get("marker")
|
| 99 |
+
if m and len(m) == 3:
|
| 100 |
+
return fp, tuple(m)
|
| 101 |
+
except Exception:
|
| 102 |
+
pass
|
| 103 |
+
return None, None
|
| 104 |
+
|
| 105 |
+
|
| 106 |
+
def _save_sync_state(fingerprint: str, marker: tuple) -> None:
|
| 107 |
+
try:
|
| 108 |
+
SYNC_STATE_FILE.write_text(json.dumps({"fingerprint": fingerprint, "marker": list(marker)}))
|
| 109 |
+
except Exception as exc:
|
| 110 |
+
log.debug("Could not save sync state: %s", exc)
|
| 111 |
+
|
| 112 |
+
|
| 113 |
def _write_status(status: str, message: str):
|
| 114 |
try:
|
| 115 |
payload = {
|
|
|
|
| 272 |
repo_id = _resolve_repo_id(api)
|
| 273 |
_ensure_repo(api, repo_id)
|
| 274 |
|
| 275 |
+
last_fp, last_marker = _load_sync_state()
|
| 276 |
+
current_marker = _metadata_marker()
|
| 277 |
+
if last_marker is not None and current_marker == last_marker:
|
| 278 |
+
log.info("No state changes detected.")
|
| 279 |
+
_write_status("synced", "No state changes detected.")
|
| 280 |
+
return
|
| 281 |
+
|
| 282 |
+
current_fp = _fingerprint_targets()
|
| 283 |
+
if last_fp is not None and current_fp == last_fp:
|
| 284 |
+
log.info("No state changes detected.")
|
| 285 |
+
_write_status("synced", "No state changes detected.")
|
| 286 |
+
return
|
| 287 |
+
|
| 288 |
with tempfile.TemporaryDirectory() as tmp:
|
| 289 |
archive = Path(tmp) / ARCHIVE_NAME
|
| 290 |
_make_archive(archive)
|
|
|
|
| 299 |
_upload_with_retry(api, archive, repo_id)
|
| 300 |
log.info("State synced to %s (%d KB)", repo_id, size_kb)
|
| 301 |
_write_status("synced", f"Synced to {repo_id} ({size_kb} KB)")
|
| 302 |
+
_save_sync_state(current_fp, current_marker)
|
| 303 |
except Exception as exc:
|
| 304 |
log.warning("Sync failed: %s", exc)
|
| 305 |
_write_status("error", f"Sync failed: {exc}")
|