Samfredoly commited on
Commit
a26c743
·
verified ·
1 Parent(s): 8be7ee9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +76 -41
app.py CHANGED
@@ -7,13 +7,14 @@ import zipfile
7
  import shutil
8
  from typing import Dict, List, Set, Optional, Tuple, Any
9
  from urllib.parse import quote
10
- from datetime import datetime
11
  from pathlib import Path
12
  import io
13
 
14
  from fastapi import FastAPI, BackgroundTasks, HTTPException, status
15
  from pydantic import BaseModel, Field
16
  from huggingface_hub import HfApi, hf_hub_download
 
17
 
18
  # --- Configuration ---
19
  AUTO_START_INDEX = 1# Hardcoded default start index if no progress is found
@@ -191,6 +192,16 @@ def save_json_state(file_path: str, data: Dict[str, Any]):
191
  with open(file_path, "w") as f:
192
  json.dump(data, f, indent=2)
193
 
 
 
 
 
 
 
 
 
 
 
194
  async def download_hf_state() -> Dict[str, Any]:
195
  """Downloads the state file from Hugging Face or returns a default state."""
196
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
@@ -225,26 +236,38 @@ async def download_hf_state() -> Dict[str, Any]:
225
  return default_state
226
 
227
  async def upload_hf_state(state: Dict[str, Any]) -> bool:
228
- """Uploads the state file to Hugging Face."""
229
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
230
 
231
- try:
232
- # Save state locally first
233
- save_json_state(str(local_path), state)
234
-
235
- # Upload to helium dataset
236
- HfApi(token=HF_TOKEN).upload_file(
237
- path_or_fileobj=str(local_path),
238
- path_in_repo=HF_STATE_FILE,
239
- repo_id=HF_OUTPUT_DATASET_ID,
240
- repo_type="dataset",
241
- commit_message=f"Update caption processing state: next_index={state['next_download_index']}"
242
- )
243
- print(f"[{FLOW_ID}] Successfully uploaded state file.")
244
- return True
245
- except Exception as e:
246
- print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
247
- return False
 
 
 
 
 
 
 
 
 
 
 
 
248
 
249
  async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
250
  """Marks a file as 'processing' in the state file and uploads the lock."""
@@ -362,31 +385,43 @@ async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path
362
  return None
363
 
364
  async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
365
- """Uploads the final captions JSON file to the output dataset."""
366
  # Use the full zip name, replacing the extension with .json
367
  caption_filename = Path(zip_full_name).with_suffix('.json').name
368
 
369
- try:
370
- print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
371
-
372
- # Create JSON content in memory
373
- json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
374
-
375
- api = HfApi(token=HF_TOKEN)
376
- api.upload_file(
377
- path_or_fileobj=io.BytesIO(json_content),
378
- path_in_repo=caption_filename,
379
- repo_id=HF_OUTPUT_DATASET_ID,
380
- repo_type="dataset",
381
- commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
382
- )
383
-
384
- print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
385
- return True
386
-
387
- except Exception as e:
388
- print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
389
- return False
 
 
 
 
 
 
 
 
 
 
 
 
390
 
391
  # --- Core Processing Functions (Modified) ---
392
 
 
7
  import shutil
8
  from typing import Dict, List, Set, Optional, Tuple, Any
9
  from urllib.parse import quote
10
+ from datetime import datetime, timedelta
11
  from pathlib import Path
12
  import io
13
 
14
  from fastapi import FastAPI, BackgroundTasks, HTTPException, status
15
  from pydantic import BaseModel, Field
16
  from huggingface_hub import HfApi, hf_hub_download
17
+ from huggingface_hub.utils import HfHubHTTPError
18
 
19
  # --- Configuration ---
20
  AUTO_START_INDEX = 1# Hardcoded default start index if no progress is found
 
192
  with open(file_path, "w") as f:
193
  json.dump(data, f, indent=2)
194
 
