| import os |
| import time |
| import tarfile |
| import subprocess |
| from huggingface_hub import HfApi, hf_hub_download, login |
|
|
| |
| DATASET = "Vvbbnnn/Vps-tar" |
|
|
| ROOT_TAR = "/tmp/root.tar.gz" |
| ROOT_FILE = "root.tar.gz" |
|
|
| APT_FILE = "apt.txt" |
| PIP_FILE = "pip.txt" |
|
|
| SYNC_INTERVAL = 3600 |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| if not HF_TOKEN: |
| raise ValueError("β HF_TOKEN not set") |
|
|
| login(token=HF_TOKEN) |
| api = HfApi() |
|
|
| |
|
|
| |
| def exists(filename): |
| try: |
| info = api.get_paths_info( |
| repo_id=DATASET, |
| repo_type="dataset", |
| paths=[filename] |
| ) |
| return len(list(info)) > 0 |
| except Exception: |
| return False |
|
|
| |
|
|
| def save_apt(): |
| print("π Saving apt...") |
| subprocess.run( |
| "dpkg --get-selections | grep -w 'install$' | awk '{print $1}' > /tmp/apt.txt", |
| shell=True |
| ) |
|
|
| def save_pip(): |
| print("π Saving pip...") |
| subprocess.run("/root/venv/bin/pip freeze > /tmp/pip.txt", shell=True) |
|
|
| |
|
|
| def should_skip(path): |
| p = path.lower() |
| if "node_modules" in p: return True |
| if "npm" in p: return True |
| if "node" in p: return True |
| if "venv" in p: return True |
| if "__pycache__" in p: return True |
| if ".git" in p: return True |
| return False |
|
|
| |
|
|
| def create_tar(): |
| print("π¦ Creating ROOT backup (strict clean)...") |
|
|
| def filter_fn(tarinfo): |
| if should_skip(tarinfo.name): |
| return None |
| return tarinfo |
|
|
| with tarfile.open(ROOT_TAR, "w:gz") as tar: |
| tar.add("/root", arcname="root", filter=filter_fn) |
|
|
| size = os.path.getsize(ROOT_TAR) / 1024 / 1024 |
| print(f"β
root.tar created ({size:.2f} MB)") |
| return True |
|
|
| |
|
|
| def upload(local, remote): |
| print(f"β¬οΈ Uploading {remote}") |
| api.upload_file( |
| path_or_fileobj=local, |
| path_in_repo=remote, |
| repo_id=DATASET, |
| repo_type="dataset" |
| ) |
| print(f"β
Uploaded {remote}") |
|
|
| |
|
|
| def download(remote, local): |
| print(f"β¬οΈ Downloading {remote}") |
| try: |
| local_dir = os.path.dirname(local) |
| hf_hub_download( |
| repo_id=DATASET, |
| repo_type="dataset", |
| filename=remote, |
| local_dir=local_dir, |
| local_dir_use_symlinks=False |
| ) |
| return True |
| except Exception as e: |
| print(f"β Download failed for {remote}: {e}") |
| return False |
|
|
| |
|
|
| def get_tar_paths(tar_path): |
| """ |
| Returns a set of absolute paths that exist inside the tar. |
| Tar stores entries as e.g. 'root/subdir/file' β '/root/subdir/file' |
| """ |
| paths = set() |
| with tarfile.open(tar_path, "r:gz") as tar: |
| for member in tar.getmembers(): |
| |
| abs_path = "/" + member.name.lstrip("/") |
| paths.add(abs_path.rstrip("/")) |
| return paths |
|
|
| def smart_delete_root(tar_paths): |
| """ |
| Walk /root and delete ONLY files/folders that do NOT exist in the tar. |
| Skips protected paths (node/venv/npm etc). |
| Uses topdown=False so deepest items are checked first, |
| allowing empty dirs to be cleaned up safely. |
| """ |
| print("π§Ή Removing files not present in tar...") |
|
|
| to_delete = [] |
|
|
| for dirpath, dirnames, filenames in os.walk("/root", topdown=False): |
| |
| for fname in filenames: |
| full_path = os.path.join(dirpath, fname).rstrip("/") |
| if should_skip(full_path): |
| continue |
| if full_path not in tar_paths: |
| to_delete.append(full_path) |
|
|
| |
| for dname in dirnames: |
| full_path = os.path.join(dirpath, dname).rstrip("/") |
| if should_skip(full_path): |
| continue |
| if full_path not in tar_paths: |
| to_delete.append(full_path) |
|
|
| for path in to_delete: |
| print(f" ποΈ {path}") |
| subprocess.run(["rm", "-rf", path]) |
|
|
| print(f"β
Removed {len(to_delete)} items not in tar") |
|
|
| def restore_root(): |
| """ |
| Smart restore: |
| 1. Read all paths that exist inside the tar |
| 2. Delete ONLY /root items NOT in the tar (no full wipe) |
| 3. Extract tar β overwrites changed files, adds missing ones |
| """ |
| print("π Smart restoring /root...") |
|
|
| tar_paths = get_tar_paths(ROOT_TAR) |
| smart_delete_root(tar_paths) |
|
|
| print("π Extracting tar...") |
| with tarfile.open(ROOT_TAR, "r:gz") as tar: |
| try: |
| |
| tar.extractall("/", filter="data") |
| except TypeError: |
| |
| tar.extractall("/") |
|
|
| print("β
Root restore complete") |
|
|
| |
|
|
| def restore_apt(): |
| print("π Restoring apt...") |
| subprocess.run("apt-get update", shell=True) |
| subprocess.run("xargs -a /tmp/apt.txt apt-get install -y", shell=True) |
|
|
| def restore_pip(): |
| print("π Restoring pip...") |
| subprocess.run("/root/venv/bin/pip install -r /tmp/pip.txt", shell=True) |
|
|
| def restore_node(): |
| print("π Restoring node...") |
| subprocess.run( |
| ["npm", "install", "--omit=dev"], |
| cwd="/root/app" |
| ) |
|
|
| |
|
|
| def sync(): |
| print("π Sync started") |
|
|
| save_apt() |
| save_pip() |
|
|
| upload("/tmp/apt.txt", APT_FILE) |
| upload("/tmp/pip.txt", PIP_FILE) |
|
|
| create_tar() |
| upload(ROOT_TAR, ROOT_FILE) |
|
|
| |
| for tmp_file in [ROOT_TAR, "/tmp/apt.txt", "/tmp/pip.txt"]: |
| if os.path.exists(tmp_file): |
| os.remove(tmp_file) |
| print(f"π§Ή Cleaned up {tmp_file}") |
|
|
| print("β
Sync complete") |
|
|
| |
|
|
| def main(): |
| print("π Full ROOT Backup System Started") |
|
|
| first_run = not ( |
| exists(ROOT_FILE) and |
| exists(APT_FILE) and |
| exists(PIP_FILE) |
| ) |
|
|
| |
| if first_run: |
| print("π First run β creating initial backup") |
| sync() |
|
|
| |
| else: |
| print("βοΈ Restoring from dataset...") |
|
|
| if not download(APT_FILE, "/tmp/apt.txt"): |
| raise RuntimeError("β Failed to download apt.txt β aborting restore!") |
| restore_apt() |
|
|
| if not download(PIP_FILE, "/tmp/pip.txt"): |
| raise RuntimeError("β Failed to download pip.txt β aborting restore!") |
| restore_pip() |
|
|
| |
| if not download(ROOT_FILE, ROOT_TAR): |
| raise RuntimeError("β Failed to download root.tar.gz β aborting restore to protect /root!") |
| restore_root() |
|
|
| restore_node() |
|
|
| |
| while True: |
| print(f"β³ Waiting {SYNC_INTERVAL}s...") |
| time.sleep(SYNC_INTERVAL) |
| sync() |
|
|
| if __name__ == "__main__": |
| main() |