somratpro Claude Sonnet 4.6 commited on
Commit
1ca1144
·
1 Parent(s): 31f0064

fix: robust backup — retry on HF 500, SQLite WAL checkpoint, fix tar deprecation

Browse files

- _upload_with_retry(): 4 attempts with 3/6/12s exponential backoff — handles
transient HF 500 errors (was failing permanently on single attempt)
- _checkpoint_sqlite(): PRAGMA wal_checkpoint(TRUNCATE) before archiving so the
snapshot is consistent; WAL/SHM files excluded from archive entirely
- _make_archive(): explicit per-file iteration with _should_exclude() filter
instead of blind tar.add() which dragged in .db-wal and .db-shm
- _extract_archive(): use tar.extractall(filter='data') for Python 3.12+
to suppress DeprecationWarning about unfiltered tar extraction

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

Files changed (1) hide show
  1. deerflow-sync.py +81 -19
deerflow-sync.py CHANGED
@@ -3,9 +3,10 @@
3
  HuggingFlow state sync — backup/restore DeerFlow runtime data to/from HF Dataset.
4
 
5
  Syncs:
6
- - deerflow.db (SQLite thread/session database)
7
  - config.yaml (generated config, may contain user edits)
8
  - workspace/ (agent-created files in the sandbox workspace)
 
9
 
10
  Usage:
11
  deerflow-sync.py restore — restore from HF Dataset on startup
@@ -26,15 +27,15 @@ from pathlib import Path
26
  logging.basicConfig(level=logging.INFO, format="%(message)s")
27
  log = logging.getLogger(__name__)
28
 
29
- HF_TOKEN = os.environ.get("HF_TOKEN", "")
30
- BACKUP_REPO = os.environ.get("BACKUP_DATASET_NAME", "huggingflow-backup")
31
- HF_USERNAME = os.environ.get("HF_USERNAME", "")
32
- DATA_DIR = Path(os.environ.get("DEER_FLOW_HOME", "/app/data"))
33
- CONFIG_PATH = Path(os.environ.get("DEER_FLOW_CONFIG_PATH", DATA_DIR / "config.yaml"))
34
- SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "600"))
35
 
36
- ARCHIVE_NAME = "deerflow-state.tar.gz"
37
- SYNC_STATUS_FILE = "/tmp/huggingflow-sync-status.json"
38
 
39
  # Files/dirs to include in the backup archive
40
  BACKUP_TARGETS = [
@@ -44,6 +45,13 @@ BACKUP_TARGETS = [
44
  CONFIG_PATH,
45
  ]
46
 
 
 
 
 
 
 
 
47
 
48
  def _write_status(status: str, message: str):
49
  try:
@@ -87,23 +95,82 @@ def _ensure_repo(api, repo_id: str):
87
  log.warning("Could not ensure dataset repo: %s", exc)
88
 
89
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  def _make_archive(dest: Path):
 
91
  with tarfile.open(dest, "w:gz") as tar:
92
  for target in BACKUP_TARGETS:
93
- if target.exists():
 
 
 
 
94
  arcname = target.relative_to(DATA_DIR.parent)
95
  tar.add(target, arcname=str(arcname))
96
  log.debug(" + %s", arcname)
 
 
 
 
 
 
97
 
98
 
99
  def _extract_archive(src: Path):
100
  extract_root = DATA_DIR.parent
101
  with tarfile.open(src, "r:gz") as tar:
102
- for member in tar.getmembers():
103
- tar.extract(member, path=extract_root)
 
 
 
 
104
  log.info("Extracted state to %s", extract_root)
105
 
106
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
  def restore():
108
  if not HF_TOKEN:
109
  log.info("No HF_TOKEN — skipping restore.")
@@ -157,14 +224,9 @@ def sync_once():
157
  _write_status("synced", "Nothing to backup — skipping upload.")
158
  return
159
 
160
- api.upload_file(
161
- path_or_fileobj=str(archive),
162
- path_in_repo=ARCHIVE_NAME,
163
- repo_id=repo_id,
164
- repo_type="dataset",
165
- token=HF_TOKEN,
166
- )
167
  size_kb = archive.stat().st_size // 1024
 
 
168
  log.info("State synced to %s (%d KB)", repo_id, size_kb)
169
  _write_status("synced", f"Synced to {repo_id} ({size_kb} KB)")
170
  except Exception as exc:
 
3
  HuggingFlow state sync — backup/restore DeerFlow runtime data to/from HF Dataset.
4
 
5
  Syncs:
6
+ - deerflow.db (SQLite thread/session database — WAL-checkpointed before archive)
7
  - config.yaml (generated config, may contain user edits)
8
  - workspace/ (agent-created files in the sandbox workspace)
9
+ - .secrets/ (persisted AUTH_JWT_SECRET and other generated secrets)
10
 
11
  Usage:
12
  deerflow-sync.py restore — restore from HF Dataset on startup
 
27
  logging.basicConfig(level=logging.INFO, format="%(message)s")
28
  log = logging.getLogger(__name__)
29
 
30
+ HF_TOKEN = os.environ.get("HF_TOKEN", "")
31
+ BACKUP_REPO = os.environ.get("BACKUP_DATASET_NAME", "huggingflow-backup")
32
+ HF_USERNAME = os.environ.get("HF_USERNAME", "")
33
+ DATA_DIR = Path(os.environ.get("DEER_FLOW_HOME", "/app/data"))
34
+ CONFIG_PATH = Path(os.environ.get("DEER_FLOW_CONFIG_PATH", DATA_DIR / "config.yaml"))
35
+ SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "600"))
36
 
