File size: 11,161 Bytes
5d8d23e
 
c13c2a7
5d8d23e
 
1ca1144
5d8d23e
 
1ca1144
5d8d23e
 
6a3762e
 
 
5d8d23e
 
d89d9d5
6a3762e
5d8d23e
 
 
 
 
 
6a3762e
5d8d23e
 
 
 
 
1ca1144
 
 
 
 
 
5d8d23e
d89d9d5
 
 
5d8d23e
 
 
 
 
31f0064
5d8d23e
 
 
1ca1144
 
 
 
 
 
 
5d8d23e
d89d9d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a3762e
 
 
 
 
 
 
 
 
 
 
 
5d8d23e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1ca1144
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5d8d23e
1ca1144
5d8d23e
 
1ca1144
 
 
 
 
5d8d23e
 
 
1ca1144
 
 
 
 
 
5d8d23e
 
 
 
 
1ca1144
 
 
 
 
 
5d8d23e
 
 
1ca1144
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5d8d23e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a3762e
5d8d23e
 
 
6a3762e
5d8d23e
 
 
 
6a3762e
5d8d23e
 
 
 
 
 
 
 
 
 
 
 
d89d9d5
 
 
 
 
 
 
 
 
 
 
 
 
5d8d23e
 
 
 
 
 
6a3762e
5d8d23e
 
 
1ca1144
 
5d8d23e
6a3762e
d89d9d5
5d8d23e
 
6a3762e
5d8d23e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
#!/usr/bin/env python3
"""
HuggingFlow state sync β€” backup/restore DeerFlow runtime data to/from HF Dataset.

Syncs:
  - deerflow.db      (SQLite thread/session database β€” WAL-checkpointed before archive)
  - config.yaml      (generated config, may contain user edits)
  - workspace/       (agent-created files in the sandbox workspace)
  - .secrets/        (persisted AUTH_JWT_SECRET and other generated secrets)

Usage:
  deerflow-sync.py restore    β€” restore from HF Dataset on startup
  deerflow-sync.py sync-once  β€” push current state to HF Dataset
  deerflow-sync.py loop       β€” sync-once on an interval (reads SYNC_INTERVAL env)
"""

import hashlib
import json
import os
import sys
import time
import tarfile
import tempfile
import logging
from datetime import datetime, timezone
from pathlib import Path

logging.basicConfig(level=logging.INFO, format="%(message)s")
log = logging.getLogger(__name__)

HF_TOKEN      = os.environ.get("HF_TOKEN", "")
BACKUP_REPO   = os.environ.get("BACKUP_DATASET_NAME", "huggingflow-backup")
HF_USERNAME   = os.environ.get("HF_USERNAME", "")
DATA_DIR      = Path(os.environ.get("DEER_FLOW_HOME", "/app/data"))
CONFIG_PATH   = Path(os.environ.get("DEER_FLOW_CONFIG_PATH", DATA_DIR / "config.yaml"))
SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "600"))

ARCHIVE_NAME      = "deerflow-state.tar.gz"
SYNC_STATUS_FILE  = "/tmp/huggingflow-sync-status.json"
SYNC_STATE_FILE   = Path("/tmp/huggingflow-sync-state.json")

# Files/dirs to include in the backup archive
BACKUP_TARGETS = [
    DATA_DIR / "deerflow.db",
    DATA_DIR / "workspace",
    DATA_DIR / ".secrets",   # persists AUTH_JWT_SECRET across restarts
    CONFIG_PATH,
]

# SQLite auxiliary files that must NOT be archived (WAL/SHM are runtime-only)
_SQLITE_AUX_SUFFIXES = {".db-wal", ".db-shm"}

# Upload retry settings
_UPLOAD_MAX_ATTEMPTS = 4
_UPLOAD_BACKOFF_BASE = 3   # seconds β€” doubles each attempt: 3, 6, 12


def _metadata_marker() -> tuple[int, int, int]:
    fc = ts = nm = 0
    for target in BACKUP_TARGETS:
        if not target.exists():
            continue
        paths = [target] if target.is_file() else list(target.rglob("*"))
        for path in paths:
            if not path.is_file() or _should_exclude(path):
                continue
            try:
                st = path.stat()
                fc += 1
                ts += int(st.st_size)
                nm = max(nm, int(st.st_mtime_ns))
            except OSError:
                continue
    return (fc, ts, nm)


