Fred808 commited on
Commit
6beb7ba
·
verified ·
1 Parent(s): a1e320e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +70 -22
app.py CHANGED
@@ -120,7 +120,7 @@ class ProcessingStatus(BaseModel):
120
 
121
  class StartProcessingRequest(BaseModel):
122
  courses: Optional[List[str]] = None # If None, process all courses
123
- continuous: bool = False
124
 
125
  # FastAPI App
126
  app = FastAPI(
@@ -135,6 +135,7 @@ course_captions: Dict[str, List[Dict]] = {} # {course: [{image, caption, metada
135
  servers = []
136
  is_processing = False
137
  current_processing_task = None
 
138
 
139
  class CaptionServer:
140
  def __init__(self, url):
@@ -156,7 +157,12 @@ def initialize_servers():
156
  # API Routes
157
  @app.get("/")
158
  async def root():
159
- return {"message": "Caption Coordinator API", "status": "running"}
 
 
 
 
 
160
 
161
  @app.get("/health")
162
  async def health():
@@ -164,7 +170,8 @@ async def health():
164
  "status": "healthy",
165
  "servers_available": len([s for s in servers if not s.busy]),
166
  "total_servers": len(servers),
167
- "is_processing": is_processing
 
168
  }
169
 
170
  @app.get("/courses")
@@ -227,7 +234,7 @@ async def get_processing_status():
227
  return status_info
228
 
229
  @app.post("/processing/start")
230
- async def start_processing(request: StartProcessingRequest, background_tasks: BackgroundTasks):
231
  """Start caption processing"""
232
  global is_processing, current_processing_task
233
 
@@ -235,9 +242,15 @@ async def start_processing(request: StartProcessingRequest, background_tasks: Ba
235
  raise HTTPException(status_code=400, detail="Processing is already running")
236
 
237
  is_processing = True
238
- current_processing_task = asyncio.create_task(processing_loop(request.courses, request.continuous))
 
 
239
 
240
- return {"message": "Processing started", "continuous": request.continuous}
 
 
 
 
241
 
242
  @app.post("/processing/stop")
243
  async def stop_processing():
@@ -250,6 +263,10 @@ async def stop_processing():
250
  is_processing = False
251
  if current_processing_task:
252
  current_processing_task.cancel()
 
 
 
 
253
  current_processing_task = None
254
 
255
  return {"message": "Processing stopped"}
@@ -281,7 +298,7 @@ async def delete_captions(course: str):
281
  except Exception as e:
282
  raise HTTPException(status_code=500, detail=f"Error deleting captions: {e}")
283
 
284
- # Core processing functions (same as original)
285
  async def fetch_courses() -> List[str]:
286
  """Fetch available courses from source server"""
287
  async with aiohttp.ClientSession() as session:
@@ -354,7 +371,10 @@ async def process_image(server: CaptionServer, course: str, image: Dict) -> Dict
354
  server.total_processed += 1
355
  metadata = {
356
  "image": image['filename'],
357
- "caption": result['caption']
 
 
 
358
  }
359
  print(f"Server {server.url} processed {image['filename']} in {processing_time:.2f}s ({server.fps:.2f} fps)")
360
  return metadata
@@ -378,7 +398,8 @@ async def submit_to_dataset(course: str, metadata_list: List[Dict]):
378
  "course": parent_folder,
379
  "metadata": {
380
  "course_name": course,
381
- "image_count": len(metadata_list)
 
382
  },
383
  "captions": metadata_list
384
  }
@@ -409,11 +430,16 @@ async def process_course(course: str, servers: List[CaptionServer]):
409
  # Get list of images
410
  images = await fetch_course_images(course)
411
  if not images:
 
412
  return
413
 
414
  print(f"\nProcessing {len(images)} images for course {course}")
415
  remaining_images = [img for img in images if img['filename'] not in processed_images[course]]
416
 
 
 
 
 
417
  while remaining_images and is_processing:
418
  # Create tasks for each available server
419
  tasks = []
@@ -445,17 +471,20 @@ async def process_course(course: str, servers: List[CaptionServer]):
445
  # Show progress
446
  total = len(images)
447
  done = len(processed_images[course])
448
- print(f"\rProgress: {done}/{total} images ({done/total*100:.1f}%)", end="")
 
449
 
