Fred808 commited on
Commit
a303362
·
verified ·
1 Parent(s): ef25d3a

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +434 -803
app.py CHANGED
@@ -4,79 +4,76 @@ import time
4
  import asyncio
5
  import aiohttp
6
  import zipfile
7
- from typing import Dict, List, Set, Optional
 
 
8
  from urllib.parse import quote
9
  from datetime import datetime
10
  from pathlib import Path
11
- import io
12
 
13
- from fastapi import FastAPI, BackgroundTasks, HTTPException, status
14
- from fastapi.responses import HTMLResponse, FileResponse
15
- from fastapi.staticfiles import StaticFiles
16
  from pydantic import BaseModel, Field
17
- from huggingface_hub import HfApi, hf_hub_download
18
  import uvicorn
19
 
20
  # --- Configuration ---
21
  FLOW_ID = os.getenv("FLOW_ID", "flow_default")
22
  FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
23
-
24
  MANAGER_URL = os.getenv("MANAGER_URL", "https://fred808-fcord.hf.space")
25
  MANAGER_COMPLETE_TASK_URL = f"{MANAGER_URL}/task/complete"
26
-
27
  HF_TOKEN = os.getenv("HF_TOKEN", "")
28
  HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3")
29
  HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium")
 
30
 
 
31
  CAPTION_SERVERS = [
32
- "https://favoredone-favoredone-tv88mp.hf.space/analyze",
33
- "https://favoredone-favoredone-7p1dcf.hf.space/analyze",
34
- "https://favoredone-favoredone-k7b4mf.hf.space/analyze",
35
- "https://favoredone-favoredone-mzlxc7.hf.space/analyze",
36
- "https://favoredone-favoredone-aomfwa.hf.space/analyze",
37
- "https://favoredone-favoredone-7g6v04.hf.space/analyze",
38
- "https://favoredone-favoredone-dk1skh.hf.space/analyze",
39
- "https://favoredone-favoredone-z4yo0y.hf.space/analyze",
40
- "https://favoredone-favoredone-f6czeq.hf.space/analyze",
41
- "https://favoredone-favoredone-5fo8ga.hf.space/analyze",
42
- "https://favoredone-favoredone-zde8x6.hf.space/analyze",
43
- "https://favoredone-favoredone-r0biih.hf.space/analyze",
44
- "https://favoredone-favoredone-ljdzkf.hf.space/analyze",
45
- "https://favoredone-favoredone-irrpe5.hf.space/analyze",
46
- "https://favoredone-favoredone-bh9rwz.hf.space/analyze",
47
- "https://favoredone-favoredone-u8c4dt.hf.space/analyze",
48
- "https://favoredone-favoredone-futwyd.hf.space/analyze",
49
- "https://favoredone-favoredone-hg2sot.hf.space/analyze",
50
- "https://favoredone-favoredone-pvweug.hf.space/analyze",
51
- "https://favoredone-favoredone-z6azk2.hf.space/analyze",
52
- "https://favoredone-favoredone-4zid9w.hf.space/analyze",
53
- "https://favoredone-favoredone-be7a1r.hf.space/analyze",
54
- "https://favoredone-favoredone-ayazxa.hf.space/analyze",
55
- "https://favoredone-favoredone-6ckj4m.hf.space/analyze",
56
- "https://favoredone-favoredone-whn0xu.hf.space/analyze",
57
- "https://favoredone-favoredone-t49exm.hf.space/analyze",
58
- "https://favoredone-favoredone-cgrh0a.hf.space/analyze",
59
- "https://favoredone-favoredone-r1kb5g.hf.space/analyze"
60
-
 
 
 
 
 
 
 
 
61
  ]
62
  MODEL_TYPE = "Florence-2-large"
63
 
 
64
  TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
65
  TEMP_DIR.mkdir(exist_ok=True)
66
 
67
- # --- Global State for Statistics ---
68
- processing_stats = {
69
- "current_course": None,
70
- "total_images": 0,
71
- "captions_completed": 0,
72
- "captions_failed": 0,
73
- "files_processed": 0,
74
- "start_time": None,
75
- "status": "idle",
76
- "error_message": None,
77
- "server_stats": {}
78
- }
79
-
80
  # --- Models ---
81
  class ProcessCourseRequest(BaseModel):
82
  course_name: Optional[str] = None
@@ -93,11 +90,126 @@ class CaptionServer:
93
  def fps(self):
94
  return self.total_processed / self.total_time if self.total_time > 0 else 0
95
 
96
- # Global state for caption servers
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  servers = [CaptionServer(url) for url in CAPTION_SERVERS]
98
  server_index = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
 
100
- # --- Core Processing Functions ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
 
102
  async def get_available_server(timeout: float = 300.0) -> CaptionServer:
103
  """Round-robin selection of an available caption server."""