def _fingerprint_targets() -> str:
    hasher = hashlib.sha256()
    for target in BACKUP_TARGETS:
        if not target.exists():
            continue
        paths = [target] if target.is_file() else sorted(p for p in target.rglob("*") if p.is_file())
        for path in paths:
            if _should_exclude(path):
                continue
            hasher.update(str(path).encode("utf-8"))
            with path.open("rb") as f:
                for chunk in iter(lambda: f.read(1024 * 1024), b""):
                    hasher.update(chunk)
    return hasher.hexdigest()


def _load_sync_state() -> tuple[str | None, tuple | None]:
    try:
        if SYNC_STATE_FILE.exists():
            s = json.loads(SYNC_STATE_FILE.read_text())
            fp = s.get("fingerprint")
            m = s.get("marker")
            if m and len(m) == 3:
                return fp, tuple(m)
    except Exception:
        pass
    return None, None


def _save_sync_state(fingerprint: str, marker: tuple) -> None:
    try:
        SYNC_STATE_FILE.write_text(json.dumps({"fingerprint": fingerprint, "marker": list(marker)}))
    except Exception as exc:
        log.debug("Could not save sync state: %s", exc)


def _write_status(status: str, message: str):
    try:
        payload = {
            "status": status,
            "message": message,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
        Path(SYNC_STATUS_FILE).write_text(json.dumps(payload))
    except Exception as exc:
        log.debug("Could not write sync status: %s", exc)


def _get_api():
    if not HF_TOKEN:
        raise RuntimeError("HF_TOKEN not set")
    from huggingface_hub import HfApi
    return HfApi(token=HF_TOKEN)


def _resolve_repo_id(api) -> str:
    if "/" in BACKUP_REPO:
        return BACKUP_REPO
    if HF_USERNAME:
        return f"{HF_USERNAME}/{BACKUP_REPO}"
    user = api.whoami()
    return f"{user['name']}/{BACKUP_REPO}"


def _ensure_repo(api, repo_id: str):
    from huggingface_hub import create_repo
    try:
        create_repo(
            repo_id=repo_id,
            repo_type="dataset",
            private=True,
            token=HF_TOKEN,
            exist_ok=True,
        )
    except Exception as exc:
        log.warning("Could not ensure dataset repo: %s", exc)


def _checkpoint_sqlite():
    """WAL-checkpoint the SQLite DB before archiving to get a clean snapshot."""
    db_path = DATA_DIR / "deerflow.db"
    if not db_path.exists():
        return
    try:
        import sqlite3
        with sqlite3.connect(str(db_path), timeout=10) as conn:
            conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
        log.debug("SQLite WAL checkpoint complete.")
    except Exception as exc:
        log.warning("SQLite checkpoint failed (continuing anyway): %s", exc)


def _should_exclude(path: Path) -> bool:
    """Return True for SQLite WAL/SHM files that must not be archived."""
    return path.suffix in _SQLITE_AUX_SUFFIXES


def _make_archive(dest: Path):
    _checkpoint_sqlite()
    with tarfile.open(dest, "w:gz") as tar:
        for target in BACKUP_TARGETS:
            if not target.exists():
                continue
            if target.is_file():
                if _should_exclude(target):
                    continue
                arcname = target.relative_to(DATA_DIR.parent)
                tar.add(target, arcname=str(arcname))
                log.debug("  + %s", arcname)
            elif target.is_dir():
                for child in sorted(target.rglob("*")):
                    if child.is_file() and not _should_exclude(child):
                        arcname = child.relative_to(DATA_DIR.parent)
                        tar.add(child, arcname=str(arcname))
                        log.debug("  + %s", arcname)


def _extract_archive(src: Path):
    extract_root = DATA_DIR.parent
    with tarfile.open(src, "r:gz") as tar:
        try:
            # Python 3.12+: use filter='data' for safe extraction
            tar.extractall(path=extract_root, filter="data")
        except TypeError:
            # Older Python without filter param
            tar.extractall(path=extract_root)  # noqa: S202
    log.info("Extracted state to %s", extract_root)


def _upload_with_retry(api, archive: Path, repo_id: str):
    """Upload archive to HF Dataset with exponential-backoff retries."""
    last_exc = None
    for attempt in range(1, _UPLOAD_MAX_ATTEMPTS + 1):
        try:
            api.upload_file(
                path_or_fileobj=str(archive),
                path_in_repo=ARCHIVE_NAME,
                repo_id=repo_id,
                repo_type="dataset",
                token=HF_TOKEN,
            )
            return  # success
        except Exception as exc:
            last_exc = exc
            if attempt < _UPLOAD_MAX_ATTEMPTS:
                wait = _UPLOAD_BACKOFF_BASE * (2 ** (attempt - 1))
                log.warning("Upload attempt %d/%d failed: %s β€” retrying in %ds",
                            attempt, _UPLOAD_MAX_ATTEMPTS, exc, wait)
                time.sleep(wait)
            else:
                log.warning("Upload failed after %d attempts: %s", _UPLOAD_MAX_ATTEMPTS, exc)
    raise last_exc


def restore():
    if not HF_TOKEN:
        log.info("No HF_TOKEN β€” skipping restore.")
        return

    try:
        api = _get_api()
        repo_id = _resolve_repo_id(api)
        _ensure_repo(api, repo_id)

        from huggingface_hub import hf_hub_download
        with tempfile.TemporaryDirectory() as tmp:
            try:
                local = hf_hub_download(
                    repo_id=repo_id,
                    filename=ARCHIVE_NAME,
                    repo_type="dataset",
                    token=HF_TOKEN,
                    local_dir=tmp,
                )
                _extract_archive(Path(local))
                log.info("State restored from %s", repo_id)
                _write_status("restored", f"State restored from {repo_id}")
            except Exception as exc:
                if "404" in str(exc) or "not found" in str(exc).lower() or "does not exist" in str(exc).lower():
                    log.info("No existing backup found in %s β€” starting fresh.", repo_id)
                    _write_status("configured", f"No backup yet in {repo_id}. First sync in {SYNC_INTERVAL}s.")
                else:
                    raise
    except Exception as exc:
        log.warning("Restore failed: %s", exc)
        _write_status("error", f"Restore failed: {exc}")
        raise


def sync_once():
    if not HF_TOKEN:
        return

    try:
        api = _get_api()
        repo_id = _resolve_repo_id(api)
        _ensure_repo(api, repo_id)

        last_fp, last_marker = _load_sync_state()
        current_marker = _metadata_marker()
        if last_marker is not None and current_marker == last_marker:
            log.info("No state changes detected.")
            _write_status("synced", "No state changes detected.")
            return

        current_fp = _fingerprint_targets()
        if last_fp is not None and current_fp == last_fp:
            log.info("No state changes detected.")
            _write_status("synced", "No state changes detected.")
            return

        with tempfile.TemporaryDirectory() as tmp:
            archive = Path(tmp) / ARCHIVE_NAME
            _make_archive(archive)

            if not archive.exists() or archive.stat().st_size == 0:
                log.info("Nothing to backup β€” skipping upload.")
                _write_status("synced", "Nothing to backup β€” skipping upload.")
                return

            size_kb = archive.stat().st_size // 1024
            log.info("Uploading state archive (%d KB) to %s...", size_kb, repo_id)
            _upload_with_retry(api, archive, repo_id)
            log.info("State synced to %s (%d KB)", repo_id, size_kb)
            _write_status("synced", f"Synced to {repo_id} ({size_kb} KB)")
            _save_sync_state(current_fp, current_marker)
    except Exception as exc:
        log.warning("Sync failed: %s", exc)
        _write_status("error", f"Sync failed: {exc}")


def loop():
    log.info("Starting periodic sync (interval: %ds)", SYNC_INTERVAL)
    while True:
        time.sleep(SYNC_INTERVAL)
        try:
            sync_once()
        except Exception as exc:
            log.warning("Periodic sync error: %s", exc)


if __name__ == "__main__":
    cmd = sys.argv[1] if len(sys.argv) > 1 else "help"
    if cmd == "restore":
        restore()
    elif cmd == "sync-once":
        sync_once()
    elif cmd == "loop":
        loop()
    else:
        print(__doc__)
        sys.exit(1)