factorstudios commited on
Commit
14ccf3f
·
verified ·
1 Parent(s): f5e2cd2

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +86 -13
app.py CHANGED
@@ -37,7 +37,7 @@ UPLOAD_PAUSE_ENABLED = True
37
  AUDIO_FILE_PREFIX = "audio/"
38
 
39
  WHISPER_SERVERS = [
40
- f"https://makeitfr-mineo-{i}.hf.space/transcribe" for i in range(1, 21)
41
  ]
42
 
43
  # Temporary storage for audio files
@@ -231,6 +231,19 @@ async def process_file_task(wav_file: str, state: Dict, progress: Dict):
231
  result = await transcribe_with_server(server, wav_path)
232
 
233
  if result:
 
 
 
 
 
 
 
 
 
 
 
 
 
234
  state["file_states"][wav_file] = "processed"
235
  progress["uploaded_count"] = progress.get("uploaded_count", 0) + 1
236
  print(f"[{FLOW_ID}] ✅ Success: {wav_file}")
@@ -261,41 +274,101 @@ async def main_processing_loop():
261
  await asyncio.sleep(60)
262
  continue
263
 
 
 
 
 
 
 
 
 
 
 
 
264
  # 1. Handpick failed_transcription files
265
  failed_files = [f for f, s in state.get("file_states", {}).items() if s == "failed_transcription"]
266
 
267
  # 2. Also check for new files based on next_download_index
268
  next_idx = state.get("next_download_index", 0)
269
- new_files = file_list[next_idx:next_idx + 100] # Take a chunk of new files
 
270
 
271
  # Combine: Prioritize failed files, then add new ones
272
- files_to_process = failed_files + [f for f in new_files if f not in state["file_states"]]
273
 
274
- if not files_to_process:
275
  print(f"[{FLOW_ID}] No files to process. Sleeping...")
276
  await asyncio.sleep(60)
277
  continue
278
 
279
- print(f"[{FLOW_ID}] Processing {len(files_to_process)} files ({len(failed_files)} failed, {len(files_to_process)-len(failed_files)} new)...")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
280
 
281
- # Process in batches of server count
 
282
  batch_size = len(servers)
283
  for i in range(0, len(files_to_process), batch_size):
284
  batch = files_to_process[i:i + batch_size]
285
  tasks = [process_file_task(f, state, progress) for f in batch]
286
  await asyncio.gather(*tasks)
287
 
288
- # Update next_download_index if we processed new files
289
- processed_new = [f for f in batch if f in new_files]
290
- if processed_new:
291
- last_new_file = processed_new[-1]
292
- state["next_download_index"] = file_list.index(last_new_file) + 1
 
293
 
294
- # Save and upload state after each batch
295
  await upload_hf_state(state)
296
  save_progress(progress)
297
 
298
- await asyncio.sleep(10)
 
299
  except Exception as e:
300
  print(f"[{FLOW_ID}] Error in main loop: {e}")
301
  await asyncio.sleep(60)
 
37
  AUDIO_FILE_PREFIX = "audio/"
38
 
39
  WHISPER_SERVERS = [
40
+ f"https://eliasishere-makeitfr-mineo-{i}.hf.space/transcribe" for i in range(1, 21)
41
  ]
42
 
43
  # Temporary storage for audio files
 
231
  result = await transcribe_with_server(server, wav_path)
232
 
233
  if result:
234
+ # Upload transcription result to HF
235
+ json_filename = Path(wav_file).with_suffix('.json').name
236
+ json_content = json.dumps(result, indent=2, ensure_ascii=False).encode('utf-8')
237
+
238
+ api = HfApi(token=HF_TOKEN)
239
+ api.upload_file(
240
+ path_or_fileobj=io.BytesIO(json_content),
241
+ path_in_repo=json_filename,
242
+ repo_id=HF_OUTPUT_DATASET_ID,
243
+ repo_type="dataset",
244
+ commit_message=f"[{FLOW_ID}] Transcription for {wav_file}"
245
+ )
246
+
247
  state["file_states"][wav_file] = "processed"
248
  progress["uploaded_count"] = progress.get("uploaded_count", 0) + 1
249
  print(f"[{FLOW_ID}] ✅ Success: {wav_file}")
 
274
  await asyncio.sleep(60)
275
  continue
276
 
