Fred808 commited on
Commit
d01e81b
·
verified ·
1 Parent(s): 344574d

Upload 3 files

Browse files
Files changed (3) hide show
  1. Dockerfile +19 -0
  2. app.py +639 -0
  3. requirements.txt +9 -0
Dockerfile ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Read the doc: https://huggingface.co/docs/hub/spaces-sdks-docker
2
+ # you will also find guides on how best to write your Dockerfile
3
+
4
+ FROM python:3.9
5
+
6
+ RUN useradd -m -u 1000 user
7
+ USER user
8
+ ENV PATH="/home/user/.local/bin:$PATH"
9
+
10
+ WORKDIR /app
11
+
12
+ COPY --chown=user ./requirements.txt requirements.txt
13
+ RUN pip install --no-cache-dir --upgrade -r requirements.txt
14
+
15
+ RUN chmod -R 777 /app
16
+
17
+
18
+ COPY --chown=user . /app
19
+ CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "7860"]
app.py ADDED
@@ -0,0 +1,639 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import asyncio
5
+ import aiohttp
6
+ import zipfile
7
+ import shutil
8
+ from typing import Dict, List, Set, Optional, Tuple, Any
9
+ from urllib.parse import quote
10
+ from datetime import datetime
11
+ from pathlib import Path
12
+ import io
13
+
14
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status
15
+ from pydantic import BaseModel, Field
16
+ from huggingface_hub import HfApi, hf_hub_download
17
+
18
+ # --- Configuration ---
19
+ AUTO_START_INDEX = 0 # Hardcoded default start index if no progress is found
20
+ FLOW_ID = os.getenv("FLOW_ID", "flow_default")
21
+ FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
22
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
23
+ HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3") # Source dataset for zip files
24
+ HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") # Target dataset for captions
25
+
26
+ # Progress and State Tracking
27
+ PROGRESS_FILE = Path("processing_progress.json")
28
+ HF_STATE_FILE = "processing_state_captions.json" # State file in helium dataset
29
+ LOCAL_STATE_FOLDER = Path(".state") # Local folder for state file
30
+ LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
+
32
+ # Directory within the HF dataset where the zip files are located
33
+ ZIP_FILE_PREFIX = "frames_zips/"
34
+
35
+ # Using the full list from the user's original code for actual deployment
36
+ CAPTION_SERVERS = [
37
+ "https://fred808-pil-4-1.hf.space/analyze",
38
+ "https://fred808-pil-4-2.hf.space/analyze",
39
+ "https://fred808-pil-4-3.hf.space/analyze",
40
+ "https://fred1012-fred1012-gw0j2h.hf.space/analyze",
41
+ "https://fred1012-fred1012-wqs6c2.hf.space/analyze",
42
+ "https://fred1012-fred1012-oncray.hf.space/analyze",
43
+ "https://fred1012-fred1012-4goge7.hf.space/analyze",
44
+ "https://fred1012-fred1012-z0eh7m.hf.space/analyze",
45
+ "https://fred1012-fred1012-u95rte.hf.space/analyze",
46
+ "https://fred1012-fred1012-igje22.hf.space/analyze",
47
+ "https://fred1012-fred1012-ibkuf8.hf.space/analyze",
48
+ "https://fred1012-fred1012-nwqthy.hf.space/analyze",
49
+ "https://fred1012-fred1012-4ldqj4.hf.space/analyze",
50
+ "https://fred1012-fred1012-pivlzg.hf.space/analyze",
51
+ "https://fred1012-fred1012-ptlc5u.hf.space/analyze",
52
+ "https://fred1012-fred1012-u7lh57.hf.space/analyze",
53
+ "https://fred1012-fred1012-q8djv1.hf.space/analyze",
54
+ "https://fredalone-fredalone-ozugrp.hf.space/analyze",
55
+ "https://fredalone-fredalone-9brxj2.hf.space/analyze",
56
+ "https://fredalone-fredalone-p8vq9a.hf.space/analyze",
57
+ "https://fredalone-fredalone-vbli2y.hf.space/analyze",
58
+ "https://fredalone-fredalone-uggger.hf.space/analyze",
59
+ "https://fredalone-fredalone-nmi7e8.hf.space/analyze",
60
+ "https://fredalone-fredalone-d1f26d.hf.space/analyze",
61
+ "https://fredalone-fredalone-461jp2.hf.space/analyze",
62
+ "https://fredalone-fredalone-3enfg4.hf.space/analyze",
63
+ "https://fredalone-fredalone-dqdbpv.hf.space/analyze",
64
+ "https://fredalone-fredalone-ivtjua.hf.space/analyze",
65
+ "https://fredalone-fredalone-6bezt2.hf.space/analyze",
66
+ "https://fredalone-fredalone-e0wfnk.hf.space/analyze",
67
+ "https://fredalone-fredalone-zu2t7j.hf.space/analyze",
68
+ "https://fredalone-fredalone-dqtv1o.hf.space/analyze",
69
+ "https://fredalone-fredalone-wclyog.hf.space/analyze",
70
+ "https://fredalone-fredalone-t27vig.hf.space/analyze",
71
+ "https://fredalone-fredalone-gahbxh.hf.space/analyze",
72
+ "https://fredalone-fredalone-kw2po4.hf.space/analyze",
73
+ "https://fredalone-fredalone-8h285h.hf.space/analyze"
74
+ ]
75
+ MODEL_TYPE = "Florence-2-large"
76
+
77
+ # Temporary storage for images
78
+ TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
79
+ TEMP_DIR.mkdir(exist_ok=True)
80
+
81
+ # --- Models ---
82
+ class ProcessStartRequest(BaseModel):
83
+ start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).")
84
+
85
+ class CaptionServer:
86
+ def __init__(self, url):
87
+ self.url = url
88
+ self.busy = False
89
+ self.total_processed = 0
90
+ self.total_time = 0
91
+ self.model = MODEL_TYPE
92
+
93
+ @property
94
+ def fps(self):
95
+ return self.total_processed / self.total_time if self.total_time > 0 else 0
96
+
97
+ # Global state for caption servers
98
+ servers = [CaptionServer(url) for url in CAPTION_SERVERS]
99
+ server_index = 0
100
+
101
+ # --- Progress and State Management Functions ---
102
+
103
+ def load_progress() -> Dict:
104
+ """Loads the local processing progress from the JSON file."""
105
+ if PROGRESS_FILE.exists():
106
+ try:
107
+ with PROGRESS_FILE.open('r') as f:
108
+ return json.load(f)
109
+ except json.JSONDecodeError:
110
+ print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
111
+ # Fall through to return default structure
112
+
113
+ # Default structure
114
+ return {
115
+ "last_processed_index": 0,
116
+ "processed_files": {}, # {index: repo_path}
117
+ "file_list": [] # Full list of all zip files found in the dataset
118
+ }
119
+
120
+ def save_progress(progress_data: Dict):
121
+ """Saves the local processing progress to the JSON file."""
122
+ try:
123
+ with PROGRESS_FILE.open('w') as f:
124
+ json.dump(progress_data, f, indent=4)
125
+ except Exception as e:
126
+ print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
127
+
128
+ def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
129
+ """Load state from JSON file with migration logic for new structure."""
130
+ if os.path.exists(file_path):
131
+ try:
132
+ with open(file_path, "r") as f:
133
+ data = json.load(f)
134
+
135
+ # Migration Logic
136
+ if "file_states" not in data or not isinstance(data["file_states"], dict):
137
+ print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.")
138
+ data["file_states"] = {}
139
+
140
+ if "next_download_index" not in data:
141
+ data["next_download_index"] = 0
142
+
143
+ return data
144
+ except json.JSONDecodeError:
145
+ print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
146
+ return default_value
147
+
148
+ def save_json_state(file_path: str, data: Dict[str, Any]):
149
+ """Save state to JSON file"""
150
+ with open(file_path, "w") as f:
151
+ json.dump(data, f, indent=2)
152
+
153
+ async def download_hf_state() -> Dict[str, Any]:
154
+ """Downloads the state file from Hugging Face or returns a default state."""
155
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
156
+ default_state = {"next_download_index": 0, "file_states": {}}
157
+
158
+ try:
159
+ # Check if the file exists in the helium repo
160
+ files = HfApi(token=HF_TOKEN).list_repo_files(
161
+ repo_id=HF_OUTPUT_DATASET_ID,
162
+ repo_type="dataset"
163
+ )
164
+
165
+ if HF_STATE_FILE not in files:
166
+ print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.")
167
+ return default_state
168
+
169
+ # Download the file
170
+ hf_hub_download(
171
+ repo_id=HF_OUTPUT_DATASET_ID,
172
+ filename=HF_STATE_FILE,
173
+ repo_type="dataset",
174
+ local_dir=LOCAL_STATE_FOLDER,
175
+ local_dir_use_symlinks=False,
176
+ token=HF_TOKEN
177
+ )
178
+
179
+ print(f"[{FLOW_ID}] Successfully downloaded state file.")
180
+ return load_json_state(str(local_path), default_state)
181
+
182
+ except Exception as e:
183
+ print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
184
+ return default_state
185
+
186
+ async def upload_hf_state(state: Dict[str, Any]) -> bool:
187
+ """Uploads the state file to Hugging Face."""
188
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
189
+
190
+ try:
191
+ # Save state locally first
192
+ save_json_state(str(local_path), state)
193
+
194
+ # Upload to helium dataset
195
+ HfApi(token=HF_TOKEN).upload_file(
196
+ path_or_fileobj=str(local_path),
197
+ path_in_repo=HF_STATE_FILE,
198
+ repo_id=HF_OUTPUT_DATASET_ID,
199
+ repo_type="dataset",
200
+ commit_message=f"Update caption processing state: next_index={state['next_download_index']}"
201
+ )
202
+ print(f"[{FLOW_ID}] Successfully uploaded state file.")
203
+ return True
204
+ except Exception as e:
205
+ print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
206
+ return False
207
+
208
+ async def lock_file_for_processing(zip_filename: str, state: Dict[str, Any]) -> bool:
209
+ """Marks a file as 'processing' in the state file and uploads the lock."""
210
+ print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {zip_filename}")
211
+
212
+ # Update state locally
213
+ state["file_states"][zip_filename] = "processing"
214
+
215
+ # Upload the updated state file immediately to establish the lock
216
+ if await upload_hf_state(state):
217
+ print(f"[{FLOW_ID}] ✅ Successfully locked file: {zip_filename}")
218
+ return True
219
+ else:
220
+ print(f"[{FLOW_ID}] ❌ Failed to lock file: {zip_filename}")
221
+ # Revert local state
222
+ if zip_filename in state["file_states"]:
223
+ del state["file_states"][zip_filename]
224
+ return False
225
+
226
+ async def unlock_file_as_processed(zip_filename: str, state: Dict[str, Any], next_index: int) -> bool:
227
+ """Marks a file as 'processed', updates the index, and uploads the state."""
228
+ print(f"[{FLOW_ID}] 🔓 Marking file as processed: {zip_filename}")
229
+
230
+ # Update state locally
231
+ state["file_states"][zip_filename] = "processed"
232
+ state["next_download_index"] = next_index
233
+
234
+ # Upload the updated state
235
+ if await upload_hf_state(state):
236
+ print(f"[{FLOW_ID}] ✅ Successfully marked as processed: {zip_filename}")
237
+ return True
238
+ else:
239
+ print(f"[{FLOW_ID}] ❌ Failed to update state for: {zip_filename}")
240
+ return False
241
+
242
+ # --- Hugging Face Utility Functions ---
243
+
244
+ async def get_zip_file_list(progress_data: Dict) -> List[str]:
245
+ """
246
+ Fetches the list of all zip files from the dataset, or uses the cached list.
247
+ Updates the progress_data with the file list if a new list is fetched.
248
+ """
249
+ if progress_data['file_list']:
250
+ print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
251
+ return progress_data['file_list']
252
+
253
+ print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...")
254
+ try:
255
+ api = HfApi(token=HF_TOKEN)
256
+ repo_files = api.list_repo_files(
257
+ repo_id=HF_DATASET_ID,
258
+ repo_type="dataset"
259
+ )
260
+
261
+ # Filter for zip files in the specified directory and sort them alphabetically for consistent indexing
262
+ zip_files = sorted([
263
+ f for f in repo_files
264
+ if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip')
265
+ ])
266
+
267
+ if not zip_files:
268
+ raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.")
269
+
270
+ print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.")
271
+
272
+ # Update and save the progress data
273
+ progress_data['file_list'] = zip_files
274
+ save_progress(progress_data)
275
+
276
+ return zip_files
277
+
278
+ except Exception as e:
279
+ print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
280
+ return []
281
+
282
+ async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]:
283
+ """Downloads the zip file for the given index and extracts its contents."""
284
+
285
+ # Extract the base name for the extraction directory
286
+ zip_full_name = Path(repo_file_full_path).name
287
+ course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
288
+
289
+ print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}")
290
+
291
+ try:
292
+ # Use hf_hub_download to get the file path
293
+ zip_path = hf_hub_download(
294
+ repo_id=HF_DATASET_ID,
295
+ filename=repo_file_full_path, # Use the full path in the repo
296
+ repo_type="dataset",
297
+ token=HF_TOKEN,
298
+ )
299
+
300
+ print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
301
+
302
+ # Create a temporary directory for extraction
303
+ extract_dir = TEMP_DIR / course_name
304
+ # Ensure a clean directory for extraction
305
+ if extract_dir.exists():
306
+ shutil.rmtree(extract_dir)
307
+ extract_dir.mkdir(exist_ok=True)
308
+
309
+ with zipfile.ZipFile(zip_path, 'r') as zip_ref:
310
+ zip_ref.extractall(extract_dir)
311
+
312
+ print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
313
+
314
+ # Clean up the downloaded zip file to save space
315
+ os.remove(zip_path)
316
+
317
+ return extract_dir
318
+
319
+ except Exception as e:
320
+ print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
321
+ return None
322
+
323
+ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
324
+ """Uploads the final captions JSON file to the output dataset."""
325
+ # Use the full zip name, replacing the extension with .json
326
+ caption_filename = Path(zip_full_name).with_suffix('.json').name
327
+
328
+ try:
329
+ print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
330
+
331
+ # Create JSON content in memory
332
+ json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
333
+
334
+ api = HfApi(token=HF_TOKEN)
335
+ api.upload_file(
336
+ path_or_fileobj=io.BytesIO(json_content),
337
+ path_in_repo=caption_filename,
338
+ repo_id=HF_OUTPUT_DATASET_ID,
339
+ repo_type="dataset",
340
+ commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
341
+ )
342
+
343
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
344
+ return True
345
+
346
+ except Exception as e:
347
+ print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
348
+ return False
349
+
350
+ # --- Core Processing Functions (Modified) ---
351
+
352
+ async def get_available_server(timeout: float = 300.0) -> CaptionServer:
353
+ """Round-robin selection of an available caption server."""
354
+ global server_index
355
+ start_time = time.time()
356
+ while True:
357
+ # Round-robin check for an available server
358
+ for _ in range(len(servers)):
359
+ server = servers[server_index]
360
+ server_index = (server_index + 1) % len(servers)
361
+ if not server.busy:
362
+ return server
363
+
364
+ # If all servers are busy, wait for a short period and check again
365
+ await asyncio.sleep(0.5)
366
+
367
+ # Check if timeout has been reached
368
+ if time.time() - start_time > timeout:
369
+ raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.")
370
+
371
+ async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
372
+ """Sends a single image to a caption server for processing."""
373
+ # This function now handles server selection and retries internally
374
+ MAX_RETRIES = 3
375
+ for attempt in range(MAX_RETRIES):
376
+ server = None
377
+ try:
378
+ # 1. Get an available server (will wait if all are busy, with a timeout)
379
+ server = await get_available_server()
380
+ server.busy = True
381
+ start_time = time.time()
382
+
383
+ # Print a less verbose message only on the first attempt
384
+ if attempt == 0:
385
+ print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...")
386
+
387
+ # 2. Prepare request data
388
+ form_data = aiohttp.FormData()
389
+ form_data.add_field('file',
390
+ image_path.open('rb'),
391
+ filename=image_path.name,
392
+ content_type='image/jpeg')
393
+ form_data.add_field('model_choice', MODEL_TYPE)
394
+
395
+ # 3. Send request
396
+ async with aiohttp.ClientSession() as session:
397
+ # Increased timeout to 10 minutes (600s) as requested by user's problem description
398
+ async with session.post(server.url, data=form_data, timeout=600) as resp:
399
+ if resp.status == 200:
400
+ result = await resp.json()
401
+ caption = result.get("caption")
402
+
403
+ if caption:
404
+ # Update progress counter
405
+ progress_tracker['completed'] += 1
406
+ if progress_tracker['completed'] % 50 == 0:
407
+ print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.")
408
+
409
+ # Log success only if it's not a progress report interval
410
+ if progress_tracker['completed'] % 50 != 0:
411
+ print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}")
412
+
413
+ return {
414
+ "course": course_name,
415
+ "image_path": image_path.name,
416
+ "caption": caption,
417
+ "timestamp": datetime.now().isoformat()
418
+ }
419
+ else:
420
+ print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...")
421
+ continue # Retry with a different server
422
+ else:
423
+ error_text = await resp.text()
424
+ print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...")
425
+ continue # Retry with a different server
426
+
427
+ except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
428
+ print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
429
+ continue # Retry with a different server
430
+ except Exception as e:
431
+ print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...")
432
+ continue # Retry with a different server
433
+ finally:
434
+ if server:
435
+ end_time = time.time()
436
+ server.busy = False
437
+ server.total_processed += 1
438
+ server.total_time += (end_time - start_time)
439
+
440
+ print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
441
+ return None
442
+
443
+ async def process_dataset_task(start_index: int):
444
+ """Main task to process the dataset sequentially starting from a given index."""
445
+
446
+ # Load both local progress and HF state
447
+ progress = load_progress()
448
+ current_state = await download_hf_state()
449
+ file_list = await get_zip_file_list(progress)
450
+
451
+ if not file_list:
452
+ print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.")
453
+ return False
454
+
455
+ # Ensure start_index is within bounds
456
+ if start_index > len(file_list):
457
+ print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.")
458
+ return True
459
+
460
+ # Determine the actual starting index in the 0-indexed list
461
+ start_list_index = start_index - 1
462
+
463
+ print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.")
464
+
465
+ global_success = True
466
+
467
+ for i in range(start_list_index, len(file_list)):
468
+ file_index = i + 1 # 1-indexed for user display and progress tracking
469
+ repo_file_full_path = file_list[i]
470
+ zip_full_name = Path(repo_file_full_path).name
471
+ course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
472
+
473
+ # Check file state in both local and HF state
474
+ file_state = current_state["file_states"].get(zip_full_name)
475
+ if file_state == "processed":
476
+ print(f"[{FLOW_ID}] Skipping {zip_full_name}: Already processed in global state.")
477
+ continue
478
+ elif file_state == "processing":
479
+ print(f"[{FLOW_ID}] Skipping {zip_full_name}: Currently being processed by another worker.")
480
+ continue
481
+
482
+ # Try to lock the file
483
+ if not await lock_file_for_processing(zip_full_name, current_state):
484
+ print(f"[{FLOW_ID}] Failed to lock {zip_full_name}. Skipping.")
485
+ continue
486
+
487
+ extract_dir = None
488
+ current_file_success = False
489
+
490
+ try:
491
+ # 1. Download and Extract
492
+ extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path)
493
+
494
+ if not extract_dir:
495
+ raise Exception("Failed to download or extract zip file.")
496
+
497
+ # 2. Find Images
498
+ # Use recursive glob to find images in subdirectories
499
+ image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
500
+ print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
501
+
502
+ if not image_paths:
503
+ print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
504
+ current_file_success = True
505
+ else:
506
+ # 3. Process Images (Captioning)
507
+ progress_tracker = {
508
+ 'total': len(image_paths),
509
+ 'completed': 0
510
+ }
511
+ print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
512
+
513
+ # Create a semaphore to limit concurrent tasks to the number of available servers
514
+ semaphore = asyncio.Semaphore(len(servers))
515
+
516
+ async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
517
+ async with semaphore:
518
+ return await send_image_for_captioning(image_path, course_name, progress_tracker)
519
+
520
+ # Create a list of tasks for parallel captioning
521
+ caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths]
522
+
523
+ # Run all captioning tasks concurrently
524
+ results = await asyncio.gather(*caption_tasks)
525
+
526
+ # Filter out failed results
527
+ all_captions = [r for r in results if r is not None]
528
+
529
+ # Final progress report for the current file
530
+ if len(all_captions) == len(image_paths):
531
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
532
+ current_file_success = True
533
+ else:
534
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions. Marking as partial failure.")
535
+ current_file_success = False
536
+
537
+ # 4. Upload Results
538
+ if all_captions:
539
+ print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
540
+ if await upload_captions_to_hf(zip_full_name, all_captions):
541
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
542
+ # Partial success in captioning is still a success for the upload step
543
+ pass
544
+ else:
545
+ print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
546
+ current_file_success = False
547
+ else:
548
+ print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.")
549
+ current_file_success = False
550
+
551
+ except Exception as e:
552
+ print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}")
553
+ current_file_success = False
554
+ global_success = False # Mark overall task as failed if any file fails critically
555
+
556
+ finally:
557
+ # 5. Cleanup and Update Progress
558
+ if extract_dir and extract_dir.exists():
559
+ print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
560
+ shutil.rmtree(extract_dir, ignore_errors=True)
561
+
562
+ if current_file_success:
563
+ # Update both local progress and HF state
564
+ progress['last_processed_index'] = file_index
565
+ progress['processed_files'][str(file_index)] = repo_file_full_path
566
+ save_progress(progress)
567
+
568
+ # Update HF state and unlock the file
569
+ if await unlock_file_as_processed(zip_full_name, current_state, file_index + 1):
570
+ print(f"[{FLOW_ID}] Progress saved and file unlocked: {zip_full_name}")
571
+ else:
572
+ print(f"[{FLOW_ID}] Warning: File processed but state update failed: {zip_full_name}")
573
+ else:
574
+ # Mark as failed in the state and continue with next file
575
+ current_state["file_states"][zip_full_name] = "failed"
576
+ await upload_hf_state(current_state)
577
+ print(f"[{FLOW_ID}] File {zip_full_name} marked as failed. Continuing with next file.")
578
+ global_success = False
579
+
580
+ print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}")
581
+ return global_success
582
+
583
+ # --- FastAPI App and Endpoints ---
584
+
585
+ app = FastAPI(
586
+ title=f"Flow Server {FLOW_ID} API",
587
+ description="Sequentially processes zip files from a dataset, captions images, and tracks progress.",
588
+ version="1.0.0"
589
+ )
590
+
591
+ @app.on_event("startup")
592
+ async def startup_event():
593
+ print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.")
594
+
595
+ # Automatically start the processing task
596
+ progress = load_progress()
597
+ # Start from the last processed index + 1, or the hardcoded AUTO_START_INDEX if the progress file is new/empty
598
+ start_index = progress.get('last_processed_index', 0) + 1
599
+ if start_index < AUTO_START_INDEX:
600
+ start_index = AUTO_START_INDEX
601
+
602
+ # Use a dummy BackgroundTasks object for the startup task
603
+ # Note: FastAPI's startup events can't directly use BackgroundTasks, but we can use asyncio.create_task
604
+ # to run the long-running process in the background without blocking the server startup.
605
+ print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...")
606
+ asyncio.create_task(process_dataset_task(start_index))
607
+
608
+ @app.get("/")
609
+ async def root():
610
+ progress = load_progress()
611
+ return {
612
+ "flow_id": FLOW_ID,
613
+ "status": "ready",
614
+ "last_processed_index": progress['last_processed_index'],
615
+ "total_files_in_list": len(progress['file_list']),
616
+ "processed_files_count": len(progress['processed_files']),
617
+ "total_servers": len(servers),
618
+ "busy_servers": sum(1 for s in servers if s.busy),
619
+ }
620
+
621
+ @app.post("/start_processing")
622
+ async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks):
623
+ """
624
+ Starts the sequential processing of zip files from the given index in the background.
625
+ """
626
+ start_index = request.start_index
627
+
628
+ print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.")
629
+
630
+ # Start the heavy processing in a background task so the API call returns immediately
631
+ # Note: The server is already auto-starting, but this allows for manual restart/override.
632
+ background_tasks.add_task(process_dataset_task, start_index)
633
+
634
+ return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."}
635
+
636
+ if __name__ == "__main__":
637
+ import uvicorn
638
+ # Note: When running in the sandbox, we need to use 0.0.0.0 to expose the port.
639
+ uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)
requirements.txt ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ fastapi==0.104.1
2
+ uvicorn==0.24.0
3
+ aiofiles==23.2.1
4
+ python-multipart==0.0.6
5
+ huggingface-hub==0.18.0
6
+ aiohttp
7
+ jinja2
8
+ pydantic
9
+ datasets