450
  if not remaining_images and len(processed_images[course]) == len(images):
451
  print(f"\nCourse {course} complete, submitting to dataset...")
452
  await submit_to_dataset(course, course_captions[course])
453
- processed_images[course].clear()
454
- course_captions[course].clear()
455
  break
456
 
457
- async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = False):
458
- """Main processing loop"""
 
 
 
459
  global is_processing
460
 
461
  # Get model information and verify Florence-2-large availability
@@ -478,7 +507,20 @@ async def processing_loop(specific_courses: Optional[List[str]] = None, continuo
478
  # Update servers list to only use those with large model
479
  processing_servers = available_servers
480
  print(f"\nUsing {len(processing_servers)} servers with {MODEL_TYPE}")
481
- print()
 
 
 
 
 
 
 
 
 
 
 
 
 
482
 
483
  start_time = time.time()
484
 
@@ -517,9 +559,11 @@ async def processing_loop(specific_courses: Optional[List[str]] = None, continuo
517
  print()
518
 
519
  if not continuous:
 
520
  break
521
 
522
  # Wait before next check
 
523
  await asyncio.sleep(5)
524
 
525
  except asyncio.CancelledError:
@@ -530,21 +574,25 @@ async def processing_loop(specific_courses: Optional[List[str]] = None, continuo
530
  await asyncio.sleep(10)
531
 
532
  is_processing = False
 
533
 
534
  # Startup event
535
  @app.on_event("startup")
536
  async def startup_event():
537
- """Initialize servers on startup"""
538
  initialize_servers()
539
  print("Caption Coordinator API started")
540
  print(f"Source server: {SOURCE_SERVER}")
541
  print(f"Caption servers: {len(CAPTION_SERVERS)}")
542
  print(f"Dataset server: {DATA_COLLECTION_SERVER}")
 
 
 
 
 
 
 
 
543
 
544
  if __name__ == "__main__":
545
- uvicorn.run(
546
- "app:app",
547
- host="0.0.0.0",
548
- port=8000,
549
- reload=True
550
- )
 
120
 
121
  class StartProcessingRequest(BaseModel):
122
  courses: Optional[List[str]] = None # If None, process all courses
123
+ continuous: bool = True # Default to continuous like original
124
 
125
  # FastAPI App
126
  app = FastAPI(
 
135
  servers = []
136
  is_processing = False
137
  current_processing_task = None
138
+ auto_start_processing = True # Set to False if you don't want auto-start
139
 
140
  class CaptionServer:
141
  def __init__(self, url):
 
157
  # API Routes
158
  @app.get("/")
159
  async def root():
160
+ return {
161
+ "message": "Caption Coordinator API",
162
+ "status": "running",
163
+ "auto_processing": auto_start_processing,
164
+ "is_processing": is_processing
165
+ }
166
 
167
  @app.get("/health")
168
  async def health():
 
170
  "status": "healthy",
171
  "servers_available": len([s for s in servers if not s.busy]),
172
  "total_servers": len(servers),
173
+ "is_processing": is_processing,
174
+ "auto_processing": auto_start_processing
175
  }
176
 
177
  @app.get("/courses")
 
234
  return status_info
235
 
236
  @app.post("/processing/start")
237
+ async def start_processing(request: StartProcessingRequest = StartProcessingRequest()):
238
  """Start caption processing"""
239
  global is_processing, current_processing_task
240
 
 
242
  raise HTTPException(status_code=400, detail="Processing is already running")
243
 
244
  is_processing = True
245
+ current_processing_task = asyncio.create_task(
246
+ processing_loop(request.courses, request.continuous)
247
+ )
248
 
249
+ return {
250
+ "message": "Processing started",
251
+ "continuous": request.continuous,
252
+ "specific_courses": request.courses
253
+ }
254
 
255
  @app.post("/processing/stop")
256
  async def stop_processing():
 
263
  is_processing = False
264
  if current_processing_task:
265
  current_processing_task.cancel()
266
+ try:
267
+ await current_processing_task
268
+ except asyncio.CancelledError:
269
+ pass
270
  current_processing_task = None
271
 
272
  return {"message": "Processing stopped"}
 
298
  except Exception as e:
299
  raise HTTPException(status_code=500, detail=f"Error deleting captions: {e}")
300
 
301
+ # Core processing functions
302
  async def fetch_courses() -> List[str]:
303
  """Fetch available courses from source server"""
304
  async with aiohttp.ClientSession() as session:
 
371
  server.total_processed += 1
372
  metadata = {
373
  "image": image['filename'],
374
+ "caption": result['caption'],
375
+ "server": server.url,
376
+ "processing_time": processing_time,
377
+ "timestamp": datetime.now().isoformat()
378
  }
379
  print(f"Server {server.url} processed {image['filename']} in {processing_time:.2f}s ({server.fps:.2f} fps)")
380
  return metadata
 
398
  "course": parent_folder,
399
  "metadata": {
400
  "course_name": course,
401
+ "image_count": len(metadata_list),
402
+ "completed_at": datetime.now().isoformat()
403
  },
404
  "captions": metadata_list
405
  }
 
430
  # Get list of images
431
  images = await fetch_course_images(course)
432
  if not images:
433
+ print(f"No images found for course {course}")
434
  return
435
 
436
  print(f"\nProcessing {len(images)} images for course {course}")
437
  remaining_images = [img for img in images if img['filename'] not in processed_images[course]]
438
 
439
+ if not remaining_images:
440
+ print(f"All images already processed for course {course}")
441
+ return
442
+
443
  while remaining_images and is_processing:
444
  # Create tasks for each available server
445
  tasks = []
 
471
  # Show progress
472
  total = len(images)
473
  done = len(processed_images[course])
474
+ progress_percent = (done / total * 100) if total > 0 else 0
475
+ print(f"\rProgress: {done}/{total} images ({progress_percent:.1f}%)", end="")
476
 
477
  if not remaining_images and len(processed_images[course]) == len(images):
478
  print(f"\nCourse {course} complete, submitting to dataset...")
479
  await submit_to_dataset(course, course_captions[course])
480
+ # Don't clear the data, keep it for API queries
 
481
  break
482
 
483
+ # Small delay to prevent overwhelming the servers
484
+ await asyncio.sleep(0.5)
485
+
486
+ async def processing_loop(specific_courses: Optional[List[str]] = None, continuous: bool = True):
487
+ """Main processing loop - same as original main() function"""
488
  global is_processing
489
 
490
  # Get model information and verify Florence-2-large availability
 
507
  # Update servers list to only use those with large model
508
  processing_servers = available_servers
509
  print(f"\nUsing {len(processing_servers)} servers with {MODEL_TYPE}")
510
+
511
+ # Check for existing caption files and report
512
+ existing_captions = list(CAPTIONS_DIR.glob("*_captions.json"))
513
+ if existing_captions:
514
+ print("\nFound existing caption files:")
515
+ for cap_file in existing_captions:
516
+ course = cap_file.stem.replace("_captions", "")
517
+ try:
518
+ with open(cap_file, 'r', encoding='utf-8') as f:
519
+ captions = json.load(f)
520
+ print(f"- {course}: {len(captions)} captions")
521
+ except Exception as e:
522
+ print(f"- Error reading {cap_file.name}: {e}")
523
+ print()
524
 
525
  start_time = time.time()
526
 
 
559
  print()
560
 
561
  if not continuous:
562
+ print("One-time processing completed")
563
  break
564
 
565
  # Wait before next check
566
+ print("Waiting for new courses...")
567
  await asyncio.sleep(5)
568
 
569
  except asyncio.CancelledError:
 
574
  await asyncio.sleep(10)
575
 
576
  is_processing = False
577
+ print("Processing loop stopped")
578
 
579
  # Startup event
580
  @app.on_event("startup")
581
  async def startup_event():
582
+ """Initialize servers and start processing on startup"""
583
  initialize_servers()
584
  print("Caption Coordinator API started")
585
  print(f"Source server: {SOURCE_SERVER}")
586
  print(f"Caption servers: {len(CAPTION_SERVERS)}")
587
  print(f"Dataset server: {DATA_COLLECTION_SERVER}")
588
+
589
+ # Start processing automatically (like original main())
590
+ if auto_start_processing:
591
+ print("Auto-starting processing loop...")
592
+ global is_processing, current_processing_task
593
+ is_processing = True
594
+ current_processing_task = asyncio.create_task(processing_loop())
595
+
596
 
597
  if __name__ == "__main__":
598
+ uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)