Samfredoly commited on
Commit
ab820b4
·
verified ·
1 Parent(s): 563c37b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +60 -31
app.py CHANGED
@@ -396,10 +396,13 @@ async def process_batch_dynamic(wav_files: List[str], start_batch_index: int, ba
396
  """
397
  Processes a batch of WAV files in parallel using available servers.
398
  Batch size = number of servers. Each server gets one file, processes it, then gets the next.
 
399
  Returns (next_batch_index, uploaded_count)
400
  """
401
  batch_end = min(start_batch_index + batch_size, len(wav_files))
402
  uploaded_count = progress.get('uploaded_count', 0)
 
 
403
 
404
  print(f"[{FLOW_ID}] Processing batch from index {start_batch_index} to {batch_end - 1} ({batch_end - start_batch_index} files)")
405
 
@@ -408,8 +411,7 @@ async def process_batch_dynamic(wav_files: List[str], start_batch_index: int, ba
408
  state.setdefault("file_states", {})
409
  for idx in range(start_batch_index, batch_end):
410
  wav_file = wav_files[idx]
411
- wav_name = Path(wav_file).name
412
- state["file_states"][wav_name] = "processing"
413
 
414
  # Update next_download_index to the end of this batch (0-based)
415
  state["next_download_index"] = batch_end
@@ -424,39 +426,52 @@ async def process_batch_dynamic(wav_files: List[str], start_batch_index: int, ba
424
  print(f"[{FLOW_ID}] Error while setting up batch locks: {e}")
425
  return start_batch_index, uploaded_count
426
 
 
 
 
427
  # --- Assign files to servers and create tasks
428
- pending_tasks: Dict[asyncio.Task, Tuple[int, Path, WhisperServer, str]] = {}
429
 
430
  try:
431
- for idx in range(start_batch_index, batch_end):
432
- file_index = idx
433
- wav_file = wav_files[file_index]
434
- wav_filename = Path(wav_file).name
435
- server = servers[idx - start_batch_index] # Assign server in order (server 0 -> file 0, server 1 -> file 1, etc.)
436
-
437
- # Download the WAV file
438
- wav_path = await download_wav_file_by_index(file_index + 1, wav_file)
439
- if not wav_path:
440
- state["file_states"][wav_file] = "failed"
441
- await upload_hf_state(state)
442
- print(f"[{FLOW_ID}] Failed to download {wav_filename}")
443
- continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
444
 
445
- # Assign to server and create task
446
- await assign_file_to_server(file_index, server)
447
- task = asyncio.create_task(send_audio_to_whisper(wav_path, server))
448
- pending_tasks[task] = (file_index, wav_path, server, wav_file)
449
- print(f"[{FLOW_ID}] Assigned {wav_filename} to server {servers.index(server) + 1}")
450
-
451
- # --- Wait for all tasks in this batch to complete and process results
452
- while pending_tasks:
453
  done, pending = await asyncio.wait(
454
  pending_tasks.keys(),
455
  return_when=asyncio.FIRST_COMPLETED
456
  )
457
 
458
  for task in done:
459
- file_index, wav_path, server, wav_file = pending_tasks.pop(task)
460
  wav_filename = Path(wav_file).name
461
 
462
  try:
@@ -473,15 +488,29 @@ async def process_batch_dynamic(wav_files: List[str], start_batch_index: int, ba
473
  save_progress(progress)
474
  print(f"[{FLOW_ID}] ✅ {wav_filename} uploaded (#{uploaded_count})")
475
  else:
476
- state["file_states"][wav_file] = "failed"
477
- print(f"[{FLOW_ID}] Failed to upload {wav_filename}")
 
 
 
 
 
478
  else:
479
- state["file_states"][wav_file] = "failed"
480
- print(f"[{FLOW_ID}] Transcription failed for {wav_filename}")
 
 
 
 
 
481
 
482
  except Exception as e:
483
- print(f"[{FLOW_ID}] Error processing result for {wav_filename}: {e}")
484
- state["file_states"][wav_file] = "failed"
 
 
 
 
485
  finally:
486
  # Release the server
487
  await release_server(server)
 
396
  """
397
  Processes a batch of WAV files in parallel using available servers.
398
  Batch size = number of servers. Each server gets one file, processes it, then gets the next.
399
+ Includes retry mechanism for failed files.
400
  Returns (next_batch_index, uploaded_count)
401
  """
402
  batch_end = min(start_batch_index + batch_size, len(wav_files))
403
  uploaded_count = progress.get('uploaded_count', 0)
404
+ max_retries = 3
405
+ failed_files = [] # Track files that failed for retry
406
 
407
  print(f"[{FLOW_ID}] Processing batch from index {start_batch_index} to {batch_end - 1} ({batch_end - start_batch_index} files)")
408
 
 
411
  state.setdefault("file_states", {})
412
  for idx in range(start_batch_index, batch_end):
413
  wav_file = wav_files[idx]
414
+ state["file_states"][wav_file] = "processing"
 
415
 
416
  # Update next_download_index to the end of this batch (0-based)
417
  state["next_download_index"] = batch_end
 
426
  print(f"[{FLOW_ID}] Error while setting up batch locks: {e}")
427
  return start_batch_index, uploaded_count
428
 
429
+ # Create a queue of files to process with retry support
430
+ files_to_process = [(idx, wav_files[idx], 0) for idx in range(start_batch_index, batch_end)] # (idx, wav_file, retry_count)
431
+
432
  # --- Assign files to servers and create tasks
433
+ pending_tasks: Dict[asyncio.Task, Tuple[int, Path, WhisperServer, str, int]] = {}
434
 
435
  try:
436
+ while files_to_process or pending_tasks:
437
+ # Assign new files to available servers
438
+ while files_to_process:
439
+ available = await get_available_servers()
440
+ if not available:
441
+ break
442
+
443
+ file_idx, wav_file, retry_count = files_to_process.pop(0)
444
+ wav_filename = Path(wav_file).name
445
+ server = available[0]
446
+
447
+ # Download the WAV file
448
+ wav_path = await download_wav_file_by_index(file_idx + 1, wav_file)
449
+ if not wav_path:
450
+ if retry_count < max_retries:
451
+ print(f"[{FLOW_ID}] ⚠️ Download failed for {wav_filename} (retry {retry_count + 1}/{max_retries}), re-queueing...")
452
+ files_to_process.append((file_idx, wav_file, retry_count + 1))
453
+ else:
454
+ state["file_states"][wav_file] = "failed_download"
455
+ print(f"[{FLOW_ID}] ❌ Download failed permanently for {wav_filename} after {max_retries} retries")
456
+ continue
457
+
458
+ # Assign to server and create task
459
+ await assign_file_to_server(file_idx, server)
460
+ task = asyncio.create_task(send_audio_to_whisper(wav_path, server))
461
+ pending_tasks[task] = (file_idx, wav_path, server, wav_file, retry_count)
462
+ print(f"[{FLOW_ID}] Assigned {wav_filename} to server {servers.index(server) + 1}")
463
 
464
+ # Wait for at least one task to complete if there are pending tasks
465
+ if not pending_tasks:
466
+ break
467
+
 
 
 
 
468
  done, pending = await asyncio.wait(
469
  pending_tasks.keys(),
470
  return_when=asyncio.FIRST_COMPLETED
471
  )
472
 
473
  for task in done:
474
+ file_idx, wav_path, server, wav_file, retry_count = pending_tasks.pop(task)
475
  wav_filename = Path(wav_file).name
476
 
477
  try:
 
488
  save_progress(progress)
489
  print(f"[{FLOW_ID}] ✅ {wav_filename} uploaded (#{uploaded_count})")
490
  else:
491
+ # Retry failed upload
492
+ if retry_count < max_retries:
493
+ print(f"[{FLOW_ID}] ⚠️ Upload failed for {wav_filename} (retry {retry_count + 1}/{max_retries}), re-queueing...")
494
+ files_to_process.append((file_idx, wav_file, retry_count + 1))
495
+ else:
496
+ state["file_states"][wav_file] = "failed_upload"
497
+ print(f"[{FLOW_ID}] ❌ Upload failed permanently for {wav_filename} after {max_retries} retries")
498
  else:
499
+ # Retry failed transcription
500
+ if retry_count < max_retries:
501
+ print(f"[{FLOW_ID}] ⚠️ Transcription failed for {wav_filename} (retry {retry_count + 1}/{max_retries}), re-queueing...")
502
+ files_to_process.append((file_idx, wav_file, retry_count + 1))
503
+ else:
504
+ state["file_states"][wav_file] = "failed_transcription"
505
+ print(f"[{FLOW_ID}] ❌ Transcription failed permanently for {wav_filename} after {max_retries} retries")
506
 
507
  except Exception as e:
508
+ if retry_count < max_retries:
509
+ print(f"[{FLOW_ID}] ⚠️ Error processing {wav_filename}: {e} (retry {retry_count + 1}/{max_retries}), re-queueing...")
510
+ files_to_process.append((file_idx, wav_file, retry_count + 1))
511
+ else:
512
+ print(f"[{FLOW_ID}] ❌ Error processing {wav_filename}: {e} (failed after {max_retries} retries)")
513
+ state["file_states"][wav_file] = "failed_error"
514
  finally:
515
  # Release the server
516
  await release_server(server)