Samfredoly commited on
Commit
8be7ee9
·
verified ·
1 Parent(s): f42fd31

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +704 -782
app.py CHANGED
@@ -1,783 +1,705 @@
1
- import os
2
- import json
3
- import time
4
- import asyncio
5
- import aiohttp
6
- import zipfile
7
- import shutil
8
- from typing import Dict, List, Set, Optional, Tuple, Any
9
- from urllib.parse import quote
10
- from datetime import datetime
11
- from pathlib import Path
12
- import io
13
-
14
- from fastapi import FastAPI, BackgroundTasks, HTTPException, status
15
- from pydantic import BaseModel, Field
16
- from huggingface_hub import HfApi, hf_hub_download
17
-
18
- # --- Configuration ---
19
- AUTO_START_INDEX = 1# Hardcoded default start index if no progress is found
20
- FLOW_ID = os.getenv("FLOW_ID", "flow_default")
21
- FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
22
- HF_TOKEN = os.getenv("HF_TOKEN", "")
23
- HF_DATASET_ID = os.getenv("HF_DATASET_ID", "samfred2/BG4") # Source dataset for zip files
24
- HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "samelias1/Data_TG") # Target dataset for captions
25
-
26
- # Progress and State Tracking
27
- PROGRESS_FILE = Path("processing_progress.json")
28
- HF_STATE_FILE = "processing_state_cursors.json" # State file in helium dataset
29
- LOCAL_STATE_FOLDER = Path(".state") # Local folder for state file
30
- LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
-
32
- # Directory within the HF dataset where the zip files are located
33
- ZIP_FILE_PREFIX = "frames_zips/"
34
-
35
- # Using the full list from the user's original code for actual deployment
36
- CAPTION_SERVERS = [
37
- # "https://Son4live-ajax-1.hf.space/track_cursor",
38
- # "https://Son4live-ajax-2.hf.space/track_cursor",
39
- # "https://Son4live-ajax-4.hf.space/track_cursor",
40
- # "https://Son4live-ajax-5.hf.space/track_cursor",
41
- # "https://Son4live-ajax-6.hf.space/track_cursor",
42
- # "https://Son4live-ajax-7.hf.space/track_cursor",
43
- # "https://Son4live-ajax-8.hf.space/track_cursor",
44
- # "https://Son4live-ajax-9.hf.space/track_cursor",
45
- # "https://Son4live-ajax-10.hf.space/track_cursor",
46
- # "https://Son4live-ajax-11.hf.space/track_cursor",
47
- # "https://Son4live-ajax-12.hf.space/track_cursor",
48
- # "https://Son4live-ajax-13.hf.space/track_cursor",
49
- # "https://Son4live-ajax-14.hf.space/track_cursor",
50
- # "https://Son4live-ajax-15.hf.space/track_cursor",
51
- # "https://Son4live-ajax-16.hf.space/track_cursor",
52
- # "https://Son4live-ajax-17.hf.space/track_cursor",
53
- # "https://Son4live-ajax-18.hf.space/track_cursor",
54
- # "https://Son4live-ajax-19.hf.space/track_cursor",
55
- # "https://Son4live-ajax-20.hf.space/track_cursor",
56
- "https://jirehlove-jaypq-1.hf.space/track_cursor",
57
- "https://jirehlove-jaypq-2.hf.space/track_cursor",
58
- "https://jirehlove-jaypq-3.hf.space/track_cursor",
59
- "https://jirehlove-jaypq-4.hf.space/track_cursor",
60
- "https://jirehlove-jaypq-5.hf.space/track_cursor",
61
- "https://jirehlove-jaypq-6.hf.space/track_cursor",
62
- "https://jirehlove-jaypq-7.hf.space/track_cursor",
63
- "https://jirehlove-jaypq-8.hf.space/track_cursor",
64
- "https://jirehlove-jaypq-9.hf.space/track_cursor",
65
- "https://jirehlove-jaypq-10.hf.space/track_cursor",
66
- "https://jirehlove-jaypq-11.hf.space/track_cursor",
67
- "https://jirehlove-jaypq-12.hf.space/track_cursor",
68
- "https://jirehlove-jaypq-13.hf.space/track_cursor",
69
- "https://jirehlove-jaypq-14.hf.space/track_cursor",
70
- "https://jirehlove-jaypq-15.hf.space/track_cursor",
71
- "https://jirehlove-jaypq-16.hf.space/track_cursor",
72
- "https://jirehlove-jaypq-17.hf.space/track_cursor",
73
- "https://jirehlove-jaypq-18.hf.space/track_cursor",
74
- "https://jirehlove-jaypq-19.hf.space/track_cursor",
75
- "https://jirehlove-jaypq-20.hf.space/track_cursor",
76
- "https://lovyone-ones-1.hf.space/track_cursor",
77
- "https://lovyone-ones-2.hf.space/track_cursor",
78
- "https://lovyone-ones-3.hf.space/track_cursor",
79
- "https://lovyone-ones-4.hf.space/track_cursor",
80
- "https://lovyone-ones-5.hf.space/track_cursor",
81
- "https://lovyone-ones-6.hf.space/track_cursor",
82
- "https://lovyone-ones-7.hf.space/track_cursor",
83
- "https://lovyone-ones-8.hf.space/track_cursor",
84
- "https://lovyone-ones-9.hf.space/track_cursor",
85
- "https://lovyone-ones-10.hf.space/track_cursor",
86
- "https://lovyone-ones-11.hf.space/track_cursor",
87
- "https://lovyone-ones-12.hf.space/track_cursor",
88
- "https://lovyone-ones-13.hf.space/track_cursor",
89
- "https://lovyone-ones-14.hf.space/track_cursor",
90
- "https://lovyone-ones-15.hf.space/track_cursor",
91
- "https://lovyone-ones-16.hf.space/track_cursor",
92
- "https://lovyone-ones-17.hf.space/track_cursor",
93
- "https://lovyone-ones-18.hf.space/track_cursor",
94
- "https://lovyone-ones-19.hf.space/track_cursor",
95
- "https://lovyone-ones-20.hf.space/track_cursor",
96
- "https://eliason1-ones-1.hf.space/track_cursor",
97
- "https://eliason1-ones-2.hf.space/track_cursor",
98
- "https://eliason1-ones-3.hf.space/track_cursor",
99
- "https://eliason1-ones-4.hf.space/track_cursor",
100
- "https://eliason1-ones-6.hf.space/track_cursor",
101
- "https://eliason1-ones-7.hf.space/track_cursor",
102
- "https://eliason1-ones-8.hf.space/track_cursor",
103
- "https://eliason1-ones-9.hf.space/track_cursor",
104
- "https://eliason1-ones-10.hf.space/track_cursor",
105
- "https://eliason1-ones-11.hf.space/track_cursor",
106
- "https://eliason1-ones-12.hf.space/track_cursor",
107
- "https://eliason1-ones-13.hf.space/track_cursor",
108
- "https://eliason1-ones-14.hf.space/track_cursor",
109
- "https://eliason1-ones-15.hf.space/track_cursor",
110
- "https://eliason1-ones-17.hf.space/track_cursor",
111
- "https://eliason1-ones-18.hf.space/track_cursor",
112
- "https://eliason1-ones-19.hf.space/track_cursor",
113
- "https://eliason1-ones-20.hf.space/track_cursor",
114
- "https://elias5584-ones-1.hf.space/track_cursor",
115
- "https://elias5584-ones-2.hf.space/track_cursor",
116
- "https://elias5584-ones-3.hf.space/track_cursor",
117
- "https://elias5584-ones-4.hf.space/track_cursor",
118
- "https://elias5584-ones-5.hf.space/track_cursor",
119
- "https://elias5584-ones-6.hf.space/track_cursor",
120
- "https://elias5584-ones-7.hf.space/track_cursor",
121
- "https://elias5584-ones-8.hf.space/track_cursor",
122
- "https://elias5584-ones-9.hf.space/track_cursor",
123
- "https://elias5584-ones-10.hf.space/track_cursor",
124
- "https://elias5584-ones-11.hf.space/track_cursor",
125
- "https://elias5584-ones-12.hf.space/track_cursor",
126
- "https://elias5584-ones-13.hf.space/track_cursor",
127
- "https://elias5584-ones-14.hf.space/track_cursor",
128
- "https://elias5584-ones-15.hf.space/track_cursor",
129
- "https://elias5584-ones-16.hf.space/track_cursor",
130
- "https://elias5584-ones-17.hf.space/track_cursor",
131
- "https://elias5584-ones-18.hf.space/track_cursor",
132
- "https://elias5584-ones-19.hf.space/track_cursor",
133
- "https://elias5584-ones-20.hf.space/track_cursor",
134
- # "https://Son4live-ajax-1.hf.space/track_cursor",
135
- # "https://Son4live-ajax-2.hf.space/track_cursor",
136
- # "https://Son4live-ajax-4.hf.space/track_cursor",
137
- # "https://Son4live-ajax-5.hf.space/track_cursor",
138
- # "https://Son4live-ajax-6.hf.space/track_cursor",
139
- # "https://Son4live-ajax-7.hf.space/track_cursor",
140
- # "https://Son4live-ajax-8.hf.space/track_cursor",
141
- # "https://Son4live-ajax-9.hf.space/track_cursor",
142
- # "https://Son4live-ajax-10.hf.space/track_cursor",
143
- # "https://Son4live-ajax-11.hf.space/track_cursor",
144
- # "https://Son4live-ajax-12.hf.space/track_cursor",
145
- # "https://Son4live-ajax-13.hf.space/track_cursor",
146
- # "https://Son4live-ajax-14.hf.space/track_cursor",
147
- # "https://Son4live-ajax-15.hf.space/track_cursor",
148
- # "https://Son4live-ajax-16.hf.space/track_cursor",
149
- # "https://Son4live-ajax-17.hf.space/track_cursor",
150
- # "https://Son4live-ajax-18.hf.space/track_cursor",
151
- # "https://Son4live-ajax-19.hf.space/track_cursor",
152
- # "https://Son4live-ajax-20.hf.space/track_cursor",
153
- "https://jirehlove-jaypq-1.hf.space/track_cursor",
154
- "https://jirehlove-jaypq-2.hf.space/track_cursor",
155
- "https://jirehlove-jaypq-3.hf.space/track_cursor",
156
- "https://jirehlove-jaypq-4.hf.space/track_cursor",
157
- "https://jirehlove-jaypq-5.hf.space/track_cursor",
158
- "https://jirehlove-jaypq-6.hf.space/track_cursor",
159
- "https://jirehlove-jaypq-7.hf.space/track_cursor",
160
- "https://jirehlove-jaypq-8.hf.space/track_cursor",
161
- "https://jirehlove-jaypq-9.hf.space/track_cursor",
162
- "https://jirehlove-jaypq-10.hf.space/track_cursor",
163
- "https://jirehlove-jaypq-11.hf.space/track_cursor",
164
- "https://jirehlove-jaypq-12.hf.space/track_cursor",
165
- "https://jirehlove-jaypq-13.hf.space/track_cursor",
166
- "https://jirehlove-jaypq-14.hf.space/track_cursor",
167
- "https://jirehlove-jaypq-15.hf.space/track_cursor",
168
- "https://jirehlove-jaypq-16.hf.space/track_cursor",
169
- "https://jirehlove-jaypq-17.hf.space/track_cursor",
170
- "https://jirehlove-jaypq-18.hf.space/track_cursor",
171
- "https://jirehlove-jaypq-19.hf.space/track_cursor",
172
- "https://jirehlove-jaypq-20.hf.space/track_cursor",
173
- "https://lovyone-ones-1.hf.space/track_cursor",
174
- "https://lovyone-ones-2.hf.space/track_cursor",
175
- "https://lovyone-ones-3.hf.space/track_cursor",
176
- "https://lovyone-ones-4.hf.space/track_cursor",
177
- "https://lovyone-ones-5.hf.space/track_cursor",
178
- "https://lovyone-ones-6.hf.space/track_cursor",
179
- "https://lovyone-ones-7.hf.space/track_cursor",
180
- "https://lovyone-ones-8.hf.space/track_cursor",
181
- "https://lovyone-ones-9.hf.space/track_cursor",
182
- "https://lovyone-ones-10.hf.space/track_cursor",
183
- "https://lovyone-ones-11.hf.space/track_cursor",
184
- "https://lovyone-ones-12.hf.space/track_cursor",
185
- "https://lovyone-ones-13.hf.space/track_cursor",
186
- "https://lovyone-ones-14.hf.space/track_cursor",
187
- "https://lovyone-ones-15.hf.space/track_cursor",
188
- "https://lovyone-ones-16.hf.space/track_cursor",
189
- "https://lovyone-ones-17.hf.space/track_cursor",
190
- "https://lovyone-ones-18.hf.space/track_cursor",
191
- "https://lovyone-ones-19.hf.space/track_cursor",
192
- "https://lovyone-ones-20.hf.space/track_cursor",
193
- ]
194
- MODEL_TYPE = "Florence-2-large"
195
-
196
- # Temporary storage for images
197
- TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
198
- TEMP_DIR.mkdir(exist_ok=True)
199
-
200
- # --- Models ---
201
- class ProcessStartRequest(BaseModel):
202
- start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).")
203
-
204
- class CaptionServer:
205
- def __init__(self, url):
206
- self.url = url
207
- self.busy = False
208
- self.total_processed = 0
209
- self.total_time = 0
210
- self.model = MODEL_TYPE
211
-
212
- @property
213
- def fps(self):
214
- return self.total_processed / self.total_time if self.total_time > 0 else 0
215
-
216
- # Global state for caption servers
217
- servers = [CaptionServer(url) for url in CAPTION_SERVERS]
218
- server_index = 0
219
-
220
- # --- Progress and State Management Functions ---
221
-
222
- def load_progress() -> Dict:
223
- """Loads the local processing progress from the JSON file."""
224
- if PROGRESS_FILE.exists():
225
- try:
226
- with PROGRESS_FILE.open('r') as f:
227
- return json.load(f)
228
- except json.JSONDecodeError:
229
- print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
230
- # Fall through to return default structure
231
-
232
- # Default structure
233
- return {
234
- "last_processed_index": 0,
235
- "processed_files": {}, # {index: repo_path}
236
- "file_list": [] # Full list of all zip files found in the dataset
237
- }
238
-
239
- def save_progress(progress_data: Dict):
240
- """Saves the local processing progress to the JSON file."""
241
- try:
242
- with PROGRESS_FILE.open('w') as f:
243
- json.dump(progress_data, f, indent=4)
244
- except Exception as e:
245
- print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
246
-
247
- def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
248
- """Load state from JSON file with migration logic for new structure."""
249
- if os.path.exists(file_path):
250
- try:
251
- with open(file_path, "r") as f:
252
- data = json.load(f)
253
-
254
- # Migration Logic
255
- if "file_states" not in data or not isinstance(data["file_states"], dict):
256
- print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.")
257
- data["file_states"] = {}
258
-
259
- if "next_download_index" not in data:
260
- data["next_download_index"] = 0
261
-
262
- return data
263
- except json.JSONDecodeError:
264
- print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
265
- return default_value
266
-
267
- def save_json_state(file_path: str, data: Dict[str, Any]):
268
- """Save state to JSON file"""
269
- with open(file_path, "w") as f:
270
- json.dump(data, f, indent=2)
271
-
272
- async def download_hf_state() -> Dict[str, Any]:
273
- """Downloads the state file from Hugging Face or returns a default state."""
274
- local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
275
- default_state = {"next_download_index": 0, "file_states": {}}
276
-
277
- try:
278
- # Check if the file exists in the helium repo
279
- files = HfApi(token=HF_TOKEN).list_repo_files(
280
- repo_id=HF_OUTPUT_DATASET_ID,
281
- repo_type="dataset"
282
- )
283
-
284
- if HF_STATE_FILE not in files:
285
- print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.")
286
- return default_state
287
-
288
- # Download the file
289
- hf_hub_download(
290
- repo_id=HF_OUTPUT_DATASET_ID,
291
- filename=HF_STATE_FILE,
292
- repo_type="dataset",
293
- local_dir=LOCAL_STATE_FOLDER,
294
- local_dir_use_symlinks=False,
295
- token=HF_TOKEN
296
- )
297
-
298
- print(f"[{FLOW_ID}] Successfully downloaded state file.")
299
- return load_json_state(str(local_path), default_state)
300
-
301
- except Exception as e:
302
- print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
303
- return default_state
304
-
305
- async def upload_hf_state(state: Dict[str, Any]) -> bool:
306
- """Uploads the state file to Hugging Face."""
307
- local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
308
-
309
- try:
310
- # Save state locally first
311
- save_json_state(str(local_path), state)
312
-
313
- # Upload to helium dataset
314
- HfApi(token=HF_TOKEN).upload_file(
315
- path_or_fileobj=str(local_path),
316
- path_in_repo=HF_STATE_FILE,
317
- repo_id=HF_OUTPUT_DATASET_ID,
318
- repo_type="dataset",
319
- commit_message=f"Update caption processing state: next_index={state['next_download_index']}"
320
- )
321
- print(f"[{FLOW_ID}] Successfully uploaded state file.")
322
- return True
323
- except Exception as e:
324
- print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
325
- return False
326
-
327
- async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
328
- """Marks a file as 'processing' in the state file and uploads the lock."""
329
- print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {zip_filename}")
330
-
331
- # Update state locally
332
- state["file_states"][zip_filename] = "processing"
333
-
334
- # Upload the updated state file immediately to establish the lock
335
- if await upload_hf_state(state):
336
- print(f"[{FLOW_ID}] Successfully locked file: {zip_filename}")
337
- return True
338
- else:
339
- print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}")
340
- # Revert local state
341
- if zip_filename in state["file_states"]:
342
- del state["file_states"][zip_filename]
343
- return False
344
-
345
- async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool:
346
- """Marks a file as 'processed', updates the index, and uploads the state."""
347
- print(f"[{FLOW_ID}] 🔓 Marking file as processed: {zip_filename}")
348
-
349
- # Update state locally
350
- state["file_states"][zip_filename] = "processed"
351
- state["next_download_index"] = next_index
352
-
353
- # Upload the updated state
354
- if await upload_hf_state(state):
355
- print(f"[{FLOW_ID}] Successfully marked as processed: {zip_filename}")
356
- return True
357
- else:
358
- print(f"[{FLOW_ID}] ❌ Failed to update state for: {zip_filename}")
359
- return False
360
-
361
- # --- Hugging Face Utility Functions ---
362
-
363
- async def get_zip_file_list(progress_data: Dict) -> List[str]:
364
- """
365
- Fetches the list of all zip files from the dataset, or uses the cached list.
366
- Updates the progress_data with the file list if a new list is fetched.
367
- """
368
- if progress_data['file_list']:
369
- print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
370
- return progress_data['file_list']
371
-
372
- print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...")
373
- try:
374
- api = HfApi(token=HF_TOKEN)
375
- repo_files = api.list_repo_files(
376
- repo_id=HF_DATASET_ID,
377
- repo_type="dataset"
378
- )
379
-
380
- # Filter for zip files in the specified directory and sort them alphabetically for consistent indexing
381
- zip_files = sorted([
382
- f for f in repo_files
383
- if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip')
384
- ])
385
-
386
- if not zip_files:
387
- raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.")
388
-
389
- print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.")
390
-
391
- # Update and save the progress data
392
- progress_data['file_list'] = zip_files
393
- save_progress(progress_data)
394
-
395
- return zip_files
396
-
397
- except Exception as e:
398
- print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
399
- return []
400
-
401
- async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]:
402
- """Downloads the zip file for the given index and extracts its contents."""
403
-
404
- # Extract the base name for the extraction directory
405
- zip_full_name = Path(repo_file_full_path).name
406
- course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
407
-
408
- print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}")
409
-
410
- try:
411
- # Use hf_hub_download to get the file path
412
- zip_path = hf_hub_download(
413
- repo_id=HF_DATASET_ID,
414
- filename=repo_file_full_path, # Use the full path in the repo
415
- repo_type="dataset",
416
- token=HF_TOKEN,
417
- )
418
-
419
- print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
420
-
421
- # Create a temporary directory for extraction
422
- extract_dir = TEMP_DIR / course_name
423
- # Ensure a clean directory for extraction
424
- if extract_dir.exists():
425
- shutil.rmtree(extract_dir)
426
- extract_dir.mkdir(exist_ok=True)
427
-
428
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
429
- zip_ref.extractall(extract_dir)
430
-
431
- print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
432
-
433
- # Clean up the downloaded zip file to save space
434
- os.remove(zip_path)
435
-
436
- return extract_dir
437
-
438
- except Exception as e:
439
- print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
440
- return None
441
-
442
- async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
443
- """Uploads the final captions JSON file to the output dataset."""
444
- # Use the full zip name, replacing the extension with .json
445
- caption_filename = Path(zip_full_name).with_suffix('.json').name
446
-
447
- try:
448
- print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
449
-
450
- # Create JSON content in memory
451
- json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
452
-
453
- api = HfApi(token=HF_TOKEN)
454
- api.upload_file(
455
- path_or_fileobj=io.BytesIO(json_content),
456
- path_in_repo=caption_filename,
457
- repo_id=HF_OUTPUT_DATASET_ID,
458
- repo_type="dataset",
459
- commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
460
- )
461
-
462
- print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
463
- return True
464
-
465
- except Exception as e:
466
- print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
467
- return False
468
-
469
- # --- Core Processing Functions (Modified) ---
470
-
471
- async def get_available_server(timeout: float = 300.0) -> CaptionServer:
472
- """Round-robin selection of an available caption server."""
473
- global server_index
474
- start_time = time.time()
475
- while True:
476
- # Round-robin check for an available server
477
- for _ in range(len(servers)):
478
- server = servers[server_index]
479
- server_index = (server_index + 1) % len(servers)
480
- if not server.busy:
481
- return server
482
-
483
- # If all servers are busy, wait for a short period and check again
484
- await asyncio.sleep(0.5)
485
-
486
- # Check if timeout has been reached
487
- if time.time() - start_time > timeout:
488
- raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.")
489
-
490
- async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
491
- """Sends a single image to a caption server for processing."""
492
- # This function now handles server selection and retries internally
493
- MAX_RETRIES = 3
494
- for attempt in range(MAX_RETRIES):
495
- server = None
496
- try:
497
- # 1. Get an available server (will wait if all are busy, with a timeout)
498
- server = await get_available_server()
499
- server.busy = True
500
- start_time = time.time()
501
-
502
- # Print a less verbose message only on the first attempt
503
- if attempt == 0:
504
- print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...")
505
-
506
- # 2. Prepare request data
507
- form_data = aiohttp.FormData()
508
- form_data.add_field('file',
509
- image_path.open('rb'),
510
- filename=image_path.name,
511
- content_type='image/jpeg')
512
- form_data.add_field('model_choice', MODEL_TYPE)
513
-
514
- # 3. Send request
515
- async with aiohttp.ClientSession() as session:
516
- # Increased timeout to 10 minutes (600s) as requested by user's problem description
517
- async with session.post(server.url, data=form_data, timeout=600) as resp:
518
- if resp.status == 200:
519
- result = await resp.json()
520
-
521
- # Handle cursor detection response format
522
- if result.get('cursor_active') is not None: # Check if it's a valid cursor detection response
523
- # Update progress counter
524
- progress_tracker['completed'] += 1
525
- if progress_tracker['completed'] % 50 == 0:
526
- print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} detections completed.")
527
-
528
- # Log success only if it's not a progress report interval
529
- if progress_tracker['completed'] % 50 != 0:
530
- print(f"[{FLOW_ID}] Success: {image_path.name} processed by {server.url}")
531
-
532
- # Store the full cursor detection result
533
- return {
534
- "course": course_name,
535
- "image_path": image_path.name,
536
- "cursor_active": result.get('cursor_active', False),
537
- "x": result.get('x'),
538
- "y": result.get('y'),
539
- "confidence": result.get('confidence'),
540
- "template": result.get('template'),
541
- "image_shape": result.get('image_shape'),
542
- "server_url": server.url,
543
- "timestamp": datetime.now().isoformat()
544
- }
545
- else:
546
- print(f"[{FLOW_ID}] Server {server.url} returned invalid response format for {image_path.name}. Response: {result}")
547
- continue # Retry with a different server
548
- else:
549
- error_text = await resp.text()
550
- print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...")
551
- continue # Retry with a different server
552
-
553
- except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
554
- print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
555
- continue # Retry with a different server
556
- except Exception as e:
557
- print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...")
558
- continue # Retry with a different server
559
- finally:
560
- if server:
561
- end_time = time.time()
562
- server.busy = False
563
- server.total_processed += 1
564
- server.total_time += (end_time - start_time)
565
-
566
- print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
567
- return None
568
-
569
- async def process_dataset_task(start_index: int):
570
- """Main task to process the dataset sequentially starting from a given index."""
571
-
572
- # Load both local progress and HF state
573
- progress = load_progress()
574
- current_state = await download_hf_state()
575
- file_list = await get_zip_file_list(progress)
576
-
577
- if not file_list:
578
- print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.")
579
- return False
580
-
581
- # Ensure start_index is within bounds
582
- if start_index > len(file_list):
583
- print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.")
584
- return True
585
-
586
- # Determine the actual starting index in the 0-indexed list
587
- start_list_index = start_index - 1
588
-
589
- print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.")
590
-
591
- global_success = True
592
-
593
- for i in range(start_list_index, len(file_list)):
594
- file_index = i + 1 # 1-indexed for user display and progress tracking
595
- repo_file_full_path = file_list[i]
596
- zip_full_name = Path(repo_file_full_path).name
597
- course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
598
-
599
- # Check file state in both local and HF state
600
- file_state = current_state["file_states"].get(zip_full_name)
601
- if file_state == "processed":
602
- print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.")
603
- continue
604
- elif file_state == "processing":
605
- print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.")
606
- continue
607
-
608
- # Try to lock the file
609
- if not await lock_file_for_processing(zip_full_name, current_state):
610
- print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.")
611
- continue
612
-
613
- extract_dir = None
614
- current_file_success = False
615
-
616
- try:
617
- # 1. Download and Extract
618
- extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path)
619
-
620
- if not extract_dir:
621
- raise Exception("Failed to download or extract zip file.")
622
-
623
- # 2. Find Images
624
- # Use recursive glob to find images in subdirectories
625
- image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
626
- print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
627
-
628
- if not image_paths:
629
- print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
630
- current_file_success = True
631
- else:
632
- # 3. Process Images (Captioning)
633
- progress_tracker = {
634
- 'total': len(image_paths),
635
- 'completed': 0
636
- }
637
- print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
638
-
639
- # Create a semaphore to limit concurrent tasks to the number of available servers
640
- semaphore = asyncio.Semaphore(len(servers))
641
-
642
- async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
643
- async with semaphore:
644
- return await send_image_for_captioning(image_path, course_name, progress_tracker)
645
-
646
- # Create a list of tasks for parallel captioning
647
- caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths]
648
-
649
- # Run all captioning tasks concurrently
650
- results = await asyncio.gather(*caption_tasks)
651
-
652
- # Filter out failed results
653
- all_captions = [r for r in results if r is not None]
654
-
655
- # Final progress report for the current file
656
- if len(all_captions) == len(image_paths):
657
- print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully processed all {len(all_captions)} images.")
658
- else:
659
- print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} images.")
660
-
661
- # Calculate success statistics
662
- cursor_detected = sum(1 for c in all_captions if c.get('cursor_active', False))
663
- print(f"[{FLOW_ID}] Detection Statistics:")
664
- print(f"- Total processed: {len(all_captions)}")
665
- print(f"- Cursors detected: {cursor_detected}")
666
- print(f"- Detection rate: {(cursor_detected/len(all_captions)*100):.2f}%")
667
-
668
- # Consider the file successful if we have any captions at all
669
- current_file_success = len(all_captions) > 0
670
-
671
- # 4. Upload Results
672
- if all_captions:
673
- print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
674
- if await upload_captions_to_hf(zip_full_name, all_captions):
675
- print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
676
- # Mark as success if we have any captions and successfully uploaded them
677
- current_file_success = True
678
- else:
679
- print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
680
- current_file_success = False
681
- else:
682
- print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.")
683
- current_file_success = False
684
-
685
- except Exception as e:
686
- print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}")
687
- current_file_success = False
688
- global_success = False # Mark overall task as failed if any file fails critically
689
-
690
- finally:
691
- # 5. Cleanup and Update Progress
692
- if extract_dir and extract_dir.exists():
693
- print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
694
- shutil.rmtree(extract_dir, ignore_errors=True)
695
-
696
- if current_file_success:
697
- # Update both local progress and HF state
698
- progress['last_processed_index'] = file_index
699
- progress['processed_files'][str(file_index)] = repo_file_full_path
700
- save_progress(progress)
701
-
702
- # Update HF state and unlock the file
703
- if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1):
704
- print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}")
705
- else:
706
- print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}")
707
- else:
708
- # Mark as failed in the state and continue with next file
709
- current_state["file_states"][zip_full_name] = "failed"
710
- await upload_hf_state(current_state)
711
- print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.")
712
- global_success = False
713
-
714
- print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}")
715
- return global_success
716
-
717
- # --- FastAPI App and Endpoints ---
718
-
719
- app = FastAPI(
720
- title=f"Flow Server {FLOW_ID} API",
721
- description="Sequentially processes zip files from a dataset, captions images, and tracks progress.",
722
- version="1.0.0"
723
- )
724
-
725
- @app.on_event("startup")
726
- async def startup_event():
727
- print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.")
728
-
729
- # Get both local progress and HF state
730
- progress = load_progress()
731
- current_state = await download_hf_state()
732
-
733
- # Get the next_download_index from HF state if available
734
- hf_next_index = current_state.get("next_download_index", 0)
735
-
736
- # If HF state has a higher index, use that instead of local progress
737
- if hf_next_index > 0:
738
- start_index = hf_next_index
739
- print(f"[{FLOW_ID}] Using next_download_index from HF state: {start_index}")
740
- else:
741
- # Fall back to local progress if HF state doesn't have a meaningful index
742
- start_index = progress.get('last_processed_index', 0) + 1
743
- if start_index < AUTO_START_INDEX:
744
- start_index = AUTO_START_INDEX
745
-
746
- # Use a dummy BackgroundTasks object for the startup task
747
- # Note: FastAPI's startup events can't directly use BackgroundTasks, but we can use asyncio.create_task
748
- # to run the long-running process in the background without blocking the server startup.
749
- print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...")
750
- asyncio.create_task(process_dataset_task(start_index))
751
-
752
- @app.get("/")
753
- async def root():
754
- progress = load_progress()
755
- return {
756
- "flow_id": FLOW_ID,
757
- "status": "ready",
758
- "last_processed_index": progress['last_processed_index'],
759
- "total_files_in_list": len(progress['file_list']),
760
- "processed_files_count": len(progress['processed_files']),
761
- "total_servers": len(servers),
762
- "busy_servers": sum(1 for s in servers if s.busy),
763
- }
764
-
765
- @app.post("/start_processing")
766
- async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks):
767
- """
768
- Starts the sequential processing of zip files from the given index in the background.
769
- """
770
- start_index = request.start_index
771
-
772
- print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.")
773
-
774
- # Start the heavy processing in a background task so the API call returns immediately
775
- # Note: The server is already auto-starting, but this allows for manual restart/override.
776
- background_tasks.add_task(process_dataset_task, start_index)
777
-
778
- return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."}
779
-
780
- if __name__ == "__main__":
781
- import uvicorn
782
- # Note: When running in the sandbox, we need to use 0.0.0.0 to expose the port.
783
  uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)
 
