samfred2 commited on
Commit
5b46870
·
verified ·
1 Parent(s): 855db7e

Upload 3 files

Browse files
Files changed (3) hide show
  1. Dockerfile +16 -0
  2. app.py +367 -0
  3. requirements.txt +9 -0
Dockerfile ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.9
2
+
3
+ RUN useradd -m -u 1000 user
4
+ USER user
5
+ ENV PATH="/home/user/.local/bin:$PATH"
6
+
7
+ WORKDIR /app
8
+
9
+ COPY --chown=user ./requirements.txt requirements.txt
10
+ RUN pip install --no-cache-dir --upgrade -r requirements.txt
11
+
12
+ RUN chmod -R 777 /app
13
+
14
+
15
+ COPY --chown=user . /app
16
+ CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "7860"]
app.py ADDED
@@ -0,0 +1,367 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import asyncio
4
+ import logging
5
+ import requests
6
+ from datetime import datetime
7
+ from typing import Optional
8
+ from fastapi import FastAPI, HTTPException, BackgroundTasks
9
+ from fastapi.responses import JSONResponse
10
+ from pydantic import BaseModel
11
+ from huggingface_hub import HfApi, hf_hub_url
12
+ from tqdm import tqdm
13
+
14
+ # --- Configuration ---
15
+ SOURCE_REPO_ID = "Fred808/data"
16
+ TARGET_REPO_ID = "samelias1/Data"
17
+ REPO_TYPE = "dataset"
18
+ REVISION = "main"
19
+ STATE_FILE = "sync_state.json"
20
+ TOKEN = os.environ.get("HF_TOKEN", "")
21
+ # Whether the service should automatically start syncing on FastAPI startup.
22
+ # Set AUTO_START env var to "0", "false" or "no" to disable.
23
+ AUTO_START = os.environ.get("AUTO_START", "true").lower() in ("1", "true", "yes")
24
+ # How long (seconds) before a remote "running" state is considered stale and ignored.
25
+ # Default: 1 hour. Set STATE_STALE_SECONDS env var to override.
26
+ try:
27
+ STATE_STALE_SECONDS = int(os.environ.get("STATE_STALE_SECONDS", "3600"))
28
+ except Exception:
29
+ STATE_STALE_SECONDS = 3600
30
+
31
+ # --- FastAPI App ---
32
+ app = FastAPI(title="HF Dataset Sync Service")
33
+
34
+ # Configure basic logging
35
+ logging.basicConfig(level=logging.INFO)
36
+ logger = logging.getLogger(__name__)
37
+
38
+
39
+ # --- Auto-start handler ---
40
+ @app.on_event("startup")
41
+ async def _maybe_start_sync_on_startup():
42
+ """If AUTO_START is enabled, start synchronization in the background on app startup.
43
+
44
+ This will not start a new sync if the saved state already shows a running sync.
45
+ """
46
+ try:
47
+ if not AUTO_START:
48
+ logger.info("AUTO_START disabled; not starting sync on startup")
49
+ return
50
+
51
+ # Attempt to download remote state first so we resume from the last-known state
52
+ try:
53
+ download_remote_state(TOKEN)
54
+ except Exception:
55
+ logger.exception("Failed to download remote state on startup; continuing with local state if present")
56
+
57
+ # If the downloaded/loaded state reports "running", it may be stale
58
+ # (service crashed or was stopped). In that case we consider it stale
59
+ # if last_updated is older than STATE_STALE_SECONDS and clear it so the
60
+ # sync can resume.
61
+ state = load_state()
62
+ if state.get("status") == "running":
63
+ last = state.get("last_updated")
64
+ stale = False
65
+ if last:
66
+ try:
67
+ last_dt = datetime.fromisoformat(last)
68
+ age = (datetime.utcnow() - last_dt).total_seconds()
69
+ if age > STATE_STALE_SECONDS:
70
+ stale = True
71
+ logger.info("Remote state marked running but last_updated is %.0f seconds old (>%s); treating as stale", age, STATE_STALE_SECONDS)
72
+ except Exception:
73
+ # If we can't parse last_updated, treat as stale to be safe
74
+ stale = True
75
+ logger.exception("Failed to parse last_updated from remote state; treating running state as stale")
76
+
77
+ if stale:
78
+ state["status"] = "idle"
79
+ state["current_file"] = None
80
+ # upload the corrected state back to the remote repo so other instances see it
81
+ try:
82
+ save_state(state, token=TOKEN)
83
+ except Exception:
84
+ logger.exception("Failed to upload corrected stale state; continuing with local corrected state")
85
+ else:
86
+ # Remote state indicates a sync was running recently and is not stale.
87
+ # We assume there's no other live process and resume that sync here.
88
+ logger.info("Remote state indicates an in-progress sync; resuming from that state")
89
+ # fall through to starting the sync below
90
+
91
+ # Launch the async synchronization as a background task without blocking startup
92
+ logger.info("AUTO_START enabled; launching dataset synchronization in background")
93
+ asyncio.create_task(synchronize_datasets(TOKEN))
94
+ except Exception as e:
95
+ logger.exception("Failed to auto-start synchronization: %s", e)
96
+
97
+
98
+ def download_remote_state(token: Optional[str] = None):
99
+ """Attempt to download `STATE_FILE` from the target repo and overwrite local state.
100
+
101
+ If no remote state exists (404) this function returns quietly. Any other
102
+ exceptions are raised for the caller to handle/log.
103
+ """
104
+ try:
105
+ download_url = hf_hub_url(
106
+ repo_id=TARGET_REPO_ID,
107
+ filename=STATE_FILE,
108
+ repo_type=REPO_TYPE,
109
+ revision=REVISION,
110
+ )
111
+ resp = requests.get(download_url)
112
+ if resp.status_code == 200:
113
+ with open(STATE_FILE, "wb") as f:
114
+ f.write(resp.content)
115
+ logger.info("Remote state downloaded from %s/%s", TARGET_REPO_ID, STATE_FILE)
116
+ else:
117
+ logger.info("Remote state not found (status %s) — continuing with local state if present", resp.status_code)
118
+ except Exception:
119
+ logger.exception("Failed to download remote state file")
120
+ raise
121
+
122
+ # --- Models ---
123
+ class SyncStatus(BaseModel):
124
+ status: str
125
+ total_files: int
126
+ synced_files: int
127
+ failed_files: int
128
+ current_file: Optional[str] = None
129
+ progress_percent: float
130
+ last_updated: str
131
+
132
+ class SyncRequest(BaseModel):
133
+ token: Optional[str] = None
134
+
135
+ # --- State Management ---
136
+
137
+ def load_state():
138
+ """Loads the synchronization state from a JSON file."""
139
+ if os.path.exists(STATE_FILE):
140
+ with open(STATE_FILE, "r") as f:
141
+ return json.load(f)
142
+ return {
143
+ "synced_files": [],
144
+ "failed_files": [],
145
+ "total_files": 0,
146
+ "status": "idle",
147
+ "last_updated": datetime.utcnow().isoformat(),
148
+ "current_file": None
149
+ }
150
+
151
+
152
+
153
+ def save_state(state, api: Optional[HfApi] = None, token: Optional[str] = None):
154
+ """Save the state locally and optionally upload it to the target repo.
155
+
156
+ If `api` is provided it will be used to upload the `STATE_FILE` to the
157
+ configured `TARGET_REPO_ID`. If `api` is None but `token` is provided,
158
+ a temporary HfApi will be created for upload.
159
+ """
160
+ state["last_updated"] = datetime.utcnow().isoformat()
161
+ with open(STATE_FILE, "w") as f:
162
+ json.dump(state, f, indent=4)
163
+
164
+ # Try to upload the state file to the target repo so restarts can resume
165
+ try:
166
+ upload_api = api
167
+ if upload_api is None and token:
168
+ upload_api = HfApi(token=token)
169
+
170
+ if upload_api is not None:
171
+ # upload the local state file to the target dataset
172
+ try:
173
+ upload_api.upload_file(
174
+ path_or_fileobj=STATE_FILE,
175
+ path_in_repo=STATE_FILE,
176
+ repo_id=TARGET_REPO_ID,
177
+ repo_type=REPO_TYPE,
178
+ commit_message=f"Sync: Update {STATE_FILE}",
179
+ )
180
+ logger.info("Uploaded state file to %s/%s", TARGET_REPO_ID, STATE_FILE)
181
+ except Exception:
182
+ # Log and continue — failing to upload state should not crash the service
183
+ logger.exception("Failed to upload state file to remote repo")
184
+ except Exception:
185
+ # Catch-all for unexpected errors constructing HfApi
186
+ logger.exception("Unexpected error while attempting to save/upload state")
187
+
188
+
189
+ # Helper to perform blocking download/upload in a thread
190
+ def _download_and_upload_file(api: HfApi, file_path: str):
191
+ """Download a file from the source repo and upload it to the target repo.
192
+
193
+ This function is synchronous and intended to be run with asyncio.to_thread
194
+ so that blocking I/O doesn't block the event loop.
195
+ """
196
+ # Build download URL and local filename
197
+ download_url = hf_hub_url(
198
+ repo_id=SOURCE_REPO_ID,
199
+ filename=file_path,
200
+ repo_type=REPO_TYPE,
201
+ revision=REVISION
202
+ )
203
+ local_path = os.path.basename(file_path)
204
+
205
+ # Download using requests (blocking)
206
+ response = requests.get(download_url, stream=True)
207
+ response.raise_for_status()
208
+
209
+ with open(local_path, "wb") as f:
210
+ for chunk in response.iter_content(chunk_size=8192):
211
+ if chunk:
212
+ f.write(chunk)
213
+
214
+ # Upload file to target repo
215
+ api.upload_file(
216
+ path_or_fileobj=local_path,
217
+ path_in_repo=file_path,
218
+ repo_id=TARGET_REPO_ID,
219
+ repo_type=REPO_TYPE,
220
+ commit_message=f"Sync: Add {file_path} from {SOURCE_REPO_ID}",
221
+ )
222
+
223
+ # Clean up local file
224
+ try:
225
+ os.remove(local_path)
226
+ except Exception:
227
+ # If cleanup fails, just continue; file may be left behind for inspection
228
+ logger.exception("Failed to remove local file %s", local_path)
229
+
230
+ # --- Synchronization Logic ---
231
+
232
+ async def synchronize_datasets(token: str):
233
+ """
234
+ Fetches file list from source, downloads files, and uploads them to target,
235
+ persisting state to resume progress.
236
+ """
237
+ state = load_state()
238
+
239
+ # Initialize HF API first so we can persist state remotely as we progress.
240
+ try:
241
+ api = HfApi(token=token)
242
+ except Exception as e:
243
+ state["status"] = "error"
244
+ state["error"] = f"Error initializing HfApi: {str(e)}"
245
+ save_state(state)
246
+ return
247
+
248
+ state["status"] = "running"
249
+ save_state(state, api=api)
250
+
251
+ synced_files = set(state["synced_files"])
252
+ failed_files = set(state.get("failed_files", []))
253
+
254
+ try:
255
+ repo_files = api.list_repo_files(
256
+ repo_id=SOURCE_REPO_ID,
257
+ repo_type=REPO_TYPE,
258
+ revision=REVISION
259
+ )
260
+ except Exception as e:
261
+ state["status"] = "error"
262
+ state["error"] = f"Error fetching file list: {str(e)}"
263
+ save_state(state)
264
+ return
265
+
266
+ state["total_files"] = len(repo_files)
267
+ files_to_sync = [f for f in repo_files if f not in synced_files and f not in failed_files]
268
+
269
+ for idx, file_path in enumerate(files_to_sync):
270
+ if file_path in synced_files:
271
+ continue
272
+
273
+ state["current_file"] = file_path
274
+ state["synced_files_count"] = len(synced_files)
275
+ state["progress_percent"] = (len(synced_files) / state["total_files"]) * 100 if state["total_files"] > 0 else 0
276
+ save_state(state, api=api)
277
+
278
+ # Skip .gitattributes
279
+ if file_path == ".gitattributes":
280
+ synced_files.add(file_path)
281
+ continue
282
+
283
+ try:
284
+ # Perform blocking download+upload in a thread to avoid blocking the event loop
285
+ await asyncio.to_thread(_download_and_upload_file, api, file_path)
286
+
287
+ # Mark as synced and persist (locally + remote)
288
+ synced_files.add(file_path)
289
+ state["synced_files"] = list(synced_files)
290
+ save_state(state, api=api)
291
+
292
+ except Exception as e:
293
+ logger.exception("Error syncing file %s: %s", file_path, e)
294
+ failed_files.add(file_path)
295
+ state["failed_files"] = list(failed_files)
296
+ save_state(state, api=api)
297
+
298
+ # Wait 2 minutes between processing files to throttle downloads/uploads
299
+ # Skip waiting after the last file
300
+ if idx != len(files_to_sync) - 1:
301
+ state["status"] = "running"
302
+ save_state(state, api=api)
303
+ logger.info("Waiting 120 seconds before processing next file")
304
+ await asyncio.sleep(120)
305
+
306
+ state["status"] = "completed"
307
+ state["current_file"] = None
308
+ state["synced_files_count"] = len(synced_files)
309
+ state["progress_percent"] = 100.0
310
+ save_state(state, api=api)
311
+
312
+ # --- Endpoints ---
313
+
314
+ @app.get("/")
315
+ async def root():
316
+ """Health check endpoint."""
317
+ return {"status": "ok", "service": "HF Dataset Sync Service"}
318
+
319
+ @app.get("/status", response_model=SyncStatus)
320
+ async def get_status():
321
+ """Get current synchronization status."""
322
+ state = load_state()
323
+ return SyncStatus(
324
+ status=state.get("status", "idle"),
325
+ total_files=state.get("total_files", 0),
326
+ synced_files=len(state.get("synced_files", [])),
327
+ failed_files=len(state.get("failed_files", [])),
328
+ current_file=state.get("current_file"),
329
+ progress_percent=state.get("progress_percent", 0.0),
330
+ last_updated=state.get("last_updated", datetime.utcnow().isoformat())
331
+ )
332
+
333
+ @app.post("/sync")
334
+ async def start_sync(request: SyncRequest, background_tasks: BackgroundTasks):
335
+ """Start the synchronization process."""
336
+ state = load_state()
337
+
338
+ if state.get("status") == "running":
339
+ raise HTTPException(status_code=409, detail="Sync is already running")
340
+
341
+ token = request.token or TOKEN
342
+ background_tasks.add_task(synchronize_datasets, token)
343
+
344
+ return {"message": "Sync started", "status": "running"}
345
+
346
+ @app.post("/reset")
347
+ async def reset_state():
348
+ """Reset the synchronization state."""
349
+ state = {
350
+ "synced_files": [],
351
+ "failed_files": [],
352
+ "total_files": 0,
353
+ "status": "idle",
354
+ "last_updated": datetime.utcnow().isoformat(),
355
+ "current_file": None
356
+ }
357
+ save_state(state)
358
+ return {"message": "State reset", "status": "idle"}
359
+
360
+ @app.get("/state")
361
+ async def get_full_state():
362
+ """Get the full synchronization state."""
363
+ return load_state()
364
+
365
+ if __name__ == "__main__":
366
+ import uvicorn
367
+ uvicorn.run(app, host="0.0.0.0", port=8000)
requirements.txt ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ fastapi>=0.95.0
2
+ uvicorn[standard]>=0.22.0
3
+ requests>=2.31.0
4
+ huggingface_hub>=0.16.4
5
+ pydantic>=1.10.7
6
+ tqdm>=4.65.0
7
+
8
+ # Optional / useful during development
9
+ # python-dotenv>=1.0.0 # if you want to load env vars from a .env file