Jaimodiji commited on
Commit
b49d3e6
·
1 Parent(s): d30571e

Upload hf_sync.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. hf_sync.py +74 -69
hf_sync.py CHANGED
@@ -13,73 +13,44 @@ REPO_ID = os.environ.get("DATASET_REPO_ID")
13
  HF_TOKEN = os.environ.get("HF_TOKEN")
14
  DATA_DIR = "data_repo"
15
  DB_FILE = os.path.join(DATA_DIR, "database.db")
16
- METADATA_FILE = os.path.join(DATA_DIR, "sync_metadata.json")
17
  LOCK_FILE = "/tmp/hf_sync.lock"
18
 
19
- def get_metadata():
20
- if os.path.exists(METADATA_FILE):
 
 
21
  try:
22
- with open(METADATA_FILE, 'r') as f:
23
  return json.load(f)
24
- except:
25
- pass
26
- return {"version": 0, "last_sync": None, "source": None, "last_db_hash": None}
27
 
28
- def get_dir_stats():
29
- """Returns total file count across data directories to detect new uploads."""
30
- count = 0
31
- for d in ['uploads', 'processed', 'output']:
32
- path = os.path.join(DATA_DIR, d)
33
- if os.path.exists(path):
34
- count += len(os.listdir(path))
35
- return count
36
 
37
  def get_file_hash(path):
38
  if not os.path.exists(path): return None
39
  hasher = hashlib.md5()
40
  with open(path, 'rb') as f:
41
- for chunk in iter(lambda: f.read(4096), b""):
42
- hasher.update(chunk)
43
  return hasher.hexdigest()
44
 
45
- def update_metadata(action, db_hash):
46
- meta = get_metadata()
47
- meta["version"] += 1
48
- meta["last_sync"] = datetime.now().isoformat()
49
- meta["source"] = os.environ.get("HOSTNAME", "local")
50
- meta["last_action"] = action
51
- meta["last_db_hash"] = db_hash
52
- meta["file_count"] = get_dir_stats()
53
- with open(METADATA_FILE, 'w') as f:
54
- json.dump(meta, f, indent=2)
55
-
56
  def safe_db_backup():
57
  if not os.path.exists(DB_FILE): return None
58
-
59
- # We backup to a temp file to get a consistent hash/file
60
  backup_db = DB_FILE + ".bak"
61
  try:
62
  source_conn = sqlite3.connect(DB_FILE)
63
  dest_conn = sqlite3.connect(backup_db)
64
- with dest_conn:
65
- source_conn.backup(dest_conn)
66
- source_conn.close()
67
- dest_conn.close()
68
  return backup_db
69
  except Exception as e:
70
  print(f"Database backup failed: {e}")
71
  return None
72
 
73
- def download():
74
- if not REPO_ID: return
75
- print(f"Downloading data from {REPO_ID}...")
76
- try:
77
- snapshot_download(repo_id=REPO_ID, repo_type="dataset", local_dir=DATA_DIR, token=HF_TOKEN, max_workers=8)
78
- print("Download successful.")
79
- update_metadata("restore", get_file_hash(DB_FILE))
80
- except Exception as e:
81
- print(f"Download failed: {e}")
82
-
83
  def upload():
84
  if not REPO_ID or not HF_TOKEN: return
85
  if os.path.exists(LOCK_FILE):
@@ -87,38 +58,72 @@ def upload():
87
 
88
  try:
89
  with open(LOCK_FILE, 'w') as f: f.write(str(os.getpid()))
90
-
91
- meta = get_metadata()
 
 
92
  backup_path = safe_db_backup()
93
- if not backup_path: return
 
 
 
 
 
 
 
 
 
94
 
95
- current_hash = get_file_hash(backup_path)
96
- current_count = get_dir_stats()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
 
98
- # ONLY UPLOAD IF CHANGED
99
- if current_hash == meta.get("last_db_hash") and current_count == meta.get("file_count", 0):
100
- print("No changes detected (DB hash and file count match). skipping upload.")
101
- os.remove(backup_path)
102
- return
 
 
 
103
 
104
- print(f"Changes detected. Syncing to {REPO_ID}...")
105
- # Replace the real db with the consistent backup for upload
106
- shutil.move(backup_path, DB_FILE)
107
- update_metadata("backup", current_hash)
108
-
109
- api = HfApi(token=HF_TOKEN)
110
- api.upload_folder(
111
- folder_path=DATA_DIR,
112
- repo_id=REPO_ID,
113
- repo_type="dataset",
114
- commit_message=f"Auto-backup v{get_metadata()['version']}"
115
- )
116
- print("Upload successful.")
117
- except Exception as e:
118
- print(f"Upload failed: {e}")
119
  finally:
120
  if os.path.exists(LOCK_FILE): os.remove(LOCK_FILE)
121
 
 
 
 
 
 
 
 
 
122
  def init_local():
123
  for d in ['output', 'processed', 'uploads']: os.makedirs(f"{DATA_DIR}/{d}", exist_ok=True)
124
 
 
13
  HF_TOKEN = os.environ.get("HF_TOKEN")
14
  DATA_DIR = "data_repo"
