Fred808 commited on
Commit
eb57683
Β·
verified Β·
1 Parent(s): 8580dc7

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +93 -58
app.py CHANGED
@@ -28,6 +28,7 @@ def save_captions_to_file(course: str, captions: List[Dict]) -> None:
28
  file_path = get_caption_file_path(course)
29
  with open(file_path, 'w', encoding='utf-8') as f:
30
  json.dump(captions, f, indent=2, ensure_ascii=False)
 
31
  except Exception as e:
32
  print(f"Error saving captions for {course}: {e}")
33
 
@@ -37,7 +38,9 @@ def load_captions_from_file(course: str) -> List[Dict]:
37
  file_path = get_caption_file_path(course)
38
  if file_path.exists():
39
  with open(file_path, 'r', encoding='utf-8') as f:
40
- return json.load(f)
 
 
41
  except Exception as e:
42
  print(f"Error loading captions for {course}: {e}")
43
  return []
@@ -131,6 +134,7 @@ app = FastAPI(
131
  # Global state
132
  processed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
133
  course_captions: Dict[str, List[Dict]] = {} # {course: [{image, caption, metadata}]}
 
134
  servers = []
135
  is_processing = False
136
  current_processing_task = None
@@ -223,13 +227,15 @@ async def get_processing_status():
223
  for course in processed_images:
224
  total = len(processed_images[course])
225
  processed = len(course_captions.get(course, []))
226
- status_info[course] = ProcessingStatus(
227
- course=course,
228
- total_images=total,
229
- processed_images=processed,
230
- progress_percent=(processed / total * 100) if total > 0 else 0,
231
- status="processing" if processed < total else "completed"
232
- )
 
 
233
  return status_info
234
 
235
  @app.post("/processing/start")
@@ -291,6 +297,8 @@ async def delete_captions(course: str):
291
  del processed_images[course]
292
  if course in course_captions:
293
  del course_captions[course]
 
 
294
  return {"message": f"Captions for {course} deleted"}
295
  else:
296
  raise HTTPException(status_code=404, detail=f"No captions found for {course}")
@@ -417,21 +425,24 @@ async def submit_to_dataset(course: str, metadata_list: List[Dict]):
417
  json=payload
418
  ) as resp:
419
  result = await resp.json()
420
- print(f"Dataset submission result for {course}: {result}")
421
  return result
422
  except Exception as e:
423
- print(f"Error submitting to dataset: {e}")
424
  return None
425
 
426
  async def process_course(course: str, servers: List[CaptionServer]):
427
- """Process all images in a course using available servers with retry logic"""
 
428
  if course not in processed_images:
429
  processed_images[course] = set()
430
  if course not in course_captions:
431
- # Load any existing captions from file
432
  course_captions[course] = load_captions_from_file(course)
433
  # Update processed images set from loaded captions
434
- processed_images[course].update(cap['image'] for cap in course_captions[course])
 
 
 
435
 
436
  # Get list of images
437
  images = await fetch_course_images(course)
@@ -442,18 +453,21 @@ async def process_course(course: str, servers: List[CaptionServer]):
442
  print(f"\nProcessing {len(images)} images for course {course}")
443
 
444
  # Track images that need processing with retry count (5 retries)
445
- pending_images = {
446
- img['filename']: {'image': img, 'retries': 0, 'max_retries': 5}
447
- for img in images
448
- if img['filename'] not in processed_images[course]
449
- }
450
 
451
  if not pending_images:
452
- print(f"All images already processed for course {course}")
 
453
  return
454
 
455
- failed_images = set()
456
- total_images = len(images)
 
 
457
 
458
  while pending_images and is_processing:
459
  # Create tasks for each available server
@@ -468,7 +482,7 @@ async def process_course(course: str, servers: List[CaptionServer]):
468
 
469
  # Assign this image to the server
470
  tasks.append(process_image(server, course, img))
471
- assigned_images.append((filename, img))
472
  # Remove from pending temporarily while it's being processed
473
  del pending_images[filename]
474
 
@@ -482,52 +496,63 @@ async def process_course(course: str, servers: List[CaptionServer]):
482
 
483
  # Handle results and retry logic