37
+ ARCHIVE_NAME = "deerflow-state.tar.gz"
38
+ SYNC_STATUS_FILE = "/tmp/huggingflow-sync-status.json"
39
 
40
  # Files/dirs to include in the backup archive
41
  BACKUP_TARGETS = [
 
45
  CONFIG_PATH,
46
  ]
47
 
48
+ # SQLite auxiliary files that must NOT be archived (WAL/SHM are runtime-only)
49
+ _SQLITE_AUX_SUFFIXES = {".db-wal", ".db-shm"}
50
+
51
+ # Upload retry settings
52
+ _UPLOAD_MAX_ATTEMPTS = 4
53
+ _UPLOAD_BACKOFF_BASE = 3 # seconds — doubles each attempt: 3, 6, 12
54
+
55
 
56
  def _write_status(status: str, message: str):
57
  try:
 
95
  log.warning("Could not ensure dataset repo: %s", exc)
96
 
97
 
98
+ def _checkpoint_sqlite():
99
+ """WAL-checkpoint the SQLite DB before archiving to get a clean snapshot."""
100
+ db_path = DATA_DIR / "deerflow.db"
101
+ if not db_path.exists():
102
+ return
103
+ try:
104
+ import sqlite3
105
+ with sqlite3.connect(str(db_path), timeout=10) as conn:
106
+ conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
107
+ log.debug("SQLite WAL checkpoint complete.")
108
+ except Exception as exc:
109
+ log.warning("SQLite checkpoint failed (continuing anyway): %s", exc)
110
+
111
+
112
+ def _should_exclude(path: Path) -> bool:
113
+ """Return True for SQLite WAL/SHM files that must not be archived."""
114
+ return path.suffix in _SQLITE_AUX_SUFFIXES
115
+
116
+
117
  def _make_archive(dest: Path):
118
+ _checkpoint_sqlite()
119
  with tarfile.open(dest, "w:gz") as tar:
120
  for target in BACKUP_TARGETS:
121
+ if not target.exists():
122
+ continue
123
+ if target.is_file():
124
+ if _should_exclude(target):
125
+ continue
126
  arcname = target.relative_to(DATA_DIR.parent)
127
  tar.add(target, arcname=str(arcname))
128
  log.debug(" + %s", arcname)
129
+ elif target.is_dir():
130
+ for child in sorted(target.rglob("*")):
131
+ if child.is_file() and not _should_exclude(child):
132
+ arcname = child.relative_to(DATA_DIR.parent)
133
+ tar.add(child, arcname=str(arcname))
134
+ log.debug(" + %s", arcname)
135
 
136
 
137
  def _extract_archive(src: Path):
138
  extract_root = DATA_DIR.parent
139
  with tarfile.open(src, "r:gz") as tar:
140
+ try:
141
+ # Python 3.12+: use filter='data' for safe extraction
142
+ tar.extractall(path=extract_root, filter="data")
143
+ except TypeError:
144
+ # Older Python without filter param
145
+ tar.extractall(path=extract_root) # noqa: S202
146
  log.info("Extracted state to %s", extract_root)
147
 
148
 
149
+ def _upload_with_retry(api, archive: Path, repo_id: str):
150
+ """Upload archive to HF Dataset with exponential-backoff retries."""
151
+ last_exc = None
152
+ for attempt in range(1, _UPLOAD_MAX_ATTEMPTS + 1):
153
+ try:
154
+ api.upload_file(
155
+ path_or_fileobj=str(archive),
156
+ path_in_repo=ARCHIVE_NAME,
157
+ repo_id=repo_id,
158
+ repo_type="dataset",
159
+ token=HF_TOKEN,
160
+ )
161
+ return # success
162
+ except Exception as exc:
163
+ last_exc = exc
164
+ if attempt < _UPLOAD_MAX_ATTEMPTS:
165
+ wait = _UPLOAD_BACKOFF_BASE * (2 ** (attempt - 1))
166
+ log.warning("Upload attempt %d/%d failed: %s — retrying in %ds",
167
+ attempt, _UPLOAD_MAX_ATTEMPTS, exc, wait)
168
+ time.sleep(wait)
169
+ else:
170
+ log.warning("Upload failed after %d attempts: %s", _UPLOAD_MAX_ATTEMPTS, exc)
171
+ raise last_exc
172
+
173
+
174
  def restore():
175
  if not HF_TOKEN:
176
  log.info("No HF_TOKEN — skipping restore.")
 
224
  _write_status("synced", "Nothing to backup — skipping upload.")
225
  return
226
 
 
 
 
 
 
 
 
227
  size_kb = archive.stat().st_size // 1024
228
+ log.info("Uploading state archive (%d KB) to %s...", size_kb, repo_id)
229
+ _upload_with_retry(api, archive, repo_id)
230
  log.info("State synced to %s (%d KB)", repo_id, size_kb)
231
  _write_status("synced", f"Synced to {repo_id} ({size_kb} KB)")
232
  except Exception as exc: