Samfredoly commited on
Commit
d0cb1d3
·
verified ·
1 Parent(s): 0735339

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +47 -3
app.py CHANGED
@@ -30,8 +30,13 @@ LOCAL_STATE_FOLDER = Path(".state")
30
  LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
 
32
  # Processing configuration
33
- MAX_UPLOADS_BEFORE_PAUSE = 120 # Pause uploading after 120 files
34
  UPLOAD_PAUSE_ENABLED = True
 
 
 
 
 
35
 
36
  # Directory within the HF dataset where the audio files are located
37
  AUDIO_FILE_PREFIX = "audio/"
@@ -305,6 +310,33 @@ async def upload_transcription_to_hf(wav_filename: str, transcription_data: Dict
305
  # Use the WAV filename, replacing the extension with .json
306
  json_filename = Path(wav_filename).with_suffix('.json').name
307
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
308
  try:
309
  print(f"[{FLOW_ID}] Uploading transcription for {wav_filename} as {json_filename} to {HF_OUTPUT_DATASET_ID}...")
310
 
@@ -320,6 +352,11 @@ async def upload_transcription_to_hf(wav_filename: str, transcription_data: Dict
320
  commit_message=f"[{FLOW_ID}] Transcription for {wav_filename}"
321
  )
322
 
 
 
 
 
 
323
  print(f"[{FLOW_ID}] Successfully uploaded transcription for {wav_filename}.")
324
  return True
325
 
@@ -620,17 +657,24 @@ async def root():
620
  total_time = sum(s.total_time for s in servers)
621
  avg_fps = total_processed / total_time if total_time > 0 else 0
622
 
 
 
 
 
 
623
  return {
624
  "flow_id": FLOW_ID,
625
  "status": "ready",
626
  "last_processed_index": progress.get('last_processed_index', 0),
627
  "total_files_in_list": len(progress['file_list']),
628
  "uploaded_count": progress.get('uploaded_count', 0),
 
 
 
629
  "total_servers": len(servers),
630
  "processing_servers": sum(1 for s in servers if s.is_processing),
631
  "total_files_processed_by_servers": total_processed,
632
- "avg_files_per_second": avg_fps,
633
- "upload_limit_paused": progress.get('uploaded_count', 0) >= MAX_UPLOADS_BEFORE_PAUSE
634
  }
635
 
636
  @app.post("/start_processing")
 
30
  LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
 
32
  # Processing configuration
33
+ MAX_UPLOADS_BEFORE_PAUSE = 120 # Max uploads allowed per hour
34
  UPLOAD_PAUSE_ENABLED = True
35
+ UPLOAD_WINDOW_SECONDS = 3600 # 1 hour in seconds
36
+
37
+ # Upload rate limiting (per-hour tracking)
38
+ upload_timestamps: List[float] = [] # Timestamps of recent uploads (for rate limiting)
39
+ upload_lock = asyncio.Lock() # Lock for upload_timestamps access
40
 
41
  # Directory within the HF dataset where the audio files are located
42
  AUDIO_FILE_PREFIX = "audio/"
 
310
  # Use the WAV filename, replacing the extension with .json
311
  json_filename = Path(wav_filename).with_suffix('.json').name
312
 
313
+ # --- Rate limiting: check uploads per hour
314
+ global upload_timestamps
315
+ if UPLOAD_PAUSE_ENABLED:
316
+ async with upload_lock:
317
+ now = time.time()
318
+ # Remove uploads older than 1 hour
319
+ upload_timestamps = [ts for ts in upload_timestamps if now - ts < UPLOAD_WINDOW_SECONDS]
320
+
321
+ # Check if we've hit the limit
322
+ if len(upload_timestamps) >= MAX_UPLOADS_BEFORE_PAUSE:
323
+ oldest_upload = upload_timestamps[0]
324
+ time_until_available = UPLOAD_WINDOW_SECONDS - (now - oldest_upload)
325
+ print(f"[{FLOW_ID}] ⏸️ Upload limit reached ({len(upload_timestamps)}/{MAX_UPLOADS_BEFORE_PAUSE} in last hour). Pausing for {time_until_available:.0f}s...")
326
+
327
+ # Release lock and wait
328
+
329
+ # Wait until the oldest upload falls outside the window
330
+ while True:
331
+ async with upload_lock:
332
+ now = time.time()
333
+ upload_timestamps = [ts for ts in upload_timestamps if now - ts < UPLOAD_WINDOW_SECONDS]
334
+ if len(upload_timestamps) < MAX_UPLOADS_BEFORE_PAUSE:
335
+ print(f"[{FLOW_ID}] ✅ Upload limit lifted. Resuming uploads...")
336
+ break
337
+
338
+ await asyncio.sleep(5) # Check every 5 seconds
339
+
340
  try:
341
  print(f"[{FLOW_ID}] Uploading transcription for {wav_filename} as {json_filename} to {HF_OUTPUT_DATASET_ID}...")
342
 
 
352
  commit_message=f"[{FLOW_ID}] Transcription for {wav_filename}"
353
  )
354
 
355
+ # Record this upload timestamp
356
+ if UPLOAD_PAUSE_ENABLED:
357
+ async with upload_lock:
358
+ upload_timestamps.append(time.time())
359
+
360
  print(f"[{FLOW_ID}] Successfully uploaded transcription for {wav_filename}.")
361
  return True
362
 
 
657
  total_time = sum(s.total_time for s in servers)
658
  avg_fps = total_processed / total_time if total_time > 0 else 0
659
 
660
+ # Get current uploads in the last hour
661
+ now = time.time()
662
+ uploads_in_last_hour = sum(1 for ts in upload_timestamps if now - ts < UPLOAD_WINDOW_SECONDS)
663
+ is_paused = uploads_in_last_hour >= MAX_UPLOADS_BEFORE_PAUSE
664
+
665
  return {
666
  "flow_id": FLOW_ID,
667
  "status": "ready",
668
  "last_processed_index": progress.get('last_processed_index', 0),
669
  "total_files_in_list": len(progress['file_list']),
670
  "uploaded_count": progress.get('uploaded_count', 0),
671
+ "uploads_in_last_hour": uploads_in_last_hour,
672
+ "upload_limit": MAX_UPLOADS_BEFORE_PAUSE,
673
+ "upload_paused": is_paused,
674
  "total_servers": len(servers),
675
  "processing_servers": sum(1 for s in servers if s.is_processing),
676
  "total_files_processed_by_servers": total_processed,
677
+ "avg_files_per_second": avg_fps
 
678
  }
679
 
680
  @app.post("/start_processing")