Samfredoly commited on
Commit
34e34a0
·
verified ·
1 Parent(s): 99dce0a

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +78 -46
app.py CHANGED
@@ -283,17 +283,19 @@ async def download_wav_file_by_index(file_index: int, repo_file_full_path: str)
283
  print(f"[{FLOW_ID}] Downloading file #{file_index}: {repo_file_full_path}")
284
 
285
  try:
286
- # Use hf_hub_download to get the file path
287
  wav_path = hf_hub_download(
288
  repo_id=HF_AUDIO_DATASET_ID,
289
  filename=repo_file_full_path,
290
  repo_type="dataset",
291
  token=HF_TOKEN,
 
 
292
  )
293
-
294
  print(f"[{FLOW_ID}] Downloaded WAV file to {wav_path}")
295
  return Path(wav_path)
296
-
297
  except Exception as e:
298
  print(f"[{FLOW_ID}] Error downloading WAV file {repo_file_full_path}: {e}")
299
  return None
@@ -336,34 +338,33 @@ async def send_audio_to_whisper(wav_path: Path, server: WhisperServer) -> Option
336
 
337
  # Prepare multipart form data
338
  form_data = aiohttp.FormData()
339
- form_data.add_field('file',
340
- wav_path.open('rb'),
341
- filename=wav_path.name,
342
- content_type='audio/wav')
343
-
344
- async with aiohttp.ClientSession() as session:
345
- # 10 minute timeout for transcription
346
- async with session.post(server.url, data=form_data, timeout=600) as resp:
347
- if resp.status == 200:
348
- result = await resp.json()
349
- end_time = time.time()
350
-
351
- # Update server stats
352
- server.total_processed += 1
353
- server.total_time += (end_time - start_time)
354
-
355
- print(f"[{FLOW_ID}] ✓ {wav_path.name} transcribed successfully by {server.url}")
356
-
357
- return {
358
- "file": wav_path.name,
359
- "transcription": result,
360
- "timestamp": datetime.now().isoformat(),
361
- "processing_time_seconds": end_time - start_time
362
- }
363
- else:
364
- error_text = await resp.text()
365
- print(f"[{FLOW_ID}] ✗ Error from {server.url}: {resp.status} - {error_text}")
366
- return None
367
 
368
  except asyncio.TimeoutError:
369
  print(f"[{FLOW_ID}] ✗ Timeout from {server.url} for {wav_path.name}")
