Fred808 commited on
Commit
70f1393
·
verified ·
1 Parent(s): d01e81b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +662 -639
app.py CHANGED
@@ -1,639 +1,662 @@
1
- import os
2
- import json
3
- import time
4
- import asyncio
5
- import aiohttp
6
- import zipfile
7
- import shutil
8
- from typing import Dict, List, Set, Optional, Tuple, Any
9
- from urllib.parse import quote
10
- from datetime import datetime
11
- from pathlib import Path
12
- import io
13
-
14
- from fastapi import FastAPI, BackgroundTasks, HTTPException, status
15
- from pydantic import BaseModel, Field
16
- from huggingface_hub import HfApi, hf_hub_download
17
-
18
- # --- Configuration ---
19
- AUTO_START_INDEX = 0 # Hardcoded default start index if no progress is found
20
- FLOW_ID = os.getenv("FLOW_ID", "flow_default")
21
- FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
22
- HF_TOKEN = os.getenv("HF_TOKEN", "")
23
- HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3") # Source dataset for zip files
24
- HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") # Target dataset for captions
25
-
26
- # Progress and State Tracking
27
- PROGRESS_FILE = Path("processing_progress.json")
28
- HF_STATE_FILE = "processing_state_captions.json" # State file in helium dataset
29
- LOCAL_STATE_FOLDER = Path(".state") # Local folder for state file
30
- LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
-
32
- # Directory within the HF dataset where the zip files are located
33
- ZIP_FILE_PREFIX = "frames_zips/"
34
-
35
- # Using the full list from the user's original code for actual deployment
36
- CAPTION_SERVERS = [
37
- "https://fred808-pil-4-1.hf.space/analyze",
38
- "https://fred808-pil-4-2.hf.space/analyze",
39
- "https://fred808-pil-4-3.hf.space/analyze",
40
- "https://fred1012-fred1012-gw0j2h.hf.space/analyze",
41
- "https://fred1012-fred1012-wqs6c2.hf.space/analyze",
42
- "https://fred1012-fred1012-oncray.hf.space/analyze",
43
- "https://fred1012-fred1012-4goge7.hf.space/analyze",
44
- "https://fred1012-fred1012-z0eh7m.hf.space/analyze",
45
- "https://fred1012-fred1012-u95rte.hf.space/analyze",
46
- "https://fred1012-fred1012-igje22.hf.space/analyze",
47
- "https://fred1012-fred1012-ibkuf8.hf.space/analyze",
48
- "https://fred1012-fred1012-nwqthy.hf.space/analyze",
49
- "https://fred1012-fred1012-4ldqj4.hf.space/analyze",
50
- "https://fred1012-fred1012-pivlzg.hf.space/analyze",
51
- "https://fred1012-fred1012-ptlc5u.hf.space/analyze",
52
- "https://fred1012-fred1012-u7lh57.hf.space/analyze",
53
- "https://fred1012-fred1012-q8djv1.hf.space/analyze",
54
- "https://fredalone-fredalone-ozugrp.hf.space/analyze",
55
- "https://fredalone-fredalone-9brxj2.hf.space/analyze",
56
- "https://fredalone-fredalone-p8vq9a.hf.space/analyze",
57
- "https://fredalone-fredalone-vbli2y.hf.space/analyze",
58
- "https://fredalone-fredalone-uggger.hf.space/analyze",
59
- "https://fredalone-fredalone-nmi7e8.hf.space/analyze",
60
- "https://fredalone-fredalone-d1f26d.hf.space/analyze",
61
- "https://fredalone-fredalone-461jp2.hf.space/analyze",
62
- "https://fredalone-fredalone-3enfg4.hf.space/analyze",
63
- "https://fredalone-fredalone-dqdbpv.hf.space/analyze",
64
- "https://fredalone-fredalone-ivtjua.hf.space/analyze",
65
- "https://fredalone-fredalone-6bezt2.hf.space/analyze",
66
- "https://fredalone-fredalone-e0wfnk.hf.space/analyze",
67
- "https://fredalone-fredalone-zu2t7j.hf.space/analyze",
68
- "https://fredalone-fredalone-dqtv1o.hf.space/analyze",
69
- "https://fredalone-fredalone-wclyog.hf.space/analyze",
70
- "https://fredalone-fredalone-t27vig.hf.space/analyze",
71
- "https://fredalone-fredalone-gahbxh.hf.space/analyze",
72
- "https://fredalone-fredalone-kw2po4.hf.space/analyze",
73
- "https://fredalone-fredalone-8h285h.hf.space/analyze"
74
- ]
75
- MODEL_TYPE = "Florence-2-large"
76
-
77
- # Temporary storage for images
78
- TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
79
- TEMP_DIR.mkdir(exist_ok=True)
80
-
81
- # --- Models ---
82
- class ProcessStartRequest(BaseModel):
83
- start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).")
84
-
85
- class CaptionServer:
86
- def __init__(self, url):
87
- self.url = url
88
- self.busy = False
89
- self.total_processed = 0
90
- self.total_time = 0
91
- self.model = MODEL_TYPE
92
-
93
- @property
94
- def fps(self):
95
- return self.total_processed / self.total_time if self.total_time > 0 else 0
96
-
97
- # Global state for caption servers
98
- servers = [CaptionServer(url) for url in CAPTION_SERVERS]
99
- server_index = 0
100
-
101
- # --- Progress and State Management Functions ---
102
-
103
- def load_progress() -> Dict:
104
- """Loads the local processing progress from the JSON file."""
105
- if PROGRESS_FILE.exists():
106
- try:
107
- with PROGRESS_FILE.open('r') as f:
108
- return json.load(f)
109
- except json.JSONDecodeError:
110
- print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
111
- # Fall through to return default structure
112
-
113
- # Default structure
114
- return {
115
- "last_processed_index": 0,
116
- "processed_files": {}, # {index: repo_path}
117
- "file_list": [] # Full list of all zip files found in the dataset
118
- }
119
-
120
- def save_progress(progress_data: Dict):
121
- """Saves the local processing progress to the JSON file."""
122
- try:
123
- with PROGRESS_FILE.open('w') as f:
124
- json.dump(progress_data, f, indent=4)
125
- except Exception as e:
126
- print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
127
-
128
- def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
129
- """Load state from JSON file with migration logic for new structure."""
130
- if os.path.exists(file_path):
131
- try:
132
- with open(file_path, "r") as f:
133
- data = json.load(f)
134
-
135
- # Migration Logic
136
- if "file_states" not in data or not isinstance(data["file_states"], dict):
137
- print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.")
138
- data["file_states"] = {}
139
-
140
- if "next_download_index" not in data:
141
- data["next_download_index"] = 0
142
-
143
- return data
144
- except json.JSONDecodeError:
145
- print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
146
- return default_value
147
-
148
- def save_json_state(file_path: str, data: Dict[str, Any]):
149
- """Save state to JSON file"""
150
- with open(file_path, "w") as f:
151
- json.dump(data, f, indent=2)
152
-
153
- async def download_hf_state() -> Dict[str, Any]:
154
- """Downloads the state file from Hugging Face or returns a default state."""
155
- local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
156
- default_state = {"next_download_index": 0, "file_states": {}}
157
-
158
- try:
159
- # Check if the file exists in the helium repo
160
- files = HfApi(token=HF_TOKEN).list_repo_files(
161
- repo_id=HF_OUTPUT_DATASET_ID,
162
- repo_type="dataset"
163
- )
164
-
165
- if HF_STATE_FILE not in files:
166
- print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.")
167
- return default_state
168
-
169
- # Download the file
170
- hf_hub_download(
171
- repo_id=HF_OUTPUT_DATASET_ID,
172
- filename=HF_STATE_FILE,
173
- repo_type="dataset",
174
- local_dir=LOCAL_STATE_FOLDER,
175
- local_dir_use_symlinks=False,
176
- token=HF_TOKEN
177
- )
178
-
179
- print(f"[{FLOW_ID}] Successfully downloaded state file.")
180
- return load_json_state(str(local_path), default_state)
181
-
182
- except Exception as e:
183
- print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
184
- return default_state
185
-
186
- async def upload_hf_state(state: Dict[str, Any]) -> bool:
187
- """Uploads the state file to Hugging Face."""
188
- local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
189
-
190
- try:
191
- # Save state locally first
192
- save_json_state(str(local_path), state)
193
-
194
- # Upload to helium dataset
195
- HfApi(token=HF_TOKEN).upload_file(
196
- path_or_fileobj=str(local_path),
197
- path_in_repo=HF_STATE_FILE,
198
- repo_id=HF_OUTPUT_DATASET_ID,
199
- repo_type="dataset",
200
- commit_message=f"Update caption processing state: next_index={state['next_download_index']}"
201
- )
202
- print(f"[{FLOW_ID}] Successfully uploaded state file.")
203
- return True
204
- except Exception as e:
205
- print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
206
- return False
207
-
208
- async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
209
- """Marks a file as 'processing' in the state file and uploads the lock."""
210
- print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {zip_filename}")
211
-
212
- # Update state locally
213
- state["file_states"][zip_filename] = "processing"
214
-
215
- # Upload the updated state file immediately to establish the lock
216
- if await upload_hf_state(state):
217
- print(f"[{FLOW_ID}] Successfully locked file: {zip_filename}")
218
- return True
219
- else:
220
- print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}")
221
- # Revert local state
222
- if zip_filename in state["file_states"]:
223
- del state["file_states"][zip_filename]
224
- return False
225
-
226
- async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool:
227
- """Marks a file as 'processed', updates the index, and uploads the state."""
228
- print(f"[{FLOW_ID}] 🔓 Marking file as processed: {zip_filename}")
229
-
230
- # Update state locally
231
- state["file_states"][zip_filename] = "processed"
232
- state["next_download_index"] = next_index
233
-
234
- # Upload the updated state
235
- if await upload_hf_state(state):
236
- print(f"[{FLOW_ID}] Successfully marked as processed: {zip_filename}")
237
- return True
238
- else:
239
- print(f"[{FLOW_ID}] Failed to update state for: {zip_filename}")
240
- return False
241
-
242
- # --- Hugging Face Utility Functions ---
243
-
244
- async def get_zip_file_list(progress_data: Dict) -> List[str]:
245
- """
246
- Fetches the list of all zip files from the dataset, or uses the cached list.
247
- Updates the progress_data with the file list if a new list is fetched.
248
- """
249
- if progress_data['file_list']:
250
- print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
251
- return progress_data['file_list']
252
-
253
- print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...")
254
- try:
255
- api = HfApi(token=HF_TOKEN)
256
- repo_files = api.list_repo_files(
257
- repo_id=HF_DATASET_ID,
258
- repo_type="dataset"
259
- )
260
-
261
- # Filter for zip files in the specified directory and sort them alphabetically for consistent indexing
262
- zip_files = sorted([
263
- f for f in repo_files
264
- if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip')
265
- ])
266
-
267
- if not zip_files:
268
- raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.")
269
-
270
- print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.")
271
-
272
- # Update and save the progress data
273
- progress_data['file_list'] = zip_files
274
- save_progress(progress_data)
275
-
276
- return zip_files
277
-
278
- except Exception as e:
279
- print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
280
- return []
281
-
282
- async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]:
283
- """Downloads the zip file for the given index and extracts its contents."""
284
-
285
- # Extract the base name for the extraction directory
286
- zip_full_name = Path(repo_file_full_path).name
287
- course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
288
-
289
- print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}")
290
-
291
- try:
292
- # Use hf_hub_download to get the file path
293
- zip_path = hf_hub_download(
294
- repo_id=HF_DATASET_ID,
295
- filename=repo_file_full_path, # Use the full path in the repo
296
- repo_type="dataset",
297
- token=HF_TOKEN,
298
- )
299
-
300
- print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
301
-
302
- # Create a temporary directory for extraction
303
- extract_dir = TEMP_DIR / course_name
304
- # Ensure a clean directory for extraction
305
- if extract_dir.exists():
306
- shutil.rmtree(extract_dir)
307
- extract_dir.mkdir(exist_ok=True)
308
-
309
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
310
- zip_ref.extractall(extract_dir)
311
-
312
- print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
313
-
314
- # Clean up the downloaded zip file to save space
315
- os.remove(zip_path)
316
-
317
- return extract_dir
318
-
319
- except Exception as e:
320
- print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
321
- return None
322
-
323
- async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
324
- """Uploads the final captions JSON file to the output dataset."""
325
- # Use the full zip name, replacing the extension with .json
326
- caption_filename = Path(zip_full_name).with_suffix('.json').name
327
-
328
- try:
329
- print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
330
-
331
- # Create JSON content in memory
332
- json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
333
-
334
- api = HfApi(token=HF_TOKEN)
335
- api.upload_file(
336
- path_or_fileobj=io.BytesIO(json_content),
337
- path_in_repo=caption_filename,
338
- repo_id=HF_OUTPUT_DATASET_ID,
339
- repo_type="dataset",
340
- commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
341
- )
342
-
343
- print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
344
- return True
345
-
346
- except Exception as e:
347
- print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
348
- return False
349
-
350
- # --- Core Processing Functions (Modified) ---
351
-
352
- async def get_available_server(timeout: float = 300.0) -> CaptionServer:
353
- """Round-robin selection of an available caption server."""
354
- global server_index
355
- start_time = time.time()
356
- while True:
357
- # Round-robin check for an available server
358
- for _ in range(len(servers)):
359
- server = servers[server_index]
360
- server_index = (server_index + 1) % len(servers)
361
- if not server.busy:
362
- return server
363
-
364
- # If all servers are busy, wait for a short period and check again
365
- await asyncio.sleep(0.5)
366
-
367
- # Check if timeout has been reached
368
- if time.time() - start_time > timeout:
369
- raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.")
370
-
371
- async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
372
- """Sends a single image to a caption server for processing."""
373
- # This function now handles server selection and retries internally
374
- MAX_RETRIES = 3
375
- for attempt in range(MAX_RETRIES):
376
- server = None
377
- try:
378
- # 1. Get an available server (will wait if all are busy, with a timeout)
379
- server = await get_available_server()
380
- server.busy = True
381
- start_time = time.time()
382
-
383
- # Print a less verbose message only on the first attempt
384
- if attempt == 0:
385
- print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...")
386
-
387
- # 2. Prepare request data
388
- form_data = aiohttp.FormData()
389
- form_data.add_field('file',
390
- image_path.open('rb'),
391
- filename=image_path.name,
392
- content_type='image/jpeg')
393
- form_data.add_field('model_choice', MODEL_TYPE)
394
-
395
- # 3. Send request
396
- async with aiohttp.ClientSession() as session:
397
- # Increased timeout to 10 minutes (600s) as requested by user's problem description
398
- async with session.post(server.url, data=form_data, timeout=600) as resp:
399
- if resp.status == 200:
400
- result = await resp.json()
401
- caption = result.get("caption")
402
-
403
- if caption:
404
- # Update progress counter
405
- progress_tracker['completed'] += 1
406
- if progress_tracker['completed'] % 50 == 0:
407
- print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.")
408
-
409
- # Log success only if it's not a progress report interval
410
- if progress_tracker['completed'] % 50 != 0:
411
- print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}")
412
-
413
- return {
414
- "course": course_name,
415
- "image_path": image_path.name,
416
- "caption": caption,
417
- "timestamp": datetime.now().isoformat()
418
- }
419
- else:
420
- print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...")
421
- continue # Retry with a different server
422
- else:
423
- error_text = await resp.text()
424
- print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...")
425
- continue # Retry with a different server
426
-
427
- except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
428
- print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
429
- continue # Retry with a different server
430
- except Exception as e:
431
- print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...")
432
- continue # Retry with a different server
433
- finally:
434
- if server:
435
- end_time = time.time()
436
- server.busy = False
437
- server.total_processed += 1
438
- server.total_time += (end_time - start_time)
439
-
440
- print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
441
- return None
442
-
443
- async def process_dataset_task(start_index: int):
444
- """Main task to process the dataset sequentially starting from a given index."""
445
-
446
- # Load both local progress and HF state
447
- progress = load_progress()
448
- current_state = await download_hf_state()
449
- file_list = await get_zip_file_list(progress)
450
-
451
- if not file_list:
452
- print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.")
453
- return False
454
-
455
- # Ensure start_index is within bounds
456
- if start_index > len(file_list):
457
- print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.")
458
- return True
459
-
460
- # Determine the actual starting index in the 0-indexed list
461
- start_list_index = start_index - 1
462
-
463
- print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.")
464
-
465
- global_success = True
466
-
467
- for i in range(start_list_index, len(file_list)):
468
- file_index = i + 1 # 1-indexed for user display and progress tracking
469
- repo_file_full_path = file_list[i]
470
- zip_full_name = Path(repo_file_full_path).name
471
- course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
472
-
473
- # Check file state in both local and HF state
474
- file_state = current_state["file_states"].get(zip_full_name)
475
- if file_state == "processed":
476
- print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.")
477
- continue
478
- elif file_state == "processing":
479
- print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.")
480
- continue
481
-
482
- # Try to lock the file
483
- if not await lock_file_for_processing(zip_full_name, current_state):
484
- print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.")
485
- continue
486
-
487
- extract_dir = None
488
- current_file_success = False
489
-
490
- try:
491
- # 1. Download and Extract
492
- extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path)
493
-
494
- if not extract_dir:
495
- raise Exception("Failed to download or extract zip file.")
496
-
497
- # 2. Find Images
498
- # Use recursive glob to find images in subdirectories
499
- image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
500
- print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
501
-
502
- if not image_paths:
503
- print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
504
- current_file_success = True
505
- else:
506
- # 3. Process Images (Captioning)
507
- progress_tracker = {
508
- 'total': len(image_paths),
509
- 'completed': 0
510
- }
511
- print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
512
-
513
- # Create a semaphore to limit concurrent tasks to the number of available servers
514
- semaphore = asyncio.Semaphore(len(servers))
515
-
516
- async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
517
- async with semaphore:
518
- return await send_image_for_captioning(image_path, course_name, progress_tracker)
519
-
520
- # Create a list of tasks for parallel captioning
521
- caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths]
522
-
523
- # Run all captioning tasks concurrently
524
- results = await asyncio.gather(*caption_tasks)
525
-
526
- # Filter out failed results
527
- all_captions = [r for r in results if r is not None]
528
-
529
- # Final progress report for the current file
530
- if len(all_captions) == len(image_paths):
531
- print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
532
- current_file_success = True
533
- else:
534
- print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions. Marking as partial failure.")
535
- current_file_success = False
536
-
537
- # 4. Upload Results
538
- if all_captions:
539
- print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
540
- if await upload_captions_to_hf(zip_full_name, all_captions):
541
- print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
542
- # Partial success in captioning is still a success for the upload step
543
- pass
544
- else:
545
- print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
546
- current_file_success = False
547
- else:
548
- print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.")
549
- current_file_success = False
550
-
551
- except Exception as e:
552
- print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}")
553
- current_file_success = False
554
- global_success = False # Mark overall task as failed if any file fails critically
555
-
556
- finally:
557
- # 5. Cleanup and Update Progress
558
- if extract_dir and extract_dir.exists():
559
- print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
560
- shutil.rmtree(extract_dir, ignore_errors=True)
561
-
562
- if current_file_success:
563
- # Update both local progress and HF state
564
- progress['last_processed_index'] = file_index
565
- progress['processed_files'][str(file_index)] = repo_file_full_path
566
- save_progress(progress)
567
-
568
- # Update HF state and unlock the file
569
- if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1):
570
- print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}")
571
- else:
572
- print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}")
573
- else:
574
- # Mark as failed in the state and continue with next file
575
- current_state["file_states"][zip_full_name] = "failed"
576
- await upload_hf_state(current_state)
577
- print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.")
578
- global_success = False
579
-
580
- print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}")
581
- return global_success
582
-
583
- # --- FastAPI App and Endpoints ---
584
-
585
- app = FastAPI(
586
- title=f"Flow Server {FLOW_ID} API",
587
- description="Sequentially processes zip files from a dataset, captions images, and tracks progress.",
588
- version="1.0.0"
589
- )
590
-
591
- @app.on_event("startup")
592
- async def startup_event():
593
- print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.")
594
-
595
- # Automatically start the processing task
596
- progress = load_progress()
597
- # Start from the last processed index + 1, or the hardcoded AUTO_START_INDEX if the progress file is new/empty
598
- start_index = progress.get('last_processed_index', 0) + 1
599
- if start_index < AUTO_START_INDEX:
600
- start_index = AUTO_START_INDEX
601
-
602
- # Use a dummy BackgroundTasks object for the startup task
603
- # Note: FastAPI's startup events can't directly use BackgroundTasks, but we can use asyncio.create_task
604
- # to run the long-running process in the background without blocking the server startup.
605
- print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...")
606
- asyncio.create_task(process_dataset_task(start_index))
607
-
608
- @app.get("/")
609
- async def root():
610
- progress = load_progress()
611
- return {
612
- "flow_id": FLOW_ID,
613
- "status": "ready",
614
- "last_processed_index": progress['last_processed_index'],
615
- "total_files_in_list": len(progress['file_list']),
616
- "processed_files_count": len(progress['processed_files']),
617
- "total_servers": len(servers),
618
- "busy_servers": sum(1 for s in servers if s.busy),
619
- }
620
-
621
- @app.post("/start_processing")
622
- async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks):
623
- """
624
- Starts the sequential processing of zip files from the given index in the background.
625
- """
626
- start_index = request.start_index
627
-
628
- print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.")
629
-
630
- # Start the heavy processing in a background task so the API call returns immediately
631
- # Note: The server is already auto-starting, but this allows for manual restart/override.
632
- background_tasks.add_task(process_dataset_task, start_index)
633
-
634
- return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."}
635
-
636
- if __name__ == "__main__":
637
- import uvicorn
638
- # Note: When running in the sandbox, we need to use 0.0.0.0 to expose the port.
639
- uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import asyncio
5
+ import aiohttp
6
+ import zipfile
7
+ import shutil
8
+ from typing import Dict, List, Set, Optional, Tuple, Any
9
+ from urllib.parse import quote
10
+ from datetime import datetime
11
+ from pathlib import Path
12
+ import io
13
+
14
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status
15
+ from pydantic import BaseModel, Field
16
+ from huggingface_hub import HfApi, hf_hub_download
17
+
18
+ # --- Configuration ---
19
+ AUTO_START_INDEX = 0 # Hardcoded default start index if no progress is found
20
+ FLOW_ID = os.getenv("FLOW_ID", "flow_default")
21
+ FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
22
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
23
+ HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3") # Source dataset for zip files
24
+ HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") # Target dataset for captions
25
+
26
+ # Progress and State Tracking
27
+ PROGRESS_FILE = Path("processing_progress.json")
28
+ HF_STATE_FILE = "processing_state_captions.json" # State file in helium dataset
29
+ LOCAL_STATE_FOLDER = Path(".state") # Local folder for state file
30
+ LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
+
32
+ # Directory within the HF dataset where the zip files are located
33
+ ZIP_FILE_PREFIX = "frames_zips/"
34
+
35
+ # Using the full list from the user's original code for actual deployment
36
+ CAPTION_SERVERS = [
37
+ "https://Eliasishere-808-1.hf.space/analyze",
38
+ "https://Eliasishere-808-2.hf.space/analyze",
39
+ "https://Eliasishere-808-3.hf.space/analyze",
40
+ "https://Eliasishere-808-4.hf.space/analyze",
41
+ "https://Eliasishere-808-5.hf.space/analyze",
42
+ "https://Eliasishere-808-6.hf.space/analyze",
43
+ "https://Eliasishere-808-7.hf.space/analyze",
44
+ "https://Eliasishere-808-8.hf.space/analyze",
45
+ "https://Eliasishere-808-9.hf.space/analyze",
46
+ "https://Eliasishere-808-10.hf.space/analyze",
47
+ "https://Eliasishere-808-11.hf.space/analyze",
48
+ "https://Eliasishere-808-12.hf.space/analyze",
49
+ "https://Eliasishere-808-13.hf.space/analyze",
50
+ "https://Eliasishere-808-14.hf.space/analyze",
51
+ "https://Eliasishere-808-15.hf.space/analyze",
52
+ "https://Eliasishere-808-16.hf.space/analyze",
53
+ "https://Eliasishere-808-17.hf.space/analyze",
54
+ "https://Eliasishere-808-18.hf.space/analyze",
55
+ "https://Eliasishere-808-19.hf.space/analyze",
56
+ "https://Eliasishere-808-20.hf.space/analyze",
57
+ "https://Samfredoly-live-1.hf.space/analyze",
58
+ "https://Samfredoly-live-2.hf.space/analyze",
59
+ "https://Samfredoly-live-3.hf.space/analyze",
60
+ "https://Samfredoly-live-4.hf.space/analyze",
61
+ "https://Samfredoly-live-5.hf.space/analyze",
62
+ "https://Samfredoly-live-6.hf.space/analyze",
63
+ "https://Samfredoly-live-7.hf.space/analyze",
64
+ "https://Samfredoly-live-8.hf.space/analyze",
65
+ "https://Samfredoly-live-9.hf.space/analyze",
66
+ "https://Samfredoly-live-10.hf.space/analyze",
67
+ "https://Samfredoly-live-11.hf.space/analyze",
68
+ "https://Samfredoly-live-12.hf.space/analyze",
69
+ "https://Samfredoly-live-13.hf.space/analyze",
70
+ "https://Samfredoly-live-14.hf.space/analyze",
71
+ "https://Samfredoly-live-15.hf.space/analyze",
72
+ "https://Samfredoly-live-16.hf.space/analyze",
73
+ "https://Samfredoly-live-17.hf.space/analyze",
74
+ "https://Samfredoly-live-18.hf.space/analyze",
75
+ "https://Samfredoly-live-19.hf.space/analyze",
76
+ "https://Samfredoly-live-20.hf.space/analyze",
77
+ "https://wortif-wort-1.hf.space/analyze",
78
+ "https://wortif-wort-2.hf.space/analyze",
79
+ "https://wortif-wort-3.hf.space/analyze",
80
+ "https://wortif-wort-4.hf.space/analyze",
81
+ "https://wortif-wort-5.hf.space/analyze",
82
+ "https://wortif-wort-6.hf.space/analyze",
83
+ "https://wortif-wort-7.hf.space/analyze",
84
+ "https://wortif-wort-8.hf.space/analyze",
85
+ "https://wortif-wort-9.hf.space/analyze",
86
+ "https://wortif-wort-10.hf.space/analyze",
87
+ "https://wortif-wort-11.hf.space/analyze",
88
+ "https://wortif-wort-12.hf.space/analyze",
89
+ "https://wortif-wort-13.hf.space/analyze",
90
+ "https://wortif-wort-14.hf.space/analyze",
91
+ "https://wortif-wort-15.hf.space/analyze",
92
+ "https://wortif-wort-16.hf.space/analyze",
93
+ "https://wortif-wort-17.hf.space/analyze",
94
+ "https://wortif-wort-18.hf.space/analyze",
95
+ "https://wortif-wort-19.hf.space/analyze",
96
+ "https://wortif-wort-20.hf.space/analyze"
97
+ ]
98
+ MODEL_TYPE = "Florence-2-large"
99
+
100
+ # Temporary storage for images
101
+ TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
102
+ TEMP_DIR.mkdir(exist_ok=True)
103
+
104
+ # --- Models ---
105
+ class ProcessStartRequest(BaseModel):
106
+ start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).")
107
+
108
+ class CaptionServer:
109
+ def __init__(self, url):
110
+ self.url = url
111
+ self.busy = False
112
+ self.total_processed = 0
113
+ self.total_time = 0
114
+ self.model = MODEL_TYPE
115
+
116
+ @property
117
+ def fps(self):
118
+ return self.total_processed / self.total_time if self.total_time > 0 else 0
119
+
120
+ # Global state for caption servers
121
+ servers = [CaptionServer(url) for url in CAPTION_SERVERS]
122
+ server_index = 0
123
+
124
+ # --- Progress and State Management Functions ---
125
+
126
+ def load_progress() -> Dict:
127
+ """Loads the local processing progress from the JSON file."""
128
+ if PROGRESS_FILE.exists():
129
+ try:
130
+ with PROGRESS_FILE.open('r') as f:
131
+ return json.load(f)
132
+ except json.JSONDecodeError:
133
+ print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
134
+ # Fall through to return default structure
135
+
136
+ # Default structure
137
+ return {
138
+ "last_processed_index": 0,
139
+ "processed_files": {}, # {index: repo_path}
140
+ "file_list": [] # Full list of all zip files found in the dataset
141
+ }
142
+
143
+ def save_progress(progress_data: Dict):
144
+ """Saves the local processing progress to the JSON file."""
145
+ try:
146
+ with PROGRESS_FILE.open('w') as f:
147
+ json.dump(progress_data, f, indent=4)
148
+ except Exception as e:
149
+ print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
150
+
151
+ def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
152
+ """Load state from JSON file with migration logic for new structure."""
153
+ if os.path.exists(file_path):
154
+ try:
155
+ with open(file_path, "r") as f:
156
+ data = json.load(f)
157
+
158
+ # Migration Logic
159
+ if "file_states" not in data or not isinstance(data["file_states"], dict):
160
+ print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.")
161
+ data["file_states"] = {}
162
+
163
+ if "next_download_index" not in data:
164
+ data["next_download_index"] = 0
165
+
166
+ return data
167
+ except json.JSONDecodeError:
168
+ print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
169
+ return default_value
170
+
171
+ def save_json_state(file_path: str, data: Dict[str, Any]):
172
+ """Save state to JSON file"""
173
+ with open(file_path, "w") as f:
174
+ json.dump(data, f, indent=2)
175
+
176
+ async def download_hf_state() -> Dict[str, Any]:
177
+ """Downloads the state file from Hugging Face or returns a default state."""
178
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
179
+ default_state = {"next_download_index": 0, "file_states": {}}
180
+
181
+ try:
182
+ # Check if the file exists in the helium repo
183
+ files = HfApi(token=HF_TOKEN).list_repo_files(
184
+ repo_id=HF_OUTPUT_DATASET_ID,
185
+ repo_type="dataset"
186
+ )
187
+
188
+ if HF_STATE_FILE not in files:
189
+ print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.")
190
+ return default_state
191
+
192
+ # Download the file
193
+ hf_hub_download(
194
+ repo_id=HF_OUTPUT_DATASET_ID,
195
+ filename=HF_STATE_FILE,
196
+ repo_type="dataset",
197
+ local_dir=LOCAL_STATE_FOLDER,
198
+ local_dir_use_symlinks=False,
199
+ token=HF_TOKEN
200
+ )
201
+
202
+ print(f"[{FLOW_ID}] Successfully downloaded state file.")
203
+ return load_json_state(str(local_path), default_state)
204
+
205
+ except Exception as e:
206
+ print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
207
+ return default_state
208
+
209
+ async def upload_hf_state(state: Dict[str, Any]) -> bool:
210
+ """Uploads the state file to Hugging Face."""
211
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
212
+
213
+ try:
214
+ # Save state locally first
215
+ save_json_state(str(local_path), state)
216
+
217
+ # Upload to helium dataset
218
+ HfApi(token=HF_TOKEN).upload_file(
219
+ path_or_fileobj=str(local_path),
220
+ path_in_repo=HF_STATE_FILE,
221
+ repo_id=HF_OUTPUT_DATASET_ID,
222
+ repo_type="dataset",
223
+ commit_message=f"Update caption processing state: next_index={state['next_download_index']}"
224
+ )
225
+ print(f"[{FLOW_ID}] Successfully uploaded state file.")
226
+ return True
227
+ except Exception as e:
228
+ print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
229
+ return False
230
+
231
+ async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
232
+ """Marks a file as 'processing' in the state file and uploads the lock."""
233
+ print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {zip_filename}")
234
+
235
+ # Update state locally
236
+ state["file_states"][zip_filename] = "processing"
237
+
238
+ # Upload the updated state file immediately to establish the lock
239
+ if await upload_hf_state(state):
240
+ print(f"[{FLOW_ID}] ✅ Successfully locked file: {zip_filename}")
241
+ return True
242
+ else:
243
+ print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}")
244
+ # Revert local state
245
+ if zip_filename in state["file_states"]:
246
+ del state["file_states"][zip_filename]
247
+ return False
248
+
249
+ async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool:
250
+ """Marks a file as 'processed', updates the index, and uploads the state."""
251
+ print(f"[{FLOW_ID}] 🔓 Marking file as processed: {zip_filename}")
252
+
253
+ # Update state locally
254
+ state["file_states"][zip_filename] = "processed"
255
+ state["next_download_index"] = next_index
256
+
257
+ # Upload the updated state
258
+ if await upload_hf_state(state):
259
+ print(f"[{FLOW_ID}] ✅ Successfully marked as processed: {zip_filename}")
260
+ return True
261
+ else:
262
+ print(f"[{FLOW_ID}] Failed to update state for: {zip_filename}")
263
+ return False
264
+
265
+ # --- Hugging Face Utility Functions ---
266
+
267
+ async def get_zip_file_list(progress_data: Dict) -> List[str]:
268
+ """
269
+ Fetches the list of all zip files from the dataset, or uses the cached list.
270
+ Updates the progress_data with the file list if a new list is fetched.
271
+ """
272
+ if progress_data['file_list']:
273
+ print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
274
+ return progress_data['file_list']
275
+
276
+ print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...")
277
+ try:
278
+ api = HfApi(token=HF_TOKEN)
279
+ repo_files = api.list_repo_files(
280
+ repo_id=HF_DATASET_ID,
281
+ repo_type="dataset"
282
+ )
283
+
284
+ # Filter for zip files in the specified directory and sort them alphabetically for consistent indexing
285
+ zip_files = sorted([
286
+ f for f in repo_files
287
+ if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip')
288
+ ])
289
+
290
+ if not zip_files:
291
+ raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.")
292
+
293
+ print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.")
294
+
295
+ # Update and save the progress data
296
+ progress_data['file_list'] = zip_files
297
+ save_progress(progress_data)
298
+
299
+ return zip_files
300
+
301
+ except Exception as e:
302
+ print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
303
+ return []
304
+
305
+ async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]:
306
+ """Downloads the zip file for the given index and extracts its contents."""
307
+
308
+ # Extract the base name for the extraction directory
309
+ zip_full_name = Path(repo_file_full_path).name
310
+ course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
311
+
312
+ print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}")
313
+
314
+ try:
315
+ # Use hf_hub_download to get the file path
316
+ zip_path = hf_hub_download(
317
+ repo_id=HF_DATASET_ID,
318
+ filename=repo_file_full_path, # Use the full path in the repo
319
+ repo_type="dataset",
320
+ token=HF_TOKEN,
321
+ )
322
+
323
+ print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
324
+
325
+ # Create a temporary directory for extraction
326
+ extract_dir = TEMP_DIR / course_name
327
+ # Ensure a clean directory for extraction
328
+ if extract_dir.exists():
329
+ shutil.rmtree(extract_dir)
330
+ extract_dir.mkdir(exist_ok=True)
331
+
332
+ with zipfile.ZipFile(zip_path, 'r') as zip_ref:
333
+ zip_ref.extractall(extract_dir)
334
+
335
+ print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
336
+
337
+ # Clean up the downloaded zip file to save space
338
+ os.remove(zip_path)
339
+
340
+ return extract_dir
341
+
342
+ except Exception as e:
343
+ print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
344
+ return None
345
+
346
+ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
347
+ """Uploads the final captions JSON file to the output dataset."""
348
+ # Use the full zip name, replacing the extension with .json
349
+ caption_filename = Path(zip_full_name).with_suffix('.json').name
350
+
351
+ try:
352
+ print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
353
+
354
+ # Create JSON content in memory
355
+ json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
356
+
357
+ api = HfApi(token=HF_TOKEN)
358
+ api.upload_file(
359
+ path_or_fileobj=io.BytesIO(json_content),
360
+ path_in_repo=caption_filename,
361
+ repo_id=HF_OUTPUT_DATASET_ID,
362
+ repo_type="dataset",
363
+ commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
364
+ )
365
+
366
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
367
+ return True
368
+
369
+ except Exception as e:
370
+ print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
371
+ return False
372
+
373
+ # --- Core Processing Functions (Modified) ---
374
+
375
+ async def get_available_server(timeout: float = 300.0) -> CaptionServer:
376
+ """Round-robin selection of an available caption server."""
377
+ global server_index
378
+ start_time = time.time()
379
+ while True:
380
+ # Round-robin check for an available server
381
+ for _ in range(len(servers)):
382
+ server = servers[server_index]
383
+ server_index = (server_index + 1) % len(servers)
384
+ if not server.busy:
385
+ return server
386
+
387
+ # If all servers are busy, wait for a short period and check again
388
+ await asyncio.sleep(0.5)
389
+
390
+ # Check if timeout has been reached
391
+ if time.time() - start_time > timeout:
392
+ raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.")
393
+
394
+ async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
395
+ """Sends a single image to a caption server for processing."""
396
+ # This function now handles server selection and retries internally
397
+ MAX_RETRIES = 3
398
+ for attempt in range(MAX_RETRIES):
399
+ server = None
400
+ try:
401
+ # 1. Get an available server (will wait if all are busy, with a timeout)
402
+ server = await get_available_server()
403
+ server.busy = True
404
+ start_time = time.time()
405
+
406
+ # Print a less verbose message only on the first attempt
407
+ if attempt == 0:
408
+ print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...")
409
+
410
+ # 2. Prepare request data
411
+ form_data = aiohttp.FormData()
412
+ form_data.add_field('file',
413
+ image_path.open('rb'),
414
+ filename=image_path.name,
415
+ content_type='image/jpeg')
416
+ form_data.add_field('model_choice', MODEL_TYPE)
417
+
418
+ # 3. Send request
419
+ async with aiohttp.ClientSession() as session:
420
+ # Increased timeout to 10 minutes (600s) as requested by user's problem description
421
+ async with session.post(server.url, data=form_data, timeout=600) as resp:
422
+ if resp.status == 200:
423
+ result = await resp.json()
424
+ caption = result.get("caption")
425
+
426
+ if caption:
427
+ # Update progress counter
428
+ progress_tracker['completed'] += 1
429
+ if progress_tracker['completed'] % 50 == 0:
430
+ print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.")
431
+
432
+ # Log success only if it's not a progress report interval
433
+ if progress_tracker['completed'] % 50 != 0:
434
+ print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}")
435
+
436
+ return {
437
+ "course": course_name,
438
+ "image_path": image_path.name,
439
+ "caption": caption,
440
+ "timestamp": datetime.now().isoformat()
441
+ }
442
+ else:
443
+ print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...")
444
+ continue # Retry with a different server
445
+ else:
446
+ error_text = await resp.text()
447
+ print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...")
448
+ continue # Retry with a different server
449
+
450
+ except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
451
+ print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
452
+ continue # Retry with a different server
453
+ except Exception as e:
454
+ print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...")
455
+ continue # Retry with a different server
456
+ finally:
457
+ if server:
458
+ end_time = time.time()
459
+ server.busy = False
460
+ server.total_processed += 1
461
+ server.total_time += (end_time - start_time)
462
+
463
+ print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
464
+ return None
465
+
466
+ async def process_dataset_task(start_index: int):
467
+ """Main task to process the dataset sequentially starting from a given index."""
468
+
469
+ # Load both local progress and HF state
470
+ progress = load_progress()
471
+ current_state = await download_hf_state()
472
+ file_list = await get_zip_file_list(progress)
473
+
474
+ if not file_list:
475
+ print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.")
476
+ return False
477
+
478
+ # Ensure start_index is within bounds
479
+ if start_index > len(file_list):
480
+ print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.")
481
+ return True
482
+
483
+ # Determine the actual starting index in the 0-indexed list
484
+ start_list_index = start_index - 1
485
+
486
+ print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.")
487
+
488
+ global_success = True
489
+
490
+ for i in range(start_list_index, len(file_list)):
491
+ file_index = i + 1 # 1-indexed for user display and progress tracking
492
+ repo_file_full_path = file_list[i]
493
+ zip_full_name = Path(repo_file_full_path).name
494
+ course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
495
+
496
+ # Check file state in both local and HF state
497
+ file_state = current_state["file_states"].get(zip_full_name)
498
+ if file_state == "processed":
499
+ print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.")
500
+ continue
501
+ elif file_state == "processing":
502
+ print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.")
503
+ continue
504
+
505
+ # Try to lock the file
506
+ if not await lock_file_for_processing(zip_full_name, current_state):
507
+ print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.")
508
+ continue
509
+
510
+ extract_dir = None
511
+ current_file_success = False
512
+
513
+ try:
514
+ # 1. Download and Extract
515
+ extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path)
516
+
517
+ if not extract_dir:
518
+ raise Exception("Failed to download or extract zip file.")
519
+
520
+ # 2. Find Images
521
+ # Use recursive glob to find images in subdirectories
522
+ image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
523
+ print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
524
+
525
+ if not image_paths:
526
+ print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
527
+ current_file_success = True
528
+ else:
529
+ # 3. Process Images (Captioning)
530
+ progress_tracker = {
531
+ 'total': len(image_paths),
532
+ 'completed': 0
533
+ }
534
+ print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
535
+
536
+ # Create a semaphore to limit concurrent tasks to the number of available servers
537
+ semaphore = asyncio.Semaphore(len(servers))
538
+
539
+ async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
540
+ async with semaphore:
541
+ return await send_image_for_captioning(image_path, course_name, progress_tracker)
542
+
543
+ # Create a list of tasks for parallel captioning
544
+ caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths]
545
+
546
+ # Run all captioning tasks concurrently
547
+ results = await asyncio.gather(*caption_tasks)
548
+
549
+ # Filter out failed results
550
+ all_captions = [r for r in results if r is not None]
551
+
552
+ # Final progress report for the current file
553
+ if len(all_captions) == len(image_paths):
554
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
555
+ current_file_success = True
556
+ else:
557
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions. Marking as partial failure.")
558
+ current_file_success = False
559
+
560
+ # 4. Upload Results
561
+ if all_captions:
562
+ print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
563
+ if await upload_captions_to_hf(zip_full_name, all_captions):
564
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
565
+ # Partial success in captioning is still a success for the upload step
566
+ pass
567
+ else:
568
+ print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
569
+ current_file_success = False
570
+ else:
571
+ print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.")
572
+ current_file_success = False
573
+
574
+ except Exception as e:
575
+ print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}")
576
+ current_file_success = False
577
+ global_success = False # Mark overall task as failed if any file fails critically
578
+
579
+ finally:
580
+ # 5. Cleanup and Update Progress
581
+ if extract_dir and extract_dir.exists():
582
+ print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
583
+ shutil.rmtree(extract_dir, ignore_errors=True)
584
+
585
+ if current_file_success:
586
+ # Update both local progress and HF state
587
+ progress['last_processed_index'] = file_index
588
+ progress['processed_files'][str(file_index)] = repo_file_full_path
589
+ save_progress(progress)
590
+
591
+ # Update HF state and unlock the file
592
+ if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1):
593
+ print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}")
594
+ else:
595
+ print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}")
596
+ else:
597
+ # Mark as failed in the state and continue with next file
598
+ current_state["file_states"][zip_full_name] = "failed"
599
+ await upload_hf_state(current_state)
600
+ print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.")
601
+ global_success = False
602
+
603
+ print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}")
604
+ return global_success
605
+
606
+ # --- FastAPI App and Endpoints ---
607
+
608
+ app = FastAPI(
609
+ title=f"Flow Server {FLOW_ID} API",
610
+ description="Sequentially processes zip files from a dataset, captions images, and tracks progress.",
611
+ version="1.0.0"
612
+ )
613
+
614
+ @app.on_event("startup")
615
+ async def startup_event():
616
+ print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.")
617
+
618
+ # Automatically start the processing task
619
+ progress = load_progress()
620
+ # Start from the last processed index + 1, or the hardcoded AUTO_START_INDEX if the progress file is new/empty
621
+ start_index = progress.get('last_processed_index', 0) + 1
622
+ if start_index < AUTO_START_INDEX:
623
+ start_index = AUTO_START_INDEX
624
+
625
+ # Use a dummy BackgroundTasks object for the startup task
626
+ # Note: FastAPI's startup events can't directly use BackgroundTasks, but we can use asyncio.create_task
627
+ # to run the long-running process in the background without blocking the server startup.
628
+ print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...")
629
+ asyncio.create_task(process_dataset_task(start_index))
630
+
631
+ @app.get("/")
632
+ async def root():
633
+ progress = load_progress()
634
+ return {
635
+ "flow_id": FLOW_ID,
636
+ "status": "ready",
637
+ "last_processed_index": progress['last_processed_index'],
638
+ "total_files_in_list": len(progress['file_list']),
639
+ "processed_files_count": len(progress['processed_files']),
640
+ "total_servers": len(servers),
641
+ "busy_servers": sum(1 for s in servers if s.busy),
642
+ }
643
+
644
+ @app.post("/start_processing")
645
+ async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks):
646
+ """
647
+ Starts the sequential processing of zip files from the given index in the background.
648
+ """
649
+ start_index = request.start_index
650
+
651
+ print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.")
652
+
653
+ # Start the heavy processing in a background task so the API call returns immediately
654
+ # Note: The server is already auto-starting, but this allows for manual restart/override.
655
+ background_tasks.add_task(process_dataset_task, start_index)
656
+
657
+ return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."}
658
+
659
+ if __name__ == "__main__":
660
+ import uvicorn
661
+ # Note: When running in the sandbox, we need to use 0.0.0.0 to expose the port.
662
+ uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)