Fred808 commited on
Commit
34aa95c
Β·
verified Β·
1 Parent(s): 15d9901

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +177 -53
app.py CHANGED
@@ -5,7 +5,7 @@ import asyncio
5
  import aiohttp
6
  import zipfile
7
  import shutil
8
- from typing import Dict, List, Set, Optional, Tuple
9
  from urllib.parse import quote
10
  from datetime import datetime
11
  from pathlib import Path
@@ -23,50 +23,45 @@ HF_TOKEN = os.getenv("HF_TOKEN", "")
23
  HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3") # Source dataset for zip files
24
  HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") # Target dataset for captions
25
 
26
- # Progress Tracking File
27
  PROGRESS_FILE = Path("processing_progress.json")
 
 
 
 
28
  # Directory within the HF dataset where the zip files are located
29
- ZIP_FILE_PREFIX = "frames/"
30
 
31
  # Using the full list from the user's original code for actual deployment
32
  CAPTION_SERVERS = [
33
- "https://fred808-pil-4-1.hf.space/analyze",
34
- "https://fred808-pil-4-2.hf.space/analyze",
35
- "https://fred808-pil-4-3.hf.space/analyze",
36
- "https://fred1012-fred1012-gw0j2h.hf.space/analyze",
37
- "https://fred1012-fred1012-wqs6c2.hf.space/analyze",
38
- "https://fred1012-fred1012-oncray.hf.space/analyze",
39
- "https://fred1012-fred1012-4goge7.hf.space/analyze",
40
- "https://fred1012-fred1012-z0eh7m.hf.space/analyze",
41
- "https://fred1012-fred1012-u95rte.hf.space/analyze",
42
- "https://fred1012-fred1012-igje22.hf.space/analyze",
43
- "https://fred1012-fred1012-ibkuf8.hf.space/analyze",
44
- "https://fred1012-fred1012-nwqthy.hf.space/analyze",
45
- "https://fred1012-fred1012-4ldqj4.hf.space/analyze",
46
- "https://fred1012-fred1012-pivlzg.hf.space/analyze",
47
- "https://fred1012-fred1012-ptlc5u.hf.space/analyze",
48
- "https://fred1012-fred1012-u7lh57.hf.space/analyze",
49
- "https://fred1012-fred1012-q8djv1.hf.space/analyze",
50
- "https://fredalone-fredalone-ozugrp.hf.space/analyze",
51
- "https://fredalone-fredalone-9brxj2.hf.space/analyze",
52
- "https://fredalone-fredalone-p8vq9a.hf.space/analyze",
53
- "https://fredalone-fredalone-vbli2y.hf.space/analyze",
54
- "https://fredalone-fredalone-uggger.hf.space/analyze",
55
- "https://fredalone-fredalone-nmi7e8.hf.space/analyze",
56
- "https://fredalone-fredalone-d1f26d.hf.space/analyze",
57
- "https://fredalone-fredalone-461jp2.hf.space/analyze",
58
- "https://fredalone-fredalone-3enfg4.hf.space/analyze",
59
- "https://fredalone-fredalone-dqdbpv.hf.space/analyze",
60
- "https://fredalone-fredalone-ivtjua.hf.space/analyze",
61
- "https://fredalone-fredalone-6bezt2.hf.space/analyze",
62
- "https://fredalone-fredalone-e0wfnk.hf.space/analyze",
63
- "https://fredalone-fredalone-zu2t7j.hf.space/analyze",
64
- "https://fredalone-fredalone-dqtv1o.hf.space/analyze",
65
- "https://fredalone-fredalone-wclyog.hf.space/analyze",
66
- "https://fredalone-fredalone-t27vig.hf.space/analyze",
67
- "https://fredalone-fredalone-gahbxh.hf.space/analyze",
68
- "https://fredalone-fredalone-kw2po4.hf.space/analyze",
69
- "https://fredalone-fredalone-8h285h.hf.space/analyze"
70
  ]
71
  MODEL_TYPE = "Florence-2-large"
72
 
@@ -94,10 +89,10 @@ class CaptionServer:
94
  servers = [CaptionServer(url) for url in CAPTION_SERVERS]
95
  server_index = 0
96
 
97
- # --- Progress Tracking Functions ---
98
 