15
  DB_FILE = os.path.join(DATA_DIR, "database.db")
16
+ STATE_FILE = os.path.join(DATA_DIR, "sync_state.json")
17
  LOCK_FILE = "/tmp/hf_sync.lock"
18
 
19
+ api = HfApi(token=HF_TOKEN)
20
+
21
+ def get_state():
22
+ if os.path.exists(STATE_FILE):
23
  try:
24
+ with open(STATE_FILE, 'r') as f:
25
  return json.load(f)
26
+ except: pass
27
+ return {"uploaded_files": {}, "last_db_hash": None, "version": 0}
 
28
 
29
+ def save_state(state):
30
+ state["last_update"] = datetime.now().isoformat()
31
+ with open(STATE_FILE, 'w') as f:
32
+ json.dump(state, f, indent=2)
 
 
 
 
33
 
34
  def get_file_hash(path):
35
  if not os.path.exists(path): return None
36
  hasher = hashlib.md5()
37
  with open(path, 'rb') as f:
38
+ for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk)
 
39
  return hasher.hexdigest()
40
 
 
 
 
 
 
 
 
 
 
 
 
41
  def safe_db_backup():
42
  if not os.path.exists(DB_FILE): return None
 
 
43
  backup_db = DB_FILE + ".bak"
44
  try:
45
  source_conn = sqlite3.connect(DB_FILE)
46
  dest_conn = sqlite3.connect(backup_db)
47
+ with dest_conn: source_conn.backup(dest_conn)
48
+ source_conn.close(); dest_conn.close()
 
 
49
  return backup_db
50
  except Exception as e:
51
  print(f"Database backup failed: {e}")
52
  return None
53
 
 
 
 
 
 
 
 
 
 
 
54
  def upload():
55
  if not REPO_ID or not HF_TOKEN: return
56
  if os.path.exists(LOCK_FILE):
 
58
 
59
  try:
60
  with open(LOCK_FILE, 'w') as f: f.write(str(os.getpid()))
61
+ state = get_state()
62
+ changes_made = False
63
+
64
+ # 1. Sync Database (Granular)
65
  backup_path = safe_db_backup()
66
+ if backup_path:
67
+ db_hash = get_file_hash(backup_path)
68
+ if db_hash != state.get("last_db_hash"):
69
+ print("Syncing Database...")
70
+ shutil.move(backup_path, DB_FILE)
71
+ api.upload_file(path_or_fileobj=DB_FILE, path_in_repo="database.db", repo_id=REPO_ID, repo_type="dataset")
72
+ state["last_db_hash"] = db_hash
73
+ changes_made = True
74
+ else:
75
+ os.remove(backup_path)
76
 
77
+ # 2. Sync Files Iteratively (Immune to folder timeouts)
78
+ for sub_dir in ['uploads', 'processed', 'output']:
79
+ dir_path = os.path.join(DATA_DIR, sub_dir)
80
+ if not os.path.exists(dir_path): continue
81
+
82
+ for root, _, files in os.walk(dir_path):
83
+ for file in files:
84
+ full_path = os.path.join(root, file)
85
+ rel_path = os.path.relpath(full_path, DATA_DIR)
86
+
87
+ # Check if file needs upload (by size/mtime to avoid hashing thousands of images)
88
+ mtime = os.path.getmtime(full_path)
89
+ size = os.path.getsize(full_path)
90
+ file_id = f"{rel_path}_{size}_{mtime}"
91
+
92
+ if state["uploaded_files"].get(rel_path) != file_id:
93
+ print(f"Syncing new file: {rel_path}")
94
+ try:
95
+ api.upload_file(
96
+ path_or_fileobj=full_path,
97
+ path_in_repo=rel_path,
98
+ repo_id=REPO_ID,
99
+ repo_type="dataset"
100
+ )
101
+ state["uploaded_files"][rel_path] = file_id
102
+ changes_made = True
103
+ except Exception as e:
104
+ print(f"Failed to upload {rel_path}: {e}")
105
 
106
+ if changes_made:
107
+ state["version"] += 1
108
+ save_state(state)
109
+ # Sync state file too
110
+ api.upload_file(path_or_fileobj=STATE_FILE, path_in_repo="sync_state.json", repo_id=REPO_ID, repo_type="dataset")
111
+ print(f"Sync complete. Version {state['version']} saved.")
112
+ else:
113
+ print("Everything up to date.")
114
 
115
+ except Exception as e: print(f"Upload process failed: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
  finally:
117
  if os.path.exists(LOCK_FILE): os.remove(LOCK_FILE)
118
 
119
+ def download():
120
+ if not REPO_ID: return
121
+ print(f"Downloading data from {REPO_ID}...")
122
+ try:
123
+ snapshot_download(repo_id=REPO_ID, repo_type="dataset", local_dir=DATA_DIR, token=HF_TOKEN, max_workers=8)
124
+ print("Download successful.")
125
+ except Exception as e: print(f"Download failed: {e}")
126
+
127
  def init_local():
128
  for d in ['output', 'processed', 'uploads']: os.makedirs(f"{DATA_DIR}/{d}", exist_ok=True)
129