Samfredoly commited on
Commit
fb8c428
·
verified ·
1 Parent(s): d0cb1d3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +21 -48
app.py CHANGED
@@ -30,13 +30,8 @@ LOCAL_STATE_FOLDER = Path(".state")
30
  LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
 
32
  # Processing configuration
33
- MAX_UPLOADS_BEFORE_PAUSE = 120 # Max uploads allowed per hour
34
  UPLOAD_PAUSE_ENABLED = True
35
- UPLOAD_WINDOW_SECONDS = 3600 # 1 hour in seconds
36
-
37
- # Upload rate limiting (per-hour tracking)
38
- upload_timestamps: List[float] = [] # Timestamps of recent uploads (for rate limiting)
39
- upload_lock = asyncio.Lock() # Lock for upload_timestamps access
40
 
41
  # Directory within the HF dataset where the audio files are located
42
  AUDIO_FILE_PREFIX = "audio/"
@@ -310,33 +305,6 @@ async def upload_transcription_to_hf(wav_filename: str, transcription_data: Dict
310
  # Use the WAV filename, replacing the extension with .json
311
  json_filename = Path(wav_filename).with_suffix('.json').name
312
 
313
- # --- Rate limiting: check uploads per hour
314
- global upload_timestamps
315
- if UPLOAD_PAUSE_ENABLED:
316
- async with upload_lock:
317
- now = time.time()
318
- # Remove uploads older than 1 hour
319
- upload_timestamps = [ts for ts in upload_timestamps if now - ts < UPLOAD_WINDOW_SECONDS]
320
-
321
- # Check if we've hit the limit
322
- if len(upload_timestamps) >= MAX_UPLOADS_BEFORE_PAUSE:
323
- oldest_upload = upload_timestamps[0]
324
- time_until_available = UPLOAD_WINDOW_SECONDS - (now - oldest_upload)
325
- print(f"[{FLOW_ID}] ⏸️ Upload limit reached ({len(upload_timestamps)}/{MAX_UPLOADS_BEFORE_PAUSE} in last hour). Pausing for {time_until_available:.0f}s...")
326
-
327
- # Release lock and wait
328
-
329
- # Wait until the oldest upload falls outside the window
330
- while True:
331
- async with upload_lock:
332
- now = time.time()
333
- upload_timestamps = [ts for ts in upload_timestamps if now - ts < UPLOAD_WINDOW_SECONDS]
334
- if len(upload_timestamps) < MAX_UPLOADS_BEFORE_PAUSE:
335
- print(f"[{FLOW_ID}] ✅ Upload limit lifted. Resuming uploads...")
336
- break
337
-
338
- await asyncio.sleep(5) # Check every 5 seconds
339
-
340
  try:
341
  print(f"[{FLOW_ID}] Uploading transcription for {wav_filename} as {json_filename} to {HF_OUTPUT_DATASET_ID}...")
342
 
@@ -352,11 +320,6 @@ async def upload_transcription_to_hf(wav_filename: str, transcription_data: Dict
352
  commit_message=f"[{FLOW_ID}] Transcription for {wav_filename}"
353
  )
354
 
355
- # Record this upload timestamp
356
- if UPLOAD_PAUSE_ENABLED:
357
- async with upload_lock:
358
- upload_timestamps.append(time.time())
359
-
360
  print(f"[{FLOW_ID}] Successfully uploaded transcription for {wav_filename}.")
361
  return True
362
 
@@ -577,9 +540,13 @@ async def process_dataset_task(start_index: int):
577
  global_success = True
578
  current_batch_index = start_list_index
579
  batch_size = len(servers) # Batch size = number of servers (20 files per batch)
 
580
 
581
  try:
 
582
  while current_batch_index < len(file_list):
 
 
583
  # Process a batch dynamically
584
  next_index, uploaded_count = await process_batch_dynamic(
585
  file_list,
@@ -589,6 +556,9 @@ async def process_dataset_task(start_index: int):
589
  progress
590
  )
591
 
 
 
 
592
  # Update progress
593
  progress['last_processed_index'] = next_index
594
  progress['uploaded_count'] = uploaded_count
@@ -596,6 +566,7 @@ async def process_dataset_task(start_index: int):
596
 
597
  # Update current batch index
598
  current_batch_index = next_index
 
599
 
600
  # Log statistics
601
  print(f"[{FLOW_ID}] Batch complete. Progress: {current_batch_index}/{len(file_list)}, Uploaded: {uploaded_count}")
@@ -604,8 +575,17 @@ async def process_dataset_task(start_index: int):
604
  print(f"[{FLOW_ID}] Server Statistics:")
605
  for i, server in enumerate(servers):
606
  print(f" Server {i+1}: {server.total_processed} files, {server.total_time:.2f}s total, {server.fps:.2f} files/sec")
 
 
 
 
 
 
 
 
 
607
 
608
- print(f"[{FLOW_ID}] All files processed successfully!")
609
  return True
610
 
611
  except Exception as e:
@@ -657,24 +637,17 @@ async def root():
657
  total_time = sum(s.total_time for s in servers)
658
  avg_fps = total_processed / total_time if total_time > 0 else 0
659
 
660
- # Get current uploads in the last hour
661
- now = time.time()
662
- uploads_in_last_hour = sum(1 for ts in upload_timestamps if now - ts < UPLOAD_WINDOW_SECONDS)
663
- is_paused = uploads_in_last_hour >= MAX_UPLOADS_BEFORE_PAUSE
664
-
665
  return {
666
  "flow_id": FLOW_ID,
667
  "status": "ready",
668
  "last_processed_index": progress.get('last_processed_index', 0),
669
  "total_files_in_list": len(progress['file_list']),
670
  "uploaded_count": progress.get('uploaded_count', 0),
671
- "uploads_in_last_hour": uploads_in_last_hour,
672
- "upload_limit": MAX_UPLOADS_BEFORE_PAUSE,
673
- "upload_paused": is_paused,
674
  "total_servers": len(servers),
675
  "processing_servers": sum(1 for s in servers if s.is_processing),
676
  "total_files_processed_by_servers": total_processed,
677
- "avg_files_per_second": avg_fps
 
678
  }
679
 
680
  @app.post("/start_processing")
 
30
  LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
 
32
  # Processing configuration
33
+ MAX_UPLOADS_BEFORE_PAUSE = 120 # Pause uploading after 120 files
34
  UPLOAD_PAUSE_ENABLED = True
 
 
 
 
 
35
 
36
  # Directory within the HF dataset where the audio files are located
37
  AUDIO_FILE_PREFIX = "audio/"
 
305
  # Use the WAV filename, replacing the extension with .json
306
  json_filename = Path(wav_filename).with_suffix('.json').name
307
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
308
  try:
309
  print(f"[{FLOW_ID}] Uploading transcription for {wav_filename} as {json_filename} to {HF_OUTPUT_DATASET_ID}...")
310
 
 
320
  commit_message=f"[{FLOW_ID}] Transcription for {wav_filename}"
321
  )