@@ -404,6 +405,27 @@ async def process_batch_dynamic(wav_files: List[str], start_batch_index: int, ba
404
  pending_tasks: Dict[asyncio.Task, Tuple[int, Path, WhisperServer]] = {}
405
 
406
  print(f"[{FLOW_ID}] Processing batch from index {start_batch_index} to {batch_end}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
407
 
408
  try:
409
  while current_index < batch_end or pending_tasks:
@@ -421,13 +443,12 @@ async def process_batch_dynamic(wav_files: List[str], start_batch_index: int, ba
421
  wav_file = wav_files[file_index]
422
  wav_filename = Path(wav_file).name
423
 
424
- # Mark file as processing in state
425
- state["file_states"][wav_filename] = "processing"
426
-
427
  # Download the WAV file
428
  wav_path = await download_wav_file_by_index(file_index + 1, wav_file)
429
  if not wav_path:
430
  state["file_states"][wav_filename] = "failed"
 
 
431
  current_index += 1
432
  continue
433
 
@@ -454,22 +475,20 @@ async def process_batch_dynamic(wav_files: List[str], start_batch_index: int, ba
454
  transcription_result = task.result()
455
 
456
  if transcription_result:
457
- # Check if we should pause uploading
458
- if UPLOAD_PAUSE_ENABLED and uploaded_count >= MAX_UPLOADS_BEFORE_PAUSE:
459
- print(f"[{FLOW_ID}] ⏸️ Upload limit reached ({uploaded_count}/{MAX_UPLOADS_BEFORE_PAUSE}). Pausing uploads but continuing processing...")
460
- # Mark as processed but don't upload
461
  state["file_states"][wav_filename] = "processed"
 
 
 
462
  else:
463
- # Upload transcription
464
- if await upload_transcription_to_hf(wav_filename, transcription_result):
465
- state["file_states"][wav_filename] = "processed"
466
- uploaded_count += 1
467
- progress['uploaded_count'] = uploaded_count
468
- save_progress(progress)
469
- else:
470
- state["file_states"][wav_filename] = "failed"
471
  else:
472
  state["file_states"][wav_filename] = "failed"
 
473
 
474
  except Exception as e:
475
  print(f"[{FLOW_ID}] Error processing result for {wav_filename}: {e}")
@@ -520,6 +539,19 @@ async def process_dataset_task(start_index: int):
520
  if 'uploaded_count' not in progress:
521
  progress['uploaded_count'] = 0
522
 
 
 
 
 
 
 
 
 
 
 
 
 
 
523
  global_success = True
524
  current_batch_index = start_list_index
525
  batch_size = len(servers) * 2 # Process 2 batches per server at a time
 
283
  print(f"[{FLOW_ID}] Downloading file #{file_index}: {repo_file_full_path}")
284
 
285
  try:
286
+ # Download the file into our TEMP_DIR (so we can safely delete it later)
287
  wav_path = hf_hub_download(
288
  repo_id=HF_AUDIO_DATASET_ID,
289
  filename=repo_file_full_path,
290
  repo_type="dataset",
291
  token=HF_TOKEN,
292
+ local_dir=str(TEMP_DIR),
293
+ local_dir_use_symlinks=False,
294
  )
295
+
296
  print(f"[{FLOW_ID}] Downloaded WAV file to {wav_path}")
297
  return Path(wav_path)
298
+
299
  except Exception as e:
300
  print(f"[{FLOW_ID}] Error downloading WAV file {repo_file_full_path}: {e}")
301
  return None
 
338
 
339
  # Prepare multipart form data
340
  form_data = aiohttp.FormData()
341
+ # Open the file in a context manager so the descriptor is closed after the request
342
+ with wav_path.open('rb') as f:
343
+ form_data.add_field('file', f, filename=wav_path.name, content_type='audio/wav')
344
+
345
+ async with aiohttp.ClientSession() as session:
346
+ # 10 minute timeout for transcription
347
+ async with session.post(server.url, data=form_data, timeout=600) as resp:
348
+ if resp.status == 200:
349
+ result = await resp.json()
350
+ end_time = time.time()
351
+
352
+ # Update server stats
353
+ server.total_processed += 1
354
+ server.total_time += (end_time - start_time)
355
+
356
+ print(f"[{FLOW_ID}] ✓ {wav_path.name} transcribed successfully by {server.url}")
357
+
358
+ return {
359
+ "file": wav_path.name,
360
+ "transcription": result,
361
+ "timestamp": datetime.now().isoformat(),
362
+ "processing_time_seconds": end_time - start_time
363
+ }
364
+ else:
365
+ error_text = await resp.text()
366
+ print(f"[{FLOW_ID}] Error from {server.url}: {resp.status} - {error_text}")
367
+ return None
 
368
 
369
  except asyncio.TimeoutError:
370
  print(f"[{FLOW_ID}] ✗ Timeout from {server.url} for {wav_path.name}")
 
405
  pending_tasks: Dict[asyncio.Task, Tuple[int, Path, WhisperServer]] = {}
406
 
407
  print(f"[{FLOW_ID}] Processing batch from index {start_batch_index} to {batch_end}")
408
+
409
+ # --- Batch-level locking: mark all files in this batch as 'processing' and upload state
410
+ try:
411
+ for idx in range(start_batch_index, batch_end):
412
+ wav_file = wav_files[idx]
413
+ wav_name = Path(wav_file).name
414
+ state.setdefault("file_states", {})
415
+ # Only set to processing if it's not already processed/processing
416
+ if state["file_states"].get(wav_name) not in ("processing", "processed"):
417
+ state["file_states"][wav_name] = "processing"
418
+
419
+ # Advance the next_download_index to the end of this batch (1-based index)
420
+ state["next_download_index"] = batch_end
421
+
422
+ # Upload HF state to establish locks for this batch
423
+ if await upload_hf_state(state):
424
+ print(f"[{FLOW_ID}] ✅ Batch lock uploaded for indices {start_batch_index}..{batch_end - 1}")
425
+ else:
426
+ print(f"[{FLOW_ID}] ❌ Failed to upload batch lock for indices {start_batch_index}..{batch_end - 1}")
427
+ except Exception as e:
428
+ print(f"[{FLOW_ID}] Error while setting up batch locks: {e}")
429
 
430
  try:
431
  while current_index < batch_end or pending_tasks:
 
443
  wav_file = wav_files[file_index]
444
  wav_filename = Path(wav_file).name
445
 
 
 
 
446
  # Download the WAV file
447
  wav_path = await download_wav_file_by_index(file_index + 1, wav_file)
448
  if not wav_path:
449
  state["file_states"][wav_filename] = "failed"
450
+ # Persist failure to HF
451
+ await upload_hf_state(state)
452
  current_index += 1
453
  continue
454
 
 
475
  transcription_result = task.result()
476
 
477
  if transcription_result:
478
+ # Upload transcription (immediately)
479
+ uploaded_ok = await upload_transcription_to_hf(wav_filename, transcription_result)
480
+ if uploaded_ok:
 
481
  state["file_states"][wav_filename] = "processed"
482
+ uploaded_count += 1
483
+ progress['uploaded_count'] = uploaded_count
484
+ save_progress(progress)
485
  else:
486
+ state["file_states"][wav_filename] = "failed"
487
+ # Persist state change for this file immediately
488
+ await upload_hf_state(state)
 
 
 
 
 
489
  else:
490
  state["file_states"][wav_filename] = "failed"
491
+ await upload_hf_state(state)
492
 
493
  except Exception as e:
494
  print(f"[{FLOW_ID}] Error processing result for {wav_filename}: {e}")
 
539
  if 'uploaded_count' not in progress:
540
  progress['uploaded_count'] = 0
541
 
542
+ # If there was no HF state in the repo, upload a fresh initial state file
543
+ try:
544
+ if not current_state.get("file_states") and current_state.get("next_download_index", 0) == 0:
545
+ print(f"[{FLOW_ID}] No HF state detected; uploading initial state file to {HF_OUTPUT_DATASET_ID}...")
546
+ # Ensure structure
547
+ current_state.setdefault("file_states", {})
548
+ current_state.setdefault("next_download_index", 0)
549
+ if await upload_hf_state(current_state):
550
+ print(f"[{FLOW_ID}] ✅ Initial HF state uploaded.")
551
+ else:
552
+ print(f"[{FLOW_ID}] ❌ Failed to upload initial HF state.")
553
+ except Exception as e:
554
+ print(f"[{FLOW_ID}] Error while uploading initial HF state: {e}")
555
  global_success = True
556
  current_batch_index = start_list_index
557
  batch_size = len(servers) * 2 # Process 2 batches per server at a time