99
  def load_progress() -> Dict:
100
- """Loads the processing progress from the JSON file."""
101
  if PROGRESS_FILE.exists():
102
  try:
103
  with PROGRESS_FILE.open('r') as f:
@@ -114,13 +109,127 @@ def load_progress() -> Dict:
114
  }
115
 
116
  def save_progress(progress_data: Dict):
117
- """Saves the processing progress to the JSON file."""
118
  try:
119
  with PROGRESS_FILE.open('w') as f:
120
  json.dump(progress_data, f, indent=4)
121
  except Exception as e:
122
  print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
123
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
124
  # --- Hugging Face Utility Functions ---
125
 
126
  async def get_zip_file_list(progress_data: Dict) -> List[str]:
@@ -325,7 +434,9 @@ async def send_image_for_captioning(image_path: Path, course_name: str, progress
325
  async def process_dataset_task(start_index: int):
326
  """Main task to process the dataset sequentially starting from a given index."""
327
 
 
328
  progress = load_progress()
 
329
  file_list = await get_zip_file_list(progress)
330
 
331
  if not file_list:
@@ -350,11 +461,18 @@ async def process_dataset_task(start_index: int):
350
  zip_full_name = Path(repo_file_full_path).name
351
  course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
352
 
353
- # Check if the file has already been successfully processed
354
- if str(file_index) in progress['processed_files']:
355
- print(f"[{FLOW_ID}] Skipping file #{file_index} ({zip_full_name}): Already processed according to progress file.")
356
- progress['last_processed_index'] = file_index
357
- save_progress(progress)
 
 
 
 
 
 
 
358
  continue
359
 
360
  extract_dir = None
@@ -433,16 +551,22 @@ async def process_dataset_task(start_index: int):
433
  shutil.rmtree(extract_dir, ignore_errors=True)
434
 
435
  if current_file_success:
436
- # Update progress only on successful completion of the file
437
  progress['last_processed_index'] = file_index
438
  progress['processed_files'][str(file_index)] = repo_file_full_path
439
  save_progress(progress)
440
- print(f"[{FLOW_ID}] Progress saved: File #{file_index} marked as processed.")
 
 
 
 
 
441
  else:
442
- # If a file fails, we stop the continuous loop to allow for manual intervention or a fresh start
443
- print(f"[{FLOW_ID}] File #{file_index} failed. Stopping continuous processing.")
 
 
444
  global_success = False
445
- break
446
 
447
  print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}")
448
  return global_success
 
5
  import aiohttp
6
  import zipfile
7
  import shutil
8
+ from typing import Dict, List, Set, Optional, Tuple, Any
9
  from urllib.parse import quote
10
  from datetime import datetime
11
  from pathlib import Path
 
23
  HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3") # Source dataset for zip files
24
  HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") # Target dataset for captions
25
 
26
+ # Progress and State Tracking
27
  PROGRESS_FILE = Path("processing_progress.json")
28
+ HF_STATE_FILE = "processing_state_captions.json" # State file in helium dataset
29
+ LOCAL_STATE_FOLDER = Path(".state") # Local folder for state file
30
+ LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
+
32
  # Directory within the HF dataset where the zip files are located
33
+ ZIP_FILE_PREFIX = "frames_zips/"
34
 
35
  # Using the full list from the user's original code for actual deployment
