Fred808 commited on
Commit
f6601b5
·
verified ·
1 Parent(s): f5f8b6b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +465 -474
app.py CHANGED
@@ -1,475 +1,466 @@
1
- import os
2
- import json
3
- import time
4
- import asyncio
5
- import aiohttp
6
- import zipfile
7
- from typing import Dict, List, Set, Optional
8
- from urllib.parse import quote
9
- from datetime import datetime
10
- from pathlib import Path
11
- import io
12
-
13
- from fastapi import FastAPI, BackgroundTasks, HTTPException, status
14
- from pydantic import BaseModel, Field
15
- from huggingface_hub import HfApi, hf_hub_download
16
- import uvicorn
17
-
18
- # --- Configuration ---
19
- # Flow Server ID and Port will be set via environment variables for easy deployment
20
- FLOW_ID = os.getenv("FLOW_ID", "flow_default")
21
- FLOW_PORT = int(os.getenv("FLOW_PORT", 8001)) # Default to 8001 for flow1
22
-
23
- # Manager Server Configuration
24
- MANAGER_URL = os.getenv("MANAGER_URL", "https://fred808-fcord.hf.space")
25
- MANAGER_COMPLETE_TASK_URL = f"{MANAGER_URL}/task/complete"
26
-
27
- # Hugging Face Configuration
28
- HF_TOKEN = os.getenv("HF_TOKEN", "") # User provided token
29
- HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3")
30
- HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") # Target dataset for captions
31
-
32
- # Using the full list from the user's original code for actual deployment
33
- CAPTION_SERVERS = [
34
- "https://fred808-pil-4-1.hf.space/analyze",
35
- "https://fred808-pil-4-2.hf.space/analyze",
36
- "https://fred808-pil-4-3.hf.space/analyze",
37
- "https://fred1012-fred1012-gw0j2h.hf.space/analyze",
38
- "https://fred1012-fred1012-wqs6c2.hf.space/analyze",
39
- "https://fred1012-fred1012-oncray.hf.space/analyze",
40
- "https://fred1012-fred1012-4goge7.hf.space/analyze",
41
- "https://fred1012-fred1012-z0eh7m.hf.space/analyze",
42
- "https://fred1012-fred1012-u95rte.hf.space/analyze",
43
- "https://fred1012-fred1012-igje22.hf.space/analyze",
44
- "https://fred1012-fred1012-ibkuf8.hf.space/analyze",
45
- "https://fred1012-fred1012-nwqthy.hf.space/analyze",
46
- "https://fred1012-fred1012-4ldqj4.hf.space/analyze",
47
- "https://fred1012-fred1012-pivlzg.hf.space/analyze",
48
- "https://fred1012-fred1012-ptlc5u.hf.space/analyze",
49
- "https://fred1012-fred1012-u7lh57.hf.space/analyze",
50
- "https://fred1012-fred1012-q8djv1.hf.space/analyze",
51
- "https://fredalone-fredalone-ozugrp.hf.space/analyze",
52
- "https://fredalone-fredalone-9brxj2.hf.space/analyze",
53
- "https://fredalone-fredalone-p8vq9a.hf.space/analyze",
54
- "https://fredalone-fredalone-vbli2y.hf.space/analyze",
55
- "https://fredalone-fredalone-uggger.hf.space/analyze",
56
- "https://fredalone-fredalone-nmi7e8.hf.space/analyze",
57
- "https://fredalone-fredalone-d1f26d.hf.space/analyze",
58
- "https://fredalone-fredalone-461jp2.hf.space/analyze",
59
- "https://fredalone-fredalone-3enfg4.hf.space/analyze",
60
- "https://fredalone-fredalone-dqdbpv.hf.space/analyze",
61
- "https://fredalone-fredalone-ivtjua.hf.space/analyze",
62
- "https://fredalone-fredalone-6bezt2.hf.space/analyze",
63
- "https://fredalone-fredalone-e0wfnk.hf.space/analyze",
64
- "https://fredalone-fredalone-zu2t7j.hf.space/analyze",
65
- "https://fredalone-fredalone-dqtv1o.hf.space/analyze",
66
- "https://fredalone-fredalone-wclyog.hf.space/analyze",
67
- "https://fredalone-fredalone-t27vig.hf.space/analyze",
68
- "https://fredalone-fredalone-gahbxh.hf.space/analyze",
69
- "https://fredalone-fredalone-kw2po4.hf.space/analyze",
70
- "https://fredalone-fredalone-8h285h.hf.space/analyze"
71
- ]
72
- MODEL_TYPE = "Florence-2-large"
73
-
74
- # Temporary storage for images
75
- TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
76
- TEMP_DIR.mkdir(exist_ok=True)
77
-
78
- # --- Models ---
79
- class ProcessCourseRequest(BaseModel):
80
- course_name: Optional[str] = None
81
-
82
- class CaptionServer:
83
- def __init__(self, url):
84
- self.url = url
85
- self.busy = False
86
- self.total_processed = 0
87
- self.total_time = 0
88
- self.model = MODEL_TYPE
89
-
90
- @property
91
- def fps(self):
92
- return self.total_processed / self.total_time if self.total_time > 0 else 0
93
-
94
- # Global state for caption servers
95
- servers = [CaptionServer(url) for url in CAPTION_SERVERS]
96
- server_index = 0
97
-
98
- # --- Core Processing Functions ---
99
-
100
- async def get_available_server(timeout: float = 300.0) -> CaptionServer:
101
- """Round-robin selection of an available caption server."""
102
- global server_index
103
- start_time = time.time()
104
- while True:
105
- # Round-robin check for an available server
106
- for _ in range(len(servers)):
107
- server = servers[server_index]
108
- server_index = (server_index + 1) % len(servers)
109
- if not server.busy:
110
- return server
111
-
112
- # If all servers are busy, wait for a short period and check again
113
- await asyncio.sleep(0.5)
114
-
115
- # Check if timeout has been reached
116
- if time.time() - start_time > timeout:
117
- raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.")
118
-
119
- async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
120
- """Sends a single image to a caption server for processing."""
121
- # This function now handles server selection and retries internally
122
- MAX_RETRIES = 3
123
- for attempt in range(MAX_RETRIES):
124
- server = None
125
- try:
126
- # 1. Get an available server (will wait if all are busy, with a timeout)
127
- server = await get_available_server()
128
- server.busy = True
129
- start_time = time.time()
130
-
131
- # Print a less verbose message only on the first attempt
132
- if attempt == 0:
133
- print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...")
134
-
135
- # 2. Prepare request data
136
- form_data = aiohttp.FormData()
137
- form_data.add_field('file',
138
- image_path.open('rb'),
139
- filename=image_path.name,
140
- content_type='image/jpeg')
141
- form_data.add_field('model_choice', MODEL_TYPE)
142
-
143
- # 3. Send request
144
- async with aiohttp.ClientSession() as session:
145
- # Increased timeout to 10 minutes (600s) as requested by user's problem description
146
- async with session.post(server.url, data=form_data, timeout=600) as resp:
147
- if resp.status == 200:
148
- result = await resp.json()
149
- caption = result.get("caption")
150
-
151
- if caption:
152
- # Update progress counter
153
- progress_tracker['completed'] += 1
154
- if progress_tracker['completed'] % 50 == 0:
155
- print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.")
156
-
157
- # Log success only if it's not a progress report interval
158
- if progress_tracker['completed'] % 50 != 0:
159
- print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}")
160
-
161
- return {
162
- "course": course_name,
163
- "image_path": image_path.name,
164
- "caption": caption,
165
- "timestamp": datetime.now().isoformat()
166
- }
167
- else:
168
- print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...")
169
- continue # Retry with a different server
170
- else:
171
- error_text = await resp.text()
172
- print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...")
173
- continue # Retry with a different server
174
-
175
- except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
176
- print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
177
- continue # Retry with a different server
178
- except Exception as e:
179
- print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...")
180
- continue # Retry with a different server
181
- finally:
182
- if server:
183
- end_time = time.time()
184
- server.busy = False
185
- server.total_processed += 1
186
- server.total_time += (end_time - start_time)
187
-
188
- print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
189
- return None
190
-
191
- async def download_and_extract_zip(course_name: str, processed_files: Set[str]) -> Optional[tuple[Path, str, str]]:
192
- """Downloads the zip file for the course and extracts its contents."""
193
- print(f"[{FLOW_ID}] Looking for files starting with '{course_name}' in frames/ directory...")
194
-
195
- try:
196
- api = HfApi(token=HF_TOKEN)
197
-
198
- # List all files in the frames directory
199
- repo_files = api.list_repo_files(
200
- repo_id=HF_DATASET_ID,
201
- repo_type="dataset"
202
- )
203
-
204
- # Find zip files that start with the course name
205
- matching_files = [
206
- f for f in repo_files
207
- if f.startswith(f"frames/{course_name}") and f.endswith('.zip')
208
- ]
209
-
210
- if not matching_files:
211
- print(f"[{FLOW_ID}] No zip files found starting with '{course_name}' in frames/ directory.")
212
- return None, None
213
-
214
- # Filter out already processed files and select the first one
215
- unprocessed_files = [f for f in matching_files if f not in processed_files]
216
-
217
- if not unprocessed_files:
218
- print(f"[{FLOW_ID}] No new zip files found for '{course_name}'.")
219
- return None, None, None
220
-
221
- repo_file_full_path = unprocessed_files[0] # e.g., frames/DAREEFSA_full_name.zip
222
-
223
- # Extract the full file name from the path (e.g., DAREEFSA_full_name.zip)
224
- zip_full_name = Path(repo_file_full_path).name
225
- print(f"[{FLOW_ID}] Found new matching file: {repo_file_full_path}. Full name: {zip_full_name}")
226
-
227
- # Use hf_hub_download to get the file path
228
- zip_path = hf_hub_download(
229
- repo_id=HF_DATASET_ID,
230
- filename=repo_file_full_path, # Use the full path in the repo
231
- repo_type="dataset",
232
- token=HF_TOKEN,
233
- )
234
-
235
- print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
236
-
237
- # Create a temporary directory for extraction
238
- extract_dir = TEMP_DIR / course_name
239
- extract_dir.mkdir(exist_ok=True)
240
-
241
- with zipfile.ZipFile(zip_path, 'r') as zip_ref:
242
- zip_ref.extractall(extract_dir)
243
-
244
- print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
245
-
246
- # Return the extraction directory, the full zip file name, and the repo path
247
- return extract_dir, zip_full_name, repo_file_full_path
248
-
249
- except Exception as e:
250
- print(f"[{FLOW_ID}] Error downloading or extracting zip for {course_name}: {e}")
251
- return None, None, None
252
-
253
- async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
254
- """Uploads the final captions JSON file to the output dataset.
255
-
256
- The user requested the output JSON file to be named after the full zip file name.
257
- """
258
- # Use the full zip name, replacing the extension with .json
259
- caption_filename = Path(zip_full_name).with_suffix('.json').name
260
-
261
- try:
262
- print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
263
-
264
- # Create JSON content in memory
265
- json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
266
-
267
- api = HfApi(token=HF_TOKEN)
268
- api.upload_file(
269
- path_or_fileobj=io.BytesIO(json_content),
270
- path_in_repo=caption_filename,
271
- repo_id=HF_OUTPUT_DATASET_ID,
272
- repo_type="dataset",
273
- commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
274
- )
275
-
276
- print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
277
- return True
278
-
279
- except Exception as e:
280
- print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
281
- return False
282
-
283
- async def process_course_task(course_name: str):
284
- """Main task to process a single course, looping until all files are processed."""
285
- print(f"[{FLOW_ID}] Starting continuous processing for course: {course_name}")
286
-
287
- processed_files = set()
288
- all_processed_files_log = []
289
- global_success = True
290
-
291
- # Loop to continuously check for new files matching the course_name prefix
292
- while True:
293
- extract_dir = None
294
- zip_full_name = None
295
- repo_file_full_path = None
296
-
297
- try:
298
- # download_and_extract_zip now returns a tuple: (extract_dir, zip_full_name, repo_file_full_path)
299
- download_result = await download_and_extract_zip(course_name, processed_files)
300
-
301
- if download_result is None or download_result[0] is None:
302
- # No new files found, or an error occurred during search/download
303
- if download_result is not None and download_result[0] is None and download_result[1] is None:
304
- print(f"[{FLOW_ID}] No new files found for {course_name}. Exiting loop.")
305
- break
306
- else:
307
- # An error occurred during search/download
308
- raise Exception("Failed to download or extract zip file.")
309
-
310
- extract_dir, zip_full_name, repo_file_full_path = download_result
311
-
312
- # Add the file to the processed set immediately to avoid re-processing in the next loop
313
- processed_files.add(repo_file_full_path)
314
- all_processed_files_log.append(repo_file_full_path)
315
-
316
- # --- Start Processing the single file ---
317
-
318
- # FIX: Use recursive glob to find images in subdirectories
319
- image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
320
- print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
321
-
322
- current_file_success = False
323
-
324
- if not image_paths:
325
- print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
326
- current_file_success = True
327
- else:
328
- # Initialize progress tracker
329
- progress_tracker = {
330
- 'total': len(image_paths),
331
- 'completed': 0
332
- }
333
- print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
334
-
335
- # Create a semaphore to limit concurrent tasks to the number of available servers
336
- semaphore = asyncio.Semaphore(len(servers))
337
-
338
- async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
339
- async with semaphore:
340
- return await send_image_for_captioning(image_path, course_name, progress_tracker)
341
-
342
- # Create a list of tasks for parallel captioning
343
- caption_tasks = []
344
- for image_path in image_paths:
345
- caption_tasks.append(limited_send_image_for_captioning(image_path, course_name, progress_tracker))
346
-
347
- # Run all captioning tasks concurrently
348
- results = await asyncio.gather(*caption_tasks)
349
-
350
- # Filter out failed results
351
- all_captions = [r for r in results if r is not None]
352
-
353
- # Final progress report for the current file
354
- if len(all_captions) == len(image_paths):
355
- print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
356
- current_file_success = True
357
- else:
358
- print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions.")
359
- current_file_success = False
360
-
361
- # Upload results
362
- if all_captions and zip_full_name:
363
- # Use the full zip file name for the upload as requested
364
- print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
365
- if await upload_captions_to_hf(zip_full_name, all_captions):
366
- print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
367
- # If partial success, we still upload, but the overall task is marked as failure if any file failed
368
- if not current_file_success:
369
- global_success = False
370
- else:
371
- print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
372
- current_file_success = False
373
- global_success = False
374
- else:
375
- print(f"[{FLOW_ID}] No captions generated or zip_full_name is missing. Skipping upload for {zip_full_name}.")
376
- current_file_success = False
377
- global_success = False
378
-
379
- # --- End Processing the single file ---
380
-
381
- except Exception as e:
382
- error_message = str(e)
383
- print(f"[{FLOW_ID}] Critical error in process_course_task for {course_name}: {error_message}")
384
- global_success = False
385
-
386
- finally:
387
- # Cleanup temporary files for the current file
388
- if extract_dir and extract_dir.exists():
389
- print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
390
- import shutil
391
- shutil.rmtree(extract_dir, ignore_errors=True)
392
-
393
- # If an unrecoverable error occurred (e.g., during search/download), break the loop
394
- if download_result is None and extract_dir is None:
395
- break
396
-
397
- # --- Final Report after the loop is complete ---
398
- print(f"[{FLOW_ID}] All processing loops complete for {course_name}.")
399
- print(f"[{FLOW_ID}] Total files processed: {len(all_processed_files_log)}")
400
- print(f"[{FLOW_ID}] List of processed files: {all_processed_files_log}")
401
-
402
- # Report completion to manager
403
- final_error_message = error_message if not global_success else None
404
- # Assuming report_completion exists and is an async function
405
- # await report_completion(course_name, global_success, final_error_message)
406
-
407
- return global_success
408
-
409
- async def report_completion(course_name: str, success: bool, error_message: Optional[str] = None):
410
- """Reports the task result back to the Manager Server."""
411
- print(f"[{FLOW_ID}] Reporting completion for {course_name} (Success: {success})...")
412
-
413
- payload = {
414
- "flow_id": FLOW_ID,
415
- "course_name": course_name,
416
- "success": success,
417
- "error_message": error_message
418
- }
419
-
420
- try:
421
- async with aiohttp.ClientSession() as session:
422
- async with session.post(MANAGER_COMPLETE_TASK_URL, json=payload) as resp:
423
- if resp.status != 200:
424
- print(f"[{FLOW_ID}] ERROR: Manager reported non-200 status: {resp.status} - {await resp.text()}")
425
- else:
426
- print(f"[{FLOW_ID}] Successfully reported completion to Manager.")
427
-
428
- except aiohttp.ClientError as e:
429
- print(f"[{FLOW_ID}] CRITICAL ERROR: Could not connect to Manager at {MANAGER_COMPLETE_TASK_URL}. Task completion not reported. Error: {e}")
430
- except Exception as e:
431
- print(f"[{FLOW_ID}] Unexpected error during reporting: {e}")
432
-
433
- # --- FastAPI App and Endpoints ---
434
-
435
- app = FastAPI(
436
- title=f"Flow Server {FLOW_ID} API",
437
- description="Fetches, extracts, and captions images for a given course.",
438
- version="1.0.0"
439
- )
440
-
441
- @app.on_event("startup")
442
- async def startup_event():
443
- print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}. Manager URL: {MANAGER_URL}")
444
-
445
- @app.get("/")
446
- async def root():
447
- return {
448
- "flow_id": FLOW_ID,
449
- "status": "ready",
450
- "manager_url": MANAGER_URL,
451
- "total_servers": len(servers),
452
- "busy_servers": sum(1 for s in servers if s.busy),
453
- }
454
-
455
- @app.post("/process_course")
456
- async def process_course(request: ProcessCourseRequest, background_tasks: BackgroundTasks):
457
- """
458
- Receives a course name from the Manager and starts processing in the background.
459
- """
460
- course_name = request.course_name
461
-
462
- if not course_name:
463
- print(f"[{FLOW_ID}] Received empty course name. Stopping processing loop.")
464
- return {"status": "stopped", "message": "No more courses to process."}
465
-
466
- print(f"[{FLOW_ID}] Received course: {course_name}. Starting background task.")
467
-
468
- # Start the heavy processing in a background task so the API call returns immediately
469
- background_tasks.add_task(process_course_task, course_name)
470
-
471
- return {"status": "processing", "course_name": course_name, "message": "Processing started in background."}
472
-
473
- if __name__ == "__main__":
474
- # Note: When running in the sandbox, we need to use 0.0.0.0 to expose the port.
475
  uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)
 
