Spaces:
Running
Running
| import os | |
| import time | |
| import subprocess | |
| import datetime | |
| import threading | |
| import tarfile | |
| import tempfile | |
| from pathlib import Path | |
| # ========================= | |
| # ENV VARIABLES | |
| # ========================= | |
| BACKUP_REPO = os.environ.get("BACKUP_REPO") | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| BACKUP_INTERVAL = int(os.environ.get("BACKUP_INTERVAL", "5")) * 60 | |
| CONFIG_BACKUP_INTERVAL = int(os.environ.get("CONFIG_BACKUP_INTERVAL", "60")) * 60 | |
| CONFIG_KEEP_LIMIT = int(os.environ.get("CONFIG_KEEP_LIMIT", "5")) | |
| # ========================= | |
| # PATHS | |
| # ========================= | |
| WORKSPACE_PATH = "/home/vscode/workspace" | |
| CONFIG_PATHS = [ | |
| "/home/vscode/.claude", | |
| "/home/vscode/.config/code-server/config.yaml", | |
| "/home/vscode/.local/share/code-server/extensions", | |
| "/home/vscode/.local/share/code-server/User", | |
| "/home/vscode/.local/share/code-server/Machine", | |
| "/home/vscode/.local/share/code-server/coder.json" | |
| ] | |
| WORKSPACE_EXCLUDES = ["__pycache__", ".git", "node_modules", ".DS_Store"] | |
| # ========================= | |
| # HELPERS | |
| # ========================= | |
| def hf_env(): | |
| env = os.environ.copy() | |
| env["HF_HOME"] = "/tmp/hf_cache" | |
| env["XDG_CACHE_HOME"] = "/tmp/xdg_cache" | |
| env["TMPDIR"] = "/tmp" | |
| env["HF_TOKEN"] = HF_TOKEN | |
| os.makedirs(env["HF_HOME"], exist_ok=True) | |
| os.makedirs(env["XDG_CACHE_HOME"], exist_ok=True) | |
| os.makedirs(env["TMPDIR"], exist_ok=True) | |
| return env | |
| def upload_folder(local_path, remote_path, excludes=None): | |
| env = hf_env() | |
| cmd = ["hf", "upload", BACKUP_REPO, local_path, remote_path, "--repo-type", "dataset"] | |
| if excludes: | |
| for e in excludes: | |
| cmd += ["--exclude", e] | |
| process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env) | |
| for line in iter(process.stdout.readline, ""): | |
| print("[Backup]", line.strip()) | |
| process.wait() | |
| def cleanup_old_backups(): | |
| try: | |
| env = hf_env() | |
| result = subprocess.run( | |
| ["hf", "repo-files", "list", BACKUP_REPO, "--repo-type", "dataset"], | |
| stdout=subprocess.PIPE, text=True, env=env | |
| ) | |
| lines = result.stdout.splitlines() | |
| configs = [l.strip() for l in lines if l.startswith("configs/configs_") and l.endswith(".tar.gz")] | |
| if len(configs) > CONFIG_KEEP_LIMIT: | |
| configs.sort(reverse=True) | |
| old = configs[CONFIG_KEEP_LIMIT:] | |
| print(f"[Cleanup] Found {len(old)} old config backups to delete.") | |
| for f in old: | |
| print(f"[Cleanup] Deleting {f}") | |
| subprocess.run(["hf", "repo-files", "delete", BACKUP_REPO, f, "--repo-type", "dataset"], | |
| env=env, check=False) | |
| else: | |
| print(f"[Cleanup] No cleanup needed. Total configs: {len(configs)}") | |
| except Exception as e: | |
| print("[Cleanup] Failed:", e) | |
| # ========================= | |
| # BACKUP ROUTINES | |
| # ========================= | |
| def run_workspace_backup(): | |
| while True: | |
| print("[Backup] Workspace backup started.") | |
| try: | |
| upload_folder(WORKSPACE_PATH, "workspace/", excludes=WORKSPACE_EXCLUDES) | |
| print("[Backup] Workspace backup completed at", datetime.datetime.utcnow().isoformat(), "UTC") | |
| except Exception as e: | |
| print("[Backup] Workspace backup failed:", e) | |
| time.sleep(BACKUP_INTERVAL) | |
| def run_config_backup(): | |
| while True: | |
| print("[Backup] Config backup started.") | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".tar.gz") as tmp: | |
| tar_path = tmp.name | |
| with tarfile.open(tar_path, "w:gz") as tar: | |
| for path in CONFIG_PATHS: | |
| if os.path.exists(path): | |
| arcname = os.path.relpath(path, "/home/vscode") | |
| tar.add(path, arcname=arcname) | |
| print(f"[Backup] Added {arcname}") | |
| timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%SZ") | |
| remote_path = f"configs/configs_{timestamp}.tar.gz" | |
| upload_folder(tar_path, remote_path) | |
| os.remove(tar_path) | |
| print("[Backup] Config backup completed at", timestamp) | |
| cleanup_old_backups() | |
| except Exception as e: | |
| print("[Backup] Config backup failed:", e) | |
| time.sleep(CONFIG_BACKUP_INTERVAL) | |
| # ========================= | |
| # MAIN ENTRY | |
| # ========================= | |
| if __name__ == "__main__": | |
| if not BACKUP_REPO or not HF_TOKEN: | |
| print("[Backup] BACKUP_REPO or HF_TOKEN not set! Exiting.") | |
| exit(0) | |
| print("[Backup] Starting dual backup threads...") | |
| threading.Thread(target=run_workspace_backup, daemon=True).start() | |
| threading.Thread(target=run_config_backup, daemon=True).start() | |
| while True: | |
| time.sleep(3600) | |