TESTINGDB / sync_manager.py
sarveshpatel's picture
Update sync_manager.py
57645c7 verified
#!/usr/bin/env python3
"""Sync PostgreSQL backups to HuggingFace Xet Dataset"""
import os
import sys
import time
import subprocess
import signal
from datetime import datetime
from pathlib import Path
SYNC_INTERVAL = int(os.environ.get('SYNC_INTERVAL', 300))
HF_TOKEN = os.environ.get('HF_TOKEN', '')
XET_DATASET = os.environ.get('XET_DATASET', '')
running = True
def log(msg):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True)
def wait_for_postgres(retries=30):
for i in range(retries):
result = subprocess.run(
['pg_isready', '-h', 'localhost', '-p', '5432'],
capture_output=True
)
if result.returncode == 0:
log("βœ… PostgreSQL ready")
return True
time.sleep(2)
return False
def backup_and_upload():
if not HF_TOKEN or not XET_DATASET:
log("⚠️ HF_TOKEN or XET_DATASET not set")
return
try:
backup_dir = Path('/data/backup')
backup_dir.mkdir(parents=True, exist_ok=True)
backup_file = backup_dir / 'pg_backup.sql'
# Create backup
result = subprocess.run(
['pg_dump', '-h', 'localhost', '-U', 'postgres', '-d', 'appdb',
'--clean', '--if-exists', '-f', str(backup_file)],
capture_output=True, text=True
)
if result.returncode != 0:
log(f"❌ Backup failed: {result.stderr}")
return
log(f"βœ… Backup created ({backup_file.stat().st_size} bytes)")
# Upload to HuggingFace
from huggingface_hub import HfApi, create_repo
api = HfApi(token=HF_TOKEN)
try:
create_repo(XET_DATASET, repo_type="dataset", private=True,
token=HF_TOKEN, exist_ok=True)
except:
pass
api.upload_file(
path_or_fileobj=str(backup_file),
path_in_repo="backup/pg_backup.sql",
repo_id=XET_DATASET,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"Backup {datetime.now().isoformat()}"
)
log(f"βœ… Uploaded to {XET_DATASET}")
except Exception as e:
log(f"❌ Error: {e}")
def signal_handler(signum, frame):
global running
log("πŸ›‘ Shutting down, final backup...")
running = False
backup_and_upload()
sys.exit(0)
def main():
log("πŸš€ Sync Manager started")
log(f"πŸ“¦ Dataset: {XET_DATASET or 'NOT SET'}")
log(f"⏰ Interval: {SYNC_INTERVAL}s")
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
if not wait_for_postgres():
log("❌ PostgreSQL not available")
return
time.sleep(30) # Initial delay
while running:
backup_and_upload()
for _ in range(SYNC_INTERVAL):
if not running:
break
time.sleep(1)
if __name__ == '__main__':
main()