36
  CAPTION_SERVERS = [
37
+ "https://favoredone-favoredone-tv88mp.hf.space/analyze",
38
+ "https://favoredone-favoredone-7p1dcf.hf.space/analyze",
39
+ "https://favoredone-favoredone-k7b4mf.hf.space/analyze",
40
+ "https://favoredone-favoredone-mzlxc7.hf.space/analyze",
41
+ "https://favoredone-favoredone-aomfwa.hf.space/analyze",
42
+ "https://favoredone-favoredone-7g6v04.hf.space/analyze",
43
+ "https://favoredone-favoredone-dk1skh.hf.space/analyze",
44
+ "https://favoredone-favoredone-z4yo0y.hf.space/analyze",
45
+ "https://favoredone-favoredone-f6czeq.hf.space/analyze",
46
+ "https://favoredone-favoredone-5fo8ga.hf.space/analyze",
47
+ "https://favoredone-favoredone-zde8x6.hf.space/analyze",
48
+ "https://favoredone-favoredone-r0biih.hf.space/analyze",
49
+ "https://favoredone-favoredone-ljdzkf.hf.space/analyze",
50
+ "https://favoredone-favoredone-irrpe5.hf.space/analyze",
51
+ "https://favoredone-favoredone-bh9rwz.hf.space/analyze",
52
+ "https://favoredone-favoredone-u8c4dt.hf.space/analyze",
53
+ "https://favoredone-favoredone-futwyd.hf.space/analyze",
54
+ "https://favoredone-favoredone-hg2sot.hf.space/analyze",
55
+ "https://favoredone-favoredone-pvweug.hf.space/analyze",
56
+ "https://favoredone-favoredone-z6azk2.hf.space/analyze",
57
+ "https://favoredone-favoredone-4zid9w.hf.space/analyze",
58
+ "https://favoredone-favoredone-be7a1r.hf.space/analyze",
59
+ "https://favoredone-favoredone-ayazxa.hf.space/analyze",
60
+ "https://favoredone-favoredone-6ckj4m.hf.space/analyze",
61
+ "https://favoredone-favoredone-whn0xu.hf.space/analyze",
62
+ "https://favoredone-favoredone-t49exm.hf.space/analyze",
63
+ "https://favoredone-favoredone-cgrh0a.hf.space/analyze",
64
+ "https://favoredone-favoredone-r1kb5g.hf.space/analyze"
 
 
 
 
 
 
 
 
 
65
  ]
66
  MODEL_TYPE = "Florence-2-large"
67
 
 
89
  servers = [CaptionServer(url) for url in CAPTION_SERVERS]
90
  server_index = 0
91
 
92
+ # --- Progress and State Management Functions ---
93
 
94
  def load_progress() -> Dict:
95
+ """Loads the local processing progress from the JSON file."""
96
  if PROGRESS_FILE.exists():
97
  try:
98
  with PROGRESS_FILE.open('r') as f:
 
109
  }
110
 
111
  def save_progress(progress_data: Dict):
112
+ """Saves the local processing progress to the JSON file."""
113
  try:
114
  with PROGRESS_FILE.open('w') as f:
115
  json.dump(progress_data, f, indent=4)
116
  except Exception as e:
117
  print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
118
 