277
+ # Check HF_OUTPUT_DATASET_ID for existing JSON outputs
278
+ print(f"[{FLOW_ID}] Checking {HF_OUTPUT_DATASET_ID} for existing JSON outputs...")
279
+ try:
280
+ api = HfApi(token=HF_TOKEN)
281
+ existing_files = api.list_repo_files(repo_id=HF_OUTPUT_DATASET_ID, repo_type="dataset")
282
+ existing_json_files = {f for f in existing_files if f.endswith('.json')}
283
+ print(f"[{FLOW_ID}] Found {len(existing_json_files)} existing JSON files.")
284
+ except Exception as e:
285
+ print(f"[{FLOW_ID}] Warning: Could not fetch existing files: {e}")
286
+ existing_json_files = set()
287
+
288
  # 1. Handpick failed_transcription files
289
  failed_files = [f for f, s in state.get("file_states", {}).items() if s == "failed_transcription"]
290
 
291
  # 2. Also check for new files based on next_download_index
292
  next_idx = state.get("next_download_index", 0)
293
+ # We take a larger chunk to allow for more skipping without re-fetching the list
294
+ new_files_chunk = file_list[next_idx:next_idx + 500]
295
 
296
  # Combine: Prioritize failed files, then add new ones
297
+ files_to_check = failed_files + [f for f in new_files_chunk if f not in state["file_states"]]
298
 
299
+ if not files_to_check:
300
  print(f"[{FLOW_ID}] No files to process. Sleeping...")
301
  await asyncio.sleep(60)
302
  continue
303
 
304
+ files_to_process = []
305
+ state_changed_locally = False
306
+
307
+ print(f"[{FLOW_ID}] Scanning {len(files_to_check)} files for existing results...")
308
+
309
+ for f in files_to_check:
310
+ expected_json_name = Path(f).with_suffix('.json').name
311
+
312
+ if expected_json_name in existing_json_files:
313
+ # Mark locally but DO NOT upload yet
314
+ if state["file_states"].get(f) != "processed":
315
+ state["file_states"][f] = "processed"
316
+ state_changed_locally = True
317
+
318
+ # Update next_download_index if it's a new file
319
+ if f in new_files_chunk:
320
+ current_idx = file_list.index(f)
321
+ if current_idx >= state.get("next_download_index", 0):
322
+ state["next_download_index"] = current_idx + 1
323
+ continue
324
+
325
+ # If we reach here, we found an UNPROCESSED file
326
+ print(f"[{FLOW_ID}] Found unprocessed file: {f}")
327
+
328
+ # Before processing, if we have local changes (skips), upload the state once
329
+ if state_changed_locally:
330
+ print(f"[{FLOW_ID}] Synchronizing skipped files to HF state before processing...")
331
+ await upload_hf_state(state)
332
+ state_changed_locally = False
333
+
334
+ files_to_process.append(f)
335
+
336
+ # Once we find an unprocessed file, we stop the skip-scan and start processing
337
+ # This ensures we process files as soon as we find them
338
+ break
339
+
340
+ # If we scanned everything and only found skips, upload the state once at the end
341
+ if state_changed_locally and not files_to_process:
342
+ print(f"[{FLOW_ID}] Uploading final batch of skips to HF state...")
343
+ await upload_hf_state(state)
344
+
345
+ if not files_to_process:
346
+ # If we only found skips, the loop will restart and check the next chunk
347
+ continue
348
+
349
+ print(f"[{FLOW_ID}] Processing batch of {len(files_to_process)} unprocessed files...")
350
 
351
+ # Process the found unprocessed file(s)
352
+ # (In this logic, it's usually just 1 file at a time to ensure frequent state updates)
353
  batch_size = len(servers)
354
  for i in range(0, len(files_to_process), batch_size):
355
  batch = files_to_process[i:i + batch_size]
356
  tasks = [process_file_task(f, state, progress) for f in batch]
357
  await asyncio.gather(*tasks)
358
 
359
+ # Update next_download_index
360
+ for f in batch:
361
+ if f in file_list:
362
+ current_idx = file_list.index(f)
363
+ if current_idx >= state.get("next_download_index", 0):
364
+ state["next_download_index"] = current_idx + 1
365
 
366
+ # Save and upload state after processing the unprocessed file
367
  await upload_hf_state(state)
368
  save_progress(progress)
369
 
370
+ await asyncio.sleep(2) # Short sleep before looking for the next unprocessed file
371
+
372
  except Exception as e:
373
  print(f"[{FLOW_ID}] Error in main loop: {e}")
374
  await asyncio.sleep(60)