sarveshpatel commited on
Commit
7b01c6f
Β·
verified Β·
1 Parent(s): 571b9ea

Create sync_manager.py

Browse files
Files changed (1) hide show
  1. sync_manager.py +195 -0
sync_manager.py ADDED
@@ -0,0 +1,195 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Sync Manager - Automatically backs up PostgreSQL to HuggingFace Xet Dataset
4
+ """
5
+
6
+ import os
7
+ import sys
8
+ import time
9
+ import subprocess
10
+ import tempfile
11
+ import signal
12
+ from datetime import datetime
13
+ from pathlib import Path
14
+
15
+ # Configuration
16
+ SYNC_INTERVAL = int(os.environ.get('SYNC_INTERVAL', 300)) # 5 minutes
17
+ HF_TOKEN = os.environ.get('HF_TOKEN', '')
18
+ XET_DATASET = os.environ.get('XET_DATASET', 'your-username/postgres-backup')
19
+ POSTGRES_HOST = 'localhost'
20
+ POSTGRES_PORT = 5432
21
+ POSTGRES_USER = 'postgres'
22
+ POSTGRES_DB = 'appdb'
23
+ PGPASSWORD = os.environ.get('POSTGRES_PASSWORD', 'postgres123')
24
+
25
+ # Global flag for graceful shutdown
26
+ running = True
27
+
28
+
29
+ def log(message: str):
30
+ """Print timestamped log message."""
31
+ timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
32
+ print(f"[{timestamp}] {message}", flush=True)
33
+
34
+
35
+ def wait_for_postgres(max_retries: int = 30) -> bool:
36
+ """Wait for PostgreSQL to be ready."""
37
+ import psycopg2
38
+
39
+ for i in range(max_retries):
40
+ try:
41
+ conn = psycopg2.connect(
42
+ host=POSTGRES_HOST,
43
+ port=POSTGRES_PORT,
44
+ user=POSTGRES_USER,
45
+ password=PGPASSWORD,
46
+ database='postgres'
47
+ )
48
+ conn.close()
49
+ log("βœ… PostgreSQL is ready")
50
+ return True
51
+ except psycopg2.OperationalError:
52
+ log(f"⏳ Waiting for PostgreSQL... ({i+1}/{max_retries})")
53
+ time.sleep(2)
54
+
55
+ log("❌ PostgreSQL did not become ready")
56
+ return False
57
+
58
+
59
+ def create_backup() -> str | None:
60
+ """Create a PostgreSQL backup using pg_dump."""
61
+ try:
62
+ backup_dir = Path('/data/backup')
63
+ backup_dir.mkdir(parents=True, exist_ok=True)
64
+
65
+ backup_file = backup_dir / 'pg_backup.sql'
66
+
67
+ env = os.environ.copy()
68
+ env['PGPASSWORD'] = PGPASSWORD
69
+
70
+ # Create backup
71
+ result = subprocess.run(
72
+ [
73
+ 'pg_dump',
74
+ '-h', POSTGRES_HOST,
75
+ '-p', str(POSTGRES_PORT),
76
+ '-U', POSTGRES_USER,
77
+ '-d', POSTGRES_DB,
78
+ '--clean',
79
+ '--if-exists',
80
+ '-f', str(backup_file)
81
+ ],
82
+ env=env,
83
+ capture_output=True,
84
+ text=True
85
+ )
86
+
87
+ if result.returncode == 0:
88
+ size = backup_file.stat().st_size
89
+ log(f"βœ… Backup created: {backup_file} ({size} bytes)")
90
+ return str(backup_file)
91
+ else:
92
+ log(f"❌ Backup failed: {result.stderr}")
93
+ return None
94
+
95
+ except Exception as e:
96
+ log(f"❌ Backup error: {e}")
97
+ return None
98
+
99
+
100
+ def upload_to_xet(backup_file: str) -> bool:
101
+ """Upload backup to HuggingFace Xet dataset."""
102
+ if not HF_TOKEN:
103
+ log("⚠️ No HF_TOKEN set, skipping upload")
104
+ return False
105
+
106
+ try:
107
+ from huggingface_hub import HfApi, create_repo
108
+
109
+ api = HfApi(token=HF_TOKEN)
110
+
111
+ # Create dataset repo if it doesn't exist
112
+ try:
113
+ create_repo(
114
+ repo_id=XET_DATASET,
115
+ repo_type="dataset",
116
+ private=True,
117
+ token=HF_TOKEN,
118
+ exist_ok=True
119
+ )
120
+ except Exception as e:
121
+ log(f"ℹ️ Repo creation note: {e}")
122
+
123
+ # Upload backup file
124
+ api.upload_file(
125
+ path_or_fileobj=backup_file,
126
+ path_in_repo="backup/pg_backup.sql",
127
+ repo_id=XET_DATASET,
128
+ repo_type="dataset",
129
+ token=HF_TOKEN,
130
+ commit_message=f"PostgreSQL backup - {datetime.now().isoformat()}"
131
+ )
132
+
133
+ log(f"βœ… Backup uploaded to {XET_DATASET}")
134
+ return True
135
+
136
+ except Exception as e:
137
+ log(f"❌ Upload failed: {e}")
138
+ return False
139
+
140
+
141
+ def sync_once():
142
+ """Perform one sync cycle."""
143
+ log("πŸ”„ Starting sync cycle...")
144
+
145
+ backup_file = create_backup()
146
+ if backup_file:
147
+ upload_to_xet(backup_file)
148
+
149
+ log("βœ… Sync cycle complete")
150
+
151
+
152
+ def signal_handler(signum, frame):
153
+ """Handle shutdown signals."""
154
+ global running
155
+ log("πŸ›‘ Shutdown signal received, performing final backup...")
156
+ running = False
157
+ sync_once()
158
+ log("πŸ‘‹ Sync manager shutting down")
159
+ sys.exit(0)
160
+
161
+
162
+ def main():
163
+ """Main sync loop."""
164
+ log("πŸš€ Sync Manager starting...")
165
+ log(f"πŸ“¦ Target dataset: {XET_DATASET}")
166
+ log(f"⏰ Sync interval: {SYNC_INTERVAL} seconds")
167
+
168
+ # Set up signal handlers
169
+ signal.signal(signal.SIGTERM, signal_handler)
170
+ signal.signal(signal.SIGINT, signal_handler)
171
+
172
+ # Wait for PostgreSQL
173
+ if not wait_for_postgres():
174
+ log("❌ Cannot start without PostgreSQL")
175
+ return
176
+
177
+ # Initial delay
178
+ time.sleep(30)
179
+
180
+ # Main loop
181
+ while running:
182
+ try:
183
+ sync_once()
184
+ except Exception as e:
185
+ log(f"❌ Sync error: {e}")
186
+
187
+ # Sleep with interrupt handling
188
+ for _ in range(SYNC_INTERVAL):
189
+ if not running:
190
+ break
191
+ time.sleep(1)
192
+
193
+
194
+ if __name__ == '__main__':
195
+ main()