119
+ def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
120
+ """Load state from JSON file with migration logic for new structure."""
121
+ if os.path.exists(file_path):
122
+ try:
123
+ with open(file_path, "r") as f:
124
+ data = json.load(f)
125
+
126
+ # Migration Logic
127
+ if "file_states" not in data or not isinstance(data["file_states"], dict):
128
+ print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.")
129
+ data["file_states"] = {}
130
+
131
+ if "next_download_index" not in data:
132
+ data["next_download_index"] = 0
133
+
134
+ return data
135
+ except json.JSONDecodeError:
136
+ print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
137
+ return default_value
138
+
139
+ def save_json_state(file_path: str, data: Dict[str, Any]):
140
+ """Save state to JSON file"""
141
+ with open(file_path, "w") as f:
142
+ json.dump(data, f, indent=2)
143
+
144
+ async def download_hf_state() -> Dict[str, Any]:
145
+ """Downloads the state file from Hugging Face or returns a default state."""
146
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
147
+ default_state = {"next_download_index": 0, "file_states": {}}
148
+
149
+ try:
150
+ # Check if the file exists in the helium repo
151
+ files = HfApi(token=HF_TOKEN).list_repo_files(
152
+ repo_id=HF_OUTPUT_DATASET_ID,
153
+ repo_type="dataset"
154
+ )
155
+
156
+ if HF_STATE_FILE not in files:
157
+ print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.")
158
+ return default_state
159
+
160
+ # Download the file
161
+ hf_hub_download(
162
+ repo_id=HF_OUTPUT_DATASET_ID,
163
+ filename=HF_STATE_FILE,
164
+ repo_type="dataset",
165
+ local_dir=LOCAL_STATE_FOLDER,
166
+ local_dir_use_symlinks=False,
167
+ token=HF_TOKEN
168
+ )
169
+
170
+ print(f"[{FLOW_ID}] Successfully downloaded state file.")
171
+ return load_json_state(str(local_path), default_state)
172
+
173
+ except Exception as e:
174
+ print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
175
+ return default_state
176
+
177
+ async def upload_hf_state(state: Dict[str, Any]) -> bool:
178
+ """Uploads the state file to Hugging Face."""
179
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
180
+
181
+ try:
182
+ # Save state locally first
183
+ save_json_state(str(local_path), state)
184
+
185
+ # Upload to helium dataset
186
+ HfApi(token=HF_TOKEN).upload_file(
187
+ path_or_fileobj=str(local_path),
188
+ path_in_repo=HF_STATE_FILE,
189
+ repo_id=HF_OUTPUT_DATASET_ID,
190
+ repo_type="dataset",
191
+ commit_message=f"Update caption processing state: next_index={state['next_download_index']}"
192
+ )
193
+ print(f"[{FLOW_ID}] Successfully uploaded state file.")
194
+ return True
195
+ except Exception as e:
196
+ print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
197
+ return False
198
+
199
+ async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
200
+ """Marks a file as 'processing' in the state file and uploads the lock."""
201
+ print(f"[{FLOW_ID}] πŸ”’ Attempting to lock file: {zip_filename}")
202
+
203
+ # Update state locally
204
+ state["file_states"][zip_filename] = "processing"
205
+
206
+ # Upload the updated state file immediately to establish the lock
207
+ if await upload_hf_state(state):
208
+ print(f"[{FLOW_ID}] βœ… Successfully locked file: {zip_filename}")
209
+ return True
210
+ else:
211
+ print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}")
212
+ # Revert local state
213
+ if zip_filename in state["file_states"]:
214
+ del state["file_states"][zip_filename]
215
+ return False
216
+
217
+ async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool:
218
+ """Marks a file as 'processed', updates the index, and uploads the state."""
219
+ print(f"[{FLOW_ID}] πŸ”“ Marking file as processed: {zip_filename}")
220
+
221
+ # Update state locally
222
+ state["file_states"][zip_filename] = "processed"
223
+ state["next_download_index"] = next_index
224
+
225
+ # Upload the updated state
226
+ if await upload_hf_state(state):
227
+ print(f"[{FLOW_ID}] βœ… Successfully marked as processed: {zip_filename}")
228
+ return True
229
+ else:
230
+ print(f"[{FLOW_ID}] ❌ Failed to update state for: {zip_filename}")
231
+ return False
232
+
233
  # --- Hugging Face Utility Functions ---
234
 
235
  async def get_zip_file_list(progress_data: Dict) -> List[str]:
 
434
  async def process_dataset_task(start_index: int):
435
  """Main task to process the dataset sequentially starting from a given index."""
436
 
437
+ # Load both local progress and HF state
438
  progress = load_progress()
439
+ current_state = await download_hf_state()
440
  file_list = await get_zip_file_list(progress)
441
 
442
  if not file_list:
 
461
  zip_full_name = Path(repo_file_full_path).name
462
  course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
463
 
464
+ # Check file state in both local and HF state
465
+ file_state = current_state["file_states"].get(zip_full_name)
466
+ if file_state == "processed":
467
+ print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.")
468
+ continue
469
+ elif file_state == "processing":
470
+ print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.")
471
+ continue
472
+
473
+ # Try to lock the file
474
+ if not await lock_file_for_processing(zip_full_name, current_state):
475
+ print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.")
476
  continue
477
 
478
  extract_dir = None
 
551
  shutil.rmtree(extract_dir, ignore_errors=True)
552
 
553
  if current_file_success:
554
+ # Update both local progress and HF state
555
  progress['last_processed_index'] = file_index
556
  progress['processed_files'][str(file_index)] = repo_file_full_path
557
  save_progress(progress)
558
+
559
+ # Update HF state and unlock the file
560
+ if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1):
561
+ print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}")
562
+ else:
563
+ print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}")
564
  else:
565
+ # Mark as failed in the state and continue with next file
566
+ current_state["file_states"][zip_full_name] = "failed"
567
+ await upload_hf_state(current_state)
568
+ print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.")
569
  global_success = False
 
570
 
571
  print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}")
572
  return global_success