484
  has_new_results = False
485
- for (filename, img), result in zip(assigned_images, results):
486
  if result:
487
  # Success - image was processed
488
  processed_images[course].add(filename)
489
  course_captions[course].append(result)
490
  has_new_results = True
 
491
  print(f"βœ“ Successfully processed {filename}")
492
  else:
493
  # Failure - check if we should retry
494
- current_retries = assigned_images[0][0] if assigned_images else 0
495
- img_data = {'image': img, 'retries': current_retries + 1, 'max_retries': 5}
496
-
497
- if img_data['retries'] <= img_data['max_retries']:
498
- # Put back in pending for retry
499
- pending_images[filename] = img_data
500
- print(f"↻ Retry {img_data['retries']}/{img_data['max_retries']} for {filename}")
 
501
  else:
502
  # Max retries exceeded, mark as failed
503
- failed_images.add(filename)
504
- print(f"βœ— Failed to process {filename} after {img_data['max_retries']} retries")
505
 
506
  # Save progress after each batch with new results
507
  if has_new_results:
508
  save_captions_to_file(course, course_captions[course])
509
 
510
  # Show progress
 
511
  done = len(processed_images[course])
 
512
  pending_count = len(pending_images)
513
- failed_count = len(failed_images)
514
- progress_percent = (done / total_images * 100) if total_images > 0 else 0
515
- print(f"\rProgress: {done}/{total_images} images ({progress_percent:.1f}%) - {pending_count} pending, {failed_count} failed", end="")
516
 
517
- # Check if we're done (all images either processed or permanently failed)
518
- if not pending_images:
519
- if failed_images:
520
- print(f"\nCourse {course} completed with {len(failed_images)} failed images: {list(failed_images)}")
521
- else:
522
- print(f"\nCourse {course} complete, submitting to dataset...")
523
- await submit_to_dataset(course, course_captions[course])
524
- break
525
-
526
  # Small delay to prevent overwhelming the servers
527
  await asyncio.sleep(0.5)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
528
 
529
  async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = True):
530
- """Main processing loop - same as original main() function"""
531
  global is_processing
532
 
533
  # Get model information and verify Florence-2-large availability
