factorstudios commited on
Commit
85db345
Β·
verified Β·
1 Parent(s): 0a4682f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +60 -52
app.py CHANGED
@@ -79,19 +79,25 @@ server_lock = asyncio.Lock() # Lock for thread-safe server state access
79
 
80
  def load_progress() -> Dict:
81
  """Loads the local processing progress from the JSON file."""
 
 
 
 
 
 
82
  if PROGRESS_FILE.exists():
83
  try:
84
  with PROGRESS_FILE.open('r') as f:
85
- return json.load(f)
 
 
 
 
 
86
  except json.JSONDecodeError:
87
  print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
88
 
89
- return {
90
- "last_processed_index": 0,
91
- "processed_files": {}, # {index: repo_path}
92
- "file_list": [], # Full list of all zip files found in the dataset
93
- "uploaded_count": 0
94
- }
95
 
96
  def save_progress(progress_data: Dict):
97
  """Saves the local processing progress to the JSON file."""
@@ -159,7 +165,7 @@ async def upload_hf_state(state: Dict[str, Any]) -> bool:
159
  # --- Hugging Face Utility Functions ---
160
 
161
  async def get_audio_file_list(progress_data: Dict) -> List[str]:
162
- if progress_data['file_list']:
163
  return progress_data['file_list']
164
  try:
165
  api = HfApi(token=HF_TOKEN)
@@ -228,7 +234,6 @@ async def process_file_task(wav_file: str, state: Dict, progress: Dict):
228
  state["file_states"][wav_file] = "processed"
229
  progress["uploaded_count"] = progress.get("uploaded_count", 0) + 1
230
  print(f"[{FLOW_ID}] βœ… Success: {wav_file}")
231
- # Note: In a real scenario, you'd save the 'result' (transcription) somewhere
232
  else:
233
  state["file_states"][wav_file] = "failed_transcription"
234
  print(f"[{FLOW_ID}] ❌ Failed: {wav_file}")
@@ -246,51 +251,54 @@ async def main_processing_loop():
246
  print(f"[{FLOW_ID}] Starting main processing loop...")
247
 
248
  while True:
249
- state = await download_hf_state()
250
- progress = load_progress()
251
- file_list = await get_audio_file_list(progress)
252
-
253
- if not file_list:
254
- await asyncio.sleep(60)
255
- continue
256
-
257
- # 1. Handpick failed_transcription files
258
- failed_files = [f for f, s in state.get("file_states", {}).items() if s == "failed_transcription"]
259
-
260
- # 2. Also check for new files based on next_download_index
261
- next_idx = state.get("next_download_index", 0)
262
- new_files = file_list[next_idx:next_idx + 100] # Take a chunk of new files
263
-
264
- # Combine: Prioritize failed files, then add new ones
265
- files_to_process = failed_files + [f for f in new_files if f not in state["file_states"]]
266
-
267
- if not files_to_process:
268
- print(f"[{FLOW_ID}] No files to process. Sleeping...")
269
- await asyncio.sleep(60)
270
- continue
271
 
272
- print(f"[{FLOW_ID}] Processing {len(files_to_process)} files ({len(failed_files)} failed, {len(files_to_process)-len(failed_files)} new)...")
273
-
274
- # Process in batches of server count
275
- batch_size = len(servers)
276
- for i in range(0, len(files_to_process), batch_size):
277
- batch = files_to_process[i:i + batch_size]
278
- tasks = [process_file_task(f, state, progress) for f in batch]
279
- await asyncio.gather(*tasks)
280
 
281
- # Update next_download_index if we processed new files
282
- processed_new = [f for f in batch if f in new_files]
283
- if processed_new:
284
- # This is a simple way to update index; in reality, you'd want to be more precise
285
- # but for this fix, we'll just increment based on what we found
286
- last_new_file = processed_new[-1]
287
- state["next_download_index"] = file_list.index(last_new_file) + 1
288
 
289
- # Save and upload state after each batch
290
- await upload_hf_state(state)
291
- save_progress(progress)
292
 
293
- await asyncio.sleep(10)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
294
 
295
  # --- FastAPI App ---
296
 
@@ -310,12 +318,12 @@ async def root():
310
  "status": "running",
311
  "next_download_index": state.get("next_download_index", 0),
312
  "failed_transcriptions": failed_count,
313
- "uploaded_count": progress.get("uploaded_count", 0)
 
314
  }
315
 
316
  @app.post("/start_processing")
317
  async def start_processing(request: ProcessStartRequest):
318
- # This endpoint can be used to manually reset the index if needed
319
  state = await download_hf_state()
320
  state["next_download_index"] = request.start_index - 1
