#!/usr/bin/env python3 """ HuggingFlow state sync — backup/restore DeerFlow runtime data to/from HF Dataset. Syncs: - deerflow.db (SQLite thread/session database — WAL-checkpointed before archive) - config.yaml (generated config, may contain user edits) - workspace/ (agent-created files in the sandbox workspace) - .secrets/ (persisted AUTH_JWT_SECRET and other generated secrets) Usage: deerflow-sync.py restore — restore from HF Dataset on startup deerflow-sync.py sync-once — push current state to HF Dataset deerflow-sync.py loop — sync-once on an interval (reads SYNC_INTERVAL env) """ import json import os import sys import time import tarfile import tempfile import logging from datetime import datetime, timezone from pathlib import Path logging.basicConfig(level=logging.INFO, format="%(message)s") log = logging.getLogger(__name__) HF_TOKEN = os.environ.get("HF_TOKEN", "") BACKUP_REPO = os.environ.get("BACKUP_DATASET_NAME", "huggingflow-backup") HF_USERNAME = os.environ.get("HF_USERNAME", "") DATA_DIR = Path(os.environ.get("DEER_FLOW_HOME", "/app/data")) CONFIG_PATH = Path(os.environ.get("DEER_FLOW_CONFIG_PATH", DATA_DIR / "config.yaml")) SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL", "600")) ARCHIVE_NAME = "deerflow-state.tar.gz" SYNC_STATUS_FILE = "/tmp/huggingflow-sync-status.json" # Files/dirs to include in the backup archive BACKUP_TARGETS = [ DATA_DIR / "deerflow.db", DATA_DIR / "workspace", DATA_DIR / ".secrets", # persists AUTH_JWT_SECRET across restarts CONFIG_PATH, ] # SQLite auxiliary files that must NOT be archived (WAL/SHM are runtime-only) _SQLITE_AUX_SUFFIXES = {".db-wal", ".db-shm"} # Upload retry settings _UPLOAD_MAX_ATTEMPTS = 4 _UPLOAD_BACKOFF_BASE = 3 # seconds — doubles each attempt: 3, 6, 12 def _write_status(status: str, message: str): try: payload = { "status": status, "message": message, "timestamp": datetime.now(timezone.utc).isoformat(), } Path(SYNC_STATUS_FILE).write_text(json.dumps(payload)) except Exception as exc: log.debug("Could not write sync status: %s", exc) def _get_api(): if not HF_TOKEN: raise RuntimeError("HF_TOKEN not set") from huggingface_hub import HfApi return HfApi(token=HF_TOKEN) def _resolve_repo_id(api) -> str: if "/" in BACKUP_REPO: return BACKUP_REPO if HF_USERNAME: return f"{HF_USERNAME}/{BACKUP_REPO}" user = api.whoami() return f"{user['name']}/{BACKUP_REPO}" def _ensure_repo(api, repo_id: str): from huggingface_hub import create_repo try: create_repo( repo_id=repo_id, repo_type="dataset", private=True, token=HF_TOKEN, exist_ok=True, ) except Exception as exc: log.warning("Could not ensure dataset repo: %s", exc) def _checkpoint_sqlite(): """WAL-checkpoint the SQLite DB before archiving to get a clean snapshot.""" db_path = DATA_DIR / "deerflow.db" if not db_path.exists(): return try: import sqlite3 with sqlite3.connect(str(db_path), timeout=10) as conn: conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") log.debug("SQLite WAL checkpoint complete.") except Exception as exc: log.warning("SQLite checkpoint failed (continuing anyway): %s", exc) def _should_exclude(path: Path) -> bool: """Return True for SQLite WAL/SHM files that must not be archived.""" return path.suffix in _SQLITE_AUX_SUFFIXES def _make_archive(dest: Path): _checkpoint_sqlite() with tarfile.open(dest, "w:gz") as tar: for target in BACKUP_TARGETS: if not target.exists(): continue if target.is_file(): if _should_exclude(target): continue arcname = target.relative_to(DATA_DIR.parent) tar.add(target, arcname=str(arcname)) log.debug(" + %s", arcname) elif target.is_dir(): for child in sorted(target.rglob("*")): if child.is_file() and not _should_exclude(child): arcname = child.relative_to(DATA_DIR.parent) tar.add(child, arcname=str(arcname)) log.debug(" + %s", arcname) def _extract_archive(src: Path): extract_root = DATA_DIR.parent with tarfile.open(src, "r:gz") as tar: try: # Python 3.12+: use filter='data' for safe extraction tar.extractall(path=extract_root, filter="data") except TypeError: # Older Python without filter param tar.extractall(path=extract_root) # noqa: S202 log.info("Extracted state to %s", extract_root) def _upload_with_retry(api, archive: Path, repo_id: str): """Upload archive to HF Dataset with exponential-backoff retries.""" last_exc = None for attempt in range(1, _UPLOAD_MAX_ATTEMPTS + 1): try: api.upload_file( path_or_fileobj=str(archive), path_in_repo=ARCHIVE_NAME, repo_id=repo_id, repo_type="dataset", token=HF_TOKEN, ) return # success except Exception as exc: last_exc = exc if attempt < _UPLOAD_MAX_ATTEMPTS: wait = _UPLOAD_BACKOFF_BASE * (2 ** (attempt - 1)) log.warning("Upload attempt %d/%d failed: %s — retrying in %ds", attempt, _UPLOAD_MAX_ATTEMPTS, exc, wait) time.sleep(wait) else: log.warning("Upload failed after %d attempts: %s", _UPLOAD_MAX_ATTEMPTS, exc) raise last_exc def restore(): if not HF_TOKEN: log.info("No HF_TOKEN — skipping restore.") return try: api = _get_api() repo_id = _resolve_repo_id(api) _ensure_repo(api, repo_id) from huggingface_hub import hf_hub_download with tempfile.TemporaryDirectory() as tmp: try: local = hf_hub_download( repo_id=repo_id, filename=ARCHIVE_NAME, repo_type="dataset", token=HF_TOKEN, local_dir=tmp, ) _extract_archive(Path(local)) log.info("State restored from %s", repo_id) _write_status("restored", f"State restored from {repo_id}") except Exception as exc: if "404" in str(exc) or "not found" in str(exc).lower() or "does not exist" in str(exc).lower(): log.info("No existing backup found in %s — starting fresh.", repo_id) _write_status("configured", f"No backup yet in {repo_id}. First sync in {SYNC_INTERVAL}s.") else: raise except Exception as exc: log.warning("Restore failed: %s", exc) _write_status("error", f"Restore failed: {exc}") raise def sync_once(): if not HF_TOKEN: return try: api = _get_api() repo_id = _resolve_repo_id(api) _ensure_repo(api, repo_id) with tempfile.TemporaryDirectory() as tmp: archive = Path(tmp) / ARCHIVE_NAME _make_archive(archive) if not archive.exists() or archive.stat().st_size == 0: log.info("Nothing to backup — skipping upload.") _write_status("synced", "Nothing to backup — skipping upload.") return size_kb = archive.stat().st_size // 1024 log.info("Uploading state archive (%d KB) to %s...", size_kb, repo_id) _upload_with_retry(api, archive, repo_id) log.info("State synced to %s (%d KB)", repo_id, size_kb) _write_status("synced", f"Synced to {repo_id} ({size_kb} KB)") except Exception as exc: log.warning("Sync failed: %s", exc) _write_status("error", f"Sync failed: {exc}") def loop(): log.info("Starting periodic sync (interval: %ds)", SYNC_INTERVAL) while True: time.sleep(SYNC_INTERVAL) try: sync_once() except Exception as exc: log.warning("Periodic sync error: %s", exc) if __name__ == "__main__": cmd = sys.argv[1] if len(sys.argv) > 1 else "help" if cmd == "restore": restore() elif cmd == "sync-once": sync_once() elif cmd == "loop": loop() else: print(__doc__) sys.exit(1)