import os import time import tarfile import subprocess from huggingface_hub import HfApi, hf_hub_download, login # ================= CONFIG ================= DATASET = "Vvbbnnn/Vps-tar" ROOT_TAR = "/tmp/root.tar.gz" ROOT_FILE = "root.tar.gz" APT_FILE = "apt.txt" PIP_FILE = "pip.txt" SYNC_INTERVAL = 3600 # ๐Ÿ” Auth HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: raise ValueError("โŒ HF_TOKEN not set") login(token=HF_TOKEN) api = HfApi() # ================= HELPERS ================= # Use API to check existence instead of downloading full file def exists(filename): try: info = api.get_paths_info( repo_id=DATASET, repo_type="dataset", paths=[filename] ) return len(list(info)) > 0 except Exception: return False # ---------- SAVE DEPS ---------- def save_apt(): print("๐Ÿ“„ Saving apt...") subprocess.run( "dpkg --get-selections | grep -w 'install$' | awk '{print $1}' > /tmp/apt.txt", shell=True ) def save_pip(): print("๐Ÿ“„ Saving pip...") subprocess.run("/root/venv/bin/pip freeze > /tmp/pip.txt", shell=True) # ---------- FILTER ---------- def should_skip(path): p = path.lower() if "node_modules" in p: return True if "npm" in p: return True if "node" in p: return True if "venv" in p: return True if "__pycache__" in p: return True if ".git" in p: return True return False # ---------- CREATE TAR ---------- def create_tar(): print("๐Ÿ“ฆ Creating ROOT backup (strict clean)...") def filter_fn(tarinfo): if should_skip(tarinfo.name): return None return tarinfo with tarfile.open(ROOT_TAR, "w:gz") as tar: tar.add("/root", arcname="root", filter=filter_fn) size = os.path.getsize(ROOT_TAR) / 1024 / 1024 print(f"โœ… root.tar created ({size:.2f} MB)") return True # ---------- UPLOAD ---------- def upload(local, remote): print(f"โฌ†๏ธ Uploading {remote}") api.upload_file( path_or_fileobj=local, path_in_repo=remote, repo_id=DATASET, repo_type="dataset" ) print(f"โœ… Uploaded {remote}") # ---------- DOWNLOAD ---------- def download(remote, local): print(f"โฌ‡๏ธ Downloading {remote}") try: local_dir = os.path.dirname(local) hf_hub_download( repo_id=DATASET, repo_type="dataset", filename=remote, local_dir=local_dir, local_dir_use_symlinks=False ) return True except Exception as e: print(f"โŒ Download failed for {remote}: {e}") return False # ---------- SMART RESTORE ---------- def get_tar_paths(tar_path): """ Returns a set of absolute paths that exist inside the tar. Tar stores entries as e.g. 'root/subdir/file' โ†’ '/root/subdir/file' """ paths = set() with tarfile.open(tar_path, "r:gz") as tar: for member in tar.getmembers(): # arcname was "root", so members look like "root/..." โ†’ "/root/..." abs_path = "/" + member.name.lstrip("/") paths.add(abs_path.rstrip("/")) return paths def smart_delete_root(tar_paths): """ Walk /root and delete ONLY files/folders that do NOT exist in the tar. Skips protected paths (node/venv/npm etc). Uses topdown=False so deepest items are checked first, allowing empty dirs to be cleaned up safely. """ print("๐Ÿงน Removing files not present in tar...") to_delete = [] for dirpath, dirnames, filenames in os.walk("/root", topdown=False): # Check files for fname in filenames: full_path = os.path.join(dirpath, fname).rstrip("/") if should_skip(full_path): continue if full_path not in tar_paths: to_delete.append(full_path) # Check directories for dname in dirnames: full_path = os.path.join(dirpath, dname).rstrip("/") if should_skip(full_path): continue if full_path not in tar_paths: to_delete.append(full_path) for path in to_delete: print(f" ๐Ÿ—‘๏ธ {path}") subprocess.run(["rm", "-rf", path]) print(f"โœ… Removed {len(to_delete)} items not in tar") def restore_root(): """ Smart restore: 1. Read all paths that exist inside the tar 2. Delete ONLY /root items NOT in the tar (no full wipe) 3. Extract tar โ†’ overwrites changed files, adds missing ones """ print("๐Ÿ”„ Smart restoring /root...") tar_paths = get_tar_paths(ROOT_TAR) smart_delete_root(tar_paths) print("๐Ÿ“‚ Extracting tar...") with tarfile.open(ROOT_TAR, "r:gz") as tar: try: # Python 3.12+ safe extraction filter tar.extractall("/", filter="data") except TypeError: # Fallback for older Python versions tar.extractall("/") print("โœ… Root restore complete") # ---------- RESTORE DEPS ---------- def restore_apt(): print("๐Ÿ”„ Restoring apt...") subprocess.run("apt-get update", shell=True) subprocess.run("xargs -a /tmp/apt.txt apt-get install -y", shell=True) def restore_pip(): print("๐Ÿ”„ Restoring pip...") subprocess.run("/root/venv/bin/pip install -r /tmp/pip.txt", shell=True) def restore_node(): print("๐Ÿ”„ Restoring node...") subprocess.run( ["npm", "install", "--omit=dev"], cwd="/root/app" ) # ================= SYNC ================= def sync(): print("๐Ÿ”„ Sync started") save_apt() save_pip() upload("/tmp/apt.txt", APT_FILE) upload("/tmp/pip.txt", PIP_FILE) create_tar() upload(ROOT_TAR, ROOT_FILE) # Clean up all temp files for tmp_file in [ROOT_TAR, "/tmp/apt.txt", "/tmp/pip.txt"]: if os.path.exists(tmp_file): os.remove(tmp_file) print(f"๐Ÿงน Cleaned up {tmp_file}") print("โœ… Sync complete") # ================= MAIN ================= def main(): print("๐Ÿš€ Full ROOT Backup System Started") first_run = not ( exists(ROOT_FILE) and exists(APT_FILE) and exists(PIP_FILE) ) # ---------- FIRST RUN ---------- if first_run: print("๐Ÿ†• First run โ†’ creating initial backup") sync() # ---------- RESTORE ---------- else: print("โ˜๏ธ Restoring from dataset...") if not download(APT_FILE, "/tmp/apt.txt"): raise RuntimeError("โŒ Failed to download apt.txt โ€” aborting restore!") restore_apt() if not download(PIP_FILE, "/tmp/pip.txt"): raise RuntimeError("โŒ Failed to download pip.txt โ€” aborting restore!") restore_pip() # Most critical check โ€” never touch /root without a valid tar if not download(ROOT_FILE, ROOT_TAR): raise RuntimeError("โŒ Failed to download root.tar.gz โ€” aborting restore to protect /root!") restore_root() restore_node() # ---------- LOOP ---------- while True: print(f"โณ Waiting {SYNC_INTERVAL}s...") time.sleep(SYNC_INTERVAL) sync() if __name__ == "__main__": main()