@@ -142,13 +254,14 @@ async def send_image_for_captioning(image_path: Path, course_name: str, progress
142
  caption = result.get("caption")
143
 
144
  if caption:
 
145
  progress_tracker['completed'] += 1
 
 
 
146
  if progress_tracker['completed'] % 50 == 0:
147
  print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.")
148
 
149
- if progress_tracker['completed'] % 50 != 0:
150
- print(f"[{FLOW_ID}] Success: {image_path.name} captioned by {server.url}")
151
-
152
  return {
153
  "course": course_name,
154
  "image_path": image_path.name,
@@ -177,62 +290,45 @@ async def send_image_for_captioning(image_path: Path, course_name: str, progress
177
  server.total_time += (end_time - start_time)
178
 
179
  print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
180
- processing_stats['captions_failed'] += 1
181
  return None
182
 
183
- async def download_and_extract_zip(course_name: str, processed_files: Set[str]) -> Optional[tuple[Path, str, str]]:
184
- """Downloads the zip file for the course and extracts its contents."""
185
- print(f"[{FLOW_ID}] Looking for files starting with '{course_name}' in frames/ directory...")
186
 
 
 
 
187
  try:
188
- api = HfApi(token=HF_TOKEN)
189
-
190
- repo_files = api.list_repo_files(
191
- repo_id=HF_DATASET_ID,
192
- repo_type="dataset"
193
- )
194
-
195
- matching_files = [
196
- f for f in repo_files
197
- if f.startswith(f"frames/{course_name}") and f.endswith('.zip')
198
- ]
199
-
200
- if not matching_files:
201
- print(f"[{FLOW_ID}] No zip files found starting with '{course_name}' in frames/ directory.")
202
- return None, None
203
-
204
- unprocessed_files = [f for f in matching_files if f not in processed_files]
205
-
206
- if not unprocessed_files:
207
- print(f"[{FLOW_ID}] No new zip files found for '{course_name}'.")
208
- return None, None, None
209
-
210
- repo_file_full_path = unprocessed_files[0]
211
- zip_full_name = Path(repo_file_full_path).name
212
- print(f"[{FLOW_ID}] Found new matching file: {repo_file_full_path}. Full name: {zip_full_name}")
213
 
 
214
  zip_path = hf_hub_download(
215
  repo_id=HF_DATASET_ID,
216
- filename=repo_file_full_path,
217
  repo_type="dataset",
218
  token=HF_TOKEN,
219
  )
220
 
221
  print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
222
 
223
- extract_dir = TEMP_DIR / course_name
224
- extract_dir.mkdir(exist_ok=True)
 
225
 
226
  with zipfile.ZipFile(zip_path, 'r') as zip_ref:
227
  zip_ref.extractall(extract_dir)
228
 
229
  print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
230
 
231
- return extract_dir, zip_full_name, repo_file_full_path
 
 
 
 
232
 
233
  except Exception as e:
234
- print(f"[{FLOW_ID}] Error downloading or extracting zip for {course_name}: {e}")
235
- return None, None, None
236
 
237
  async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
238
  """Uploads the final captions JSON file to the output dataset."""
@@ -243,7 +339,7 @@ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> boo
243
 
244
  json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
245
 
246
- api = HfApi(token=HF_TOKEN)
247
  api.upload_file(
248
  path_or_fileobj=io.BytesIO(json_content),
249
  path_in_repo=caption_filename,
@@ -259,782 +355,317 @@ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> boo
259
  print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
260
  return False
261
 
262
- async def process_course_task(course_name: str):
263
- """Main task to process a single course, looping until all files are processed."""
264
- global processing_stats
265
-
266
- processing_stats['current_course'] = course_name
267
- processing_stats['status'] = 'processing'
268
- processing_stats['start_time'] = datetime.now().isoformat()
269
- processing_stats['total_images'] = 0
270
- processing_stats['captions_completed'] = 0
271
- processing_stats['captions_failed'] = 0
272
- processing_stats['files_processed'] = 0
273
- processing_stats['error_message'] = None
274
 
275
- print(f"[{FLOW_ID}] Starting continuous processing for course: {course_name}")
276
-
277
- processed_files = set()
278
- all_processed_files_log = []
279
- global_success = True
280
-
281
- while True:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
282
  extract_dir = None
283
  zip_full_name = None
284
- repo_file_full_path = None
285
 
286
  try:
287
- download_result = await download_and_extract_zip(course_name, processed_files)
288
 
289
- if download_result is None or download_result[0] is None:
290
- if download_result is not None and download_result[0] is None and download_result[1] is None:
291
- print(f"[{FLOW_ID}] No new files found for {course_name}. Exiting loop.")
292
- break
293
- else:
294
- raise Exception("Failed to download or extract zip file.")
295
 
296
- extract_dir, zip_full_name, repo_file_full_path = download_result
297
-
298
- processed_files.add(repo_file_full_path)
299
- all_processed_files_log.append(repo_file_full_path)
300
- processing_stats['files_processed'] = len(all_processed_files_log)
301
 
 
302
  image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
303
- processing_stats['total_images'] += len(image_paths)
304
  print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
305
 
306
- current_file_success = False
307
-
308
  if not image_paths:
309
  print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
310
- current_file_success = True
311
  else:
 
312
  progress_tracker = {
313
  'total': len(image_paths),
314
  'completed': 0
315
  }
316
- print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
317
-
 
 
 
318
  semaphore = asyncio.Semaphore(len(servers))
319
-
320
  async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
321
  async with semaphore:
322
  return await send_image_for_captioning(image_path, course_name, progress_tracker)
323
 
324
- caption_tasks = []
325
- for image_path in image_paths:
326
- caption_tasks.append(limited_send_image_for_captioning(image_path, course_name, progress_tracker))
327
-
328
  results = await asyncio.gather(*caption_tasks)
329
-
330
  all_captions = [r for r in results if r is not None]
331
- processing_stats['captions_completed'] = sum(1 for r in results if r is not None)
332
 
 
333
  if len(all_captions) == len(image_paths):
334
  print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
335
- current_file_success = True
336
  else:
337
  print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions.")
338
- current_file_success = False
339
 
 
340
  if all_captions and zip_full_name:
341
- print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
342
  if await upload_captions_to_hf(zip_full_name, all_captions):
343
  print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
344
- if not current_file_success:
345
- global_success = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
346
  else:
347
- print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
348
- current_file_success = False
349
- global_success = False
 
 
 
 
 
350
  else:
351
- print(f"[{FLOW_ID}] No captions generated or zip_full_name is missing. Skipping upload for {zip_full_name}.")
352
- current_file_success = False
353
- global_success = False
 
 
 
354
 
355
  except Exception as e:
356
  error_message = str(e)
357
- print(f"[{FLOW_ID}] Critical error in process_course_task for {course_name}: {error_message}")
358
- processing_stats['error_message'] = error_message
359
- global_success = False
 
 
 
360
 
361
  finally:
 
362
  if extract_dir and extract_dir.exists():
363
  print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
364
- import shutil
365
  shutil.rmtree(extract_dir, ignore_errors=True)
366
 
367
- if download_result is None and extract_dir is None:
368
- break
369
-
370
- print(f"[{FLOW_ID}] All processing loops complete for {course_name}.")
371
- print(f"[{FLOW_ID}] Total files processed: {len(all_processed_files_log)}")
372
- print(f"[{FLOW_ID}] List of processed files: {all_processed_files_log}")
373
-
374
- processing_stats['status'] = 'completed' if global_success else 'failed'
375
-
376
- return global_success
377
-
378
- async def report_completion(course_name: str, success: bool, error_message: Optional[str] = None):
379
- """Reports the task result back to the Manager Server."""
380
- print(f"[{FLOW_ID}] Reporting completion for {course_name} (Success: {success})...")
381
-
382
- payload = {
383
- "flow_id": FLOW_ID,
384
- "course_name": course_name,
385
- "success": success,
386
- "error_message": error_message
387
- }
388
-
389
- try:
390
- async with aiohttp.ClientSession() as session:
391
- async with session.post(MANAGER_COMPLETE_TASK_URL, json=payload) as resp:
392
- if resp.status != 200:
393
- print(f"[{FLOW_ID}] ERROR: Manager reported non-200 status: {resp.status} - {await resp.text()}")
394
- else:
395
- print(f"[{FLOW_ID}] Successfully reported completion to Manager.")
396
-
397
- except aiohttp.ClientError as e:
398
- print(f"[{FLOW_ID}] CRITICAL ERROR: Could not connect to Manager at {MANAGER_COMPLETE_TASK_URL}. Task completion not reported. Error: {e}")
399
- except Exception as e:
400
- print(f"[{FLOW_ID}] Unexpected error during reporting: {e}")
401
 
402
  # --- FastAPI App and Endpoints ---
403
 
404
  app = FastAPI(
405
  title=f"Flow Server {FLOW_ID} API",
406
  description="Fetches, extracts, and captions images for a given course.",
407
- version="1.0.0"
408
  )
409
 
410
- HTML_UI = """
411
- <!DOCTYPE html>
412
- <html lang="en">
413
- <head>
414
- <meta charset="UTF-8">
415
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
416
- <title>Course Processing Dashboard</title>
417
- <style>
418
- * {
419
- margin: 0;
420
- padding: 0;
421
- box-sizing: border-box;
422
- }
423
-
424
- body {
425
- font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
426
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
427
- min-height: 100vh;
428
- padding: 20px;
429
- }
430
-
431
- .container {
432
- max-width: 1200px;
433
- margin: 0 auto;
434
- }
435
-
436
- .header {
437
- text-align: center;
438
- color: white;
439
- margin-bottom: 40px;
440
- }
441
-
442
- .header h1 {
443
- font-size: 2.5em;
444
- margin-bottom: 10px;
445
- text-shadow: 2px 2px 4px rgba(0,0,0,0.3);
446
- }
447
-
448
- .header p {
449
- font-size: 1.1em;
450
- opacity: 0.9;
451
- }
452
-
453
- .main-card {
454
- background: white;
455
- border-radius: 15px;
456
- padding: 40px;
457
- box-shadow: 0 20px 60px rgba(0,0,0,0.3);
458
- margin-bottom: 30px;
459
- }
460
-
461
- .input-section {
462
- display: flex;
463
- gap: 15px;
464
- margin-bottom: 30px;
465
- flex-wrap: wrap;
466
- }
467
-
468
- .input-group {
469
- flex: 1;
470
- min-width: 250px;
471
- }
472
-
473
- .input-group label {
474
- display: block;
475
- margin-bottom: 8px;
476
- font-weight: 600;
477
- color: #333;
478
- font-size: 0.95em;
479
- }
480
-
481
- .input-group input {
482
- width: 100%;
483
- padding: 12px 15px;
484
- border: 2px solid #e0e0e0;
485
- border-radius: 8px;
486
- font-size: 1em;
487
- transition: border-color 0.3s;
488
- }
489
-
490
- .input-group input:focus {
491
- outline: none;
492
- border-color: #667eea;
493
- box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1);
494
- }
495
-
496
- .button-group {
497
- display: flex;
498
- gap: 10px;
499
- flex-wrap: wrap;
500
- }
501
-
502
- button {
503
- padding: 12px 30px;
504
- border: none;
505
- border-radius: 8px;
506
- font-size: 1em;
507
- font-weight: 600;
508
- cursor: pointer;
509
- transition: all 0.3s;
510
- }
511
-
512
- .btn-primary {
513
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
514
- color: white;
515
- flex: 1;
516
- min-width: 150px;
517
- }
518
-
519
- .btn-primary:hover {
520
- transform: translateY(-2px);
521
- box-shadow: 0 10px 20px rgba(102, 126, 234, 0.3);
522
- }
523
-
524
- .btn-primary:active {
525
- transform: translateY(0);
526
- }
527
-
528
- .btn-primary:disabled {
529
- opacity: 0.6;
530
- cursor: not-allowed;
531
- transform: none;
532
- }
533
-
534
- .btn-secondary {
535
- background: #f0f0f0;
536
- color: #333;
537
- flex: 1;
538
- min-width: 150px;
539
- }
540
-
541
- .btn-secondary:hover {
542
- background: #e0e0e0;
543
- }
544
-
545
- .stats-grid {
546
- display: grid;
547
- grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
548
- gap: 20px;
549
- margin: 30px 0;
550
- }
551
-
552
- .stat-card {
553
- background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
554
- padding: 20px;
555
- border-radius: 12px;
556
- border-left: 5px solid #667eea;
557
- }
558
-
559
- .stat-card.success {
560
- border-left-color: #4caf50;
561
- background: linear-gradient(135deg, #e8f5e9 0%, #c8e6c9 100%);
562
- }
563
-
564
- .stat-card.warning {
565
- border-left-color: #ff9800;
566
- background: linear-gradient(135deg, #fff3e0 0%, #ffe0b2 100%);
567
- }
568
-
569
- .stat-card.error {
570
- border-left-color: #f44336;
571
- background: linear-gradient(135deg, #ffebee 0%, #ffcdd2 100%);
572
- }
573
-
574
- .stat-label {
575
- font-size: 0.9em;
576
- color: #666;
577
- margin-bottom: 8px;
578
- font-weight: 500;
579
- text-transform: uppercase;
580
- letter-spacing: 0.5px;
581
- }
582
-
583
- .stat-value {
584
- font-size: 2em;
585
- font-weight: 700;
586
- color: #333;
587
- }
588
-
589
- .progress-section {
590
- margin: 30px 0;
591
- }
592
-
593
- .progress-label {
594
- display: flex;
595
- justify-content: space-between;
596
- margin-bottom: 10px;
597
- font-weight: 600;
598
- color: #333;
599
- }
600
-
601
- .progress-bar {
602
- width: 100%;
603
- height: 25px;
604
- background: #e0e0e0;
605
- border-radius: 12px;
606
- overflow: hidden;
607
- box-shadow: inset 0 2px 4px rgba(0,0,0,0.1);
608
- }
609
-
610
- .progress-fill {
611
- height: 100%;
612
- background: linear-gradient(90deg, #667eea 0%, #764ba2 100%);
613
- width: 0%;
614
- transition: width 0.3s ease;
615
- display: flex;
616
- align-items: center;
617
- justify-content: center;
618
- color: white;
619
- font-size: 0.85em;
620
- font-weight: 600;
621
- }
622
-
623
- .status-badge {
624
- display: inline-block;
625
- padding: 6px 12px;
626
- border-radius: 20px;
627
- font-size: 0.9em;
628
- font-weight: 600;
629
- margin-top: 10px;
630
- }
631
-
632
- .status-badge.idle {
633
- background: #e0e0e0;
634
- color: #666;
635
- }
636
-
637
- .status-badge.processing {
638
- background: #fff3e0;
639
- color: #ff9800;
640
- animation: pulse 1.5s infinite;
641
- }
642
-
643
- .status-badge.completed {
644
- background: #e8f5e9;
645
- color: #4caf50;
646
- }
647
-
648
- .status-badge.failed {
649
- background: #ffebee;
650
- color: #f44336;
651
- }
652
-
653
- @keyframes pulse {
654
- 0%, 100% { opacity: 1; }
655
- 50% { opacity: 0.7; }
656
- }
657
-
658
- .error-message {
659
- background: #ffebee;
660
- color: #c62828;
661
- padding: 15px;
662
- border-radius: 8px;
663
- margin-top: 15px;
664
- border-left: 4px solid #f44336;
665
- display: none;
666
- }
667
-
668
- .error-message.show {
669
- display: block;
670
- }
671
-
672
- .server-stats {
673
- margin-top: 30px;
674
- padding-top: 30px;
675
- border-top: 2px solid #e0e0e0;
676
- }
677
-
678
- .server-stats h3 {
679
- color: #333;
680
- margin-bottom: 15px;
681
- font-size: 1.2em;
682
- }
683
-
684
- .server-list {
685
- display: grid;
686
- grid-template-columns: repeat(auto-fill, minmax(200px, 1fr));
687
- gap: 15px;
688
- }
689
-
690
- .server-item {
691
- background: #f5f5f5;
692
- padding: 12px;
693
- border-radius: 8px;
694
- font-size: 0.9em;
695
- border: 1px solid #e0e0e0;
696
- }
697
-
698
- .server-item.busy {
699
- background: #fff3e0;
700
- border-color: #ff9800;
701
- }
702
-
703
- .server-name {
704
- font-weight: 600;
705
- color: #333;
706
- margin-bottom: 5px;
707
- word-break: break-all;
708
- }
709
-
710
- .server-info {
711
- color: #666;
712
- font-size: 0.85em;
713
- }
714
-
715
- .loading-spinner {
716
- display: inline-block;
717
- width: 20px;
718
- height: 20px;
719
- border: 3px solid #f3f3f3;
720
- border-top: 3px solid #667eea;
721
- border-radius: 50%;
722
- animation: spin 1s linear infinite;
723
- margin-right: 10px;
724
- vertical-align: middle;
725
- }
726
-
727
- @keyframes spin {
728
- 0% { transform: rotate(0deg); }
729
- 100% { transform: rotate(360deg); }
730
- }
731
-
732
- .timestamp {
733
- color: #999;
734
- font-size: 0.9em;
735
- margin-top: 20px;
736
- text-align: right;
737
- }
738
-
739
- @media (max-width: 768px) {
740
- .header h1 {
741
- font-size: 1.8em;
742
- }
743
-
744
- .main-card {
745
- padding: 20px;
746
- }
747
-
748
- .input-section {
749
- flex-direction: column;
750
- }
751
-
752
- .button-group {
753
- flex-direction: column;
754
- }
755
-
756
- button {
757
- width: 100%;
758
- }
759
- }
760
- </style>
761
- </head>
762
- <body>
763
- <div class="container">
764
- <div class="header">
765
- <h1>📚 Course Processing Dashboard</h1>
766
- <p>Submit course index names for image captioning and processing</p>
767
- </div>
768
-
769
- <div class="main-card">
770
- <div class="input-section">
771
- <div class="input-group">
772
- <label for="courseInput">Course Index Name</label>
773
- <input
774
- type="text"
775
- id="courseInput"
776
- placeholder="e.g., COURSE_001, BIO101, etc."
777
- autocomplete="off"
778
- >
779
- </div>
780
- <div class="button-group">
781
- <button class="btn-primary" onclick="submitCourse()">
782
- <span id="submitText">Start Processing</span>
783
- </button>
784
- <button class="btn-secondary" onclick="refreshStats()">Refresh Stats</button>
785
- </div>
786
- </div>
787
-
788
- <div class="error-message" id="errorMessage"></div>
789
-
790
- <div id="statsContainer" style="display: none;">
791
- <div style="margin-bottom: 20px;">
792
- <h2 style="color: #333; margin-bottom: 10px;">Current Course: <span id="currentCourse" style="color: #667eea;"></span></h2>
793
- <span class="status-badge idle" id="statusBadge">Idle</span>
794
- </div>
795
-
796
- <div class="progress-section">
797
- <div class="progress-label">
798
- <span>Overall Progress</span>
799
- <span id="progressText">0%</span>
800
- </div>
801
- <div class="progress-bar">
802
- <div class="progress-fill" id="progressFill" style="width: 0%;">
803
- <span id="progressPercent">0%</span>
804
- </div>
805
- </div>
806
- </div>
807
-
808
- <div class="stats-grid">
809
- <div class="stat-card">
810
- <div class="stat-label">Total Images Found</div>
811
- <div class="stat-value" id="totalImages">0</div>
812
- </div>
813
- <div class="stat-card success">
814
- <div class="stat-label">Captions Completed</div>
815
- <div class="stat-value" id="captionsCompleted">0</div>
816
- </div>
817
- <div class="stat-card warning">
818
- <div class="stat-label">Captions Failed</div>
819
- <div class="stat-value" id="captionsFailed">0</div>
820
- </div>
821
- <div class="stat-card">
822
- <div class="stat-label">Files Processed</div>
823
- <div class="stat-value" id="filesProcessed">0</div>
824
- </div>
825
- <div class="stat-card">
826
- <div class="stat-label">Success Rate</div>
827
- <div class="stat-value" id="successRate">0%</div>
828
- </div>
829
- <div class="stat-card">
830
- <div class="stat-label">Processing Time</div>
831
- <div class="stat-value" id="processingTime">0s</div>
832
- </div>
833
- </div>
834
-
835
- <div class="server-stats">
836
- <h3>Caption Server Status</h3>
837
- <div class="server-list" id="serverList">
838
- <!-- Server items will be populated here -->
839
- </div>
840
- </div>
841
- </div>
842
-
843
- <div class="timestamp" id="timestamp"></div>
844
- </div>
845
- </div>
846
-
847
- <script>
848
- let processingInterval = null;
849
-
850
- async function submitCourse() {
851
- const courseInput = document.getElementById('courseInput');
852
- const courseName = courseInput.value.trim();
853
-
854
- if (!courseName) {
855
- showError('Please enter a course index name');
856
- return;
857
- }
858
-
859
- const submitBtn = event.target;
860
- submitBtn.disabled = true;
861
- document.getElementById('submitText').innerHTML = '<span class="loading-spinner"></span>Submitting...';
862
-
863
- try {
864
- const response = await fetch('/process_course', {
865
- method: 'POST',
866
- headers: {
867
- 'Content-Type': 'application/json',
868
- },
869
- body: JSON.stringify({ course_name: courseName })
870
- });
871
-
872
- if (!response.ok) {
873
- throw new Error('Failed to submit course');
874
- }
875
-
876
- const data = await response.json();
877
- clearError();
878
- document.getElementById('statsContainer').style.display = 'block';
879
- courseInput.value = '';
880
-
881
- // Start polling for stats
882
- if (processingInterval) clearInterval(processingInterval);
883
- processingInterval = setInterval(refreshStats, 2000);
884
-
885
- refreshStats();
886
-
887
- } catch (error) {
888
- showError('Error submitting course: ' + error.message);
889
- } finally {
890
- submitBtn.disabled = false;
891
- document.getElementById('submitText').textContent = 'Start Processing';
892
- }
893
- }
894
-
895
- async function refreshStats() {
896
- try {
897
- const response = await fetch('/stats');
898
- if (!response.ok) throw new Error('Failed to fetch stats');
899
-
900
- const stats = await response.json();
901
- updateStatsDisplay(stats);
902
-
903
- } catch (error) {
904
- console.error('Error fetching stats:', error);
905
- }
906
- }
907
-
908
- function updateStatsDisplay(stats) {
909
- document.getElementById('currentCourse').textContent = stats.current_course || 'None';
910
- document.getElementById('totalImages').textContent = stats.total_images;
911
- document.getElementById('captionsCompleted').textContent = stats.captions_completed;
912
- document.getElementById('captionsFailed').textContent = stats.captions_failed;
913
- document.getElementById('filesProcessed').textContent = stats.files_processed;
914
-
915
- // Calculate success rate
916
- const total = stats.captions_completed + stats.captions_failed;
917
- const successRate = total > 0 ? Math.round((stats.captions_completed / total) * 100) : 0;
918
- document.getElementById('successRate').textContent = successRate + '%';
919
-
920
- // Update progress bar
921
- const progress = stats.total_images > 0
922
- ? Math.round((stats.captions_completed / stats.total_images) * 100)
923
- : 0;
924
- document.getElementById('progressFill').style.width = progress + '%';
925
- document.getElementById('progressPercent').textContent = progress + '%';
926
- document.getElementById('progressText').textContent = progress + '%';
927
-
928
- // Update status badge
929
- const statusBadge = document.getElementById('statusBadge');
930
- statusBadge.className = 'status-badge ' + stats.status;
931
- statusBadge.textContent = stats.status.charAt(0).toUpperCase() + stats.status.slice(1);
932
-
933
- // Update processing time
934
- if (stats.start_time) {
935
- const startTime = new Date(stats.start_time);
936
- const elapsed = Math.floor((Date.now() - startTime.getTime()) / 1000);
937
- document.getElementById('processingTime').textContent = formatTime(elapsed);
938
- }
939
-
940
- // Update timestamp
941
- document.getElementById('timestamp').textContent = 'Last updated: ' + new Date().toLocaleTimeString();
942
-
943
- // Update server stats
944
- updateServerStats(stats.server_stats);
945
-
946
- // Show error if present
947
- if (stats.error_message) {
948
- showError('Error: ' + stats.error_message);
949
- }
950
- }
951
-
952
- function updateServerStats(serverStats) {
953
- const serverList = document.getElementById('serverList');
954
- serverList.innerHTML = '';
955
-
956
- for (const [url, stats] of Object.entries(serverStats)) {
957
- const serverItem = document.createElement('div');
958
- serverItem.className = 'server-item' + (stats.busy ? ' busy' : '');
959
- serverItem.innerHTML = `
960
- <div class="server-name">${url.split('/').slice(-3, -1).join('-')}</div>
961
- <div class="server-info">Processed: ${stats.total_processed}</div>
962
- <div class="server-info">FPS: ${stats.fps.toFixed(2)}</div>
963
- <div class="server-info">Status: ${stats.busy ? '🔴 Busy' : '🟢 Available'}</div>
964
- `;
965
- serverList.appendChild(serverItem);
966
- }
967
- }
968
-
969
- function formatTime(seconds) {
970
- if (seconds < 60) return seconds + 's';
971
- const minutes = Math.floor(seconds / 60);
972
- const secs = seconds % 60;
973
- return minutes + 'm ' + secs + 's';
974
- }
975
-
976
- function showError(message) {
977
- const errorDiv = document.getElementById('errorMessage');
978
- errorDiv.textContent = message;
979
- errorDiv.classList.add('show');
980
- }
981
-
982
- function clearError() {
983
- const errorDiv = document.getElementById('errorMessage');
984
- errorDiv.classList.remove('show');
985
- }
986
-
987
- // Allow Enter key to submit
988
- document.getElementById('courseInput').addEventListener('keypress', function(e) {
989
- if (e.key === 'Enter') submitCourse();
990
- });
991
-
992
- // Initial stats load
993
- refreshStats();
994
- </script>
995
- </body>
996
- </html>
997
- """
998
 
999
  @app.on_event("startup")
1000
  async def startup_event():
1001
  print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}. Manager URL: {MANAGER_URL}")
 
 
 
 
 
 
 
 
 
 
 
 
1002
 
1003
  @app.get("/", response_class=HTMLResponse)
1004
- async def root():
1005
- return HTML_UI
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1006
 
1007
- @app.get("/stats")
1008
- async def get_stats():
1009
- """Returns current processing statistics."""
1010
- server_stats = {}
1011
- for server in servers:
1012
- server_stats[server.url] = {
1013
- "busy": server.busy,
1014
- "total_processed": server.total_processed,
1015
- "total_time": server.total_time,
1016
- "fps": server.fps
1017
- }
1018
 
1019
- processing_stats['server_stats'] = server_stats
1020
- return processing_stats
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1021
 
1022
- @app.post("/process_course")
1023
- async def process_course(request: ProcessCourseRequest, background_tasks: BackgroundTasks):
1024
- """
1025
- Receives a course name from the UI and starts processing in the background.
1026
- """
1027
- course_name = request.course_name
1028
 
1029
- if not course_name:
1030
- print(f"[{FLOW_ID}] Received empty course name. Stopping processing loop.")
1031
- return {"status": "stopped", "message": "No more courses to process."}
1032
-
1033
- print(f"[{FLOW_ID}] Received course: {course_name}. Starting background task.")
1034
-
1035
- background_tasks.add_task(process_course_task, course_name)
1036
 
1037
- return {"status": "processing", "course_name": course_name, "message": "Processing started in background."}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1038
 
1039
  if __name__ == "__main__":
1040
  uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)
 
4
  import asyncio
5
  import aiohttp
6
  import zipfile
7
+ import io
8
+ import shutil
9
+ from typing import Dict, List, Set, Optional, Any
10
  from urllib.parse import quote
11
  from datetime import datetime
12
  from pathlib import Path
 
13
 
14
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status, Request
15
+ from fastapi.responses import HTMLResponse
16
+ from fastapi.templating import Jinja2Templates
17
  from pydantic import BaseModel, Field
18
+ from huggingface_hub import HfApi, hf_hub_download, HfFileSystem
19
  import uvicorn
20
 
21
  # --- Configuration ---
22
  FLOW_ID = os.getenv("FLOW_ID", "flow_default")
23
  FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
 
24
  MANAGER_URL = os.getenv("MANAGER_URL", "https://fred808-fcord.hf.space")
25
  MANAGER_COMPLETE_TASK_URL = f"{MANAGER_URL}/task/complete"
 
26
  HF_TOKEN = os.getenv("HF_TOKEN", "")
27
  HF_DATASET_ID = os.getenv("HF_DATASET_ID", "Fred808/BG3")
28
  HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "fred808/helium")
29
+ STATE_FILE_NAME = f"{FLOW_ID}_state.json"
30
 
31
+ # Using the full list from the user's original code for actual deployment
32
  CAPTION_SERVERS = [
33
+ "https://fred808-pil-4-1.hf.space/analyze",
34
+ "https://fred808-pil-4-2.hf.space/analyze",
35
+ "https://fred808-pil-4-3.hf.space/analyze",
36
+ "https://fred1012-fred1012-gw0j2h.hf.space/analyze",
37
+ "https://fred1012-fred1012-wqs6c2.hf.space/analyze",
38
+ "https://fred1012-fred1012-oncray.hf.space/analyze",
39
+ "https://fred1012-fred1012-4goge7.hf.space/analyze",
40
+ "https://fred1012-fred1012-z0eh7m.hf.space/analyze",
41
+ "https://fred1012-fred1012-u95rte.hf.space/analyze",
42
+ "https://fred1012-fred1012-igje22.hf.space/analyze",
43
+ "https://fred1012-fred1012-ibkuf8.hf.space/analyze",
44
+ "https://fred1012-fred1012-nwqthy.hf.space/analyze",
45
+ "https://fred1012-fred1012-4ldqj4.hf.space/analyze",
46
+ "https://fred1012-fred1012-pivlzg.hf.space/analyze",
47
+ "https://fred1012-fred1012-ptlc5u.hf.space/analyze",
48
+ "https://fred1012-fred1012-u7lh57.hf.space/analyze",
49
+ "https://fred1012-fred1012-q8djv1.hf.space/analyze",
50
+ "https://fredalone-fredalone-ozugrp.hf.space/analyze",
51
+ "https://fredalone-fredalone-9brxj2.hf.space/analyze",
52
+ "https://fredalone-fredalone-p8vq9a.hf.space/analyze",
53
+ "https://fredalone-fredalone-vbli2y.hf.space/analyze",
54
+ "https://fredalone-fredalone-uggger.hf.space/analyze",
55
+ "https://fredalone-fredalone-nmi7e8.hf.space/analyze",
56
+ "https://fredalone-fredalone-d1f26d.hf.space/analyze",
57
+ "https://fredalone-fredalone-461jp2.hf.space/analyze",
58
+ "https://fredalone-fredalone-3enfg4.hf.space/analyze",
59
+ "https://fredalone-fredalone-dqdbpv.hf.space/analyze",
60
+ "https://fredalone-fredalone-ivtjua.hf.space/analyze",
61
+ "https://fredalone-fredalone-6bezt2.hf.space/analyze",
62
+ "https://fredalone-fredalone-e0wfnk.hf.space/analyze",
63
+ "https://fredalone-fredalone-zu2t7j.hf.space/analyze",
64
+ "https://fredalone-fredalone-dqtv1o.hf.space/analyze",
65
+ "https://fredalone-fredalone-wclyog.hf.space/analyze",
66
+ "https://fredalone-fredalone-t27vig.hf.space/analyze",
67
+ "https://fredalone-fredalone-gahbxh.hf.space/analyze",
68
+ "https://fredalone-fredalone-kw2po4.hf.space/analyze",
69
+ "https://fredalone-fredalone-8h285h.hf.space/analyze"
70
  ]
71
  MODEL_TYPE = "Florence-2-large"
72
 
73
+ # Temporary storage for images
74
  TEMP_DIR = Path(f"temp_images_{FLOW_ID}")
75
  TEMP_DIR.mkdir(exist_ok=True)
76
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
  # --- Models ---
78
  class ProcessCourseRequest(BaseModel):
79
  course_name: Optional[str] = None
 
90
  def fps(self):
91
  return self.total_processed / self.total_time if self.total_time > 0 else 0
92
 
93
+ class ServerState(BaseModel):
94
+ # The list of all zip files in the dataset (frames/ directory)
95
+ all_zip_files: List[str] = Field(default_factory=list)
96
+ # The set of zip files that have been successfully processed and uploaded
97
+ processed_files: Set[str] = Field(default_factory=set)
98
+ # The index in all_zip_files from which the next download should start
99
+ current_index: int = 0
100
+ # Total number of files to process
101
+ total_files: int = 0
102
+ # Status of the current operation
103
+ status: str = "Idle"
104
+ # Name of the file currently being processed
105
+ current_file: Optional[str] = None
106
+ # Progress within the current file
107
+ current_file_progress: str = "0/0"
108
+ # Timestamp of the last update
109
+ last_update: str = datetime.now().isoformat()
110
+ # Flag to control the processing loop
111
+ is_running: bool = False
112
+
113
+ # Global state for caption servers and the overall server state
114
  servers = [CaptionServer(url) for url in CAPTION_SERVERS]
115
  server_index = 0
116
+ state = ServerState()
117
+ # Lock for thread-safe access to the global state
118
+ state_lock = asyncio.Lock()
119
+
120
+ # --- Persistence Functions ---
121
+
122
+ def get_hf_api():
123
+ """Helper to get HfApi instance."""
124
+ return HfApi(token=HF_TOKEN)
125
+
126
+ def get_hf_fs():
127
+ """Helper to get HfFileSystem instance."""
128
+ return HfFileSystem(token=HF_TOKEN)
129
+
130
+ async def load_state_from_hf():
131
+ """Loads the state from the Hugging Face output dataset."""
132
+ global state
133
+ fs = get_hf_fs()
134
+ state_path = f"{HF_OUTPUT_DATASET_ID}/{STATE_FILE_NAME}"
135
+
136
+ async with state_lock:
137
+ try:
138
+ if fs.exists(state_path):
139
+ print(f"[{FLOW_ID}] Loading state from {state_path}...")
140
+ with fs.open(state_path, 'rb') as f:
141
+ data = json.load(f)
142
+ # Convert list of processed files back to a set
143
+ if 'processed_files' in data and isinstance(data['processed_files'], list):
144
+ data['processed_files'] = set(data['processed_files'])
145
+ state = ServerState.parse_obj(data)
146
+ print(f"[{FLOW_ID}] State loaded successfully. Current index: {state.current_index}")
147
+ else:
148
+ print(f"[{FLOW_ID}] State file {state_path} not found. Starting with default state.")
149
+ except Exception as e:
150
+ print(f"[{FLOW_ID}] Error loading state from HF: {e}. Starting with default state.")
151
+ state = ServerState()
152
 
153
+ async def save_state_to_hf():
154
+ """Saves the current state to the Hugging Face output dataset."""
155
+ global state
156
+ api = get_hf_api()
157
+ state_path = STATE_FILE_NAME
158
+
159
+ async with state_lock:
160
+ state.last_update = datetime.now().isoformat()
161
+ # Convert set of processed files to a list for JSON serialization
162
+ data_to_save = state.dict()
163
+ data_to_save['processed_files'] = list(state.processed_files)
164
+
165
+ json_content = json.dumps(data_to_save, indent=2, ensure_ascii=False).encode('utf-8')
166
+
167
+ try:
168
+ print(f"[{FLOW_ID}] Saving state to {state_path} in {HF_OUTPUT_DATASET_ID}...")
169
+ api.upload_file(
170
+ path_or_fileobj=io.BytesIO(json_content),
171
+ path_in_repo=state_path,
172
+ repo_id=HF_OUTPUT_DATASET_ID,
173
+ repo_type="dataset",
174
+ commit_message=f"[{FLOW_ID}] Update server state. Index: {state.current_index}"
175
+ )
176
+ print(f"[{FLOW_ID}] State saved successfully.")
177
+ return True
178
+ except Exception as e:
179
+ print(f"[{FLOW_ID}] Error saving state to HF: {e}")
180
+ return False
181
+
182
+ async def update_file_list():
183
+ """Fetches the list of all zip files from the BG3 dataset."""
184
+ global state
185
+ api = get_hf_api()
186
+
187
+ async with state_lock:
188
+ try:
189
+ state.status = "Updating file list..."
190
+ print(f"[{FLOW_ID}] Fetching file list from {HF_DATASET_ID}...")
191
+ repo_files = api.list_repo_files(
192
+ repo_id=HF_DATASET_ID,
193
+ repo_type="dataset"
194
+ )
195
+
196
+ # Filter for zip files in the 'frames/' directory
197
+ zip_files = sorted([
198
+ f for f in repo_files
199
+ if f.startswith("frames/") and f.endswith('.zip')
200
+ ])
201
+
202
+ state.all_zip_files = zip_files
203
+ state.total_files = len(zip_files)
204
+ state.status = "File list updated."
205
+ print(f"[{FLOW_ID}] Found {state.total_files} zip files.")
206
+ except Exception as e:
207
+ state.status = f"Error updating file list: {e}"
208
+ print(f"[{FLOW_ID}] Error updating file list: {e}")
209
+
210
+ await save_state_to_hf()
211
+
212
+ # --- Core Processing Functions (Modified) ---
213
 
214
  async def get_available_server(timeout: float = 300.0) -> CaptionServer:
215
  """Round-robin selection of an available caption server."""
 
254
  caption = result.get("caption")
255
 
256
  if caption:
257
+ # Update progress counter and global state
258
  progress_tracker['completed'] += 1
259
+ async with state_lock:
260
+ state.current_file_progress = f"{progress_tracker['completed']}/{progress_tracker['total']}"
261
+
262
  if progress_tracker['completed'] % 50 == 0:
263
  print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} captions completed.")
