File size: 8,705 Bytes
5d8d23e
 
c13c2a7
5d8d23e
 
1ca1144
5d8d23e
 
1ca1144
5d8d23e
 
6a3762e
 
 
5d8d23e
 
6a3762e
5d8d23e
 
 
 
 
 
6a3762e
5d8d23e
 
 
 
 
1ca1144
 
 
 
 
 
5d8d23e
1ca1144
 
5d8d23e
 
 
 
 
31f0064
5d8d23e
 
 
1ca1144
 
 
 
 
 
 
5d8d23e
6a3762e
 
 
 
 
 
 
 
 
 
 
 
5d8d23e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1ca1144
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5d8d23e
1ca1144
5d8d23e
 
1ca1144
 
 
 
 
5d8d23e
 
 
1ca1144
 
 
 
 
 
5d8d23e
 
 
 
 
1ca1144
 
 
 
 
 
5d8d23e
 
 
1ca1144
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5d8d23e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a3762e
5d8d23e
 
 
6a3762e
5d8d23e
 
 
 
6a3762e
5d8d23e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a3762e
5d8d23e
 
 
1ca1144
 
5d8d23e
6a3762e
5d8d23e
 
6a3762e
5d8d23e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
#!/usr/bin/env python3
"""
HuggingFlow state sync β€” backup/restore DeerFlow runtime data to/from HF Dataset.

Syncs:
  - deerflow.db      (SQLite thread/session database β€” WAL-checkpointed before archive)
  - config.yaml      (generated config, may contain user edits)
  - workspace/       (agent-created files in the sandbox workspace)
  - .secrets/        (persisted AUTH_JWT_SECRET and other generated secrets)

Usage:
  deerflow-sync.py restore    β€” restore from HF Dataset on startup
  deerflow-sync.py sync-once  β€” push current state to HF Dataset
  deerflow-sync.py loop       β€” sync-once on an interval (reads SYNC_INTERVAL env)
"""

import json
import os
import sys
import time
import tarfile
import tempfile
import logging
from datetime import datetime, timezone
from pathlib import Path

logging.basicConfig(level=logging.INFO, format="%(message)s")
log = logging.getLogger(__name__)

HF_TOKEN      = os.environ.get("HF_TOKEN", "")
BACKUP_REPO   = os.environ.get("BACKUP_DATASET_NAME", "huggingflow-backup")
HF_USERNAME   = os.environ.get("HF_USERNAME", "")
DATA_DIR      = Path(os.environ.get("DEER_FLOW_HOME", "/app/data"))
CONFIG_PATH   = Path(os.environ.get("DEER_FLOW_CONFIG_PATH", DATA_DIR / "config.yaml"))
SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "600"))

ARCHIVE_NAME     = "deerflow-state.tar.gz"
SYNC_STATUS_FILE = "/tmp/huggingflow-sync-status.json"

# Files/dirs to include in the backup archive
BACKUP_TARGETS = [
    DATA_DIR / "deerflow.db",
    DATA_DIR / "workspace",
    DATA_DIR / ".secrets",   # persists AUTH_JWT_SECRET across restarts
    CONFIG_PATH,
]

# SQLite auxiliary files that must NOT be archived (WAL/SHM are runtime-only)
_SQLITE_AUX_SUFFIXES = {".db-wal", ".db-shm"}

# Upload retry settings
_UPLOAD_MAX_ATTEMPTS = 4
_UPLOAD_BACKOFF_BASE = 3   # seconds β€” doubles each attempt: 3, 6, 12