321
  await upload_hf_state(state)
 
79
 
80
  def load_progress() -> Dict:
81
  """Loads the local processing progress from the JSON file."""
82
+ default_structure = {
83
+ "last_processed_index": 0,
84
+ "processed_files": {}, # {index: repo_path}
85
+ "file_list": [], # Full list of all zip files found in the dataset
86
+ "uploaded_count": 0
87
+ }
88
  if PROGRESS_FILE.exists():
89
  try:
90
  with PROGRESS_FILE.open('r') as f:
91
+ data = json.load(f)
92
+ # Ensure all keys exist
93
+ for key, value in default_structure.items():
94
+ if key not in data:
95
+ data[key] = value
96
+ return data
97
  except json.JSONDecodeError:
98
  print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
99
 
100
+ return default_structure
 
 
 
 
 
101
 
102
  def save_progress(progress_data: Dict):
103
  """Saves the local processing progress to the JSON file."""
 
165
  # --- Hugging Face Utility Functions ---
166
 
167
  async def get_audio_file_list(progress_data: Dict) -> List[str]:
168
+ if progress_data.get('file_list'):
169
  return progress_data['file_list']
170
  try:
171
  api = HfApi(token=HF_TOKEN)
 
234
  state["file_states"][wav_file] = "processed"
235
  progress["uploaded_count"] = progress.get("uploaded_count", 0) + 1
236
  print(f"[{FLOW_ID}] βœ… Success: {wav_file}")
 
237
  else:
238
  state["file_states"][wav_file] = "failed_transcription"
239
  print(f"[{FLOW_ID}] ❌ Failed: {wav_file}")
 
251
  print(f"[{FLOW_ID}] Starting main processing loop...")
252
 
253
  while True:
254
+ try:
255
+ state = await download_hf_state()
256
+ progress = load_progress()
257
+ file_list = await get_audio_file_list(progress)
258
+
259
+ if not file_list:
260
+ print(f"[{FLOW_ID}] File list empty, retrying in 60s...")
261
+ await asyncio.sleep(60)
262
+ continue
 
 
 
 
 
 
 
 
 
 
 
 
 
263
 
264
+ # 1. Handpick failed_transcription files
265
+ failed_files = [f for f, s in state.get("file_states", {}).items() if s == "failed_transcription"]
 
 
 
 
 
 
266
 
267
+ # 2. Also check for new files based on next_download_index
268
+ next_idx = state.get("next_download_index", 0)
269
+ new_files = file_list[next_idx:next_idx + 100] # Take a chunk of new files
 
 
 
 
270
 
271
+ # Combine: Prioritize failed files, then add new ones
272
+ files_to_process = failed_files + [f for f in new_files if f not in state["file_states"]]
 
273
 
274
+ if not files_to_process:
275
+ print(f"[{FLOW_ID}] No files to process. Sleeping...")
276
+ await asyncio.sleep(60)
277
+ continue
278
+
279
+ print(f"[{FLOW_ID}] Processing {len(files_to_process)} files ({len(failed_files)} failed, {len(files_to_process)-len(failed_files)} new)...")
280
+
281
+ # Process in batches of server count
282
+ batch_size = len(servers)
283
+ for i in range(0, len(files_to_process), batch_size):
284
+ batch = files_to_process[i:i + batch_size]
285
+ tasks = [process_file_task(f, state, progress) for f in batch]
286
+ await asyncio.gather(*tasks)
287
+
288
+ # Update next_download_index if we processed new files
289
+ processed_new = [f for f in batch if f in new_files]
290
+ if processed_new:
291
+ last_new_file = processed_new[-1]
292
+ state["next_download_index"] = file_list.index(last_new_file) + 1
293
+
294
+ # Save and upload state after each batch
295
+ await upload_hf_state(state)
296
+ save_progress(progress)
297
+
298
+ await asyncio.sleep(10)
299
+ except Exception as e:
300
+ print(f"[{FLOW_ID}] Error in main loop: {e}")
301
+ await asyncio.sleep(60)
302
 
303
  # --- FastAPI App ---
304
 
 
318
  "status": "running",
319
  "next_download_index": state.get("next_download_index", 0),
320
  "failed_transcriptions": failed_count,
321
+ "uploaded_count": progress.get("uploaded_count", 0),
322
+ "total_files_in_list": len(progress.get('file_list', []))
323
  }
324
 
325
  @app.post("/start_processing")
326
  async def start_processing(request: ProcessStartRequest):
 
327
  state = await download_hf_state()
328
  state["next_download_index"] = request.start_index - 1
329
  await upload_hf_state(state)