264
 
 
 
 
265
  return {
266
  "course": course_name,
267
  "image_path": image_path.name,
 
290
  server.total_time += (end_time - start_time)
291
 
292
  print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
 
293
  return None
294
 
295
+ async def download_and_extract_zip(repo_file_full_path: str) -> Optional[tuple[Path, str]]:
296
+ """Downloads the zip file at the given path and extracts its contents."""
 
297
 
298
+ zip_full_name = Path(repo_file_full_path).name
299
+ course_name = zip_full_name.split('_')[0] # Assuming course name is the prefix before the first underscore
300
+
301
  try:
302
+ print(f"[{FLOW_ID}] Downloading file: {repo_file_full_path}. Full name: {zip_full_name}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
303
 
304
+ # Use hf_hub_download to get the file path
305
  zip_path = hf_hub_download(
306
  repo_id=HF_DATASET_ID,
307
+ filename=repo_file_full_path, # Use the full path in the repo
308
  repo_type="dataset",
309
  token=HF_TOKEN,
310
  )
311
 
312
  print(f"[{FLOW_ID}] Downloaded to {zip_path}. Extracting...")
313
 
314
+ # Create a temporary directory for extraction
315
+ extract_dir = TEMP_DIR / course_name / zip_full_name.replace('.', '_')
316
+ extract_dir.mkdir(parents=True, exist_ok=True)
317
 
318
  with zipfile.ZipFile(zip_path, 'r') as zip_ref:
319
  zip_ref.extractall(extract_dir)
320
 
321
  print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
322
 
323
+ # Clean up the downloaded zip file
324
+ os.remove(zip_path)
325
+
326
+ # Return the extraction directory and the full zip file name
327
+ return extract_dir, zip_full_name
328
 
329
  except Exception as e:
330
+ print(f"[{FLOW_ID}] Error downloading or extracting zip for {repo_file_full_path}: {e}")
331
+ return None
332
 
333
  async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
334
  """Uploads the final captions JSON file to the output dataset."""
 
339
 
340
  json_content = json.dumps(captions, indent=2, ensure_ascii=False).encode('utf-8')
341
 
342
+ api = get_hf_api()
343
  api.upload_file(
344
  path_or_fileobj=io.BytesIO(json_content),
345
  path_in_repo=caption_filename,
 
355
  print(f"[{FLOW_ID}] Error uploading captions for {zip_full_name}: {e}")
356
  return False
357
 
358
+ async def process_next_file_task():
359
+ """Task to process the next file in the list based on the current index."""
360
+ global state
 
 
 
 
 
 
 
 
 
361
 
362
+ if not state.is_running:
363
+ print(f"[{FLOW_ID}] Processing loop is not running. Exiting task.")
364
+ return
365
+
366
+ while state.is_running:
367
+ repo_file_full_path = None
368
+ current_index = -1
369
+
370
+ async with state_lock:
371
+ current_index = state.current_index
372
+ if current_index >= state.total_files:
373
+ state.status = "Finished processing all files."
374
+ state.is_running = False
375
+ print(f"[{FLOW_ID}] Reached end of file list. Stopping processing.")
376
+ await save_state_to_hf()
377
+ break
378
+
379
+ repo_file_full_path = state.all_zip_files[current_index]
380
+
381
+ if repo_file_full_path in state.processed_files:
382
+ state.current_index += 1
383
+ state.status = f"Skipping processed file: {Path(repo_file_full_path).name}"
384
+ state.current_file = Path(repo_file_full_path).name
385
+ print(f"[{FLOW_ID}] Skipping already processed file: {repo_file_full_path}")
386
+ await save_state_to_hf()
387
+ continue
388
+
389
+ # Mark the file as in-progress in the state
390
+ state.status = f"Processing file {current_index + 1}/{state.total_files}"
391
+ state.current_file = Path(repo_file_full_path).name
392
+ state.current_file_progress = "0/0"
393
+ await save_state_to_hf()
394
+
395
+ # --- Start Processing ---
396
  extract_dir = None
397
  zip_full_name = None
398
+ global_success = False
399
 
400
  try:
401
+ download_result = await download_and_extract_zip(repo_file_full_path)
402
 
403
+ if download_result is None:
404
+ raise Exception("Failed to download or extract zip file.")
 
 
 
 
405
 
406
+ extract_dir, zip_full_name = download_result
407
+ course_name = zip_full_name.split('_')[0]
 
 
 
408
 
409
+ # Find images
410
  image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
 
411
  print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
412
 
 
 
413
  if not image_paths:
414
  print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
415
+ global_success = True
416
  else:
417
+ # Initialize progress tracker
418
  progress_tracker = {
419
  'total': len(image_paths),
420
  'completed': 0
421
  }
422
+ async with state_lock:
423
+ state.current_file_progress = f"0/{len(image_paths)}"
424
+ await save_state_to_hf()
425
+
426
+ # Create and run captioning tasks
427
  semaphore = asyncio.Semaphore(len(servers))
 
428
  async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
429
  async with semaphore:
430
  return await send_image_for_captioning(image_path, course_name, progress_tracker)
431
 
432
+ caption_tasks = [limited_send_image_for_captioning(p, course_name, progress_tracker) for p in image_paths]
 
 
 
433
  results = await asyncio.gather(*caption_tasks)
 
434
  all_captions = [r for r in results if r is not None]
 
435
 
436
+ # Final progress report
437
  if len(all_captions) == len(image_paths):
438
  print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
439
+ global_success = True
440
  else:
441
  print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions.")
442
+ global_success = False
443
 
444
+ # Upload results
445
  if all_captions and zip_full_name:
 
446
  if await upload_captions_to_hf(zip_full_name, all_captions):
447
  print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
448
+ # If upload is successful, we mark the file as processed, regardless of partial success
449
+ # The uploaded JSON will reflect the actual number of captions
450
+ if global_success:
451
+ print(f"[{FLOW_ID}] Fully processed and uploaded: {zip_full_name}")
452
+ else:
453
+ print(f"[{FLOW_ID}] Partially processed but uploaded: {zip_full_name}. Needs manual review.")
454
+
455
+ # Mark as processed only if upload succeeded
456
+ async with state_lock:
457
+ state.processed_files.add(repo_file_full_path)
458
+ state.current_index += 1
459
+ state.current_file = None
460
+ state.current_file_progress = "0/0"
461
+ state.status = "Idle"
462
+ await save_state_to_hf()
463
+
464
  else:
465
+ print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}. Will retry this file later.")
466
+ # Do NOT increment index or mark as processed, so it will be retried
467
+ async with state_lock:
468
+ state.status = f"Error uploading captions for {zip_full_name}. Retrying later."
469
+ await save_state_to_hf()
470
+ # Wait before retrying to avoid immediate re-attempt on a transient error
471
+ await asyncio.sleep(60)
472
+
473
  else:
474
+ print(f"[{FLOW_ID}] No captions generated or zip_full_name is missing. Skipping upload for {zip_full_name}. Will retry later.")
475
+ # Do NOT increment index or mark as processed
476
+ async with state_lock:
477
+ state.status = f"No captions generated for {zip_full_name}. Retrying later."
478
+ await save_state_to_hf()
479
+ await asyncio.sleep(60)
480
 
481
  except Exception as e:
482
  error_message = str(e)
483
+ print(f"[{FLOW_ID}] Critical error in process_next_file_task for {repo_file_full_path}: {error_message}")
484
+ async with state_lock:
485
+ state.status = f"CRITICAL ERROR for {Path(repo_file_full_path).name}. Retrying later. Error: {error_message[:50]}..."
486
+ await save_state_to_hf()
487
+ # Wait before retrying
488
+ await asyncio.sleep(60)
489
 
490
  finally:
491
+ # Cleanup temporary files
492
  if extract_dir and extract_dir.exists():
493
  print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
 
494
  shutil.rmtree(extract_dir, ignore_errors=True)
495
 
496
+ # If the loop is still running, wait a short time before checking for the next file
497
+ if state.is_running:
498
+ await asyncio.sleep(5)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
499
 
500
  # --- FastAPI App and Endpoints ---
501
 
502
  app = FastAPI(
503
  title=f"Flow Server {FLOW_ID} API",
504
  description="Fetches, extracts, and captions images for a given course.",
505
+ version="2.0.0"
506
  )
507
 
508
+ # Setup Jinja2 templates for the UI
509
+ templates = Jinja2Templates(directory="templates")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
510
 
511
  @app.on_event("startup")
512
  async def startup_event():
513
  print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}. Manager URL: {MANAGER_URL}")
514
+ # 1. Load state from persistence (HF)
515
+ await load_state_from_hf()
516
+ # 2. Update the list of all files from the dataset
517
+ await update_file_list()
518
+ # 3. Start the continuous processing task if the index is valid
519
+ if state.current_index < state.total_files:
520
+ state.is_running = True
521
+ BackgroundTasks().add_task(process_next_file_task)
522
+ else:
523
+ state.is_running = False
524
+ print(f"[{FLOW_ID}] Index {state.current_index} is out of bounds. Starting in Idle mode.")
525
+
526
 
527
  @app.get("/", response_class=HTMLResponse)
528
+ async def home(request: Request):
529
+ """Home page with status and controls."""
530
+ async with state_lock:
531
+ processed_count = len(state.processed_files)
532
+ remaining_count = state.total_files - processed_count
533
+
534
+ # Calculate server stats
535
+ server_stats = [
536
+ {
537
+ "url": s.url,
538
+ "busy": s.busy,
539
+ "processed": s.total_processed,
540
+ "fps": f"{s.fps:.2f}"
541
+ } for s in servers
542
+ ]
543
+
544
+ # Calculate overall FPS
545
+ total_processed = sum(s.total_processed for s in servers)
546
+ total_time = sum(s.total_time for s in servers)
547
+ overall_fps = total_processed / total_time if total_time > 0 else 0
548
+
549
+ context = {
550
+ "request": request,
551
+ "flow_id": FLOW_ID,
552
+ "status": state.status,
553
+ "is_running": state.is_running,
554
+ "total_files": state.total_files,
555
+ "processed_count": processed_count,
556
+ "remaining_count": remaining_count,
557
+ "current_index": state.current_index,
558
+ "current_file": state.current_file if state.current_file else "N/A",
559
+ "current_file_progress": state.current_file_progress,
560
+ "last_update": state.last_update,
561
+ "overall_fps": f"{overall_fps:.2f}",
562
+ "server_stats": server_stats
563
+ }
564
+ return templates.TemplateResponse("index.html", context)
565
 
566
+ @app.post("/set_index")
567
+ async def set_index(request: Request, background_tasks: BackgroundTasks):
568
+ """Endpoint to manually set the start index."""
569
+ global state
 
 
 
 
 
 
 
570
 
571
+ form = await request.form()
572
+ try:
573
+ new_index = int(form.get("start_index"))
574
+ except (TypeError, ValueError):
575
+ raise HTTPException(status_code=400, detail="Invalid index value.")
576
+
577
+ async with state_lock:
578
+ if 0 <= new_index < state.total_files:
579
+ state.current_index = new_index
580
+ state.status = f"Index set to {new_index}. Restarting processing."
581
+
582
+ # If the loop is not running, start it
583
+ if not state.is_running:
584
+ state.is_running = True
585
+ background_tasks.add_task(process_next_file_task)
586
+
587
+ await save_state_to_hf()
588
+ print(f"[{FLOW_ID}] Index manually set to {new_index}.")
589
+ return {"status": "success", "message": f"Start index set to {new_index}. Processing will resume from this point."}
590
+ elif new_index == state.total_files:
591
+ state.current_index = new_index
592
+ state.is_running = False
593
+ state.status = "Finished processing all files."
594
+ await save_state_to_hf()
595
+ return {"status": "success", "message": "Index set to end of list. Processing stopped."}
596
+ else:
597
+ raise HTTPException(status_code=400, detail=f"Index {new_index} is out of bounds (0 to {state.total_files}).")
598
 
599
+ @app.post("/control_processing")
600
+ async def control_processing(request: Request, background_tasks: BackgroundTasks):
601
+ """Endpoint to start/stop the processing loop."""
602
+ global state
 
 
603
 
604
+ form = await request.form()
605
+ action = form.get("action")
 
 
 
 
 
606
 
607
+ async with state_lock:
608
+ if action == "start":
609
+ if not state.is_running and state.current_index < state.total_files:
610
+ state.is_running = True
611
+ state.status = "Processing started."
612
+ background_tasks.add_task(process_next_file_task)
613
+ await save_state_to_hf()
614
+ return {"status": "success", "message": "Processing loop started."}
615
+ elif state.current_index >= state.total_files:
616
+ return {"status": "error", "message": "Cannot start. All files have been processed."}
617
+ else:
618
+ return {"status": "info", "message": "Processing is already running."}
619
+ elif action == "stop":
620
+ if state.is_running:
621
+ state.is_running = False
622
+ state.status = "Processing stopped by user."
623
+ await save_state_to_hf()
624
+ return {"status": "success", "message": "Processing loop stopped."}
625
+ else:
626
+ return {"status": "info", "message": "Processing is already stopped."}
627
+ else:
628
+ raise HTTPException(status_code=400, detail="Invalid action.")
629
+
630
+ @app.get("/status")
631
+ async def get_status():
632
+ """API endpoint to get the current server status as JSON."""
633
+ async with state_lock:
634
+ processed_count = len(state.processed_files)
635
+
636
+ server_stats = [
637
+ {
638
+ "url": s.url,
639
+ "busy": s.busy,
640
+ "processed": s.total_processed,
641
+ "fps": f"{s.fps:.2f}"
642
+ } for s in servers
643
+ ]
644
+
645
+ total_processed = sum(s.total_processed for s in servers)
646
+ total_time = sum(s.total_time for s in servers)
647
+ overall_fps = total_processed / total_time if total_time > 0 else 0
648
+
649
+ return {
650
+ "flow_id": FLOW_ID,
651
+ "status": state.status,
652
+ "is_running": state.is_running,
653
+ "total_files": state.total_files,
654
+ "processed_count": processed_count,
655
+ "remaining_count": state.total_files - processed_count,
656
+ "current_index": state.current_index,
657
+ "current_file": state.current_file,
658
+ "current_file_progress": state.current_file_progress,
659
+ "last_update": state.last_update,
660
+ "overall_fps": f"{overall_fps:.2f}",
661
+ "server_stats": server_stats
662
+ }
663
+
664
+ # The original /process_course endpoint is now obsolete as the server manages its own queue
665
+ # @app.post("/process_course")
666
+ # async def process_course(request: ProcessCourseRequest, background_tasks: BackgroundTasks):
667
+ # return {"status": "obsolete", "message": "The server now manages its own processing queue based on the index."}
668
+
669
 
670
  if __name__ == "__main__":
671
  uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)