195
+ async def handle_rate_limit_error(e: Exception, context: str) -> bool:
196
+ """Handles 429 rate limit error by pausing for 1 hour."""
197
+ if isinstance(e, HfHubHTTPError) and e.response.status_code == 429:
198
+ pause_duration = 3600 # 1 hour in seconds
199
+ print(f"[{FLOW_ID}] 🚨 RATE LIMIT (429) DETECTED during {context}. Pausing for {pause_duration // 3600} hour(s)...")
200
+ print(f"[{FLOW_ID}] Resuming at: {datetime.now() + timedelta(seconds=pause_duration)}")
201
+ await asyncio.sleep(pause_duration)
202
+ return True
203
+ return False
204
+
205
  async def download_hf_state() -> Dict[str, Any]:
206
  """Downloads the state file from Hugging Face or returns a default state."""
207
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
 
236
  return default_state
237
 
238
  async def upload_hf_state(state: Dict[str, Any]) -> bool:
239
+ """Uploads the state file to Hugging Face with 429 rate limit handling."""
240
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
241
 
242
+ MAX_RETRIES = 5
243
+ for attempt in range(MAX_RETRIES):
244
+ try:
245
+ # Save state locally first
246
+ save_json_state(str(local_path), state)
247
+
248
+ # Upload to helium dataset
249
+ HfApi(token=HF_TOKEN).upload_file(
250
+ path_or_fileobj=str(local_path),
251
+ path_in_repo=HF_STATE_FILE,
252
+ repo_id=HF_OUTPUT_DATASET_ID,
253
+ repo_type="dataset",
254
+ commit_message=f"Update caption processing state: next_index={state['next_download_index']}"
255
+ )
256
+ print(f"[{FLOW_ID}] Successfully uploaded state file.")
257
+ return True
258
+ except Exception as e:
259
+ if await handle_rate_limit_error(e, "state file upload"):
260
+ continue # Retry after pause
261
+
262
+ print(f"[{FLOW_ID}] Failed to upload state file on attempt {attempt + 1}: {str(e)}")
263
+ # If it's not a rate limit error, or we've exhausted retries, break
264
+ if attempt == MAX_RETRIES - 1:
265
+ break
266
+ # Add a small delay before non-429 retries
267
+ await asyncio.sleep(5)
268
+
269
+ print(f"[{FLOW_ID}] CRITICAL: Failed to upload state file after {MAX_RETRIES} attempts.")
270
+ return False
271
 
272
  async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
273
  """Marks a file as 'processing' in the state file and uploads the lock."""
 
385
  return None
386
 
387
  async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
388
+ """Uploads the final captions JSON file to the output dataset with 429 rate limit handling."""
389
  # Use the full zip name, replacing the extension with .json
390
  caption_filename = Path(zip_full_name).with_suffix('.json').name
391
 
392
+ MAX_RETRIES = 5
393
+ for attempt in range(MAX_RETRIES):
394
+ try:
395
+ print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
396
+
397
+ # Create JSON content in memory
398
+ json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
399
+
400
+ api = HfApi(token=HF_TOKEN)
401
+ api.upload_file(
402
+ path_or_fileobj=io.BytesIO(json_content),
403
+ path_in_repo=caption_filename,
404
+ repo_id=HF_OUTPUT_DATASET_ID,
405
+ repo_type="dataset",
406
+ commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
407
+ )
408
+
409
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
410
+ return True
411
+
412
+ except Exception as e:
413
+ if await handle_rate_limit_error(e, f"captions upload for {zip_full_name}"):
414
+ continue # Retry after pause
415
+
416
+ print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name} on attempt {attempt + 1}: {e}")
417
+ # If it's not a rate limit error, or we've exhausted retries, break
418
+ if attempt == MAX_RETRIES - 1:
419
+ break
420
+ # Add a small delay before non-429 retries
421
+ await asyncio.sleep(5)
422
+
423
+ print(f"[{FLOW_ID}] CRITICAL: Failed to upload captions for {zip_full_name} after {MAX_RETRIES} attempts.")
424
+ return False
425
 
426
  # --- Core Processing Functions (Modified) ---
427