Update app.py
Browse files
app.py
CHANGED
|
@@ -183,8 +183,8 @@ def download_state_from_api() -> Dict[str, Any]:
|
|
| 183 |
state_data["file_states"] = {}
|
| 184 |
if "next_download_index" not in state_data:
|
| 185 |
state_data["next_download_index"] = 0
|
| 186 |
-
|
| 187 |
-
log_message(f"β
|
| 188 |
return state_data
|
| 189 |
|
| 190 |
except requests.exceptions.RequestException as e:
|
|
@@ -214,11 +214,11 @@ def upload_state_to_api(state: Dict[str, Any]) -> bool:
|
|
| 214 |
return True
|
| 215 |
|
| 216 |
except requests.exceptions.HTTPError as e:
|
| 217 |
-
|
| 218 |
-
|
| 219 |
-
|
| 220 |
-
|
| 221 |
-
|
| 222 |
except requests.exceptions.RequestException as e:
|
| 223 |
log_message(f"β Failed to upload state file to API ({url}): {str(e)}", "ERROR")
|
| 224 |
return False
|
|
@@ -269,12 +269,12 @@ def lock_file_for_processing(wav_filename: str, state: Dict[str, Any]) -> bool:
|
|
| 269 |
del state["file_states"][wav_filename]
|
| 270 |
return False
|
| 271 |
|
| 272 |
-
def unlock_file_as_processed(wav_filename: str, state: Dict[str, Any]) -> bool:
|
| 273 |
"""Marks a file as 'processed', updates the index, and uploads the state via API."""
|
| 274 |
log_message(f"π Attempting to unlock file: {wav_filename} (Marking as 'processed')", "INFO")
|
| 275 |
|
| 276 |
state["file_states"][wav_filename] = "processed"
|
| 277 |
-
|
| 278 |
|
| 279 |
if upload_state_to_api(state):
|
| 280 |
log_message(f"β
Successfully unlocked and marked as processed: {wav_filename} via API state upload", "INFO")
|
|
@@ -544,9 +544,7 @@ def process_audio_file(audio_path: str, reference_map: Dict[str, str], output_fi
|
|
| 544 |
return False
|
| 545 |
|
| 546 |
# 3. Upload transcription to API
|
| 547 |
-
|
| 548 |
-
|
| 549 |
-
if upload_success:
|
| 550 |
processing_status["transcribed_files"] += 1
|
| 551 |
# Clean up the local transcription file after successful upload
|
| 552 |
try:
|
|
@@ -581,7 +579,8 @@ def main_processing_loop():
|
|
| 581 |
while processing_status["is_running"]:
|
| 582 |
time.sleep(PROCESSING_DELAY)
|
| 583 |
|
| 584 |
-
# 1. Download state from the
|
|
|
|
| 585 |
current_state = download_state_from_api()
|
| 586 |
next_file_info = get_next_file_to_process(SOURCE_REPO_ID, current_state)
|
| 587 |
|
|
@@ -601,10 +600,9 @@ def main_processing_loop():
|
|
| 601 |
try:
|
| 602 |
# 2. Lock file by updating state on the API
|
| 603 |
# IMPORTANT: Update next_download_index when locking to prevent other workers from picking same file
|
| 604 |
-
|
| 605 |
-
|
| 606 |
-
# The index should be incremented before the lock attempt to prevent other workers from picking it up
|
| 607 |
current_state["next_download_index"] = target_index + 1
|
|
|
|
| 608 |
|
| 609 |
if not lock_file_for_processing(target_file, current_state):
|
| 610 |
log_message(f"β Failed to lock file {target_file}. Skipping.", "ERROR")
|
|
@@ -638,18 +636,17 @@ def main_processing_loop():
|
|
| 638 |
|
| 639 |
finally:
|
| 640 |
# 4. Unlock/Mark as processed by updating state on the API
|
| 641 |
-
# IMPORTANT:
|
| 642 |
-
# Instead, use the current_state which already has the correct next_download_index
|
| 643 |
|
| 644 |
if success:
|
| 645 |
-
# Mark as processed and
|
| 646 |
-
unlock_file_as_processed(target_file, current_state)
|
| 647 |
processing_status["processed_files"] += 1
|
| 648 |
else:
|
| 649 |
# Mark as failed but keep the incremented index so next worker can proceed
|
| 650 |
log_message(f"β οΈ File {target_file} failed. Marking as 'failed' and updating state.", "WARNING")
|
| 651 |
current_state["file_states"][target_file] = "failed"
|
| 652 |
-
#
|
| 653 |
upload_state_to_api(current_state)
|
| 654 |
|
| 655 |
# Clean up the downloaded audio file regardless of success
|
|
|
|
| 183 |
state_data["file_states"] = {}
|
| 184 |
if "next_download_index" not in state_data:
|
| 185 |
state_data["next_download_index"] = 0
|
| 186 |
+
|
| 187 |
+
log_message(f"β
Downloaded state: next_download_index={state_data['next_download_index']}, processed_files={len([f for f,s in state_data['file_states'].items() if s=='processed'])}", "INFO")
|
| 188 |
return state_data
|
| 189 |
|
| 190 |
except requests.exceptions.RequestException as e:
|
|
|
|
| 214 |
return True
|
| 215 |
|
| 216 |
except requests.exceptions.HTTPError as e:
|
| 217 |
+
if hasattr(e, 'response') and e.response.status_code == 409:
|
| 218 |
+
log_message(f"β οΈ State file already exists on server (409 Conflict) - Treating as success.", "INFO")
|
| 219 |
+
return True
|
| 220 |
+
log_message(f"β Failed to upload state file to API ({url}): {str(e)}", "ERROR")
|
| 221 |
+
return False
|
| 222 |
except requests.exceptions.RequestException as e:
|
| 223 |
log_message(f"β Failed to upload state file to API ({url}): {str(e)}", "ERROR")
|
| 224 |
return False
|
|
|
|
| 269 |
del state["file_states"][wav_filename]
|
| 270 |
return False
|
| 271 |
|
| 272 |
+
def unlock_file_as_processed(wav_filename: str, state: Dict[str, Any], next_index: int) -> bool:
|
| 273 |
"""Marks a file as 'processed', updates the index, and uploads the state via API."""
|
| 274 |
log_message(f"π Attempting to unlock file: {wav_filename} (Marking as 'processed')", "INFO")
|
| 275 |
|
| 276 |
state["file_states"][wav_filename] = "processed"
|
| 277 |
+
state["next_download_index"] = next_index
|
| 278 |
|
| 279 |
if upload_state_to_api(state):
|
| 280 |
log_message(f"β
Successfully unlocked and marked as processed: {wav_filename} via API state upload", "INFO")
|
|
|
|
| 544 |
return False
|
| 545 |
|
| 546 |
# 3. Upload transcription to API
|
| 547 |
+
if upload_transcription_to_api(final_json_path, final_json_filename):
|
|
|
|
|
|
|
| 548 |
processing_status["transcribed_files"] += 1
|
| 549 |
# Clean up the local transcription file after successful upload
|
| 550 |
try:
|
|
|
|
| 579 |
while processing_status["is_running"]:
|
| 580 |
time.sleep(PROCESSING_DELAY)
|
| 581 |
|
| 582 |
+
# 1. Download FRESH state from the API at the start of each iteration
|
| 583 |
+
# This ensures we respect the next_download_index that other workers may have set
|
| 584 |
current_state = download_state_from_api()
|
| 585 |
next_file_info = get_next_file_to_process(SOURCE_REPO_ID, current_state)
|
| 586 |
|
|
|
|
| 600 |
try:
|
| 601 |
# 2. Lock file by updating state on the API
|
| 602 |
# IMPORTANT: Update next_download_index when locking to prevent other workers from picking same file
|
| 603 |
+
old_index = current_state["next_download_index"]
|
|
|
|
|
|
|
| 604 |
current_state["next_download_index"] = target_index + 1
|
| 605 |
+
log_message(f"π Incrementing next_download_index from {old_index} to {current_state['next_download_index']}", "INFO")
|
| 606 |
|
| 607 |
if not lock_file_for_processing(target_file, current_state):
|
| 608 |
log_message(f"β Failed to lock file {target_file}. Skipping.", "ERROR")
|
|
|
|
| 636 |
|
| 637 |
finally:
|
| 638 |
# 4. Unlock/Mark as processed by updating state on the API
|
| 639 |
+
# IMPORTANT: Keep the incremented next_download_index from locking
|
|
|
|
| 640 |
|
| 641 |
if success:
|
| 642 |
+
# Mark as processed and keep the incremented index, then upload state
|
| 643 |
+
unlock_file_as_processed(target_file, current_state, current_state["next_download_index"])
|
| 644 |
processing_status["processed_files"] += 1
|
| 645 |
else:
|
| 646 |
# Mark as failed but keep the incremented index so next worker can proceed
|
| 647 |
log_message(f"β οΈ File {target_file} failed. Marking as 'failed' and updating state.", "WARNING")
|
| 648 |
current_state["file_states"][target_file] = "failed"
|
| 649 |
+
# Keep the incremented next_download_index - don't change it
|
| 650 |
upload_state_to_api(current_state)
|
| 651 |
|
| 652 |
# Clean up the downloaded audio file regardless of success
|