1
+ import os
2
+ import json
3
+ import time
4
+ import asyncio
5
+ import aiohttp
6
+ import zipfile
7
+ from typing import Dict, List, Set, Optional
8
+ from urllib.parse import quote
9
+ from datetime import datetime
10
+ from pathlib import Path
11
+ import io
12
+
13
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status
14
+ from pydantic import BaseModel, Field
15
+ from huggingface_hub import HfApi, hf_hub_download
16
+ import uvicorn
17
+
18
+ # --- Configuration ---
19
+ # Flow Server ID and Port will be set via environment variables for easy deployment
20
+ FLOW_ID = os.getenv("FLOW_ID", "flow_default")
21
+ FLOW_PORT = int(os.getenv("FLOW_PORT", 8001)) # Default to 8001 for flow1
22
+
23
+ # Manager Server Configuration
24
+ MANAGER_URL = os.getenv("MANAGER_URL", "https://fred808-fcord.hf.space")
25
+ MANAGER_COMPLETE_TASK_URL = f"{MANAGER_URL}/task/complete"
26
+
27
+ # Hugging Face Configuration
28
+ HF_TOKEN = os.getenv("HF_TOKEN", "") # User provided token
29
+ HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3")
30
+ HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium") # Target dataset for captions
31
+
32
+ # Using the full list from the user's original code for actual deployment
33
+ CAPTION_SERVERS = [
34
+ "https://favoredone-favoredone-tv88mp.hf.space/analyze",
35
+ "https://favoredone-favoredone-7p1dcf.hf.space/analyze",
36
+ "https://favoredone-favoredone-k7b4mf.hf.space/analyze",
37
+ "https://favoredone-favoredone-mzlxc7.hf.space/analyze",
38
+ "https://favoredone-favoredone-aomfwa.hf.space/analyze",
39
+ "https://favoredone-favoredone-7g6v04.hf.space/analyze",
40
+ "https://favoredone-favoredone-dk1skh.hf.space/analyze",
41
+ "https://favoredone-favoredone-z4yo0y.hf.space/analyze",
42
+ "https://favoredone-favoredone-f6czeq.hf.space/analyze",
43
+ "https://favoredone-favoredone-5fo8ga.hf.space/analyze",
44
+ "https://favoredone-favoredone-zde8x6.hf.space/analyze",
45
+ "https://favoredone-favoredone-r0biih.hf.space/analyze",
46
+ "https://favoredone-favoredone-ljdzkf.hf.space/analyze",
47
+ "https://favoredone-favoredone-irrpe5.hf.space/analyze",
48
+ "https://favoredone-favoredone-bh9rwz.hf.space/analyze",
49
+ "https://favoredone-favoredone-u8c4dt.hf.space/analyze",
50
+ "https://favoredone-favoredone-futwyd.hf.space/analyze",
51
+ "https://favoredone-favoredone-hg2sot.hf.space/analyze",
52
+ "https://favoredone-favoredone-pvweug.hf.space/analyze",
53
+ "https://favoredone-favoredone-z6azk2.hf.space/analyze",
54
+ "https://favoredone-favoredone-4zid9w.hf.space/analyze",
55
+ "https://favoredone-favoredone-be7a1r.hf.space/analyze",
56
+ "https://favoredone-favoredone-ayazxa.hf.space/analyze",
57
+ "https://favoredone-favoredone-6ckj4m.hf.space/analyze",
58
+ "https://favoredone-favoredone-whn0xu.hf.space/analyze",
59
+ "https://favoredone-favoredone-t49exm.hf.space/analyze",
60
+ "https://favoredone-favoredone-cgrh0a.hf.space/analyze",
61
+ "https://favoredone-favoredone-r1kb5g.hf.space/analyze"
62
+ ]
63
+ MODEL_TYPE = "Florence-2-large"
64
+
65
+ # Temporary storage for images
66
+ TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
67
+ TEMP_DIR.mkdir(exist_ok=True)
68
+
69
+ # --- Models ---
70
+ class ProcessCourseRequest(BaseModel):
71
+ course_name: Optional[str] = None
72
+
73
+ class CaptionServer:
74
+ def __init__(self, url):
75
+ self.url = url
76
+ self.busy = False
77
+ self.total_processed = 0
78
+ self.total_time = 0
79
+ self.model = MODEL_TYPE
80
+
81
+ @property
82
+ def fps(self):
83
+ return self.total_processed / self.total_time if self.total_time > 0 else 0
84
+
85
+ # Global state for caption servers
86
+ servers = [CaptionServer(url) for url in CAPTION_SERVERS]
87
+ server_index = 0
88
+
89
+ # --- Core Processing Functions ---
90
+
91
+ async def get_available_server(timeout: float = 300.0) -> CaptionServer:
92
+ """Round-robin selection of an available caption server."""
93
+ global server_index
94
+ start_time = time.time()
95
+ while True:
96
+ # Round-robin check for an available server
97
+ for _ in range(len(servers)):
98
+ server = servers[server_index]
99
+ server_index = (server_index + 1) % len(servers)
100
+ if not server.busy:
101
+ return server
102
+
103
+ # If all servers are busy, wait for a short period and check again
104
+ await asyncio.sleep(0.5)
105
+
106
+ # Check if timeout has been reached
107
+ if time.time() - start_time > timeout:
108
+ raise TimeoutError(f"Timeout ({timeout}s) waiting for an available caption server.")
109
+
110
+ async def send_image_for_captioning(image_path: Path, course_name: str, progress_tracker: Dict) -> Optional[Dict]:
111
+ """Sends a single image to a caption server for processing."""
112
+ # This function now handles server selection and retries internally
113
+ MAX_RETRIES = 3
114
+ for attempt in range(MAX_RETRIES):
115
+ server = None
116
+ try:
117
+ # 1. Get an available server (will wait if all are busy, with a timeout)
118
+ server = await get_available_server()
119
+ server.busy = True
120
+ start_time = time.time()
121
+
122
+ # Print a less verbose message only on the first attempt
123
+ if attempt == 0:
124
+ print(f"[{FLOW_ID}] Starting attempt on {image_path.name}...")
125
+
126
+ # 2. Prepare request data
127
+ form_data = aiohttp.FormData()
128
+ form_data.add_field('file',
129
+ image_path.open('rb'),
130
+ filename=image_path.name,
131
+ content_type='image/jpeg')
132
+ form_data.add_field('model_choice', MODEL_TYPE)
133
+
134
+ # 3. Send request
135
+ async with aiohttp.ClientSession() as session:
136
+ # Increased timeout to 10 minutes (600s) as requested by user's problem description
137
+ async with session.post(server.url, data=form_data, timeout=600) as resp:
138
+ if resp.status == 200:
139
+ result = await resp.json()
140
+ caption = result.get("caption")
141
+
142
+ if caption:
143
+ # Update progress counter
144
+ progress_tracker['completed'] += 1
145
+ if progress_tracker['completed'] % 50 == 0:
146
+ print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.")
147
+
148
+ # Log success only if it's not a progress report interval
149
+ if progress_tracker['completed'] % 50 != 0:
150
+ print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}")
151
+
152
+ return {
153
+ "course": course_name,
154
+ "image_path": image_path.name,
155
+ "caption": caption,
156
+ "timestamp": datetime.now().isoformat()
157
+ }
158
+ else:
159
+ print(f"[{FLOW_ID}] Server {server.url} returned success but no caption for {image_path.name}. Retrying...")
160
+ continue # Retry with a different server
161
+ else:
162
+ error_text = await resp.text()
163
+ print(f"[{FLOW_ID}] Error from server {server.url} for {image_path.name}: {resp.status} - {error_text}. Retrying...")
164
+ continue # Retry with a different server
165
+
166
+ except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
167
+ print(f"[{FLOW_ID}] Connection/Timeout error for {image_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
168
+ continue # Retry with a different server
169
+ except Exception as e:
170
+ print(f"[{FLOW_ID}] Unexpected error during captioning for {image_path.name}: {e}. Retrying...")
171
+ continue # Retry with a different server
172
+ finally:
173
+ if server:
174
+ end_time = time.time()
175
+ server.busy = False
176
+ server.total_processed += 1
177
+ server.total_time += (end_time - start_time)
178
+
179
+ print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
180
+ return None
181
+
182
+ async def download_and_extract_zip(course_name: str, processed_files: Set[str]) -> Optional[tuple[Path, str, str]]:
183
+ """Downloads the zip file for the course and extracts its contents."""
184
+ print(f"[{FLOW_ID}] Looking for files starting with '{course_name}' in frames/ directory...")
185
+
186
+ try:
187
+ api = HfApi(token=HF_TOKEN)
188
+
189
+ # List all files in the frames directory
190
+ repo_files = api.list_repo_files(
191
+ repo_id=HF_DATASET_ID,
192
+ repo_type="dataset"
193
+ )
194
+
195
+ # Find zip files that start with the course name
196
+ matching_files = [
197
+ f for f in repo_files
198
+ if f.startswith(f"frames/{course_name}") and f.endswith('.zip')
199
+ ]
200
+
201
+ if not matching_files:
202
+ print(f"[{FLOW_ID}] No zip files found starting with '{course_name}' in frames/ directory.")
203
+ return None, None
204
+
205
+ # Filter out already processed files and select the first one
206
+ unprocessed_files = [f for f in matching_files if f not in processed_files]
207
+
208
+ if not unprocessed_files:
209
+ print(f"[{FLOW_ID}] No new zip files found for '{course_name}'.")
210
+ return None, None, None
211
+
212
+ repo_file_full_path = unprocessed_files[0] # e.g., frames/DAREEFSA_full_name.zip
213
+
214
+ # Extract the full file name from the path (e.g., DAREEFSA_full_name.zip)
215
+ zip_full_name = Path(repo_file_full_path).name
216
+ print(f"[{FLOW_ID}] Found new matching file: {repo_file_full_path}. Full name: {zip_full_name}")
217
+
218
+ # Use hf_hub_download to get the file path
219
+ zip_path = hf_hub_download(
220
+ repo_id=HF_DATASET_ID,
221
+ filename=repo_file_full_path, # Use the full path in the repo
222
+ repo_type="dataset",
223
+ token=HF_TOKEN,
224
+ )
225
+
226
+ print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
227
+
228
+ # Create a temporary directory for extraction
229
+ extract_dir = TEMP_DIR / course_name
230
+ extract_dir.mkdir(exist_ok=True)
231
+
232
+ with zipfile.ZipFile(zip_path, 'r') as zip_ref:
233
+ zip_ref.extractall(extract_dir)
234
+
235
+ print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
236
+
237
+ # Return the extraction directory, the full zip file name, and the repo path
238
+ return extract_dir, zip_full_name, repo_file_full_path
239
+
240
+ except Exception as e:
241
+ print(f"[{FLOW_ID}] Error downloading or extracting zip for {course_name}: {e}")
242
+ return None, None, None
243
+
244
+ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
245
+ """Uploads the final captions JSON file to the output dataset.
246
+
247
+ The user requested the output JSON file to be named after the full zip file name.
248
+ """
249
+ # Use the full zip name, replacing the extension with .json
250
+ caption_filename = Path(zip_full_name).with_suffix('.json').name
251
+
252
+ try:
253
+ print(f"[{FLOW_ID}] Uploading {len(captions)} captions for {zip_full_name} as {caption_filename} to {HF_OUTPUT_DATASET_ID}...")
254
+
255
+ # Create JSON content in memory
256
+ json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
257
+
258
+ api = HfApi(token=HF_TOKEN)
259
+ api.upload_file(
260
+ path_or_fileobj=io.BytesIO(json_content),
261
+ path_in_repo=caption_filename,
262
+ repo_id=HF_OUTPUT_DATASET_ID,
263
+ repo_type="dataset",
264
+ commit_message=f"[{FLOW_ID}] Captions for {zip_full_name}"
265
+ )
266
+
267
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
268
+ return True
269
+
270
+ except Exception as e:
271
+ print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
272
+ return False
273
+
274
+ async def process_course_task(course_name: str):
275
+ """Main task to process a single course, looping until all files are processed."""
276
+ print(f"[{FLOW_ID}] Starting continuous processing for course: {course_name}")
277
+
278
+ processed_files = set()
279
+ all_processed_files_log = []
280
+ global_success = True
281
+
282
+ # Loop to continuously check for new files matching the course_name prefix
283
+ while True:
284
+ extract_dir = None
285
+ zip_full_name = None
286
+ repo_file_full_path = None
287
+
288
+ try:
289
+ # download_and_extract_zip now returns a tuple: (extract_dir, zip_full_name, repo_file_full_path)
290
+ download_result = await download_and_extract_zip(course_name, processed_files)
291
+
292
+ if download_result is None or download_result[0] is None:
293
+ # No new files found, or an error occurred during search/download
294
+ if download_result is not None and download_result[0] is None and download_result[1] is None:
295
+ print(f"[{FLOW_ID}] No new files found for {course_name}. Exiting loop.")
296
+ break
297
+ else:
298
+ # An error occurred during search/download
299
+ raise Exception("Failed to download or extract zip file.")
300
+
301
+ extract_dir, zip_full_name, repo_file_full_path = download_result
302
+
303
+ # Add the file to the processed set immediately to avoid re-processing in the next loop
304
+ processed_files.add(repo_file_full_path)
305
+ all_processed_files_log.append(repo_file_full_path)
306
+
307
+ # --- Start Processing the single file ---
308
+
309
+ # FIX: Use recursive glob to find images in subdirectories
310
+ image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
311
+ print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
312
+
313
+ current_file_success = False
314
+
315
+ if not image_paths:
316
+ print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
317
+ current_file_success = True
318
+ else:
319
+ # Initialize progress tracker
320
+ progress_tracker = {
321
+ 'total': len(image_paths),
322
+ 'completed': 0
323
+ }
324
+ print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
325
+
326
+ # Create a semaphore to limit concurrent tasks to the number of available servers
327
+ semaphore = asyncio.Semaphore(len(servers))
328
+
329
+ async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
330
+ async with semaphore:
331
+ return await send_image_for_captioning(image_path, course_name, progress_tracker)
332
+
333
+ # Create a list of tasks for parallel captioning
334
+ caption_tasks = []
335
+ for image_path in image_paths:
336
+ caption_tasks.append(limited_send_image_for_captioning(image_path, course_name, progress_tracker))
337
+
338
+ # Run all captioning tasks concurrently
339
+ results = await asyncio.gather(*caption_tasks)
340
+
341
+ # Filter out failed results
342
+ all_captions = [r for r in results if r is not None]
343
+
344
+ # Final progress report for the current file
345
+ if len(all_captions) == len(image_paths):
346
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
347
+ current_file_success = True
348
+ else:
349
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions.")
350
+ current_file_success = False
351
+
352
+ # Upload results
353
+ if all_captions and zip_full_name:
354
+ # Use the full zip file name for the upload as requested
355
+ print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
356
+ if await upload_captions_to_hf(zip_full_name, all_captions):
357
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
358
+ # If partial success, we still upload, but the overall task is marked as failure if any file failed
359
+ if not current_file_success:
360
+ global_success = False
361
+ else:
362
+ print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
363
+ current_file_success = False
364
+ global_success = False
365
+ else:
366
+ print(f"[{FLOW_ID}] No captions generated or zip_full_name is missing. Skipping upload for {zip_full_name}.")
367
+ current_file_success = False
368
+ global_success = False
369
+
370
+ # --- End Processing the single file ---
371
+
372
+ except Exception as e:
373
+ error_message = str(e)
374
+ print(f"[{FLOW_ID}] Critical error in process_course_task for {course_name}: {error_message}")
375
+ global_success = False
376
+
377
+ finally:
378
+ # Cleanup temporary files for the current file
379
+ if extract_dir and extract_dir.exists():
380
+ print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
381
+ import shutil
382
+ shutil.rmtree(extract_dir, ignore_errors=True)
383
+
384
+ # If an unrecoverable error occurred (e.g., during search/download), break the loop
385
+ if download_result is None and extract_dir is None:
386
+ break
387
+
388
+ # --- Final Report after the loop is complete ---
389
+ print(f"[{FLOW_ID}] All processing loops complete for {course_name}.")
390
+ print(f"[{FLOW_ID}] Total files processed: {len(all_processed_files_log)}")
391
+ print(f"[{FLOW_ID}] List of processed files: {all_processed_files_log}")
392
+
393
+ # Report completion to manager
394
+ final_error_message = error_message if not global_success else None
395
+ # Assuming report_completion exists and is an async function
396
+ # await report_completion(course_name, global_success, final_error_message)
397
+
398
+ return global_success
399
+
400
+ async def report_completion(course_name: str, success: bool, error_message: Optional[str] = None):
401
+ """Reports the task result back to the Manager Server."""
402
+ print(f"[{FLOW_ID}] Reporting completion for {course_name} (Success: {success})...")
403
+
404
+ payload = {
405
+ "flow_id": FLOW_ID,
406
+ "course_name": course_name,
407
+ "success": success,
408
+ "error_message": error_message
409
+ }
410
+
411
+ try:
412
+ async with aiohttp.ClientSession() as session:
413
+ async with session.post(MANAGER_COMPLETE_TASK_URL, json=payload) as resp:
414
+ if resp.status != 200:
415
+ print(f"[{FLOW_ID}] ERROR: Manager reported non-200 status: {resp.status} - {await resp.text()}")
416
+ else:
417
+ print(f"[{FLOW_ID}] Successfully reported completion to Manager.")
418
+
419
+ except aiohttp.ClientError as e:
420
+ print(f"[{FLOW_ID}] CRITICAL ERROR: Could not connect to Manager at {MANAGER_COMPLETE_TASK_URL}. Task completion not reported. Error: {e}")
421
+ except Exception as e:
422
+ print(f"[{FLOW_ID}] Unexpected error during reporting: {e}")
423
+
424
+ # --- FastAPI App and Endpoints ---
425
+
426
+ app = FastAPI(
427
+ title=f"Flow Server {FLOW_ID} API",
428
+ description="Fetches, extracts, and captions images for a given course.",
429
+ version="1.0.0"
430
+ )
431
+
432
+ @app.on_event("startup")
433
+ async def startup_event():
434
+ print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}. Manager URL: {MANAGER_URL}")
435
+
436
+ @app.get("/")
437
+ async def root():
438
+ return {
439
+ "flow_id": FLOW_ID,
440
+ "status": "ready",
441
+ "manager_url": MANAGER_URL,
442
+ "total_servers": len(servers),
443
+ "busy_servers": sum(1 for s in servers if s.busy),
444
+ }
445
+
446
+ @app.post("/process_course")
447
+ async def process_course(request: ProcessCourseRequest, background_tasks: BackgroundTasks):
448
+ """
449
+ Receives a course name from the Manager and starts processing in the background.
450
+ """
451
+ course_name = request.course_name
452
+
453
+ if not course_name:
454
+ print(f"[{FLOW_ID}] Received empty course name. Stopping processing loop.")
455
+ return {"status": "stopped", "message": "No more courses to process."}
456
+
457
+ print(f"[{FLOW_ID}] Received course: {course_name}. Starting background task.")
458
+
459
+ # Start the heavy processing in a background task so the API call returns immediately
460
+ background_tasks.add_task(process_course_task, course_name)
461
+
462
+ return {"status": "processing", "course_name": course_name, "message": "Processing started in background."}
463
+
464
+ if __name__ == "__main__":
465
+ # Note: When running in the sandbox, we need to use 0.0.0.0 to expose the port.
 
 
 
 
 
 
 
 
 
466
  uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)