Fred808 commited on
Commit
9fdc8fb
·
verified ·
1 Parent(s): b6388e3

Upload 3 files

Browse files
Files changed (3) hide show
  1. Dockerfile +19 -0
  2. app.py +506 -0
  3. requirements.txt +9 -0
Dockerfile ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Read the doc: https://huggingface.co/docs/hub/spaces-sdks-docker
2
+ # you will also find guides on how best to write your Dockerfile
3
+
4
+ FROM python:3.9
5
+
6
+ RUN useradd -m -u 1000 user
7
+ USER user
8
+ ENV PATH="/home/user/.local/bin:$PATH"
9
+
10
+ WORKDIR /app
11
+
12
+ COPY --chown=user ./requirements.txt requirements.txt
13
+ RUN pip install --no-cache-dir --upgrade -r requirements.txt
14
+
15
+ RUN chmod -R 777 /app
16
+
17
+
18
+ COPY --chown=user . /app
19
+ CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "7860"]
app.py ADDED
@@ -0,0 +1,506 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import asyncio
5
+ import aiohttp
6
+ import zipfile
7
+ import shutil
8
+ from typing import Dict, List, Set, Optional, Tuple
9
+ from urllib.parse import quote
10
+ from datetime import datetime
11
+ from pathlib import Path
12
+ import io
13
+
14
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status
15
+ from pydantic import BaseModel, Field
16
+ from huggingface_hub import HfApi, hf_hub_download
17
+
18
+ # --- Configuration ---
19
+ AUTO_START_INDEX = 20 # Hardcoded default start index if no progress is found
20
+ FLOW_ID = os.getenv("FLOW_ID", "flow_default")
21
+ FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
22
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
23
+ HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3") # Source dataset for zip files
24
+ HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") # Target dataset for captions
25
+
26
+ # Progress Tracking File
27
+ PROGRESS_FILE = Path("processing_progress.json")
28
+ # Directory within the HF dataset where the zip files are located
29
+ ZIP_FILE_PREFIX = "frames/"
30
+
31
+ # Using the full list from the user's original code for actual deployment
32
+ CAPTION_SERVERS = [
33
+ "https://fred808-pil-4-1.hf.space/analyze",
34
+ "https://fred808-pil-4-2.hf.space/analyze",
35
+ "https://fred808-pil-4-3.hf.space/analyze",
36
+ "https://fred1012-fred1012-gw0j2h.hf.space/analyze",
37
+ "https://fred1012-fred1012-wqs6c2.hf.space/analyze",
38
+ "https://fred1012-fred1012-oncray.hf.space/analyze",
39
+ "https://fred1012-fred1012-4goge7.hf.space/analyze",
40
+ "https://fred1012-fred1012-z0eh7m.hf.space/analyze",
41
+ "https://fred1012-fred1012-u95rte.hf.space/analyze",
42
+ "https://fred1012-fred1012-igje22.hf.space/analyze",
43
+ "https://fred1012-fred1012-ibkuf8.hf.space/analyze",
44
+ "https://fred1012-fred1012-nwqthy.hf.space/analyze",
45
+ "https://fred1012-fred1012-4ldqj4.hf.space/analyze",
46
+ "https://fred1012-fred1012-pivlzg.hf.space/analyze",
47
+ "https://fred1012-fred1012-ptlc5u.hf.space/analyze",
48
+ "https://fred1012-fred1012-u7lh57.hf.space/analyze",
49
+ "https://fred1012-fred1012-q8djv1.hf.space/analyze",
50
+ "https://fredalone-fredalone-ozugrp.hf.space/analyze",
51
+ "https://fredalone-fredalone-9brxj2.hf.space/analyze",
52
+ "https://fredalone-fredalone-p8vq9a.hf.space/analyze",
53
+ "https://fredalone-fredalone-vbli2y.hf.space/analyze",
54
+ "https://fredalone-fredalone-uggger.hf.space/analyze",
55
+ "https://fredalone-fredalone-nmi7e8.hf.space/analyze",
56
+ "https://fredalone-fredalone-d1f26d.hf.space/analyze",
57
+ "https://fredalone-fredalone-461jp2.hf.space/analyze",
58
+ "https://fredalone-fredalone-3enfg4.hf.space/analyze",
59
+ "https://fredalone-fredalone-dqdbpv.hf.space/analyze",
60
+ "https://fredalone-fredalone-ivtjua.hf.space/analyze",
61
+ "https://fredalone-fredalone-6bezt2.hf.space/analyze",
62
+ "https://fredalone-fredalone-e0wfnk.hf.space/analyze",
63
+ "https://fredalone-fredalone-zu2t7j.hf.space/analyze",
64
+ "https://fredalone-fredalone-dqtv1o.hf.space/analyze",
65
+ "https://fredalone-fredalone-wclyog.hf.space/analyze",
66
+ "https://fredalone-fredalone-t27vig.hf.space/analyze",
67
+ "https://fredalone-fredalone-gahbxh.hf.space/analyze",
68
+ "https://fredalone-fredalone-kw2po4.hf.space/analyze",
69
+ "https://fredalone-fredalone-8h285h.hf.space/analyze"
70
+ ]
71
+ MODEL_TYPE = "Florence-2-large"
72
+
73
+ # Temporary storage for images
74
+ TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
75
+ TEMP_DIR.mkdir(exist_ok=True)
76
+
77
+ # --- Models ---
78
+ class ProcessStartRequest(BaseModel):
79
+ start_index: int = Field(AUTO_START_INDEX, ge=1, description="The index number of the zip file to start processing from (1-indexed).")
80
+
81
+ class CaptionServer:
82
+ def __init__(self, url):
83
+ self.url = url
84
+ self.busy = False
85
+ self.total_processed = 0
86
+ self.total_time = 0
87
+ self.model = MODEL_TYPE
88
+
89
+ @property
90
+ def fps(self):
91
+ return self.total_processed / self.total_time if self.total_time > 0 else 0
92
+
93
+ # Global state for caption servers
94
+ servers = [CaptionServer(url) for url in CAPTION_SERVERS]
95
+ server_index = 0
96
+
97
+ # --- Progress Tracking Functions ---
98
+
99
+ def load_progress() -> Dict:
100
+ """Loads the processing progress from the JSON file."""
101
+ if PROGRESS_FILE.exists():
102
+ try:
103
+ with PROGRESS_FILE.open('r') as f:
104
+ return json.load(f)
105
+ except json.JSONDecodeError:
106
+ print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
107
+ # Fall through to return default structure
108
+
109
+ # Default structure
110
+ return {
111
+ "last_processed_index": 0,
112
+ "processed_files": {}, # {index: repo_path}
113
+ "file_list": [] # Full list of all zip files found in the dataset
114
+ }
115
+
116
+ def save_progress(progress_data: Dict):
117
+ """Saves the processing progress to the JSON file."""
118
+ try:
119
+ with PROGRESS_FILE.open('w') as f:
120
+ json.dump(progress_data, f, indent=4)
121
+ except Exception as e:
122
+ print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
123
+
124
+ # --- Hugging Face Utility Functions ---
125
+
126
+ async def get_zip_file_list(progress_data: Dict) -> List[str]:
127
+ """
128
+ Fetches the list of all zip files from the dataset, or uses the cached list.
129
+ Updates the progress_data with the file list if a new list is fetched.
130
+ """
131
+ if progress_data['file_list']:
132
+ print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
133
+ return progress_data['file_list']
134
+
135
+ print(f"[{FLOW_ID}] Fetching full list of zip files from {HF_DATASET_ID}...")
136
+ try:
137
+ api = HfApi(token=HF_TOKEN)
138
+ repo_files = api.list_repo_files(
139
+ repo_id=HF_DATASET_ID,
140
+ repo_type="dataset"
141
+ )
142
+
143
+ # Filter for zip files in the specified directory and sort them alphabetically for consistent indexing
144
+ zip_files = sorted([
145
+ f for f in repo_files
146
+ if f.startswith(ZIP_FILE_PREFIX) and f.endswith('.zip')
147
+ ])
148
+
149
+ if not zip_files:
150
+ raise FileNotFoundError(f"No zip files found in '{ZIP_FILE_PREFIX}' directory of dataset '{HF_DATASET_ID}'.")
151
+
152
+ print(f"[{FLOW_ID}] Found {len(zip_files)} zip files.")
153
+
154
+ # Update and save the progress data
155
+ progress_data['file_list'] = zip_files
156
+ save_progress(progress_data)
157
+
158
+ return zip_files
159
+
160
+ except Exception as e:
161
+ print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
162
+ return []
163
+
164
+ async def download_and_extract_zip_by_index(file_index: int, repo_file_full_path: str) -> Optional[Path]:
165
+ """Downloads the zip file for the given index and extracts its contents."""
166
+
167
+ # Extract the base name for the extraction directory
168
+ zip_full_name = Path(repo_file_full_path).name
169
+ course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
170
+
171
+ print(f"[{FLOW_ID}] Processing file #{file_index}: {repo_file_full_path}. Full name: {zip_full_name}")
172
+
173
+ try:
174
+ # Use hf_hub_download to get the file path
175
+ zip_path = hf_hub_download(
176
+ repo_id=HF_DATASET_ID,
177
+ filename=repo_file_full_path, # Use the full path in the repo
178
+ repo_type="dataset",
179
+ token=HF_TOKEN,
180
+ )
181
+
182
+ print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
183
+
184
+ # Create a temporary directory for extraction
185
+ extract_dir = TEMP_DIR / course_name
186
+ # Ensure a clean directory for extraction
187
+ if extract_dir.exists():
188
+ shutil.rmtree(extract_dir)
189
+ extract_dir.mkdir(exist_ok=True)
190
+
191
+ with zipfile.ZipFile(zip_path, 'r') as zip_ref:
192
+ zip_ref.extractall(extract_dir)
193
+
194
+ print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
195
+
196
+ # Clean up the downloaded zip file to save space
197
+ os.remove(zip_path)
198
+
199
+ return extract_dir
200
+
201
+ except Exception as e:
202
+ print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
203
+ return None
204
+
205
+ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
206
+ """Uploads the final captions JSON file to the output dataset."""
207
+ # Use the full zip name, replacing the extension with .json
208
+ caption_filename = Path(zip_full_name).with_suffix('.json').name
209
+
210
+ try:
211
+ print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
212
+
213
+ # Create JSON content in memory
214
+ json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
215
+
216
+ api = HfApi(token=HF_TOKEN)
217
+ api.upload_file(
218
+ path_or_fileobj=io.BytesIO(json_content),
219
+ path_in_repo=caption_filename,
220
+ repo_id=HF_OUTPUT_DATASET_ID,
221
+ repo_type="dataset",
222
+ commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
223
+ )
224
+
225
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
226
+ return True
227
+
228
+ except Exception as e:
229
+ print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
230
+ return False
231
+
232
+ # --- Core Processing Functions (Modified) ---
233
+
234
+ async def get_available_server(timeout: float = 300.0) -> CaptionServer:
235
+ """Round-robin selection of an available caption server."""
236
+ global server_index
237
+ start_time = time.time()
238
+ while True:
239
+ # Round-robin check for an available server
240
+ for _ in range(len(servers)):
241
+ server = servers[server_index]
242
+ server_index = (server_index + 1) % len(servers)
243
+ if not server.busy:
244
+ return server
245
+
246
+ # If all servers are busy, wait for a short period and check again
247
+ await asyncio.sleep(0.5)
248
+
249
+ # Check if timeout has been reached
250
+ if time.time() - start_time > timeout:
251
+ raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.")
252
+
253
+ async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
254
+ """Sends a single image to a caption server for processing."""
255
+ # This function now handles server selection and retries internally
256
+ MAX_RETRIES = 3
257
+ for attempt in range(MAX_RETRIES):
258
+ server = None
259
+ try:
260
+ # 1. Get an available server (will wait if all are busy, with a timeout)
261
+ server = await get_available_server()
262
+ server.busy = True
263
+ start_time = time.time()
264
+
265
+ # Print a less verbose message only on the first attempt
266
+ if attempt == 0:
267
+ print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...")
268
+
269
+ # 2. Prepare request data
270
+ form_data = aiohttp.FormData()
271
+ form_data.add_field('file',
272
+ image_path.open('rb'),
273
+ filename=image_path.name,
274
+ content_type='image/jpeg')
275
+ form_data.add_field('model_choice', MODEL_TYPE)
276
+
277
+ # 3. Send request
278
+ async with aiohttp.ClientSession() as session:
279
+ # Increased timeout to 10 minutes (600s) as requested by user's problem description
280
+ async with session.post(server.url, data=form_data, timeout=600) as resp:
281
+ if resp.status == 200:
282
+ result = await resp.json()
283
+ caption = result.get("caption")
284
+
285
+ if caption:
286
+ # Update progress counter
287
+ progress_tracker['completed'] += 1
288
+ if progress_tracker['completed'] % 50 == 0:
289
+ print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.")
290
+
291
+ # Log success only if it's not a progress report interval
292
+ if progress_tracker['completed'] % 50 != 0:
293
+ print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}")
294
+
295
+ return {
296
+ "course": course_name,
297
+ "image_path": image_path.name,
298
+ "caption": caption,
299
+ "timestamp": datetime.now().isoformat()
300
+ }
301
+ else:
302
+ print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...")
303
+ continue # Retry with a different server
304
+ else:
305
+ error_text = await resp.text()
306
+ print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...")
307
+ continue # Retry with a different server
308
+
309
+ except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
310
+ print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
311
+ continue # Retry with a different server
312
+ except Exception as e:
313
+ print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...")
314
+ continue # Retry with a different server
315
+ finally:
316
+ if server:
317
+ end_time = time.time()
318
+ server.busy = False
319
+ server.total_processed += 1
320
+ server.total_time += (end_time - start_time)
321
+
322
+ print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
323
+ return None
324
+
325
+ async def process_dataset_task(start_index: int):
326
+ """Main task to process the dataset sequentially starting from a given index."""
327
+
328
+ progress = load_progress()
329
+ file_list = await get_zip_file_list(progress)
330
+
331
+ if not file_list:
332
+ print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.")
333
+ return False
334
+
335
+ # Ensure start_index is within bounds
336
+ if start_index > len(file_list):
337
+ print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.")
338
+ return True
339
+
340
+ # Determine the actual starting index in the 0-indexed list
341
+ start_list_index = start_index - 1
342
+
343
+ print(f"[{FLOW_ID}] Starting dataset processing from file index: {start_index} out of {len(file_list)}.")
344
+
345
+ global_success = True
346
+
347
+ for i in range(start_list_index, len(file_list)):
348
+ file_index = i + 1 # 1-indexed for user display and progress tracking
349
+ repo_file_full_path = file_list[i]
350
+ zip_full_name = Path(repo_file_full_path).name
351
+ course_name = zip_full_name.replace('.zip', '') # Use the file name as the course/job name
352
+
353
+ # Check if the file has already been successfully processed
354
+ if str(file_index) in progress['processed_files']:
355
+ print(f"[{FLOW_ID}] Skipping file #{file_index} ({zip_full_name}): Already processed according to progress file.")
356
+ progress['last_processed_index'] = file_index
357
+ save_progress(progress)
358
+ continue
359
+
360
+ extract_dir = None
361
+ current_file_success = False
362
+
363
+ try:
364
+ # 1. Download and Extract
365
+ extract_dir = await download_and_extract_zip_by_index(file_index, repo_file_full_path)
366
+
367
+ if not extract_dir:
368
+ raise Exception("Failed to download or extract zip file.")
369
+
370
+ # 2. Find Images
371
+ # Use recursive glob to find images in subdirectories
372
+ image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
373
+ print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
374
+
375
+ if not image_paths:
376
+ print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
377
+ current_file_success = True
378
+ else:
379
+ # 3. Process Images (Captioning)
380
+ progress_tracker = {
381
+ 'total': len(image_paths),
382
+ 'completed': 0
383
+ }
384
+ print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
385
+
386
+ # Create a semaphore to limit concurrent tasks to the number of available servers
387
+ semaphore = asyncio.Semaphore(len(servers))
388
+
389
+ async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
390
+ async with semaphore:
391
+ return await send_image_for_captioning(image_path, course_name, progress_tracker)
392
+
393
+ # Create a list of tasks for parallel captioning
394
+ caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths]
395
+
396
+ # Run all captioning tasks concurrently
397
+ results = await asyncio.gather(*caption_tasks)
398
+
399
+ # Filter out failed results
400
+ all_captions = [r for r in results if r is not None]
401
+
402
+ # Final progress report for the current file
403
+ if len(all_captions) == len(image_paths):
404
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
405
+ current_file_success = True
406
+ else:
407
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions. Marking as partial failure.")
408
+ current_file_success = False
409
+
410
+ # 4. Upload Results
411
+ if all_captions:
412
+ print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
413
+ if await upload_captions_to_hf(zip_full_name, all_captions):
414
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
415
+ # Partial success in captioning is still a success for the upload step
416
+ pass
417
+ else:
418
+ print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
419
+ current_file_success = False
420
+ else:
421
+ print(f"[{FLOW_ID}] No captions generated. Skipping upload for {zip_full_name}.")
422
+ current_file_success = False
423
+
424
+ except Exception as e:
425
+ print(f"[{FLOW_ID}] Critical error in process_dataset_task for file #{file_index} ({zip_full_name}): {e}")
426
+ current_file_success = False
427
+ global_success = False # Mark overall task as failed if any file fails critically
428
+
429
+ finally:
430
+ # 5. Cleanup and Update Progress
431
+ if extract_dir and extract_dir.exists():
432
+ print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
433
+ shutil.rmtree(extract_dir, ignore_errors=True)
434
+
435
+ if current_file_success:
436
+ # Update progress only on successful completion of the file
437
+ progress['last_processed_index'] = file_index
438
+ progress['processed_files'][str(file_index)] = repo_file_full_path
439
+ save_progress(progress)
440
+ print(f"[{FLOW_ID}] Progress saved: File #{file_index} marked as processed.")
441
+ else:
442
+ # If a file fails, we stop the continuous loop to allow for manual intervention or a fresh start
443
+ print(f"[{FLOW_ID}] File #{file_index} failed. Stopping continuous processing.")
444
+ global_success = False
445
+ break
446
+
447
+ print(f"[{FLOW_ID}] All processing loops complete. Overall success: {global_success}")
448
+ return global_success
449
+
450
+ # --- FastAPI App and Endpoints ---
451
+
452
+ app = FastAPI(
453
+ title=f"Flow Server {FLOW_ID} API",
454
+ description="Sequentially processes zip files from a dataset, captions images, and tracks progress.",
455
+ version="1.0.0"
456
+ )
457
+
458
+ @app.on_event("startup")
459
+ async def startup_event():
460
+ print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.")
461
+
462
+ # Automatically start the processing task
463
+ progress = load_progress()
464
+ # Start from the last processed index + 1, or the hardcoded AUTO_START_INDEX if the progress file is new/empty
465
+ start_index = progress.get('last_processed_index', 0) + 1
466
+ if start_index < AUTO_START_INDEX:
467
+ start_index = AUTO_START_INDEX
468
+
469
+ # Use a dummy BackgroundTasks object for the startup task
470
+ # Note: FastAPI's startup events can't directly use BackgroundTasks, but we can use asyncio.create_task
471
+ # to run the long-running process in the background without blocking the server startup.
472
+ print(f"[{FLOW_ID}] Auto-starting processing from index: {start_index}...")
473
+ asyncio.create_task(process_dataset_task(start_index))
474
+
475
+ @app.get("/")
476
+ async def root():
477
+ progress = load_progress()
478
+ return {
479
+ "flow_id": FLOW_ID,
480
+ "status": "ready",
481
+ "last_processed_index": progress['last_processed_index'],
482
+ "total_files_in_list": len(progress['file_list']),
483
+ "processed_files_count": len(progress['processed_files']),
484
+ "total_servers": len(servers),
485
+ "busy_servers": sum(1 for s in servers if s.busy),
486
+ }
487
+
488
+ @app.post("/start_processing")
489
+ async def start_processing(request: ProcessStartRequest, background_tasks: BackgroundTasks):
490
+ """
491
+ Starts the sequential processing of zip files from the given index in the background.
492
+ """
493
+ start_index = request.start_index
494
+
495
+ print(f"[{FLOW_ID}] Received request to start processing from index: {start_index}. Starting background task.")
496
+
497
+ # Start the heavy processing in a background task so the API call returns immediately
498
+ # Note: The server is already auto-starting, but this allows for manual restart/override.
499
+ background_tasks.add_task(process_dataset_task, start_index)
500
+
501
+ return {"status": "processing", "start_index": start_index, "message": "Dataset processing started in background."}
502
+
503
+ if __name__ == "__main__":
504
+ import uvicorn
505
+ # Note: When running in the sandbox, we need to use 0.0.0.0 to expose the port.
506
+ uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)
requirements.txt ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ fastapi==0.104.1
2
+ uvicorn==0.24.0
3
+ aiofiles==23.2.1
4
+ python-multipart==0.0.6
5
+ huggingface-hub==0.18.0
6
+ aiohttp
7
+ jinja2
8
+ pydantic
9
+ datasets