def _write_status(status: str, message: str):
    try:
        payload = {
            "status": status,
            "message": message,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
        Path(SYNC_STATUS_FILE).write_text(json.dumps(payload))
    except Exception as exc:
        log.debug("Could not write sync status: %s", exc)


def _get_api():
    if not HF_TOKEN:
        raise RuntimeError("HF_TOKEN not set")
    from huggingface_hub import HfApi
    return HfApi(token=HF_TOKEN)


def _resolve_repo_id(api) -> str:
    if "/" in BACKUP_REPO:
        return BACKUP_REPO
    if HF_USERNAME:
        return f"{HF_USERNAME}/{BACKUP_REPO}"
    user = api.whoami()
    return f"{user['name']}/{BACKUP_REPO}"


def _ensure_repo(api, repo_id: str):
    from huggingface_hub import create_repo
    try:
        create_repo(
            repo_id=repo_id,
            repo_type="dataset",
            private=True,
            token=HF_TOKEN,
            exist_ok=True,
        )
    except Exception as exc:
        log.warning("Could not ensure dataset repo: %s", exc)


def _checkpoint_sqlite():
    """WAL-checkpoint the SQLite DB before archiving to get a clean snapshot."""
    db_path = DATA_DIR / "deerflow.db"
    if not db_path.exists():
        return
    try:
        import sqlite3
        with sqlite3.connect(str(db_path), timeout=10) as conn:
            conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
        log.debug("SQLite WAL checkpoint complete.")
    except Exception as exc:
        log.warning("SQLite checkpoint failed (continuing anyway): %s", exc)


def _should_exclude(path: Path) -> bool:
    """Return True for SQLite WAL/SHM files that must not be archived."""
    return path.suffix in _SQLITE_AUX_SUFFIXES


def _make_archive(dest: Path):
    _checkpoint_sqlite()
    with tarfile.open(dest, "w:gz") as tar:
        for target in BACKUP_TARGETS:
            if not target.exists():
                continue
            if target.is_file():
                if _should_exclude(target):
                    continue
                arcname = target.relative_to(DATA_DIR.parent)
                tar.add(target, arcname=str(arcname))
                log.debug("  + %s", arcname)
            elif target.is_dir():
                for child in sorted(target.rglob("*")):
                    if child.is_file() and not _should_exclude(child):
                        arcname = child.relative_to(DATA_DIR.parent)
                        tar.add(child, arcname=str(arcname))
                        log.debug("  + %s", arcname)


def _extract_archive(src: Path):
    extract_root = DATA_DIR.parent
    with tarfile.open(src, "r:gz") as tar:
        try:
            # Python 3.12+: use filter='data' for safe extraction
            tar.extractall(path=extract_root, filter="data")
        except TypeError:
            # Older Python without filter param
            tar.extractall(path=extract_root)  # noqa: S202
    log.info("Extracted state to %s", extract_root)


def _upload_with_retry(api, archive: Path, repo_id: str):
    """Upload archive to HF Dataset with exponential-backoff retries."""
    last_exc = None
    for attempt in range(1, _UPLOAD_MAX_ATTEMPTS + 1):
        try:
            api.upload_file(
                path_or_fileobj=str(archive),
                path_in_repo=ARCHIVE_NAME,
                repo_id=repo_id,
                repo_type="dataset",
                token=HF_TOKEN,
            )
            return  # success
        except Exception as exc:
            last_exc = exc
            if attempt < _UPLOAD_MAX_ATTEMPTS:
                wait = _UPLOAD_BACKOFF_BASE * (2 ** (attempt - 1))
                log.warning("Upload attempt %d/%d failed: %s β€” retrying in %ds",
                            attempt, _UPLOAD_MAX_ATTEMPTS, exc, wait)
                time.sleep(wait)
            else:
                log.warning("Upload failed after %d attempts: %s", _UPLOAD_MAX_ATTEMPTS, exc)
    raise last_exc


def restore():
    if not HF_TOKEN:
        log.info("No HF_TOKEN β€” skipping restore.")
        return

    try:
        api = _get_api()
        repo_id = _resolve_repo_id(api)
        _ensure_repo(api, repo_id)

        from huggingface_hub import hf_hub_download
        with tempfile.TemporaryDirectory() as tmp:
            try:
                local = hf_hub_download(
                    repo_id=repo_id,
                    filename=ARCHIVE_NAME,
                    repo_type="dataset",
                    token=HF_TOKEN,
                    local_dir=tmp,
                )
                _extract_archive(Path(local))
                log.info("State restored from %s", repo_id)
                _write_status("restored", f"State restored from {repo_id}")
            except Exception as exc:
                if "404" in str(exc) or "not found" in str(exc).lower() or "does not exist" in str(exc).lower():
                    log.info("No existing backup found in %s β€” starting fresh.", repo_id)
                    _write_status("configured", f"No backup yet in {repo_id}. First sync in {SYNC_INTERVAL}s.")
                else:
                    raise
    except Exception as exc:
        log.warning("Restore failed: %s", exc)
        _write_status("error", f"Restore failed: {exc}")
        raise


def sync_once():
    if not HF_TOKEN:
        return

    try:
        api = _get_api()
        repo_id = _resolve_repo_id(api)
        _ensure_repo(api, repo_id)

        with tempfile.TemporaryDirectory() as tmp:
            archive = Path(tmp) / ARCHIVE_NAME
            _make_archive(archive)

            if not archive.exists() or archive.stat().st_size == 0:
                log.info("Nothing to backup β€” skipping upload.")
                _write_status("synced", "Nothing to backup β€” skipping upload.")
                return

            size_kb = archive.stat().st_size // 1024
            log.info("Uploading state archive (%d KB) to %s...", size_kb, repo_id)
            _upload_with_retry(api, archive, repo_id)
            log.info("State synced to %s (%d KB)", repo_id, size_kb)
            _write_status("synced", f"Synced to {repo_id} ({size_kb} KB)")
    except Exception as exc:
        log.warning("Sync failed: %s", exc)
        _write_status("error", f"Sync failed: {exc}")


def loop():
    log.info("Starting periodic sync (interval: %ds)", SYNC_INTERVAL)
    while True:
        time.sleep(SYNC_INTERVAL)
        try:
            sync_once()
        except Exception as exc:
            log.warning("Periodic sync error: %s", exc)


if __name__ == "__main__":
    cmd = sys.argv[1] if len(sys.argv) > 1 else "help"
    if cmd == "restore":
        restore()
    elif cmd == "sync-once":
        sync_once()
    elif cmd == "loop":
        loop()
    else:
        print(__doc__)
        sys.exit(1)