Samfredoly commited on
Commit
4884f03
·
verified ·
1 Parent(s): d1c663c

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +28 -12
app.py CHANGED
@@ -387,28 +387,40 @@ def get_next_file_to_process(source_repo_id: str, state: Dict[str, Any]) -> Opti
387
  next_index = state.get("next_download_index", 0)
388
  file_states = state.get("file_states", {})
389
 
390
- # 3. Find the next file that hasn't been processed or is not currently being processed
391
- for i in range(next_index, len(audio_files)):
392
- filename = audio_files[i]
 
 
393
  status = file_states.get(filename, "unprocessed")
394
 
395
- # Skip files that are already processed or currently being processed
396
  if status in ["processed", "processing"]:
 
397
  continue
398
 
399
- # Found an unprocessed file
400
- file_url = hf_hub_url(repo_id=source_repo_id, filename=filename, repo_type="dataset")
 
 
 
 
 
 
 
401
 
402
- log_message(f"Found next file at index {i}: {filename}", "INFO")
 
 
403
  return {
404
  "filename": filename,
405
  "url": file_url,
406
- "index": i
407
  }
408
-
409
- log_message("All files up to the current index have been processed or are locked.", "INFO")
410
 
411
- # If we reach the end, check from the beginning for any failed files
 
 
412
  for i in range(0, next_index):
413
  filename = audio_files[i]
414
  status = file_states.get(filename, "unprocessed")
@@ -421,7 +433,8 @@ def get_next_file_to_process(source_repo_id: str, state: Dict[str, Any]) -> Opti
421
  "url": file_url,
422
  "index": i
423
  }
424
-
 
425
  return None
426
 
427
  except Exception as e:
@@ -585,6 +598,9 @@ def main_processing_loop():
585
 
586
  try:
587
  # 2. Lock file by updating state on the API
 
 
 
588
  if not lock_file_for_processing(target_file, current_state):
589
  log_message(f"❌ Failed to lock file {target_file}. Skipping.", "ERROR")
590
  time.sleep(PROCESSING_DELAY)
 
387
  next_index = state.get("next_download_index", 0)
388
  file_states = state.get("file_states", {})
389
 
390
+ # 3. Skip forward past all processed and processing files starting from next_index
391
+ # This ensures we don't repeatedly find files that have already been handled
392
+ current_index = next_index
393
+ while current_index < len(audio_files):
394
+ filename = audio_files[current_index]
395
  status = file_states.get(filename, "unprocessed")
396
 
397
+ # If this file is processed or currently processing, skip it
398
  if status in ["processed", "processing"]:
399
+ current_index += 1
400
  continue
401
 
402
+ # If this file failed, we can retry it, so return it
403
+ if status == "failed":
404
+ file_url = hf_hub_url(repo_id=source_repo_id, filename=filename, repo_type="dataset")
405
+ log_message(f"Found failed file for retry at index {current_index}: {filename}", "INFO")
406
+ return {
407
+ "filename": filename,
408
+ "url": file_url,
409
+ "index": current_index
410
+ }
411
 
412
+ # If this file is unprocessed, we found our next file
413
+ file_url = hf_hub_url(repo_id=source_repo_id, filename=filename, repo_type="dataset")
414
+ log_message(f"Found next file at index {current_index}: {filename}", "INFO")
415
  return {
416
  "filename": filename,
417
  "url": file_url,
418
+ "index": current_index
419
  }
 
 
420
 
421
+ log_message("All files have been processed or are locked. Checking for any failed files from the start.", "INFO")
422
+
423
+ # 4. If we've processed all files from next_index to end, check from beginning for failed files
424
  for i in range(0, next_index):
425
  filename = audio_files[i]
426
  status = file_states.get(filename, "unprocessed")
 
433
  "url": file_url,
434
  "index": i
435
  }
436
+
437
+ log_message("All files have been processed. Waiting for new files...", "INFO")
438
  return None
439
 
440
  except Exception as e:
 
598
 
599
  try:
600
  # 2. Lock file by updating state on the API
601
+ # IMPORTANT: Update next_download_index when locking to prevent other workers from picking same file
602
+ current_state["next_download_index"] = target_index + 1
603
+
604
  if not lock_file_for_processing(target_file, current_state):
605
  log_message(f"❌ Failed to lock file {target_file}. Skipping.", "ERROR")
606
  time.sleep(PROCESSING_DELAY)