factorstudios commited on
Commit
6dfe7fa
·
verified ·
1 Parent(s): 6f1e541

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +68 -23
app.py CHANGED
@@ -183,6 +183,27 @@ class CaptionServer:
183
  servers = [CaptionServer(url) for url in CAPTION_SERVERS]
184
  server_index = 0
185
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
  # --- Progress and State Management Functions ---
187
 
188
  def load_progress() -> Dict:
@@ -236,12 +257,14 @@ def save_json_state(file_path: str, data: Dict[str, Any]):
236
 
237
  async def download_hf_state() -> Dict[str, Any]:
238
  """Downloads the state file from Hugging Face or returns a default state."""
 
239
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
240
  default_state = {"next_download_index": 0, "file_states": {}}
241
 
242
  try:
243
  # Check if the file exists in the helium repo
244
- files = HfApi(token=HF_TOKEN).list_repo_files(
 
245
  repo_id=HF_OUTPUT_DATASET_ID,
246
  repo_type="dataset"
247
  )
@@ -264,11 +287,15 @@ async def download_hf_state() -> Dict[str, Any]:
264
  return load_json_state(str(local_path), default_state)
265
 
266
  except Exception as e:
 
 
 
267
  print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
268
  return default_state
269
 
270
  async def upload_hf_state(state: Dict[str, Any]) -> bool:
271
  """Uploads the state file to Hugging Face."""
 
272
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
273
 
274
  try:
@@ -286,7 +313,13 @@ async def upload_hf_state(state: Dict[str, Any]) -> bool:
286
  print(f"[{FLOW_ID}] Successfully uploaded state file.")
287
  return True
288
  except Exception as e:
289
- print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
 
 
 
 
 
 
290
  return False
291
 
292
  async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
@@ -334,6 +367,7 @@ async def get_zip_file_list(progress_data: Dict) -> List[str]:
334
  print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
335
  return progress_data['file_list']
336
 
 
337
  print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...")
338
  try:
339
  api = HfApi(token=HF_TOKEN)
@@ -360,11 +394,15 @@ async def get_zip_file_list(progress_data: Dict) -> List[str]:
360
  return zip_files
361
 
362
  except Exception as e:
 
 
 
363
  print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
364
  return []
365
 
366
  async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]:
367
  """Downloads the zip file for the given index and extracts its contents."""
 
368
 
369
  # Extract the base name for the extraction directory
370
  zip_full_name = Path(repo_file_full_path).name
@@ -401,11 +439,15 @@ async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path
401
  return extract_dir
402
 
403
  except Exception as e:
 
 
 
404
  print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
405
  return None
406
 
407
  async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
408
  """Uploads the final captions JSON file to the output dataset."""
 
409
  # Use the full zip name, replacing the extension with .json
410
  caption_filename = Path(zip_full_name).with_suffix('.json').name
411
 
@@ -428,6 +470,11 @@ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> boo
428
  return True
429
 
430
  except Exception as e:
 
 
 
 
 
431
  print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
432
  return False
433
 
@@ -454,18 +501,10 @@ async def get_available_server(timeout: float = 300.0) -> CaptionServer:
454
 
455
  async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
456
  """Sends a single image to a caption server for processing."""
457
- global RATE_LIMIT_PAUSE_UNTIL
458
-
459
  # This function now handles server selection and retries internally
460
  MAX_RETRIES = 3
461
  for attempt in range(MAX_RETRIES):
462
- # Check if we are currently paused due to rate limiting
463
- current_time = time.time()
464
- if current_time < RATE_LIMIT_PAUSE_UNTIL:
465
- wait_time = RATE_LIMIT_PAUSE_UNTIL - current_time
466
- print(f"[{FLOW_ID}] ⏳ Rate limit pause active. Waiting {int(wait_time)} more seconds...")
467
- await asyncio.sleep(min(wait_time, 60)) # Wait in chunks
468
- continue
469
 
470
  server = None
471
  try:
@@ -512,14 +551,17 @@ async def send_image_for_captioning(image_path: Path, course_name: str, progress
512
  else:
513
  print(f"[{FLOW_ID}] Warning: Server returned 200 but no caption for {image_path.name}.")
514
  elif resp.status == 429:
515
- print(f"[{FLOW_ID}] 🛑 HIT RATE LIMIT (429) on {server.url}. Pausing for 1 hour.")
516
- RATE_LIMIT_PAUSE_UNTIL = time.time() + 3600 # Pause for 1 hour
517
- # We don't return None immediately, we let the loop retry after the pause
518
  else:
519
  print(f"[{FLOW_ID}] Error: Server {server.url} returned status {resp.status} for {image_path.name}.")
520
 
521
  except Exception as e:
522
- print(f"[{FLOW_ID}] Exception during captioning for {image_path.name} on {server.url if server else 'unknown server'}: {e}")
 
 
 
523
  finally:
524
  if server:
525
  server.busy = False
@@ -535,7 +577,6 @@ async def process_dataset_task(start_index: int):
535
  """
536
  Main background task that processes zip files sequentially.
537
  """
538
- global RATE_LIMIT_PAUSE_UNTIL
539
  print(f"[{FLOW_ID}] Starting dataset processing task from index {start_index}...")
540
 
541
  # 1. Load progress and get file list
@@ -552,13 +593,21 @@ async def process_dataset_task(start_index: int):
552
  # --- NEW: Check helium_tg for existing JSON files to avoid redundant processing ---
553
  print(f"[{FLOW_ID}] Checking {HF_OUTPUT_DATASET_ID} for existing JSON outputs...")
554
  try:
 
555
  api = HfApi(token=HF_TOKEN)
556
  existing_files = api.list_repo_files(repo_id=HF_OUTPUT_DATASET_ID, repo_type="dataset")
557
  existing_json_files = {f for f in existing_files if f.endswith('.json')}
558
  print(f"[{FLOW_ID}] Found {len(existing_json_files)} existing JSON files in {HF_OUTPUT_DATASET_ID}.")
559
  except Exception as e:
560
- print(f"[{FLOW_ID}] Warning: Could not fetch existing files from {HF_OUTPUT_DATASET_ID}: {e}")
561
- existing_json_files = set()
 
 
 
 
 
 
 
562
  # ----------------------------------------------------------------------------------
563
 
564
  global_success = True
@@ -568,11 +617,7 @@ async def process_dataset_task(start_index: int):
568
  start_idx = max(0, start_index - 1)
569
 
570
  for i in range(start_idx, len(file_list)):
571
- # Check for rate limit pause at the start of each file
572
- while time.time() < RATE_LIMIT_PAUSE_UNTIL:
573
- wait_time = int(RATE_LIMIT_PAUSE_UNTIL - time.time())
574
- print(f"[{FLOW_ID}] ⏳ Global rate limit pause. Waiting {wait_time}s before next file...")
575
- await asyncio.sleep(min(wait_time, 60))
576
 
577
  file_index = i + 1
578
  repo_file_full_path = file_list[i]
 
183
  servers = [CaptionServer(url) for url in CAPTION_SERVERS]
184
  server_index = 0
185
 
186
+ # --- Centralized Rate Limit Handling ---
187
+
188
+ async def check_rate_limit_pause():
189
+ """Checks if a rate limit pause is active and waits if necessary."""
190
+ global RATE_LIMIT_PAUSE_UNTIL
191
+ while True:
192
+ current_time = time.time()
193
+ if current_time < RATE_LIMIT_PAUSE_UNTIL:
194
+ wait_time = int(RATE_LIMIT_PAUSE_UNTIL - current_time)
195
+ print(f"[{FLOW_ID}] ⏳ Global rate limit pause active. Waiting {wait_time}s...")
196
+ # Wait in chunks to allow for potential updates or shutdowns
197
+ await asyncio.sleep(min(wait_time, 60))
198
+ else:
199
+ break
200
+
201
+ def trigger_rate_limit_pause():
202
+ """Sets the global rate limit pause for 1 hour."""
203
+ global RATE_LIMIT_PAUSE_UNTIL
204
+ print(f"[{FLOW_ID}] 🛑 TRIGGERING 1-HOUR PAUSE due to 429 Rate Limit error.")
205
+ RATE_LIMIT_PAUSE_UNTIL = time.time() + 3600
206
+
207
  # --- Progress and State Management Functions ---
208
 
209
  def load_progress() -> Dict:
 
257
 
258
  async def download_hf_state() -> Dict[str, Any]:
259
  """Downloads the state file from Hugging Face or returns a default state."""
260
+ await check_rate_limit_pause()
261
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
262
  default_state = {"next_download_index": 0, "file_states": {}}
263
 
264
  try:
265
  # Check if the file exists in the helium repo
266
+ api = HfApi(token=HF_TOKEN)
267
+ files = api.list_repo_files(
268
  repo_id=HF_OUTPUT_DATASET_ID,
269
  repo_type="dataset"
270
  )
 
287
  return load_json_state(str(local_path), default_state)
288
 
289
  except Exception as e:
290
+ if "429" in str(e):
291
+ trigger_rate_limit_pause()
292
+ return await download_hf_state() # Retry after pause
293
  print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
294
  return default_state
295
 
296
  async def upload_hf_state(state: Dict[str, Any]) -> bool:
297
  """Uploads the state file to Hugging Face."""
298
+ await check_rate_limit_pause()
299
  local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
300
 
301
  try:
 
313
  print(f"[{FLOW_ID}] Successfully uploaded state file.")
314
  return True
315
  except Exception as e:
316
+ if "429" in str(e):
317
+ trigger_rate_limit_pause()
318
+ # We don't retry here to avoid infinite loops in state management,
319
+ # but the next call will wait.
320
+ print(f"[{FLOW_ID}] Failed to upload state file due to 429. Pause triggered.")
321
+ else:
322
+ print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
323
  return False
324
 
325
  async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
 
367
  print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
368
  return progress_data['file_list']
369
 
370
+ await check_rate_limit_pause()
371
  print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...")
372
  try:
373
  api = HfApi(token=HF_TOKEN)
 
394
  return zip_files
395
 
396
  except Exception as e:
397
+ if "429" in str(e):
398
+ trigger_rate_limit_pause()
399
+ return await get_zip_file_list(progress_data)
400
  print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
401
  return []
402
 
403
  async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]:
404
  """Downloads the zip file for the given index and extracts its contents."""
405
+ await check_rate_limit_pause()
406
 
407
  # Extract the base name for the extraction directory
408
  zip_full_name = Path(repo_file_full_path).name
 
439
  return extract_dir
440
 
441
  except Exception as e:
442
+ if "429" in str(e):
443
+ trigger_rate_limit_pause()
444
+ return await download_and_extract_zip_by_index(file_index, repo_file_full_path)
445
  print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
446
  return None
447
 
448
  async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
449
  """Uploads the final captions JSON file to the output dataset."""
450
+ await check_rate_limit_pause()
451
  # Use the full zip name, replacing the extension with .json
452
  caption_filename = Path(zip_full_name).with_suffix('.json').name
453
 
 
470
  return True
471
 
472
  except Exception as e:
473
+ if "429" in str(e):
474
+ trigger_rate_limit_pause()
475
+ # Retry once after pause
476
+ await check_rate_limit_pause()
477
+ return await upload_captions_to_hf(zip_full_name, captions)
478
  print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
479
  return False
480
 
 
501
 
502
  async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
503
  """Sends a single image to a caption server for processing."""
 
 
504
  # This function now handles server selection and retries internally
505
  MAX_RETRIES = 3
506
  for attempt in range(MAX_RETRIES):
507
+ await check_rate_limit_pause()
 
 
 
 
 
 
508
 
509
  server = None
510
  try:
 
551
  else:
552
  print(f"[{FLOW_ID}] Warning: Server returned 200 but no caption for {image_path.name}.")
553
  elif resp.status == 429:
554
+ print(f"[{FLOW_ID}] 🛑 HIT RATE LIMIT (429) on {server.url}.")
555
+ trigger_rate_limit_pause()
556
+ # Loop will retry after check_rate_limit_pause()
557
  else:
558
  print(f"[{FLOW_ID}] Error: Server {server.url} returned status {resp.status} for {image_path.name}.")
559
 
560
  except Exception as e:
561
+ if "429" in str(e):
562
+ trigger_rate_limit_pause()
563
+ else:
564
+ print(f"[{FLOW_ID}] Exception during captioning for {image_path.name} on {server.url if server else 'unknown server'}: {e}")
565
  finally:
566
  if server:
567
  server.busy = False
 
577
  """
578
  Main background task that processes zip files sequentially.
579
  """
 
580
  print(f"[{FLOW_ID}] Starting dataset processing task from index {start_index}...")
581
 
582
  # 1. Load progress and get file list
 
593
  # --- NEW: Check helium_tg for existing JSON files to avoid redundant processing ---
594
  print(f"[{FLOW_ID}] Checking {HF_OUTPUT_DATASET_ID} for existing JSON outputs...")
595
  try:
596
+ await check_rate_limit_pause()
597
  api = HfApi(token=HF_TOKEN)
598
  existing_files = api.list_repo_files(repo_id=HF_OUTPUT_DATASET_ID, repo_type="dataset")
599
  existing_json_files = {f for f in existing_files if f.endswith('.json')}
600
  print(f"[{FLOW_ID}] Found {len(existing_json_files)} existing JSON files in {HF_OUTPUT_DATASET_ID}.")
601
  except Exception as e:
602
+ if "429" in str(e):
603
+ trigger_rate_limit_pause()
604
+ await check_rate_limit_pause()
605
+ # We'll just continue with an empty set if it fails again,
606
+ # the per-file check will catch it later.
607
+ existing_json_files = set()
608
+ else:
609
+ print(f"[{FLOW_ID}] Warning: Could not fetch existing files from {HF_OUTPUT_DATASET_ID}: {e}")
610
+ existing_json_files = set()
611
  # ----------------------------------------------------------------------------------
612
 
613
  global_success = True
 
617
  start_idx = max(0, start_index - 1)
618
 
619
  for i in range(start_idx, len(file_list)):
620
+ await check_rate_limit_pause()
 
 
 
 
621
 
622
  file_index = i + 1
623
  repo_file_full_path = file_list[i]