322
 
 
 
 
 
 
323
  print(f"[{FLOW_ID}] Successfully uploaded transcription for {wav_filename}.")
324
  return True
325
 
 
540
  global_success = True
541
  current_batch_index = start_list_index
542
  batch_size = len(servers) # Batch size = number of servers (20 files per batch)
543
+ batch_interval_seconds = 600 # 600 seconds = 10 minutes (enforces max 6 batches per hour)
544
 
545
  try:
546
+ batch_count = 0
547
  while current_batch_index < len(file_list):
548
+ batch_start_time = time.time()
549
+
550
  # Process a batch dynamically
551
  next_index, uploaded_count = await process_batch_dynamic(
552
  file_list,
 
556
  progress
557
  )
558
 
559
+ batch_end_time = time.time()
560
+ batch_elapsed = batch_end_time - batch_start_time
561
+
562
  # Update progress
563
  progress['last_processed_index'] = next_index
564
  progress['uploaded_count'] = uploaded_count
 
566
 
567
  # Update current batch index
568
  current_batch_index = next_index
569
+ batch_count += 1
570
 
571
  # Log statistics
572
  print(f"[{FLOW_ID}] Batch complete. Progress: {current_batch_index}/{len(file_list)}, Uploaded: {uploaded_count}")
 
575
  print(f"[{FLOW_ID}] Server Statistics:")
576
  for i, server in enumerate(servers):
577
  print(f" Server {i+1}: {server.total_processed} files, {server.total_time:.2f}s total, {server.fps:.2f} files/sec")
578
+
579
+ # Rate limiting: enforce minimum 10 minutes between batch starts (max 6 batches per hour)
580
+ if current_batch_index < len(file_list): # Don't wait after the last batch
581
+ wait_time = batch_interval_seconds - batch_elapsed
582
+ if wait_time > 0:
583
+ print(f"[{FLOW_ID}] Rate limit: batch took {batch_elapsed:.1f}s. Waiting {wait_time:.1f}s before next batch (min 10 min interval)...")
584
+ await asyncio.sleep(wait_time)
585
+ else:
586
+ print(f"[{FLOW_ID}] Batch took {batch_elapsed:.1f}s (exceeded 10 min interval). Proceeding immediately to next batch.")
587
 
588
+ print(f"[{FLOW_ID}] All files processed successfully! Total batches: {batch_count}")
589
  return True
590
 
591
  except Exception as e:
 
637
  total_time = sum(s.total_time for s in servers)
638
  avg_fps = total_processed / total_time if total_time > 0 else 0
639
 
 
 
 
 
 
640
  return {
641
  "flow_id": FLOW_ID,
642
  "status": "ready",
643
  "last_processed_index": progress.get('last_processed_index', 0),
644
  "total_files_in_list": len(progress['file_list']),
645
  "uploaded_count": progress.get('uploaded_count', 0),
 
 
 
646
  "total_servers": len(servers),
647
  "processing_servers": sum(1 for s in servers if s.is_processing),
648
  "total_files_processed_by_servers": total_processed,
649
+ "avg_files_per_second": avg_fps,
650
+ "upload_limit_paused": progress.get('uploaded_count', 0) >= MAX_UPLOADS_BEFORE_PAUSE
651
  }
652
 
653
  @app.post("/start_processing")