1
+ import os
2
+ import json
3
+ import time
4
+ import asyncio
5
+ import aiohttp
6
+ import zipfile
7
+ import shutil
8
+ from typing import Dict, List, Set, Optional, Tuple, Any
9
+ from urllib.parse import quote
10
+ from datetime import datetime
11
+ from pathlib import Path
12
+ import io
13
+
14
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status
15
+ from pydantic import BaseModel, Field
16
+ from huggingface_hub import HfApi, hf_hub_download
17
+
18
+ # --- Configuration ---
19
+ AUTO_START_INDEX = 1# Hardcoded default start index if no progress is found
20
+ FLOW_ID = os.getenv("FLOW_ID", "flow_default")
21
+ FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
22
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
23
+ HF_DATASET_ID = os.getenv("HF_DATASET_ID", "samfred2/BG4") # Source dataset for zip files
24
+ HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "samelias1/Data_TG") # Target dataset for captions
25
+
26
+ # Progress and State Tracking
27
+ PROGRESS_FILE = Path("processing_progress.json")
28
+ HF_STATE_FILE = "processing_state_cursors.json" # State file in helium dataset
29
+ LOCAL_STATE_FOLDER = Path(".state") # Local folder for state file
30
+ LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
+
32
+ # Directory within the HF dataset where the zip files are located
33
+ ZIP_FILE_PREFIX = "frames_zips/"
34
+
35
+ # Using the full list from the user's original code for actual deployment
36
+ CAPTION_SERVERS = [
37
+ "https://elias5584-supreme-1.hf.space/track_cursor",
38
+ "https://elias5584-supreme-2.hf.space/track_cursor",
39
+ "https://elias5584-supreme-3.hf.space/track_cursor",
40
+ "https://elias5584-supreme-4.hf.space/track_cursor",
41
+ "https://elias5584-supreme-5.hf.space/track_cursor",
42
+ "https://elias5584-supreme-6.hf.space/track_cursor",
43
+ "https://elias5584-supreme-7.hf.space/track_cursor",
44
+ "https://elias5584-supreme-8.hf.space/track_cursor",
45
+ "https://elias5584-supreme-9.hf.space/track_cursor",
46
+ "https://elias5584-supreme-10.hf.space/track_cursor",
47
+ "https://elias5584-supreme-11.hf.space/track_cursor",
48
+ "https://elias5584-supreme-12.hf.space/track_cursor",
49
+ "https://elias5584-supreme-13.hf.space/track_cursor",
50
+ "https://elias5584-supreme-14.hf.space/track_cursor",
51
+ "https://elias5584-supreme-15.hf.space/track_cursor",
52
+ "https://elias5584-supreme-16.hf.space/track_cursor",
53
+ "https://elias5584-supreme-17.hf.space/track_cursor",
54
+ "https://elias5584-supreme-18.hf.space/track_cursor",
55
+ "https://elias5584-supreme-19.hf.space/track_cursor",
56
+ "https://elias5584-supreme-20.hf.space/track_cursor",
57
+ "https://lovyone-elite-1.hf.space/track_cursor",
58
+ "https://lovyone-elite-2.hf.space/track_cursor",
59
+ "https://lovyone-elite-3.hf.space/track_cursor",
60
+ "https://lovyone-elite-4.hf.space/track_cursor",
61
+ "https://lovyone-elite-5.hf.space/track_cursor",
62
+ "https://lovyone-elite-6.hf.space/track_cursor",
63
+ "https://lovyone-elite-7.hf.space/track_cursor",
64
+ "https://lovyone-elite-8.hf.space/track_cursor",
65
+ "https://lovyone-elite-9.hf.space/track_cursor",
66
+ "https://lovyone-elite-10.hf.space/track_cursor",
67
+ "https://lovyone-elite-11.hf.space/track_cursor",
68
+ "https://lovyone-elite-12.hf.space/track_cursor",
69
+ "https://lovyone-elite-13.hf.space/track_cursor",
70
+ "https://lovyone-elite-14.hf.space/track_cursor",
71
+ "https://lovyone-elite-15.hf.space/track_cursor",
72
+ "https://lovyone-elite-16.hf.space/track_cursor",
73
+ "https://lovyone-elite-17.hf.space/track_cursor",
74
+ "https://lovyone-elite-18.hf.space/track_cursor",
75
+ "https://lovyone-elite-19.hf.space/track_cursor",
76
+ "https://lovyone-elite-20.hf.space/track_cursor",
77
+ "https://eliason1-pia-3.hf.space/track_cursor",
78
+ "https://eliason1-pia-4.hf.space/track_cursor",
79
+ "https://eliason1-pia-5.hf.space/track_cursor",
80
+ "https://eliason1-pia-6.hf.space/track_cursor",
81
+ "https://eliason1-pia-7.hf.space/track_cursor",
82
+ "https://eliason1-pia-8.hf.space/track_cursor",
83
+ "https://eliason1-pia-9.hf.space/track_cursor",
84
+ "https://eliason1-pia-10.hf.space/track_cursor",
85
+ "https://eliason1-pia-11.hf.space/track_cursor",
86
+ "https://eliason1-pia-12.hf.space/track_cursor",
87
+ "https://eliason1-pia-13.hf.space/track_cursor",
88
+ "https://eliason1-pia-14.hf.space/track_cursor",
89
+ "https://eliason1-pia-15.hf.space/track_cursor",
90
+ "https://eliason1-pia-16.hf.space/track_cursor",
91
+ "https://eliason1-pia-17.hf.space/track_cursor",
92
+ "https://eliason1-pia-18.hf.space/track_cursor",
93
+ "https://eliason1-pia-19.hf.space/track_cursor",
94
+ "https://eliason1-pia-20.hf.space/track_cursor",
95
+ "https://jirehlove-clearly-1.hf.space/track_cursor",
96
+ "https://jirehlove-clearly-2.hf.space/track_cursor",
97
+ "https://jirehlove-clearly-3.hf.space/track_cursor",
98
+ "https://jirehlove-clearly-4.hf.space/track_cursor",
99
+ "https://jirehlove-clearly-5.hf.space/track_cursor",
100
+ "https://jirehlove-clearly-6.hf.space/track_cursor",
101
+ "https://jirehlove-clearly-7.hf.space/track_cursor",
102
+ "https://jirehlove-clearly-8.hf.space/track_cursor",
103
+ "https://jirehlove-clearly-9.hf.space/track_cursor",
104
+ "https://jirehlove-clearly-10.hf.space/track_cursor",
105
+ "https://jirehlove-clearly-11.hf.space/track_cursor",
106
+ "https://jirehlove-clearly-12.hf.space/track_cursor",
107
+ "https://jirehlove-clearly-13.hf.space/track_cursor",
108
+ "https://jirehlove-clearly-14.hf.space/track_cursor",
109
+ "https://jirehlove-clearly-15.hf.space/track_cursor",
110
+ "https://jirehlove-clearly-16.hf.space/track_cursor",
111
+ "https://jirehlove-clearly-17.hf.space/track_cursor",
112
+ "https://jirehlove-clearly-18.hf.space/track_cursor",
113
+ "https://jirehlove-clearly-19.hf.space/track_cursor",
114
+ "https://jirehlove-clearly-20.hf.space/track_cursor"
115
+ ]
116
+ MODEL_TYPE = "Florence-2-large"
117
+
118
+ # Temporary storage for images
119
+ TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
120
+ TEMP_DIR.mkdir(exist_ok=True)
121
+
122
+ # --- Models ---
123
+ class ProcessStartRequest(BaseModel):
124
+ start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).")
125
+
126
+ class CaptionServer:
127
+ def __init__(self, url):
128
+ self.url = url
129
+ self.busy = False
130
+ self.total_processed = 0
131
+ self.total_time = 0
132
+ self.model = MODEL_TYPE
133
+
134
+ @property
135
+ def fps(self):
136
+ return self.total_processed / self.total_time if self.total_time > 0 else 0
137
+
138
+ # Global state for caption servers
139
+ servers = [CaptionServer(url) for url in CAPTION_SERVERS]
140
+ server_index = 0
141
+
142
+ # --- Progress and State Management Functions ---
143
+
144
+ def load_progress() -> Dict:
145
+ """Loads the local processing progress from the JSON file."""
146
+ if PROGRESS_FILE.exists():
147
+ try:
148
+ with PROGRESS_FILE.open('r') as f:
149
+ return json.load(f)
150
+ except json.JSONDecodeError:
151
+ print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
152
+ # Fall through to return default structure
153
+
154
+ # Default structure
155
+ return {
156
+ "last_processed_index": 0,
157
+ "processed_files": {}, # {index: repo_path}
158
+ "file_list": [] # Full list of all zip files found in the dataset
159
+ }
160
+
161
+ def save_progress(progress_data: Dict):
162
+ """Saves the local processing progress to the JSON file."""
163
+ try:
164
+ with PROGRESS_FILE.open('w') as f:
165
+ json.dump(progress_data, f, indent=4)
166
+ except Exception as e:
167
+ print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
168
+
169
+ def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
170
+ """Load state from JSON file with migration logic for new structure."""
171
+ if os.path.exists(file_path):
172
+ try:
173
+ with open(file_path, "r") as f:
174
+ data = json.load(f)
175
+
176
+ # Migration Logic
177
+ if "file_states" not in data or not isinstance(data["file_states"], dict):
178
+ print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.")
179
+ data["file_states"] = {}
180
+
181
+ if "next_download_index" not in data:
182
+ data["next_download_index"] = 0
183
+
184
+ return data
185
+ except json.JSONDecodeError:
186
+ print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
187
+ return default_value
188
+
189
+ def save_json_state(file_path: str, data: Dict[str, Any]):
190
+ """Save state to JSON file"""
191
+ with open(file_path, "w") as f:
192
+ json.dump(data, f, indent=2)
193
+
194
+ async def download_hf_state() -> Dict[str, Any]:
195
+ """Downloads the state file from Hugging Face or returns a default state."""
196
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
197
+ default_state = {"next_download_index": 0, "file_states": {}}
198
+
199
+ try:
200
+ # Check if the file exists in the helium repo
201
+ files = HfApi(token=HF_TOKEN).list_repo_files(
202
+ repo_id=HF_OUTPUT_DATASET_ID,
203
+ repo_type="dataset"
204
+ )
205
+
206
+ if HF_STATE_FILE not in files:
207
+ print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.")
208
+ return default_state
209
+
210
+ # Download the file
211
+ hf_hub_download(
212
+ repo_id=HF_OUTPUT_DATASET_ID,
213
+ filename=HF_STATE_FILE,
214
+ repo_type="dataset",
215
+ local_dir=LOCAL_STATE_FOLDER,
216
+ local_dir_use_symlinks=False,
217
+ token=HF_TOKEN
218
+ )
219
+
220
+ print(f"[{FLOW_ID}] Successfully downloaded state file.")
221
+ return load_json_state(str(local_path), default_state)
222
+
223
+ except Exception as e:
224
+ print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
225
+ return default_state
226
+
227
+ async def upload_hf_state(state: Dict[str, Any]) -> bool:
228
+ """Uploads the state file to Hugging Face."""
229
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
230
+
231
+ try:
232
+ # Save state locally first
233
+ save_json_state(str(local_path), state)
234
+
235
+ # Upload to helium dataset
236
+ HfApi(token=HF_TOKEN).upload_file(
237
+ path_or_fileobj=str(local_path),
238
+ path_in_repo=HF_STATE_FILE,
239
+ repo_id=HF_OUTPUT_DATASET_ID,
240
+ repo_type="dataset",
241
+ commit_message=f"Update caption processing state: next_index={state['next_download_index']}"
242
+ )
243
+ print(f"[{FLOW_ID}] Successfully uploaded state file.")
244
+ return True
245
+ except Exception as e:
246
+ print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
247
+ return False
248
+
249
+ async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
250
+ """Marks a file as 'processing' in the state file and uploads the lock."""
251
+ print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {zip_filename}")
252
+
253
+ # Update state locally
254
+ state["file_states"][zip_filename] = "processing"
255
+
256
+ # Upload the updated state file immediately to establish the lock
257
+ if await upload_hf_state(state):
258
+ print(f"[{FLOW_ID}] ✅ Successfully locked file: {zip_filename}")
259
+ return True
260
+ else:
261
+ print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}")
262
+ # Revert local state
263
+ if zip_filename in state["file_states"]:
264
+ del state["file_states"][zip_filename]
265
+ return False
266
+
267
+ async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool:
268
+ """Marks a file as 'processed', updates the index, and uploads the state."""
269
+ print(f"[{FLOW_ID}] 🔓 Marking file as processed: {zip_filename}")
270
+
271
+ # Update state locally
272
+ state["file_states"][zip_filename] = "processed"
273
+ state["next_download_index"] = next_index
274
+
275
+ # Upload the updated state
276
+ if await upload_hf_state(state):
277
+ print(f"[{FLOW_ID}] ✅ Successfully marked as processed: {zip_filename}")
278
+ return True
279
+ else:
280
+ print(f"[{FLOW_ID}] ❌ Failed to update state for: {zip_filename}")
281
+ return False
282
+
283
+ # --- Hugging Face Utility Functions ---
284
+
285
+ async def get_zip_file_list(progress_data: Dict) -> List[str]:
286
+ """
287
+ Fetches the list of all zip files from the dataset, or uses the cached list.
288
+ Updates the progress_data with the file list if a new list is fetched.
289
+ """
290
+ if progress_data['file_list']:
291
+ print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
292
+ return progress_data['file_list']
293
+
294
+ print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...")
295
+ try:
296
+ api = HfApi(token=HF_TOKEN)
297
+ repo_files = api.list_repo_files(
298
+ repo_id=HF_DATASET_ID,
299
+ repo_type="dataset"
300
+ )
301
+
302
+ # Filter for zip files in the specified directory and sort them alphabetically for consistent indexing
303
+ zip_files = sorted([
304
+ f for f in repo_files
305
+ if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip')
306
+ ])
307
+
308
+ if not zip_files:
309
+ raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.")
310
+
311
+ print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.")
312
+
313
+ # Update and save the progress data
314
+ progress_data['file_list'] = zip_files
315
+ save_progress(progress_data)
316
+
317
+ return zip_files
318
+
319
+ except Exception as e:
320
+ print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
321
+ return []
322
+
323
+ async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]:
324
+ """Downloads the zip file for the given index and extracts its contents."""
325
+
326
+ # Extract the base name for the extraction directory
327
+ zip_full_name = Path(repo_file_full_path).name
328
+ course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
329
+
330
+ print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}")
331
+
332
+ try:
333
+ # Use hf_hub_download to get the file path
334
+ zip_path = hf_hub_download(
335
+ repo_id=HF_DATASET_ID,
336
+ filename=repo_file_full_path, # Use the full path in the repo
337
+ repo_type="dataset",
338
+ token=HF_TOKEN,
339
+ )
340
+
341
+ print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
342
+
343
+ # Create a temporary directory for extraction
344
+ extract_dir = TEMP_DIR / course_name
345
+ # Ensure a clean directory for extraction
346
+ if extract_dir.exists():
347
+ shutil.rmtree(extract_dir)
348
+ extract_dir.mkdir(exist_ok=True)
349
+
350
+ with zipfile.ZipFile(zip_path, 'r') as zip_ref:
351
+ zip_ref.extractall(extract_dir)
352
+
353
+ print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
354
+
355
+ # Clean up the downloaded zip file to save space
356
+ os.remove(zip_path)
357
+
358
+ return extract_dir
359
+
360
+ except Exception as e:
361
+ print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
362
+ return None
363
+
364
+ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
365
+ """Uploads the final captions JSON file to the output dataset."""
366
+ # Use the full zip name, replacing the extension with .json
367
+ caption_filename = Path(zip_full_name).with_suffix('.json').name
368
+
369
+ try:
370
+ print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
371
+
372
+ # Create JSON content in memory
373
+ json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
374
+
375
+ api = HfApi(token=HF_TOKEN)
376
+ api.upload_file(
377
+ path_or_fileobj=io.BytesIO(json_content),
378
+ path_in_repo=caption_filename,
379
+ repo_id=HF_OUTPUT_DATASET_ID,
380
+ repo_type="dataset",
381
+ commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
382
+ )
383
+
384
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
385
+ return True
386
+
387
+ except Exception as e:
388
+ print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
389
+ return False
390
+
391
+ # --- Core Processing Functions (Modified) ---
392
+
393
+ async def get_available_server(timeout: float = 300.0) -> CaptionServer:
394
+ """Round-robin selection of an available caption server."""
395
+ global server_index
396
+ start_time = time.time()
397
+ while True:
398
+ # Round-robin check for an available server
399
+ for _ in range(len(servers)):
400
+ server = servers[server_index]
401
+ server_index = (server_index + 1) % len(servers)
402
+ if not server.busy:
403
+ return server
404
+
405
+ # If all servers are busy, wait for a short period and check again
406
+ await asyncio.sleep(0.5)
407
+
408
+ # Check if timeout has been reached
409
+ if time.time() - start_time > timeout:
410
+ raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.")
411
+
412
+ async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
413
+ """Sends a single image to a caption server for processing."""
414
+ # This function now handles server selection and retries internally
415
+ MAX_RETRIES = 3
416
+ for attempt in range(MAX_RETRIES):
417
+ server = None
418
+ try:
419
+ # 1. Get an available server (will wait if all are busy, with a timeout)
420
+ server = await get_available_server()
421
+ server.busy = True
422
+ start_time = time.time()
423
+
424
+ # Print a less verbose message only on the first attempt
425
+ if attempt == 0:
426
+ print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...")
427
+
428
+ # 2. Prepare request data
429
+ form_data = aiohttp.FormData()
430
+ form_data.add_field('file',
431
+ image_path.open('rb'),
432
+ filename=image_path.name,
433
+ content_type='image/jpeg')
434
+ form_data.add_field('model_choice', MODEL_TYPE)
435
+
436
+ # 3. Send request
437
+ async with aiohttp.ClientSession() as session:
438
+ # Increased timeout to 10 minutes (600s) as requested by user's problem description
439
+ async with session.post(server.url, data=form_data, timeout=600) as resp:
440
+ if resp.status == 200:
441
+ result = await resp.json()
442
+
443
+ # Handle cursor detection response format
444
+ if result.get('cursor_active') is not None: # Check if it's a valid cursor detection response
445
+ # Update progress counter
446
+ progress_tracker['completed'] += 1
447
+ if progress_tracker['completed'] % 50 == 0:
448
+ print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} detections completed.")
449
+
450
+ # Log success only if it's not a progress report interval
451
+ if progress_tracker['completed'] % 50 != 0:
452
+ print(f"[{FLOW_ID}] Success: {image_path.name} processed by {server.url}")
453
+
454
+ # Store the full cursor detection result
455
+ return {
456
+ "course": course_name,
457
+ "image_path": image_path.name,
458
+ "cursor_active": result.get('cursor_active', False),
459
+ "x": result.get('x'),
460
+ "y": result.get('y'),
461
+ "confidence": result.get('confidence'),
462
+ "template": result.get('template'),
463
+ "image_shape": result.get('image_shape'),
464
+ "server_url": server.url,
465
+ "timestamp": datetime.now().isoformat()
466
+ }
467
+ else:
468
+ print(f"[{FLOW_ID}] Server {server.url} returned invalid response format for {image_path.name}. Response: {result}")
469
+ continue # Retry with a different server
470
+ else:
471
+ error_text = await resp.text()
472
+ print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...")
473
+ continue # Retry with a different server
474
+
475
+ except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
476
+ print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
477
+ continue # Retry with a different server
478
+ except Exception as e:
479
+ print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...")
480
+ continue # Retry with a different server
481
+ finally:
482
+ if server:
483
+ end_time = time.time()
484
+ server.busy = False
485
+ server.total_processed += 1
486
+ server.total_time += (end_time - start_time)
487
+
488
+ print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
489
+ return None
490
+
491
+ async def process_dataset_task(start_index: int):
492
+ """Main task to process the dataset sequentially starting from a given index."""
493
+
494
+ # Load both local progress and HF state
495
+ progress = load_progress()
496
+ current_state = await download_hf_state()
497
+ file_list = await get_zip_file_list(progress)
498
+
499
+ if not file_list:
500
+ print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.")
501
+ return False
502
+
503
+ # Ensure start_index is within bounds
504
+ if start_index > len(file_list):
505
+ print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.")
506
+ return True
507
+
508
+ # Determine the actual starting index in the 0-indexed list
509
+ start_list_index = start_index - 1
510
+
511
+ print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.")
512
+
513
+ global_success = True
514
+
515
+ for i in range(start_list_index, len(file_list)):
516
+ file_index = i + 1 # 1-indexed for user display and progress tracking
517
+ repo_file_full_path = file_list[i]
518
+ zip_full_name = Path(repo_file_full_path).name
519
+ course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
520
+
521
+ # Check file state in both local and HF state
522
+ file_state = current_state["file_states"].get(zip_full_name)
523
+ if file_state == "processed":
524
+ print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.")
525
+ continue
526
+ elif file_state == "processing":
527
+ print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.")
528
+ continue
529
+
530
+ # Try to lock the file
531
+ if not await lock_file_for_processing(zip_full_name, current_state):
532
+ print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.")
533
+ continue
534
+
535
+ extract_dir = None
536
+ current_file_success = False
537
+
538
+ try:
539
+ # 1. Download and Extract
540
+ extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path)
541
+
542
+ if not extract_dir:
543
+ raise Exception("Failed to download or extract zip file.")
544
+
545
+ # 2. Find Images
546
+ # Use recursive glob to find images in subdirectories
547
+ image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
548
+ print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
549
+
550
+ if not image_paths:
551
+ print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
552
+ current_file_success = True
553
+ else:
554
+ # 3. Process Images (Captioning)
555
+ progress_tracker = {
556
+ 'total': len(image_paths),
557
+ 'completed': 0
558
+ }
559
+ print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
560
+
561
+ # Create a semaphore to limit concurrent tasks to the number of available servers
562
+ semaphore = asyncio.Semaphore(len(servers))
563
+
564
+ async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
565
+ async with semaphore:
566
+ return await send_image_for_captioning(image_path, course_name, progress_tracker)
567
+
568
+ # Create a list of tasks for parallel captioning
569
+ caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths]
570
+
571
+ # Run all captioning tasks concurrently
572
+ results = await asyncio.gather(*caption_tasks)
573
+
574
+ # Filter out failed results
575
+ all_captions = [r for r in results if r is not None]
576
+
577
+ # Final progress report for the current file
578
+ if len(all_captions) == len(image_paths):
579
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully processed all {len(all_captions)} images.")
580
+ else:
581
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} images.")
582
+
583
+ # Calculate success statistics
584
+ cursor_detected = sum(1 for c in all_captions if c.get('cursor_active', False))
585
+ print(f"[{FLOW_ID}] Detection Statistics:")
586
+ print(f"- Total processed: {len(all_captions)}")
587
+ print(f"- Cursors detected: {cursor_detected}")
588
+ print(f"- Detection rate: {(cursor_detected/len(all_captions)*100):.2f}%")
589
+
590
+ # Consider the file successful if we have any captions at all
591
+ current_file_success = len(all_captions) > 0
592
+
593
+ # 4. Upload Results
594
+ if all_captions:
595
+ print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
596
+ if await upload_captions_to_hf(zip_full_name, all_captions):
597
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
598
+ # Mark as success if we have any captions and successfully uploaded them
599
+ current_file_success = True
600
+ else:
601
+ print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
602
+ current_file_success = False
603
+ else:
604
+ print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.")
605
+ current_file_success = False
606
+
607
+ except Exception as e:
608
+ print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}")
609
+ current_file_success = False
610
+ global_success = False # Mark overall task as failed if any file fails critically
611
+
612
+ finally:
613
+ # 5. Cleanup and Update Progress
614
+ if extract_dir and extract_dir.exists():
615
+ print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
616
+ shutil.rmtree(extract_dir, ignore_errors=True)
617
+
618
+ if current_file_success:
619
+ # Update both local progress and HF state
620
+ progress['last_processed_index'] = file_index
621
+ progress['processed_files'][str(file_index)] = repo_file_full_path
622
+ save_progress(progress)
623
+
624
+ # Update HF state and unlock the file
625
+ if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1):
626
+ print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}")
627
+ else:
628
+ print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}")
629
+ else:
630
+ # Mark as failed in the state and continue with next file
631
+ current_state["file_states"][zip_full_name] = "failed"
632
+ await upload_hf_state(current_state)
633
+ print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.")
634
+ global_success = False
635
+
636
+ print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}")
637
+ return global_success
638
+
639
+ # --- FastAPI App and Endpoints ---
640
+
641
+ app = FastAPI(
642
+ title=f"Flow Server {FLOW_ID} API",
643
+ description="Sequentially processes zip files from a dataset, captions images, and tracks progress.",
644
+ version="1.0.0"
645
+ )
646
+
647
+ @app.on_event("startup")
648
+ async def startup_event():
649
+ print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.")
650
+
651
+ # Get both local progress and HF state
652
+ progress = load_progress()
653
+ current_state = await download_hf_state()
654
+
655
+ # Get the next_download_index from HF state if available
656
+ hf_next_index = current_state.get("next_download_index", 0)
657
+
658
+ # If HF state has a higher index, use that instead of local progress
659
+ if hf_next_index > 0:
660
+ start_index = hf_next_index
661
+ print(f"[{FLOW_ID}] Using next_download_index from HF state: {start_index}")
662
+ else:
663
+ # Fall back to local progress if HF state doesn't have a meaningful index
664
+ start_index = progress.get('last_processed_index', 0) + 1
665
+ if start_index < AUTO_START_INDEX:
666
+ start_index = AUTO_START_INDEX
667
+
668
+ # Use a dummy BackgroundTasks object for the startup task
669
+ # Note: FastAPI's startup events can't directly use BackgroundTasks, but we can use asyncio.create_task
670
+ # to run the long-running process in the background without blocking the server startup.
671
+ print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...")
672
+ asyncio.create_task(process_dataset_task(start_index))
673
+
674
+ @app.get("/")
675
+ async def root():
676
+ progress = load_progress()
677
+ return {
678
+ "flow_id": FLOW_ID,
679
+ "status": "ready",
680
+ "last_processed_index": progress['last_processed_index'],
681
+ "total_files_in_list": len(progress['file_list']),
682
+ "processed_files_count": len(progress['processed_files']),
683
+ "total_servers": len(servers),
684
+ "busy_servers": sum(1 for s in servers if s.busy),
685
+ }
686
+
687
+ @app.post("/start_processing")
688
+ async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks):
689
+ """
690
+ Starts the sequential processing of zip files from the given index in the background.
691
+ """
692
+ start_index = request.start_index
693
+
694
+ print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.")
695
+
696
+ # Start the heavy processing in a background task so the API call returns immediately
697
+ # Note: The server is already auto-starting, but this allows for manual restart/override.
698
+ background_tasks.add_task(process_dataset_task, start_index)
699
+
700
+ return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."}
701
+
702
+ if __name__ == "__main__":
703
+ import uvicorn
704
+ # Note: When running in the sandbox, we need to use 0.0.0.0 to expose the port.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
705
  uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)