@@ -566,14 +591,22 @@ async def processing_loop(specific_courses: Optional[List[str]] = None, continuo
566
  print()
567
 
568
  start_time = time.time()
 
569
 
570
  while is_processing:
571
  try:
 
 
 
 
 
572
  # Get available courses
573
  if specific_courses:
574
  courses = specific_courses
 
575
  else:
576
  courses = await fetch_courses()
 
577
 
578
  if not courses:
579
  print("No courses found, waiting...")
@@ -582,24 +615,24 @@ async def processing_loop(specific_courses: Optional[List[str]] = None, continuo
582
  await asyncio.sleep(10)
583
  continue
584
 
585
- print(f"Found {len(courses)} courses")
586
-
587
  # Process each course with all available servers
588
  for course in courses:
589
  if not is_processing:
590
  break
 
 
591
  await process_course(course, processing_servers)
592
 
593
- # Show server stats
594
- print("\nServer Stats:")
595
- total_processed = sum(s.total_processed for s in processing_servers)
596
- elapsed = time.time() - start_time
597
- if elapsed > 0:
598
- print(f"Total images processed: {total_processed}")
599
- print(f"Overall speed: {total_processed/elapsed:.2f} fps")
600
- for s in processing_servers:
601
- print(f"- {s.url}: {s.total_processed} images, {s.fps:.2f} fps")
602
- print()
603
 
604
  if not continuous:
605
  print("One-time processing completed")
@@ -613,7 +646,9 @@ async def processing_loop(specific_courses: Optional[List[str]] = None, continuo
613
  print("Processing cancelled")
614
  break
615
  except Exception as e:
616
- print(f"Error in processing loop: {e}")
 
 
617
  await asyncio.sleep(10)
618
 
619
  is_processing = False
 
28
  file_path = get_caption_file_path(course)
29
  with open(file_path, 'w', encoding='utf-8') as f:
30
  json.dump(captions, f, indent=2, ensure_ascii=False)
31
+ print(f"βœ“ Saved {len(captions)} captions for {course}")
32
  except Exception as e:
33
  print(f"Error saving captions for {course}: {e}")
34
 
 
38
  file_path = get_caption_file_path(course)
39
  if file_path.exists():
40
  with open(file_path, 'r', encoding='utf-8') as f:
41
+ captions = json.load(f)
42
+ print(f"βœ“ Loaded {len(captions)} existing captions for {course}")
43
+ return captions
44
  except Exception as e:
45
  print(f"Error loading captions for {course}: {e}")
46
  return []
 
134
  # Global state
135
  processed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
136
  course_captions: Dict[str, List[Dict]] = {} # {course: [{image, caption, metadata}]}
137
+ failed_images: Dict[str, Set[str]] = {} # {course: set(image_names)}
138
  servers = []
139
  is_processing = False
140
  current_processing_task = None
 
227
  for course in processed_images:
228
  total = len(processed_images[course])
229
  processed = len(course_captions.get(course, []))
230
+ failed = len(failed_images.get(course, set()))
231
+ status_info[course] = {
232
+ "course": course,
233
+ "total_images": total,
234
+ "processed_images": processed,
235
+ "failed_images": failed,
236
+ "progress_percent": (processed / total * 100) if total > 0 else 0,
237
+ "status": "completed" if processed + failed >= total else "processing"
238
+ }
239
  return status_info
240
 
241
  @app.post("/processing/start")
 
297
  del processed_images[course]
298
  if course in course_captions:
299
  del course_captions[course]
300
+ if course in failed_images:
301
+ del failed_images[course]
302
  return {"message": f"Captions for {course} deleted"}
303
  else:
304
  raise HTTPException(status_code=404, detail=f"No captions found for {course}")
 
425
  json=payload
426
  ) as resp:
427
  result = await resp.json()
428
+ print(f"βœ“ Dataset submission result for {course}: {result}")
429
  return result
430
  except Exception as e:
431
+ print(f"βœ— Error submitting to dataset: {e}")
432
  return None
433
 
434
  async def process_course(course: str, servers: List[CaptionServer]):
435
+ """Process all images in a course using available servers with proper retry logic"""
436
+ # Initialize course tracking
437
  if course not in processed_images:
438
  processed_images[course] = set()
439
  if course not in course_captions:
 
440
  course_captions[course] = load_captions_from_file(course)
441
  # Update processed images set from loaded captions
442
+ for cap in course_captions[course]:
443
+ processed_images[course].add(cap['image'])
444
+ if course not in failed_images:
445
+ failed_images[course] = set()
446
 
447
  # Get list of images
448
  images = await fetch_course_images(course)
 
453
  print(f"\nProcessing {len(images)} images for course {course}")
454
 
455
  # Track images that need processing with retry count (5 retries)
456
+ pending_images = {}
457
+ for img in images:
458
+ filename = img['filename']
459
+ if filename not in processed_images[course] and filename not in failed_images[course]:
460
+ pending_images[filename] = {'image': img, 'retries': 0, 'max_retries': 5}
461
 
462
  if not pending_images:
463
+ print(f"All images already processed or failed for course {course}")
464
+ print(f"- Processed: {len(processed_images[course])}, Failed: {len(failed_images[course])}")
465
  return
466
 
467
+ print(f"Images to process: {len(pending_images)} (already processed: {len(processed_images[course])}, failed: {len(failed_images[course])})")
468
+
469
+ batch_size = len([s for s in servers if not s.busy])
470
+ processed_in_this_run = 0
471
 
472
  while pending_images and is_processing:
473
  # Create tasks for each available server
 
482
 
483
  # Assign this image to the server
484
  tasks.append(process_image(server, course, img))
485
+ assigned_images.append((filename, img, img_data['retries']))
486
  # Remove from pending temporarily while it's being processed
487
  del pending_images[filename]
488
 
 
496
 
497
  # Handle results and retry logic
498
  has_new_results = False
499
+ for (filename, img, current_retries), result in zip(assigned_images, results):
500
  if result:
501
  # Success - image was processed
502
  processed_images[course].add(filename)
503
  course_captions[course].append(result)
504
  has_new_results = True
505
+ processed_in_this_run += 1
506
  print(f"βœ“ Successfully processed {filename}")
507
  else:
508
  # Failure - check if we should retry
509
+ if current_retries < 5: # max_retries
510
+ # Put back in pending for retry with incremented retry count
511
+ pending_images[filename] = {
512
+ 'image': img,
513
+ 'retries': current_retries + 1,
514
+ 'max_retries': 5
515
+ }
516
+ print(f"↻ Retry {current_retries + 1}/5 for {filename}")
517
  else:
518
  # Max retries exceeded, mark as failed
519
+ failed_images[course].add(filename)
520
+ print(f"βœ— Failed to process {filename} after 5 retries")
521
 
522
  # Save progress after each batch with new results
523
  if has_new_results:
524
  save_captions_to_file(course, course_captions[course])
525
 
526
  # Show progress
527
+ total = len(images)
528
  done = len(processed_images[course])
529
+ failed_count = len(failed_images[course])
530
  pending_count = len(pending_images)
531
+ progress_percent = (done / total * 100) if total > 0 else 0
532
+
533
+ print(f"\rProgress: {done}/{total} ({progress_percent:.1f}%) - {pending_count} pending, {failed_count} failed, {processed_in_this_run} new", end="", flush=True)
534
 
 
 
 
 
 
 
 
 
 
535
  # Small delay to prevent overwhelming the servers
536
  await asyncio.sleep(0.5)
537
+
538
+ # Final status for this course
539
+ total = len(images)
540
+ done = len(processed_images[course])
541
+ failed_count = len(failed_images[course])
542
+
543
+ if done + failed_count >= total:
544
+ if failed_count > 0:
545
+ print(f"\nβœ“ Course {course} completed with {failed_count} failed images")
546
+ else:
547
+ print(f"\nβœ“ Course {course} fully completed")
548
+ # Submit to dataset only when fully completed
549
+ print(f"Submitting {len(course_captions[course])} captions to dataset...")
550
+ await submit_to_dataset(course, course_captions[course])
551
+ else:
552
+ print(f"\n→ Course {course} partially completed: {done}/{total} processed, {failed_count} failed")
553
 
554
  async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = True):
555
+ """Main processing loop with proper error handling"""
556
  global is_processing
557
 
558
  # Get model information and verify Florence-2-large availability
 
591
  print()
592
 
593
  start_time = time.time()
594
+ iteration = 0
595
 
596
  while is_processing:
597
  try:
598
+ iteration += 1
599
+ print(f"\n{'='*50}")
600
+ print(f"Processing Iteration {iteration}")
601
+ print(f"{'='*50}")
602
+
603
  # Get available courses
604
  if specific_courses:
605
  courses = specific_courses
606
+ print(f"Processing specific courses: {courses}")
607
  else:
608
  courses = await fetch_courses()
609
+ print(f"Found {len(courses)} courses")
610
 
611
  if not courses:
612
  print("No courses found, waiting...")
 
615
  await asyncio.sleep(10)
616
  continue
617
 
 
 
618
  # Process each course with all available servers
619
  for course in courses:
620
  if not is_processing:
621
  break
622
+
623
+ print(f"\n--- Processing course: {course} ---")
624
  await process_course(course, processing_servers)
625
 
626
+ # Show server stats
627
+ print("\nServer Stats:")
628
+ total_processed = sum(s.total_processed for s in processing_servers)
629
+ elapsed = time.time() - start_time
630
+ if elapsed > 0:
631
+ print(f"Total images processed: {total_processed}")
632
+ print(f"Overall speed: {total_processed/elapsed:.2f} fps")
633
+ for s in processing_servers:
634
+ print(f"- {s.url}: {s.total_processed} images, {s.fps:.2f} fps")
635
+ print()
636
 
637
  if not continuous:
638
  print("One-time processing completed")
 
646
  print("Processing cancelled")
647
  break
648
  except Exception as e:
649
+ print(f"Error in processing loop: {str(e)}")
650
+ import traceback
651
+ traceback.print_exc()
652
  await asyncio.sleep(10)
653
 
654
  is_processing = False